[FLINK-4454] always display JobManager address using LeaderRetrievalService

This closes #2406


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/72064558
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/72064558
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/72064558

Branch: refs/heads/flip-6
Commit: 720645587bc58a22db6a8d948f91384da2ecb7b7
Parents: 844c874
Author: Maximilian Michels <m...@apache.org>
Authored: Mon Aug 22 18:11:45 2016 +0200
Committer: Maximilian Michels <m...@apache.org>
Committed: Wed Aug 24 11:29:30 2016 +0200

----------------------------------------------------------------------
 .../org/apache/flink/client/CliFrontend.java    |  4 ++--
 .../flink/client/program/ClusterClient.java     | 23 +++++---------------
 .../client/program/StandaloneClusterClient.java |  4 ++--
 3 files changed, 10 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/72064558/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java 
b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 15e1362..c90bc29 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -845,7 +845,7 @@ public class CliFrontend {
                CustomCommandLine customCLI = 
getActiveCustomCommandLine(options.getCommandLine());
                try {
                        ClusterClient client = 
customCLI.retrieveCluster(options.getCommandLine(), config);
-                       logAndSysout("Using address " + 
client.getJobManagerAddressFromConfig() + " to connect to JobManager.");
+                       logAndSysout("Using address " + 
client.getJobManagerAddress() + " to connect to JobManager.");
                        return client;
                } catch (Exception e) {
                        LOG.error("Couldn't retrieve {} cluster.", 
customCLI.getId(), e);
@@ -896,7 +896,7 @@ public class CliFrontend {
                }
 
                // Avoid resolving the JobManager Gateway here to prevent 
blocking until we invoke the user's program.
-               final InetSocketAddress jobManagerAddress = 
client.getJobManagerAddressFromConfig();
+               final InetSocketAddress jobManagerAddress = 
client.getJobManagerAddress();
                logAndSysout("Using address " + 
jobManagerAddress.getHostString() + ":" + jobManagerAddress.getPort() + " to 
connect to JobManager.");
                logAndSysout("JobManager web interface address " + 
client.getWebInterfaceURL());
                return client;

http://git-wip-us.apache.org/repos/asf/flink/blob/72064558/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index 2e6a9cc..c3c666b 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -27,7 +27,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
-import akka.actor.ActorRef;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
@@ -57,6 +56,7 @@ import 
org.apache.flink.runtime.messages.accumulators.AccumulatorResultsFound;
 import 
org.apache.flink.runtime.messages.accumulators.RequestAccumulatorResults;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.net.ConnectionUtils;
+import org.apache.flink.runtime.util.LeaderConnectionInfo;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
@@ -232,27 +232,16 @@ public abstract class ClusterClient {
        }
 
        /**
-        * Gets the current JobManager address from the Flink configuration 
(may change in case of a HA setup).
-        * @return The address (host and port) of the leading JobManager when 
it was last retrieved (may be outdated)
-        */
-       public InetSocketAddress getJobManagerAddressFromConfig() {
-               try {
-                       String hostName = 
flinkConfig.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
-                       int port = 
flinkConfig.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
-                       return new InetSocketAddress(hostName, port);
-               } catch (Exception e) {
-                       throw new RuntimeException("Failed to retrieve 
JobManager address", e);
-               }
-       }
-
-       /**
         * Gets the current JobManager address (may change in case of a HA 
setup).
         * @return The address (host and port) of the leading JobManager
         */
        public InetSocketAddress getJobManagerAddress() {
                try {
-                       final ActorRef jmActor = getJobManagerGateway().actor();
-                       return 
AkkaUtils.getInetSockeAddressFromAkkaURL(jmActor.path().toSerializationFormat());
+                       LeaderConnectionInfo leaderConnectionInfo =
+                               
LeaderRetrievalUtils.retrieveLeaderConnectionInfo(
+                                       
LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig), timeout);
+
+                       return 
AkkaUtils.getInetSockeAddressFromAkkaURL(leaderConnectionInfo.getAddress());
                } catch (Exception e) {
                        throw new RuntimeException("Failed to retrieve 
JobManager address", e);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/72064558/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
index 2c6e101..d25c9d1 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
@@ -44,7 +44,7 @@ public class StandaloneClusterClient extends ClusterClient {
 
        @Override
        public String getWebInterfaceURL() {
-               String host = 
this.getJobManagerAddressFromConfig().getHostString();
+               String host = this.getJobManagerAddress().getHostString();
                int port = 
getFlinkConfiguration().getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY,
                        ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT);
                return "http://"; +  host + ":" + port;
@@ -75,7 +75,7 @@ public class StandaloneClusterClient extends ClusterClient {
        @Override
        public String getClusterIdentifier() {
                // Avoid blocking here by getting the address from the config 
without resolving the address
-               return "Standalone cluster with JobManager at " + 
this.getJobManagerAddressFromConfig();
+               return "Standalone cluster with JobManager at " + 
this.getJobManagerAddress();
        }
 
        @Override

Reply via email to