Repository: flink
Updated Branches:
  refs/heads/master 140faa945 -> 0ccd1fd5b


[runtime] Enables remote communication for local standalone cluster mode which 
is required by the JobClient.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dacff90b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dacff90b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dacff90b

Branch: refs/heads/master
Commit: dacff90b6a06df0c8a5b1acf563cfdd5784b42cc
Parents: 140faa9
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Thu Feb 5 17:22:57 2015 +0100
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Thu Feb 5 17:31:00 2015 +0100

----------------------------------------------------------------------
 .../flink/runtime/jobmanager/JobManager.scala     | 18 ++++++------------
 1 file changed, 6 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dacff90b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index c9a60b4..4ee16da 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -547,14 +547,13 @@ object JobManager {
     EnvironmentInformation.logEnvironmentInfo(LOG, "JobManager")
     val (configuration, executionMode, listeningAddress) = parseArgs(args)
 
-
     val jobManagerSystem = AkkaUtils.createActorSystem(configuration, 
listeningAddress)
 
     startActor(Props(new JobManager(configuration) with 
WithWebServer))(jobManagerSystem)
 
     if(executionMode.equals(LOCAL)){
       TaskManager.startActorWithConfiguration("", configuration,
-        localAkkaCommunication = true, localTaskManagerCommunication = 
true)(jobManagerSystem)
+        localAkkaCommunication = false, localTaskManagerCommunication = 
true)(jobManagerSystem)
     }
 
     jobManagerSystem.awaitTermination()
@@ -593,17 +592,12 @@ object JobManager {
           configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, 
config.configDir + "/..")
         }
 
-        val listeningAddress = if(config.executionMode.equals(LOCAL)){
-          // All communication happens within the same actor system
-          None
-        }else{
-          val hostname = 
configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null)
-          val port = 
configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
-            ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
+        val hostname = 
configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null)
+        val port = 
configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
+          ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
 
-          // Listening address on which the actor system listens for remote 
messages
-          Some((hostname, port))
-        }
+        // Listening address on which the actor system listens for remote 
messages
+        val listeningAddress = Some((hostname, port))
 
         (configuration, config.executionMode, listeningAddress)
     } getOrElse {

Reply via email to