Repository: flink Updated Branches: refs/heads/master 140faa945 -> 0ccd1fd5b
[runtime] Enables remote communication for local standalone cluster mode which is required by the JobClient. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dacff90b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dacff90b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dacff90b Branch: refs/heads/master Commit: dacff90b6a06df0c8a5b1acf563cfdd5784b42cc Parents: 140faa9 Author: Till Rohrmann <trohrm...@apache.org> Authored: Thu Feb 5 17:22:57 2015 +0100 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Thu Feb 5 17:31:00 2015 +0100 ---------------------------------------------------------------------- .../flink/runtime/jobmanager/JobManager.scala | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/dacff90b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index c9a60b4..4ee16da 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -547,14 +547,13 @@ object JobManager { EnvironmentInformation.logEnvironmentInfo(LOG, "JobManager") val (configuration, executionMode, listeningAddress) = parseArgs(args) - val jobManagerSystem = AkkaUtils.createActorSystem(configuration, listeningAddress) startActor(Props(new JobManager(configuration) with WithWebServer))(jobManagerSystem) if(executionMode.equals(LOCAL)){ TaskManager.startActorWithConfiguration("", configuration, - localAkkaCommunication = true, localTaskManagerCommunication = true)(jobManagerSystem) + localAkkaCommunication = false, localTaskManagerCommunication = true)(jobManagerSystem) } jobManagerSystem.awaitTermination() @@ -593,17 +592,12 @@ object JobManager { configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, config.configDir + "/..") } - val listeningAddress = if(config.executionMode.equals(LOCAL)){ - // All communication happens within the same actor system - None - }else{ - val hostname = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null) - val port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, - ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT) + val hostname = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null) + val port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, + ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT) - // Listening address on which the actor system listens for remote messages - Some((hostname, port)) - } + // Listening address on which the actor system listens for remote messages + val listeningAddress = Some((hostname, port)) (configuration, config.executionMode, listeningAddress) } getOrElse {