[FLINK-5238] [minicluster] MiniCluster starts local communication if only one 
TaskManager is used


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/62e8e33f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/62e8e33f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/62e8e33f

Branch: refs/heads/flip-6
Commit: 62e8e33f341e95b70e090a6d0f7d5e75b9c4d4c9
Parents: 6b3283e
Author: Stephan Ewen <[email protected]>
Authored: Fri Dec 2 17:00:25 2016 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Mon Dec 5 02:49:43 2016 +0100

----------------------------------------------------------------------
 .../flink/runtime/minicluster/MiniCluster.java  |  4 +++-
 .../runtime/taskexecutor/TaskManagerRunner.java | 22 +++++++++++++++-----
 2 files changed, 20 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/62e8e33f/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 1b9f265..29a6e59 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -525,6 +525,7 @@ public class MiniCluster {
                        RpcService[] taskManagerRpcServices) throws Exception {
 
                final TaskManagerRunner[] taskManagerRunners = new 
TaskManagerRunner[numTaskManagers];
+               final boolean localCommunication = numTaskManagers == 1;
 
                for (int i = 0; i < numTaskManagers; i++) {
                        taskManagerRunners[i] = new TaskManagerRunner(
@@ -532,7 +533,8 @@ public class MiniCluster {
                                new ResourceID(UUID.randomUUID().toString()),
                                taskManagerRpcServices[i],
                                haServices,
-                               metricRegistry);
+                               metricRegistry,
+                               localCommunication);
 
                        taskManagerRunners[i].start();
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/62e8e33f/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index a18ff40..1145a46 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -66,11 +66,22 @@ public class TaskManagerRunner implements FatalErrorHandler 
{
        private final TaskExecutor taskManager;
 
        public TaskManagerRunner(
+                       Configuration configuration,
+                       ResourceID resourceID,
+                       RpcService rpcService,
+                       HighAvailabilityServices highAvailabilityServices,
+                       MetricRegistry metricRegistry) throws Exception {
+
+               this(configuration, resourceID, rpcService, 
highAvailabilityServices, metricRegistry, false);
+       }
+
+       public TaskManagerRunner(
                Configuration configuration,
                ResourceID resourceID,
                RpcService rpcService,
                HighAvailabilityServices highAvailabilityServices,
-               MetricRegistry metricRegistry) throws Exception {
+               MetricRegistry metricRegistry,
+               boolean localCommunicationOnly) throws Exception {
 
                this.configuration = Preconditions.checkNotNull(configuration);
                this.resourceID = Preconditions.checkNotNull(resourceID);
@@ -80,10 +91,11 @@ public class TaskManagerRunner implements FatalErrorHandler 
{
 
                InetAddress remoteAddress = 
InetAddress.getByName(rpcService.getAddress());
 
-               TaskManagerServicesConfiguration 
taskManagerServicesConfiguration = 
TaskManagerServicesConfiguration.fromConfiguration(
-                       configuration,
-                       remoteAddress,
-                       false);
+               TaskManagerServicesConfiguration 
taskManagerServicesConfiguration = 
+                               
TaskManagerServicesConfiguration.fromConfiguration(
+                                               configuration,
+                                               remoteAddress,
+                                               localCommunicationOnly);
 
                TaskManagerServices taskManagerServices = 
TaskManagerServices.fromConfiguration(
                        taskManagerServicesConfiguration,

Reply via email to