Repository: flink
Updated Branches:
  refs/heads/master d2211ed53 -> e2c2cf414


[FLINK-8465] [flip6] Retrieve correct leader component address in ClusterClient

Rename ClusterClient#getJobManagerAddress into #getClusterConnectionInfo. The
returned LeaderConnectionInfo contains the address of the leading cluster
component. In the old code this is the JobManager whereas in Flip-6 it is the
Dispatcher.

This closes #5321.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f53f8465
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f53f8465
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f53f8465

Branch: refs/heads/master
Commit: f53f846578c42f4c5248a3dde31df3ae4d69959b
Parents: d2211ed
Author: Till Rohrmann <[email protected]>
Authored: Mon Jan 29 11:30:48 2018 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Mon Jan 29 15:55:17 2018 +0100

----------------------------------------------------------------------
 .../flink/client/program/ClusterClient.java     | 22 ++++-----
 .../client/program/StandaloneClusterClient.java | 12 ++++-
 .../client/program/rest/RestClusterClient.java  | 10 ++++
 .../apache/flink/client/cli/DefaultCLITest.java | 17 ++++---
 .../LeaderRetrievalException.java               |  4 +-
 .../runtime/util/LeaderConnectionInfo.java      | 49 +++++++++++++++++++-
 .../runtime/util/LeaderRetrievalUtils.java      | 27 +++++++----
 .../apache/flink/runtime/akka/AkkaUtils.scala   | 29 +++++++-----
 .../org/apache/flink/api/scala/FlinkShell.scala | 15 ++++--
 .../flink/yarn/YARNSessionFIFOITCase.java       |  2 +-
 .../flink/yarn/cli/FlinkYarnSessionCli.java     | 34 ++++++++++----
 11 files changed, 162 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f53f8465/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index 1326ff1..0ef54b2 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -74,7 +74,6 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 
 import java.net.InetAddress;
-import java.net.InetSocketAddress;
 import java.net.URISyntaxException;
 import java.net.URL;
 import java.util.ArrayList;
@@ -294,20 +293,15 @@ public abstract class ClusterClient<T> {
        }
 
        /**
-        * Gets the current JobManager address (may change in case of a HA 
setup).
-        * @return The address (host and port) of the leading JobManager
+        * Gets the current cluster connection info (may change in case of a HA 
setup).
+        *
+        * @return The the connection info to the leader component of the 
cluster
+        * @throws LeaderRetrievalException if the leader could not be retrieved
         */
