[FLINK-5238] [minicluster] MiniCluster starts local communication if only one TaskManager is used
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/62e8e33f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/62e8e33f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/62e8e33f Branch: refs/heads/flip-6 Commit: 62e8e33f341e95b70e090a6d0f7d5e75b9c4d4c9 Parents: 6b3283e Author: Stephan Ewen <[email protected]> Authored: Fri Dec 2 17:00:25 2016 +0100 Committer: Stephan Ewen <[email protected]> Committed: Mon Dec 5 02:49:43 2016 +0100 ---------------------------------------------------------------------- .../flink/runtime/minicluster/MiniCluster.java | 4 +++- .../runtime/taskexecutor/TaskManagerRunner.java | 22 +++++++++++++++----- 2 files changed, 20 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/62e8e33f/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index 1b9f265..29a6e59 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -525,6 +525,7 @@ public class MiniCluster { RpcService[] taskManagerRpcServices) throws Exception { final TaskManagerRunner[] taskManagerRunners = new TaskManagerRunner[numTaskManagers]; + final boolean localCommunication = numTaskManagers == 1; for (int i = 0; i < numTaskManagers; i++) { taskManagerRunners[i] = new TaskManagerRunner( @@ -532,7 +533,8 @@ public class MiniCluster { new ResourceID(UUID.randomUUID().toString()), taskManagerRpcServices[i], haServices, - metricRegistry); + metricRegistry, + localCommunication); taskManagerRunners[i].start(); } http://git-wip-us.apache.org/repos/asf/flink/blob/62e8e33f/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java index a18ff40..1145a46 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java @@ -66,11 +66,22 @@ public class TaskManagerRunner implements FatalErrorHandler { private final TaskExecutor taskManager; public TaskManagerRunner( + Configuration configuration, + ResourceID resourceID, + RpcService rpcService, + HighAvailabilityServices highAvailabilityServices, + MetricRegistry metricRegistry) throws Exception { + + this(configuration, resourceID, rpcService, highAvailabilityServices, metricRegistry, false); + } + + public TaskManagerRunner( Configuration configuration, ResourceID resourceID, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, - MetricRegistry metricRegistry) throws Exception { + MetricRegistry metricRegistry, + boolean localCommunicationOnly) throws Exception { this.configuration = Preconditions.checkNotNull(configuration); this.resourceID = Preconditions.checkNotNull(resourceID); @@ -80,10 +91,11 @@ public class TaskManagerRunner implements FatalErrorHandler { InetAddress remoteAddress = InetAddress.getByName(rpcService.getAddress()); - TaskManagerServicesConfiguration taskManagerServicesConfiguration = TaskManagerServicesConfiguration.fromConfiguration( - configuration, - remoteAddress, - false); + TaskManagerServicesConfiguration taskManagerServicesConfiguration = + TaskManagerServicesConfiguration.fromConfiguration( + configuration, + remoteAddress, + localCommunicationOnly); TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration( taskManagerServicesConfiguration,
