[FLINK-8666] [test] Use testDispatcherConfig in MiniCluster

Using the AkkaUtils#testDispatcherConfig reduces the number of started threads.
This effectively decreases the resource foot print of the MiniCluster.

This closes #5499.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/65081ac7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/65081ac7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/65081ac7

Branch: refs/heads/master
Commit: 65081ac72d18f610db210c5fe030805141a9b2e5
Parents: e29ec0f
Author: Till Rohrmann <[email protected]>
Authored: Thu Feb 15 18:07:49 2018 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Fri Feb 23 18:22:09 2018 +0100

----------------------------------------------------------------------
 .../apache/flink/runtime/minicluster/MiniCluster.java   | 12 +++++++++---
 1 file changed, 9 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/65081ac7/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index c98387d..01be01d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -65,6 +65,7 @@ import org.apache.flink.util.AutoCloseableAsync;
 import org.apache.flink.util.ExceptionUtils;
 
 import akka.actor.ActorSystem;
+import com.typesafe.config.Config;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -634,13 +635,18 @@ public class MiniCluster implements JobExecutorService, 
AutoCloseableAsync {
                        boolean remoteEnabled,
                        String bindAddress) {
 
-               ActorSystem actorSystem;
+               final Config akkaConfig;
+
                if (remoteEnabled) {
-                       actorSystem = 
AkkaUtils.createActorSystem(configuration, bindAddress, 0);
+                       akkaConfig = AkkaUtils.getAkkaConfig(configuration, 
bindAddress, 0);
                } else {
-                       actorSystem = 
AkkaUtils.createLocalActorSystem(configuration);
+                       akkaConfig = AkkaUtils.getAkkaConfig(configuration);
                }
 
+               final Config effectiveAkkaConfig = 
AkkaUtils.testDispatcherConfig().withFallback(akkaConfig);
+
+               final ActorSystem actorSystem = 
AkkaUtils.createActorSystem(effectiveAkkaConfig);
+
                return new AkkaRpcService(actorSystem, askTimeout);
        }
 

Reply via email to