-       public InetSocketAddress getJobManagerAddress() {
-               try {
-                       LeaderConnectionInfo leaderConnectionInfo =
-                               
LeaderRetrievalUtils.retrieveLeaderConnectionInfo(
-                                       
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
-                                       timeout);
-
-                       return 
AkkaUtils.getInetSocketAddressFromAkkaURL(leaderConnectionInfo.getAddress());
-               } catch (Exception e) {
-                       throw new RuntimeException("Failed to retrieve 
JobManager address", e);
-               }
+       public LeaderConnectionInfo getClusterConnectionInfo() throws 
LeaderRetrievalException {
+               return LeaderRetrievalUtils.retrieveLeaderConnectionInfo(
+                       
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
+                       timeout);
        }
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/f53f8465/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
index df08c30..0b91ed4 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
@@ -22,12 +22,14 @@ import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.client.deployment.StandaloneClusterId;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.messages.GetClusterStatus;
 import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 
+import java.net.InetSocketAddress;
 import java.net.URL;
 import java.util.Collections;
 import java.util.List;
@@ -54,7 +56,15 @@ public class StandaloneClusterClient extends 
ClusterClient<StandaloneClusterId>
 
        @Override
        public String getWebInterfaceURL() {
-               String host = getJobManagerAddress().getHostString();
+               final InetSocketAddress inetSocketAddressFromAkkaURL;
+
+               try {
+                       inetSocketAddressFromAkkaURL = 
AkkaUtils.getInetSocketAddressFromAkkaURL(getClusterConnectionInfo().getAddress());
+               } catch (Exception e) {
+                       throw new RuntimeException("Could not retrieve leader 
retrieval information.", e);
+               }
+
+               String host = inetSocketAddressFromAkkaURL.getHostName();
                int port = getFlinkConfiguration().getInteger(WebOptions.PORT);
                return "http://"; +  host + ":" + port;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/f53f8465/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index 141af71..564990f 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.rest.RestClient;
 import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
@@ -70,6 +71,8 @@ import 
org.apache.flink.runtime.rest.messages.queue.AsynchronouslyCreatedResourc
 import org.apache.flink.runtime.rest.messages.queue.QueueStatus;
 import org.apache.flink.runtime.rest.util.RestClientException;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.runtime.util.LeaderConnectionInfo;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.runtime.webmonitor.retriever.LeaderRetriever;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.ExecutorUtils;
@@ -365,6 +368,13 @@ public class RestClusterClient<T> extends ClusterClient<T> 
{
                return clusterId;
        }
 
+       @Override
+       public LeaderConnectionInfo getClusterConnectionInfo() throws 
LeaderRetrievalException {
+               return LeaderRetrievalUtils.retrieveLeaderConnectionInfo(
+                       highAvailabilityServices.getDispatcherLeaderRetriever(),
+                       timeout);
+       }
+
        /**
         * Creates a {@code CompletableFuture} that polls a {@code 
AsynchronouslyCreatedResource} until
         * its {@link AsynchronouslyCreatedResource#queueStatus() QueueStatus} 
becomes

http://git-wip-us.apache.org/repos/asf/flink/blob/f53f8465/flink-clients/src/test/java/org/apache/flink/client/cli/DefaultCLITest.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/cli/DefaultCLITest.java 
b/flink-clients/src/test/java/org/apache/flink/client/cli/DefaultCLITest.java
index aaca798..d89e988 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/cli/DefaultCLITest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/cli/DefaultCLITest.java
@@ -22,15 +22,16 @@ import 
org.apache.flink.client.deployment.StandaloneClusterDescriptor;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.util.LeaderConnectionInfo;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.commons.cli.CommandLine;
-import org.junit.Assert;
+import org.hamcrest.Matchers;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
-import java.net.InetSocketAddress;
+import static org.junit.Assert.assertThat;
 
 /**
  * Tests for the {@link DefaultCLI}.
@@ -60,13 +61,14 @@ public class DefaultCLITest extends TestLogger {
 
                CommandLine commandLine = 
defaultCLI.parseCommandLineOptions(args, false);
 
-               final InetSocketAddress expectedAddress = new 
InetSocketAddress(localhost, port);
-
                final StandaloneClusterDescriptor clusterDescriptor = 
defaultCLI.createClusterDescriptor(commandLine);
 
                final ClusterClient<?> clusterClient = 
clusterDescriptor.retrieve(defaultCLI.getClusterId(commandLine));
 
-               Assert.assertEquals(expectedAddress, 
clusterClient.getJobManagerAddress());
+               final LeaderConnectionInfo clusterConnectionInfo = 
clusterClient.getClusterConnectionInfo();
+
+               assertThat(clusterConnectionInfo.getHostname(), 
Matchers.equalTo(localhost));
+               assertThat(clusterConnectionInfo.getPort(), 
Matchers.equalTo(port));
        }
 
        /**
@@ -93,9 +95,10 @@ public class DefaultCLITest extends TestLogger {
 
                final ClusterClient<?> clusterClient = 
clusterDescriptor.retrieve(defaultCLI.getClusterId(commandLine));
 
-               final InetSocketAddress expectedAddress = new 
InetSocketAddress(manualHostname, manualPort);
+               final LeaderConnectionInfo clusterConnectionInfo = 
clusterClient.getClusterConnectionInfo();
 
-               Assert.assertEquals(expectedAddress, 
clusterClient.getJobManagerAddress());
+               assertThat(clusterConnectionInfo.getHostname(), 
Matchers.equalTo(manualHostname));
+               assertThat(clusterConnectionInfo.getPort(), 
Matchers.equalTo(manualPort));
        }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f53f8465/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/LeaderRetrievalException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/LeaderRetrievalException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/LeaderRetrievalException.java
index 8d2a9b5..ff1c5fd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/LeaderRetrievalException.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/LeaderRetrievalException.java
@@ -18,11 +18,13 @@
 
 package org.apache.flink.runtime.leaderretrieval;
 
+import org.apache.flink.util.FlinkException;
+
 /**
  * This exception is thrown by the {@link 
org.apache.flink.runtime.util.LeaderRetrievalUtils} when
  * the method retrieveLeaderGateway fails to retrieve the current leader's 
gateway.
  */
-public class LeaderRetrievalException extends Exception {
+public class LeaderRetrievalException extends FlinkException {
 
        private static final long serialVersionUID = 42;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f53f8465/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderConnectionInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderConnectionInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderConnectionInfo.java
index aee023a..2c94c43 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderConnectionInfo.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderConnectionInfo.java
@@ -18,6 +18,12 @@
 
 package org.apache.flink.runtime.util;
 
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.util.FlinkException;
+
+import akka.actor.Address;
+
+import java.net.MalformedURLException;
 import java.util.UUID;
 
 /**
@@ -29,9 +35,34 @@ public class LeaderConnectionInfo {
 
        private final UUID leaderSessionID;
 
-       public LeaderConnectionInfo(String address, UUID leaderSessionID) {
+       private final String hostname;
+
+       private final int port;
+
+       public LeaderConnectionInfo(String address, UUID leaderSessionID) 
throws FlinkException {
                this.address = address;
                this.leaderSessionID = leaderSessionID;
+
+               final Address akkaAddress;
+               // this only works as long as the address is Akka based
+               try {
+                       akkaAddress = AkkaUtils.getAddressFromAkkaURL(address);
+               } catch (MalformedURLException e) {
+                       throw new FlinkException("Could not extract the 
hostname from the given address \'" +
+                               address + "\'.", e);
+               }
+
+               if (akkaAddress.host().isDefined()) {
+                       hostname = akkaAddress.host().get();
+               } else {
+                       hostname = "localhost";
+               }
+
+               if (akkaAddress.port().isDefined()) {
+                       port = (int) akkaAddress.port().get();
+               } else {
+                       port = -1;
+               }
        }
 
        public String getAddress() {
@@ -41,4 +72,20 @@ public class LeaderConnectionInfo {
        public UUID getLeaderSessionID() {
                return leaderSessionID;
        }
+
+       public String getHostname() {
+               return hostname;
+       }
+
+       public int getPort() {
+               return port;
+       }
+
+       @Override
+       public String toString() {
+               return "LeaderConnectionInfo{" +
+                       "address='" + address + '\'' +
+                       ", leaderSessionID=" + leaderSessionID +
+                       '}';
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f53f8465/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
index 6b861a3..aeaa2b9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
@@ -18,10 +18,6 @@
 
 package org.apache.flink.runtime.util;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.dispatch.Mapper;
-import akka.dispatch.OnComplete;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -34,16 +30,23 @@ import 
org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.net.ConnectionUtils;
+import org.apache.flink.util.FlinkException;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.dispatch.Mapper;
+import akka.dispatch.OnComplete;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.util.UUID;
+
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.Promise;
 import scala.concurrent.duration.FiniteDuration;
 
-import java.net.InetAddress;
-import java.util.UUID;
-
 /**
  * Utility class to work with {@link LeaderRetrievalService} class.
  */
@@ -241,8 +244,14 @@ public class LeaderRetrievalUtils {
 
                @Override
                public void notifyLeaderAddress(String leaderAddress, UUID 
leaderSessionID) {
-                       if(leaderAddress != null && !leaderAddress.equals("") 
&& !connectionInfo.isCompleted()) {
-                               connectionInfo.success(new 
LeaderConnectionInfo(leaderAddress, leaderSessionID));
+                       if (leaderAddress != null && !leaderAddress.equals("") 
&& !connectionInfo.isCompleted()) {
+                               try {
+                                       final LeaderConnectionInfo 
leaderConnectionInfo = new LeaderConnectionInfo(leaderAddress, leaderSessionID);
+                                       
connectionInfo.success(leaderConnectionInfo);
+                               } catch (FlinkException e) {
+                                       connectionInfo.failure(e);
+                               }
+
                        }
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f53f8465/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index f4b8069..0c6be5d 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -697,18 +697,12 @@ object AkkaUtils {
   def getInetSocketAddressFromAkkaURL(akkaURL: String): InetSocketAddress = {
     // AkkaURLs have the form schema://systemName@host:port/.... if it's a 
remote Akka URL
     try {
-      // we need to manually strip the protocol, because "akka.tcp" is not
-      // a valid protocol for Java's URL class
-      val protocolonPos = akkaURL.indexOf("://")
-      if (protocolonPos == -1 || protocolonPos >= akkaURL.length - 4) {
-        throw new MalformedURLException()
-      }
-      
-      val url = new URL("http://"; + akkaURL.substring(protocolonPos + 3))
-      if (url.getHost == null || url.getPort == -1) {
-        throw new MalformedURLException()
+      val address = getAddressFromAkkaURL(akkaURL)
+
+      (address.host, address.port) match {
+        case (Some(hostname), Some(portValue)) => new 
InetSocketAddress(hostname, portValue)
+        case _ => throw new MalformedURLException()
       }
-      new InetSocketAddress(url.getHost, url.getPort)
     }
     catch {
       case _ : MalformedURLException =>
@@ -716,6 +710,19 @@ object AkkaUtils {
     }
   }
 
+  /**
+    * Extracts the [[Address]] from the given akka URL.
+    *
+    * @param akkaURL to extract the [[Address]] from
+    * @throws java.net.MalformedURLException if the [[Address]] could not be 
parsed from
+    *                                        the given akka URL
+    * @return Extracted [[Address]] from the given akka URL
+    */
+  @throws(classOf[MalformedURLException])
+  def getAddressFromAkkaURL(akkaURL: String): Address = {
+    AddressFromURIString(akkaURL)
+  }
+
   def formatDurationParingErrorMessage: String = {
     "Duration format must be \"val unit\", where 'val' is a number and 'unit' 
is " +
       "(d|day)|(h|hour)|(min|minute)|s|sec|second)|(ms|milli|millisecond)|" +

http://git-wip-us.apache.org/repos/asf/flink/blob/f53f8465/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala 
b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
index 6fa6aa5..54ed05e 100644
--- 
a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
+++ 
b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
@@ -20,10 +20,11 @@ package org.apache.flink.api.scala
 
 import java.io._
 
-import org.apache.flink.client.cli.{CliFrontend, CliFrontendParser, RunOptions}
-import org.apache.flink.client.deployment.{ClusterDescriptor, 
StandaloneClusterId}
+import org.apache.flink.client.cli.{CliFrontend, CliFrontendParser}
+import org.apache.flink.client.deployment.ClusterDescriptor
 import org.apache.flink.client.program.ClusterClient
 import org.apache.flink.configuration.{Configuration, GlobalConfiguration, 
JobManagerOptions}
+import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.minicluster.StandaloneMiniCluster
 
 import scala.collection.mutable.ArrayBuffer
@@ -270,8 +271,11 @@ object FlinkShell {
 
     val cluster = clusterDescriptor.deploySessionCluster(clusterSpecification)
 
-    val address = cluster.getJobManagerAddress.getAddress.getHostAddress
-    val port = cluster.getJobManagerAddress.getPort
+    val inetSocketAddress = AkkaUtils.getInetSocketAddressFromAkkaURL(
+      cluster.getClusterConnectionInfo.getAddress)
+
+    val address = inetSocketAddress.getAddress.getHostAddress
+    val port = inetSocketAddress.getPort
 
     (address, port, Some(Right(cluster)))
   }
@@ -307,7 +311,8 @@ object FlinkShell {
       throw new RuntimeException("Yarn Cluster could not be retrieved.")
     }
 
-    val jobManager = cluster.getJobManagerAddress
+    val jobManager = AkkaUtils.getInetSocketAddressFromAkkaURL(
+      cluster.getClusterConnectionInfo.getAddress)
 
     (jobManager.getHostString, jobManager.getPort, None)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/f53f8465/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index ec6c105..975dd28 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -272,7 +272,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
                        }
 
                        // use the cluster
-                       
Assert.assertNotNull(yarnCluster.getJobManagerAddress());
+                       
Assert.assertNotNull(yarnCluster.getClusterConnectionInfo());
                        Assert.assertNotNull(yarnCluster.getWebInterfaceURL());
 
                        LOG.info("Shutting down cluster. All tests passed");

http://git-wip-us.apache.org/repos/asf/flink/blob/f53f8465/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index cc7f4c1..3ab8de7 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -34,6 +34,7 @@ import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusRespon
 import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
 import org.apache.flink.runtime.security.SecurityConfiguration;
 import org.apache.flink.runtime.security.SecurityUtils;
+import org.apache.flink.runtime.util.LeaderConnectionInfo;
 import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
@@ -596,17 +597,32 @@ public class FlinkYarnSessionCli extends 
AbstractCustomCommandLine<ApplicationId
                                        //------------------ ClusterClient 
deployed, handle connection details
                                        yarnApplicationId = 
clusterClient.getClusterId();
 
-                                       String jobManagerAddress =
-                                               
clusterClient.getJobManagerAddress().getAddress().getHostName() +
-                                                       ':' + 
clusterClient.getJobManagerAddress().getPort();
+                                       try {
+                                               final LeaderConnectionInfo 
connectionInfo = clusterClient.getClusterConnectionInfo();
 
-                                       System.out.println("Flink JobManager is 
now running on " + jobManagerAddress);
-                                       System.out.println("JobManager Web 
Interface: " + clusterClient.getWebInterfaceURL());
+                                               System.out.println("Flink 
JobManager is now running on " + connectionInfo.getHostname() +
+                                                       ':' + 
connectionInfo.getPort() + " with leader id " + 
connectionInfo.getLeaderSessionID() + '.');
+                                               System.out.println("JobManager 
Web Interface: " + clusterClient.getWebInterfaceURL());
 
-                                       writeYarnPropertiesFile(
-                                               yarnApplicationId,
-                                               
clusterSpecification.getNumberTaskManagers() * 
clusterSpecification.getSlotsPerTaskManager(),
-                                               
yarnClusterDescriptor.getDynamicPropertiesEncoded());
+                                               writeYarnPropertiesFile(
+                                                       yarnApplicationId,
+                                                       
clusterSpecification.getNumberTaskManagers() * 
clusterSpecification.getSlotsPerTaskManager(),
+                                                       
yarnClusterDescriptor.getDynamicPropertiesEncoded());
+                                       } catch (Exception e) {
+                                               try {
+                                                       
clusterClient.shutdown();
+                                               } catch (Exception ex) {
+                                                       LOG.info("Could not 
properly shutdown cluster client.", ex);
+                                               }
+
+                                               try {
+                                                       
yarnClusterDescriptor.terminateCluster(yarnApplicationId);
+                                               } catch (FlinkException fe) {
+                                                       LOG.info("Could not 
properly terminate the Flink cluster.", fe);
+                                               }
+
+                                               throw new FlinkException("Could 
not write the Yarn connection information.", e);
+                                       }
                                }
 
                                if (detachedMode) {

Reply via email to