Repository: flink Updated Branches: refs/heads/master 8111177cf -> a51c02f6e
[FLINK-1585] [tests] Fix mini clusters to respect memory and buffer configurations and improve relative memory computation. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a1104491 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a1104491 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a1104491 Branch: refs/heads/master Commit: a1104491b88ff3293142992900e69253206736ac Parents: 8111177 Author: Stephan Ewen <se...@apache.org> Authored: Thu Feb 19 14:34:14 2015 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Thu Feb 19 18:53:42 2015 +0100 ---------------------------------------------------------------------- .../minicluster/LocalFlinkMiniCluster.scala | 77 +++++++++++++------- .../test/util/ForkableFlinkMiniCluster.scala | 9 +++ 2 files changed, 60 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a1104491/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala index 88006ac..66ff4f3 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala @@ -154,40 +154,65 @@ class LocalFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem: } def setMemory(config: Configuration): Unit = { - var memorySize: Long = EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag - val bufferMem: Long = ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS * - ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE - val numTaskManager = config.getInteger(ConfigConstants - .LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1) - val taskManagerNumSlots: Int = ConfigConstants.DEFAULT_TASK_MANAGER_NUM_TASK_SLOTS - memorySize = memorySize - (bufferMem * numTaskManager) - memorySize = (memorySize * ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION).toLong - memorySize >>>= 20 - memorySize /= numTaskManager - config.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, memorySize) + // set this only if no memory was preconfigured + if (config.getInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1) == -1) { + + val bufferMem: Long = + config.getLong(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, + ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS) * + config.getLong(ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY, + ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE) + + val numTaskManager = config.getInteger( + ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1) + + val memoryFraction = config.getFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, + ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION) + + // full memory size + var memorySize: Long = EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag + + // compute the memory size per task manager. we assume equally much memory for + // each TaskManagers and each JobManager + memorySize /= numTaskManager + 1 // the +1 is the job manager + + // for each TaskManager, subtract the memory needed for memory buffers + memorySize -= bufferMem; + memorySize = (memorySize * memoryFraction).toLong + memorySize >>>= 20 // bytes to megabytes + config.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, memorySize) + } } def getDefaultConfig: Configuration = { - val config: Configuration = new Configuration + val config: Configuration = new Configuration() + config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, HOSTNAME) - config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, ConfigConstants - .DEFAULT_JOB_MANAGER_IPC_PORT) - config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, ConfigConstants - .DEFAULT_TASK_MANAGER_IPC_PORT) - config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, ConfigConstants - .DEFAULT_TASK_MANAGER_DATA_PORT) - config.setBoolean(ConfigConstants.TASK_MANAGER_MEMORY_LAZY_ALLOCATION_KEY, ConfigConstants - .DEFAULT_TASK_MANAGER_MEMORY_LAZY_ALLOCATION) - config.setInteger(ConfigConstants.JOBCLIENT_POLLING_INTERVAL_KEY, ConfigConstants - .DEFAULT_JOBCLIENT_POLLING_INTERVAL) - config.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, ConfigConstants - .DEFAULT_FILESYSTEM_OVERWRITE) + + config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, + ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT) + + config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, + ConfigConstants.DEFAULT_TASK_MANAGER_IPC_PORT) + + config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, + ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT) + + config.setBoolean(ConfigConstants.TASK_MANAGER_MEMORY_LAZY_ALLOCATION_KEY, + ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_LAZY_ALLOCATION) + + config.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, + ConfigConstants.DEFAULT_FILESYSTEM_OVERWRITE) + config.setBoolean(ConfigConstants.FILESYSTEM_OUTPUT_ALWAYS_CREATE_DIRECTORY_KEY, ConfigConstants.DEFAULT_FILESYSTEM_ALWAYS_CREATE_DIRECTORY) + config.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1) + config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1) - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, ConfigConstants - .DEFAULT_TASK_MANAGER_NUM_TASK_SLOTS) + + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, + ConfigConstants.DEFAULT_TASK_MANAGER_NUM_TASK_SLOTS) // Reduce number of threads for local execution config.setInteger(NettyConfig.NUM_THREADS_CLIENT, 1) http://git-wip-us.apache.org/repos/asf/flink/blob/a1104491/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala index 23975e2..5b22895 100644 --- a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala +++ b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala @@ -24,6 +24,15 @@ import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster import org.apache.flink.runtime.taskmanager.TaskManager import org.apache.flink.runtime.testingUtils.TestingTaskManager +/** + * A forkable mini cluster is a special case of the mini cluster, used for parallel test execution + * on build servers. If multiple tests run in parallel, the cluster picks up the fork number and + * uses it to avoid port conflicts. + * + * @param userConfiguration Configuration object with the user provided configuration values + * @param singleActorSystem true, if all actors (JobManager and TaskManager) shall be run in the + * same [[ActorSystem]], otherwise false. + */ class ForkableFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem: Boolean) extends LocalFlinkMiniCluster(userConfiguration, singleActorSystem) {