Repository: flink
Updated Branches:
  refs/heads/master 8111177cf -> a51c02f6e


[FLINK-1585] [tests] Fix mini clusters to respect memory and buffer 
configurations and improve relative memory computation.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a1104491
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a1104491
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a1104491

Branch: refs/heads/master
Commit: a1104491b88ff3293142992900e69253206736ac
Parents: 8111177
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Feb 19 14:34:14 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Feb 19 18:53:42 2015 +0100

----------------------------------------------------------------------
 .../minicluster/LocalFlinkMiniCluster.scala     | 77 +++++++++++++-------
 .../test/util/ForkableFlinkMiniCluster.scala    |  9 +++
 2 files changed, 60 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a1104491/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index 88006ac..66ff4f3 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -154,40 +154,65 @@ class LocalFlinkMiniCluster(userConfiguration: 
Configuration, singleActorSystem:
   }
 
   def setMemory(config: Configuration): Unit = {
-    var memorySize: Long = 
EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag
-    val bufferMem: Long = 
ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS *
-      ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE
-    val numTaskManager = config.getInteger(ConfigConstants
-      .LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1)
-    val taskManagerNumSlots: Int = 
ConfigConstants.DEFAULT_TASK_MANAGER_NUM_TASK_SLOTS
-    memorySize = memorySize - (bufferMem * numTaskManager)
-    memorySize = (memorySize * 
ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION).toLong
-    memorySize >>>= 20
-    memorySize /= numTaskManager
-    config.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, memorySize)
+    // set this only if no memory was preconfigured
+    if (config.getInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1) == 
-1) {
+
+      val bufferMem: Long =
+            
config.getLong(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
+                           
ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS) *
+            
config.getLong(ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY,
+                           
ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE)
+
+      val numTaskManager = config.getInteger(
+        ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1)
+
+      val memoryFraction = 
config.getFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
+                                           
ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION)
+
+      // full memory size
+      var memorySize: Long = 
EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag
+
+      // compute the memory size per task manager. we assume equally much 
memory for
+      // each TaskManagers and each JobManager
+      memorySize /= numTaskManager + 1 // the +1 is the job manager
+
+      // for each TaskManager, subtract the memory needed for memory buffers
+      memorySize -= bufferMem;
+      memorySize = (memorySize * memoryFraction).toLong
+      memorySize >>>= 20  // bytes to megabytes
+      config.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, memorySize)
+    }
   }
 
   def getDefaultConfig: Configuration = {
-    val config: Configuration = new Configuration
+    val config: Configuration = new Configuration()
+
     config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, HOSTNAME)
-    config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, ConfigConstants
-      .DEFAULT_JOB_MANAGER_IPC_PORT)
-    config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 
ConfigConstants
-      .DEFAULT_TASK_MANAGER_IPC_PORT)
-    config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, 
ConfigConstants
-      .DEFAULT_TASK_MANAGER_DATA_PORT)
-    config.setBoolean(ConfigConstants.TASK_MANAGER_MEMORY_LAZY_ALLOCATION_KEY, 
ConfigConstants
-      .DEFAULT_TASK_MANAGER_MEMORY_LAZY_ALLOCATION)
-    config.setInteger(ConfigConstants.JOBCLIENT_POLLING_INTERVAL_KEY, 
ConfigConstants
-      .DEFAULT_JOBCLIENT_POLLING_INTERVAL)
-    config.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, 
ConfigConstants
-      .DEFAULT_FILESYSTEM_OVERWRITE)
+
+    config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
+      ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
+
+    config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY,
+      ConfigConstants.DEFAULT_TASK_MANAGER_IPC_PORT)
+
+    config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
+      ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT)
+
+    config.setBoolean(ConfigConstants.TASK_MANAGER_MEMORY_LAZY_ALLOCATION_KEY,
+      ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_LAZY_ALLOCATION)
+
+    config.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY,
+      ConfigConstants.DEFAULT_FILESYSTEM_OVERWRITE)
+
     
config.setBoolean(ConfigConstants.FILESYSTEM_OUTPUT_ALWAYS_CREATE_DIRECTORY_KEY,
       ConfigConstants.DEFAULT_FILESYSTEM_ALWAYS_CREATE_DIRECTORY)
+
     config.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1)
+
     
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1)
-    config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
ConfigConstants
-      .DEFAULT_TASK_MANAGER_NUM_TASK_SLOTS)
+
+    config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
+      ConfigConstants.DEFAULT_TASK_MANAGER_NUM_TASK_SLOTS)
 
     // Reduce number of threads for local execution
     config.setInteger(NettyConfig.NUM_THREADS_CLIENT, 1)

http://git-wip-us.apache.org/repos/asf/flink/blob/a1104491/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
 
b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
index 23975e2..5b22895 100644
--- 
a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
+++ 
b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
@@ -24,6 +24,15 @@ import 
org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
 import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.testingUtils.TestingTaskManager
 
+/**
+ * A forkable mini cluster is a special case of the mini cluster, used for 
parallel test execution
+ * on build servers. If multiple tests run in parallel, the cluster picks up 
the fork number and
+ * uses it to avoid port conflicts.
+ *
+ * @param userConfiguration Configuration object with the user provided 
configuration values
+ * @param singleActorSystem true, if all actors (JobManager and TaskManager) 
shall be run in the
+ *                          same [[ActorSystem]], otherwise false.
+ */
 class ForkableFlinkMiniCluster(userConfiguration: Configuration, 
singleActorSystem: Boolean)
   extends LocalFlinkMiniCluster(userConfiguration, singleActorSystem) {
 

Reply via email to