[FLINK-4454] always display JobManager address using LeaderRetrievalService
This closes #2406 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/72064558 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/72064558 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/72064558 Branch: refs/heads/flip-6 Commit: 720645587bc58a22db6a8d948f91384da2ecb7b7 Parents: 844c874 Author: Maximilian Michels <m...@apache.org> Authored: Mon Aug 22 18:11:45 2016 +0200 Committer: Maximilian Michels <m...@apache.org> Committed: Wed Aug 24 11:29:30 2016 +0200 ---------------------------------------------------------------------- .../org/apache/flink/client/CliFrontend.java | 4 ++-- .../flink/client/program/ClusterClient.java | 23 +++++--------------- .../client/program/StandaloneClusterClient.java | 4 ++-- 3 files changed, 10 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/72064558/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java index 15e1362..c90bc29 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java @@ -845,7 +845,7 @@ public class CliFrontend { CustomCommandLine customCLI = getActiveCustomCommandLine(options.getCommandLine()); try { ClusterClient client = customCLI.retrieveCluster(options.getCommandLine(), config); - logAndSysout("Using address " + client.getJobManagerAddressFromConfig() + " to connect to JobManager."); + logAndSysout("Using address " + client.getJobManagerAddress() + " to connect to JobManager."); return client; } catch (Exception e) { LOG.error("Couldn't retrieve {} cluster.", customCLI.getId(), e); @@ -896,7 +896,7 @@ public class CliFrontend { } // Avoid resolving the JobManager Gateway here to prevent blocking until we invoke the user's program. - final InetSocketAddress jobManagerAddress = client.getJobManagerAddressFromConfig(); + final InetSocketAddress jobManagerAddress = client.getJobManagerAddress(); logAndSysout("Using address " + jobManagerAddress.getHostString() + ":" + jobManagerAddress.getPort() + " to connect to JobManager."); logAndSysout("JobManager web interface address " + client.getWebInterfaceURL()); return client; http://git-wip-us.apache.org/repos/asf/flink/blob/72064558/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java index 2e6a9cc..c3c666b 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java @@ -27,7 +27,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import akka.actor.ActorRef; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobSubmissionResult; @@ -57,6 +56,7 @@ import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsFound; import org.apache.flink.runtime.messages.accumulators.RequestAccumulatorResults; import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.net.ConnectionUtils; +import org.apache.flink.runtime.util.LeaderConnectionInfo; import org.apache.flink.runtime.util.LeaderRetrievalUtils; import org.apache.flink.util.Preconditions; import org.apache.flink.util.SerializedValue; @@ -232,27 +232,16 @@ public abstract class ClusterClient { } /** - * Gets the current JobManager address from the Flink configuration (may change in case of a HA setup). - * @return The address (host and port) of the leading JobManager when it was last retrieved (may be outdated) - */ - public InetSocketAddress getJobManagerAddressFromConfig() { - try { - String hostName = flinkConfig.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null); - int port = flinkConfig.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1); - return new InetSocketAddress(hostName, port); - } catch (Exception e) { - throw new RuntimeException("Failed to retrieve JobManager address", e); - } - } - - /** * Gets the current JobManager address (may change in case of a HA setup). * @return The address (host and port) of the leading JobManager */ public InetSocketAddress getJobManagerAddress() { try { - final ActorRef jmActor = getJobManagerGateway().actor(); - return AkkaUtils.getInetSockeAddressFromAkkaURL(jmActor.path().toSerializationFormat()); + LeaderConnectionInfo leaderConnectionInfo = + LeaderRetrievalUtils.retrieveLeaderConnectionInfo( + LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig), timeout); + + return AkkaUtils.getInetSockeAddressFromAkkaURL(leaderConnectionInfo.getAddress()); } catch (Exception e) { throw new RuntimeException("Failed to retrieve JobManager address", e); } http://git-wip-us.apache.org/repos/asf/flink/blob/72064558/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java index 2c6e101..d25c9d1 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java @@ -44,7 +44,7 @@ public class StandaloneClusterClient extends ClusterClient { @Override public String getWebInterfaceURL() { - String host = this.getJobManagerAddressFromConfig().getHostString(); + String host = this.getJobManagerAddress().getHostString(); int port = getFlinkConfiguration().getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT); return "http://" + host + ":" + port; @@ -75,7 +75,7 @@ public class StandaloneClusterClient extends ClusterClient { @Override public String getClusterIdentifier() { // Avoid blocking here by getting the address from the config without resolving the address - return "Standalone cluster with JobManager at " + this.getJobManagerAddressFromConfig(); + return "Standalone cluster with JobManager at " + this.getJobManagerAddress(); } @Override