[FLINK-8666] [test] Use testDispatcherConfig in MiniCluster Using the AkkaUtils#testDispatcherConfig reduces the number of started threads. This effectively decreases the resource foot print of the MiniCluster.
This closes #5499. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/65081ac7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/65081ac7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/65081ac7 Branch: refs/heads/master Commit: 65081ac72d18f610db210c5fe030805141a9b2e5 Parents: e29ec0f Author: Till Rohrmann <[email protected]> Authored: Thu Feb 15 18:07:49 2018 +0100 Committer: Till Rohrmann <[email protected]> Committed: Fri Feb 23 18:22:09 2018 +0100 ---------------------------------------------------------------------- .../apache/flink/runtime/minicluster/MiniCluster.java | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/65081ac7/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index c98387d..01be01d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -65,6 +65,7 @@ import org.apache.flink.util.AutoCloseableAsync; import org.apache.flink.util.ExceptionUtils; import akka.actor.ActorSystem; +import com.typesafe.config.Config; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -634,13 +635,18 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync { boolean remoteEnabled, String bindAddress) { - ActorSystem actorSystem; + final Config akkaConfig; + if (remoteEnabled) { - actorSystem = AkkaUtils.createActorSystem(configuration, bindAddress, 0); + akkaConfig = AkkaUtils.getAkkaConfig(configuration, bindAddress, 0); } else { - actorSystem = AkkaUtils.createLocalActorSystem(configuration); + akkaConfig = AkkaUtils.getAkkaConfig(configuration); } + final Config effectiveAkkaConfig = AkkaUtils.testDispatcherConfig().withFallback(akkaConfig); + + final ActorSystem actorSystem = AkkaUtils.createActorSystem(effectiveAkkaConfig); + return new AkkaRpcService(actorSystem, askTimeout); }
