[FLINK-3667] delay connection to JobManager until job execution

- lazily initialize ActorSystem
- make sure it is not created before job execution
- print connection information on the CLI

This closes #2189


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8d589623
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8d589623
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8d589623

Branch: refs/heads/master
Commit: 8d589623d2c2d039b014bc8783bef25351ec36ce
Parents: b674fd5
Author: Maximilian Michels <[email protected]>
Authored: Thu Jun 30 17:41:42 2016 +0200
Committer: Maximilian Michels <[email protected]>
Committed: Fri Jul 1 15:22:31 2016 +0200

----------------------------------------------------------------------
 .../org/apache/flink/client/CliFrontend.java    |  18 ++-
 .../deployment/StandaloneClusterDescriptor.java |   2 +-
 .../flink/client/program/ClusterClient.java     |  94 ++++++++----
 .../client/program/StandaloneClusterClient.java |   5 +-
 .../apache/flink/client/CliFrontendRunTest.java |  11 +-
 .../TestingClusterClientWithoutActorSystem.java |  55 -------
 ...CliFrontendYarnAddressConfigurationTest.java |   4 +-
 .../flink/yarn/FlinkYarnSessionCliTest.java     |  10 +-
 .../apache/flink/yarn/YarnClusterClient.java    | 149 +++++++++++--------
 .../flink/yarn/cli/FlinkYarnSessionCli.java     |   7 +-
 10 files changed, 177 insertions(+), 178 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8d589623/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java 
b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 94f5cdb..1322f23 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -154,7 +154,6 @@ public class CliFrontend {
                LOG.info("Trying to load configuration file");
                
GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath());
                System.setProperty(ENV_CONFIG_DIRECTORY, 
configDirectory.getAbsolutePath());
-
                this.config = GlobalConfiguration.getConfiguration();
 
                try {
@@ -234,7 +233,7 @@ public class CliFrontend {
                ClusterClient client = null;
                try {
 
-                       client = getClient(options, program.getMainClassName());
+                       client = createClient(options, 
program.getMainClassName());
                        
client.setPrintStatusDuringExecution(options.getStdoutLogging());
                        client.setDetached(options.getDetachedMode());
                        LOG.debug("Client slots is set to {}", 
client.getMaxSlots());
@@ -810,7 +809,7 @@ public class CliFrontend {
                CustomCommandLine customCLI = 
getActiveCustomCommandLine(options.getCommandLine());
                try {
                        ClusterClient client = 
customCLI.retrieveCluster(options.getCommandLine(), config);
-                       LOG.info("Using address {} to connect to JobManager.", 
client.getJobManagerAddressFromConfig());
+                       logAndSysout("Using address " + 
client.getJobManagerAddressFromConfig() + " to connect to JobManager.");
                        return client;
                } catch (Exception e) {
                        LOG.error("Couldn't retrieve {} cluster.", 
customCLI.getId(), e);
@@ -827,6 +826,7 @@ public class CliFrontend {
         * @throws Exception
         */
        protected ActorGateway getJobManagerGateway(CommandLineOptions options) 
throws Exception {
+               logAndSysout("Retrieving JobManager.");
                return retrieveClient(options).getJobManagerGateway();
        }
 
@@ -836,7 +836,7 @@ public class CliFrontend {
         * @param programName Program name
         * @throws Exception
         */
-       protected ClusterClient getClient(
+       protected ClusterClient createClient(
                        CommandLineOptions options,
                        String programName) throws Exception {
 
@@ -846,12 +846,12 @@ public class CliFrontend {
                ClusterClient client;
                try {
                        client = 
activeCommandLine.retrieveCluster(options.getCommandLine(), config);
-                       logAndSysout("Cluster retrieved");
+                       logAndSysout("Cluster retrieved: " + 
client.getClusterIdentifier());
                } catch (UnsupportedOperationException e) {
                        try {
                                String applicationName = "Flink Application: " 
+ programName;
                                client = 
activeCommandLine.createCluster(applicationName, options.getCommandLine(), 
config);
-                               logAndSysout("Cluster started");
+                               logAndSysout("Cluster started: " + 
client.getClusterIdentifier());
                        } catch (UnsupportedOperationException e2) {
                                throw new IllegalConfigurationException(
                                        "The JobManager address is neither 
provided at the command-line, " +
@@ -859,7 +859,9 @@ public class CliFrontend {
                        }
                }
 
-               logAndSysout("Using address " + client.getJobManagerAddress() + 
" to connect to JobManager.");
+               // Avoid resolving the JobManager Gateway here to prevent 
blocking until we invoke the user's program.
+               final InetSocketAddress jobManagerAddress = 
client.getJobManagerAddressFromConfig();
+               logAndSysout("Using address " + 
jobManagerAddress.getHostString() + ":" + jobManagerAddress.getPort() + " to 
connect to JobManager.");
                logAndSysout("JobManager web interface address " + 
client.getWebInterfaceURL());
                return client;
        }
@@ -1054,7 +1056,7 @@ public class CliFrontend {
         * @param config The config to write to
         */
        public static void setJobManagerAddressInConfig(Configuration config, 
InetSocketAddress address) {
-               config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, 
address.getHostName());
+               config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, 
address.getHostString());
                config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 
address.getPort());
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8d589623/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
index 57ccc47..7a3d4d4 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
@@ -50,7 +50,7 @@ public class StandaloneClusterDescriptor implements 
ClusterDescriptor<Standalone
        }
 
        @Override
-       public StandaloneClusterClient deploy() {
+       public StandaloneClusterClient deploy() throws 
UnsupportedOperationException {
                throw new UnsupportedOperationException("Can't deploy a 
standalone cluster.");
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8d589623/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index d5057b8..6cb5abb 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -81,8 +81,8 @@ public abstract class ClusterClient {
        /** The optimizer used in the optimization of batch programs */
        final Optimizer compiler;
 
-       /** The actor system used to communicate with the JobManager */
-       protected final ActorSystem actorSystem;
+       /** The actor system used to communicate with the JobManager. Lazily 
initialized upon first use */
+       protected final LazyActorSystemLoader actorSystemLoader;
 
        /** Configuration of the client */
        protected final Configuration flinkConfig;
@@ -127,39 +127,74 @@ public abstract class ClusterClient {
                this.timeout = AkkaUtils.getClientTimeout(flinkConfig);
                this.lookupTimeout = AkkaUtils.getLookupTimeout(flinkConfig);
 
-               this.actorSystem = createActorSystem();
+               this.actorSystemLoader = new LazyActorSystemLoader(flinkConfig, 
LOG);
        }
 
        // 
------------------------------------------------------------------------
        //  Startup & Shutdown
        // 
------------------------------------------------------------------------
 
-       /**
-        * Method to create the ActorSystem of the Client. May be overriden in 
subclasses.
-        * @return ActorSystem
-        * @throws IOException
-        */
-       protected ActorSystem createActorSystem() throws IOException {
+       protected static class LazyActorSystemLoader {
+
+               private final Logger LOG;
+
+               private final Configuration flinkConfig;
+
+               private ActorSystem actorSystem;
 
-               if (actorSystem != null) {
-                       throw new RuntimeException("This method may only be 
called once.");
+               private LazyActorSystemLoader(Configuration flinkConfig, Logger 
LOG) {
+                       this.flinkConfig = flinkConfig;
+                       this.LOG = LOG;
                }
 
-               // start actor system
-               LOG.info("Starting client actor system.");
+               /**
+                * Indicates whether the ActorSystem has already been 
instantiated.
+                * @return boolean True if it exists, False otherwise
+                */
+               public boolean isLoaded() {
+                       return actorSystem != null;
+               }
 
-               String hostName = 
flinkConfig.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
-               int port = 
flinkConfig.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
-               if (hostName == null || port == -1) {
-                       throw new IOException("The initial JobManager address 
has not been set correctly.");
+               public void shutdown() {
+                       if (isLoaded()) {
+                               actorSystem.shutdown();
+                               actorSystem.awaitTermination();
+                               actorSystem = null;
+                       }
+               }
+
+               /**
+                * Creates a new ActorSystem or returns an existing one.
+                * @return ActorSystem
+                */
+               public ActorSystem get() {
+
+                       if (!isLoaded()) {
+                               // start actor system
+                               LOG.info("Starting client actor system.");
+
+                               String hostName = 
flinkConfig.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
+                               int port = 
flinkConfig.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
+                               if (hostName == null || port == -1) {
+                                       throw new RuntimeException("The initial 
JobManager address has not been set correctly.");
+                               }
+                               InetSocketAddress initialJobManagerAddress = 
new InetSocketAddress(hostName, port);
+
+                               // find name of own public interface, able to 
connect to the JM
+                               // try to find address for 2 seconds. log after 
400 ms.
+                               InetAddress ownHostname;
+                               try {
+                                       ownHostname = 
ConnectionUtils.findConnectingAddress(initialJobManagerAddress, 2000, 400);
+                               } catch (IOException e) {
+                                       throw new RuntimeException("Failed to 
resolve JobManager address at " + initialJobManagerAddress, e);
+                               }
+                               actorSystem = 
AkkaUtils.createActorSystem(flinkConfig,
+                                       new Some<>(new Tuple2<String, 
Object>(ownHostname.getCanonicalHostName(), 0)));
+                       }
+
+                       return actorSystem;
                }
-               InetSocketAddress initialJobManagerAddress = new 
InetSocketAddress(hostName, port);
 
-               // find name of own public interface, able to connect to the JM
-               // try to find address for 2 seconds. log after 400 ms.
-               InetAddress ownHostname = 
ConnectionUtils.findConnectingAddress(initialJobManagerAddress, 2000, 400);
-               return AkkaUtils.createActorSystem(flinkConfig,
-                       new Some<>(new Tuple2<String, 
Object>(ownHostname.getCanonicalHostName(), 0)));
        }
 
        /**
@@ -170,10 +205,7 @@ public abstract class ClusterClient {
                        try {
                                finalizeCluster();
                        } finally {
-                               if (!this.actorSystem.isTerminated()) {
-                                       this.actorSystem.shutdown();
-                                       this.actorSystem.awaitTermination();
-                               }
+                               this.actorSystemLoader.shutdown();
                        }
                }
        }
@@ -201,7 +233,7 @@ public abstract class ClusterClient {
 
        /**
         * Gets the current JobManager address from the Flink configuration 
(may change in case of a HA setup).
-        * @return The address (host and port) of the leading JobManager
+        * @return The address (host and port) of the leading JobManager when 
it was last retrieved (may be outdated)
         */
        public InetSocketAddress getJobManagerAddressFromConfig() {
                try {
@@ -375,7 +407,7 @@ public abstract class ClusterClient {
                try {
                        logAndSysout("Submitting job with JobID: " + 
jobGraph.getJobID() + ". Waiting for job completion.");
                        this.lastJobID = jobGraph.getJobID();
-                       return JobClient.submitJobAndWait(actorSystem,
+                       return 
JobClient.submitJobAndWait(actorSystemLoader.get(),
                                leaderRetrievalService, jobGraph, timeout, 
printStatusDuringExecution, classLoader);
                } catch (JobExecutionException e) {
                        throw new ProgramInvocationException("The program 
execution failed: " + e.getMessage(), e);
@@ -614,7 +646,7 @@ public abstract class ClusterClient {
 
                return LeaderRetrievalUtils.retrieveLeaderGateway(
                        
LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig),
-                       actorSystem,
+                       actorSystemLoader.get(),
                        lookupTimeout);
        }
 
@@ -652,7 +684,7 @@ public abstract class ClusterClient {
        /**
         * Returns a string representation of the cluster.
         */
-       protected abstract String getClusterIdentifier();
+       public abstract String getClusterIdentifier();
 
        /**
         * Request the cluster to shut down or disconnect.

http://git-wip-us.apache.org/repos/asf/flink/blob/8d589623/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
index 82f350a..2c6e101 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
@@ -44,7 +44,7 @@ public class StandaloneClusterClient extends ClusterClient {
 
        @Override
        public String getWebInterfaceURL() {
-               String host = this.getJobManagerAddress().getHostName();
+               String host = 
this.getJobManagerAddressFromConfig().getHostString();
                int port = 
getFlinkConfiguration().getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY,
                        ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT);
                return "http://"; +  host + ":" + port;
@@ -74,7 +74,8 @@ public class StandaloneClusterClient extends ClusterClient {
 
        @Override
        public String getClusterIdentifier() {
-               return "Standalone cluster with JobManager running at " + 
this.getJobManagerAddress();
+               // Avoid blocking here by getting the address from the config 
without resolving the address
+               return "Standalone cluster with JobManager at " + 
this.getJobManagerAddressFromConfig();
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/8d589623/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java 
b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
index fa554c6..f710d8e 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
@@ -118,13 +118,13 @@ public class CliFrontendRunTest {
        }
 
        // 
--------------------------------------------------------------------------------------------
-       
+
        public static final class RunTestingCliFrontend extends CliFrontend {
-               
+
                private final int expectedParallelism;
                private final boolean sysoutLogging;
                private final boolean isDetached;
-               
+
                public RunTestingCliFrontend(int expectedParallelism, boolean 
logging, boolean isDetached) throws Exception {
                        super(CliFrontendTestUtils.getConfigDir());
                        this.expectedParallelism = expectedParallelism;
@@ -139,10 +139,5 @@ public class CliFrontendRunTest {
                        assertEquals(expectedParallelism, parallelism);
                        return 0;
                }
-
-               @Override
-               protected ClusterClient getClient(CommandLineOptions options, 
String programName) throws Exception {
-                       return TestingClusterClientWithoutActorSystem.create();
-               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8d589623/flink-clients/src/test/java/org/apache/flink/client/TestingClusterClientWithoutActorSystem.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/TestingClusterClientWithoutActorSystem.java
 
b/flink-clients/src/test/java/org/apache/flink/client/TestingClusterClientWithoutActorSystem.java
deleted file mode 100644
index ab608cb..0000000
--- 
a/flink-clients/src/test/java/org/apache/flink/client/TestingClusterClientWithoutActorSystem.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.client;
-
-import akka.actor.ActorSystem;
-import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.client.program.StandaloneClusterClient;
-import org.apache.flink.configuration.Configuration;
-import org.mockito.Mockito;
-
-import java.io.IOException;
-
-/**
- * A client to use in tests which does not instantiate an ActorSystem.
- */
-public class TestingClusterClientWithoutActorSystem extends 
StandaloneClusterClient {
-
-       private TestingClusterClientWithoutActorSystem() throws IOException {
-               super(new Configuration());
-       }
-
-       /**
-        * Do not instantiate the Actor System to save resources.
-        * @return Mocked ActorSystem
-        * @throws IOException
-        */
-       @Override
-       protected ActorSystem createActorSystem() throws IOException {
-               return Mockito.mock(ActorSystem.class);
-       }
-
-       public static ClusterClient create() {
-               try {
-                       return new TestingClusterClientWithoutActorSystem();
-               } catch (IOException e) {
-                       throw new RuntimeException("Could not create 
TestingClientWithoutActorSystem.", e);
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8d589623/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
index 5c10de8..323c10b 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
@@ -303,8 +303,8 @@ public class CliFrontendYarnAddressConfigurationTest {
 
                @Override
                // make method public
-               public ClusterClient getClient(CommandLineOptions options, 
String programName) throws Exception {
-                       return super.getClient(options, programName);
+               public ClusterClient createClient(CommandLineOptions options, 
String programName) throws Exception {
+                       return super.createClient(options, programName);
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/8d589623/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
index 2d08bee..ba249c2 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
@@ -134,6 +134,7 @@ public class FlinkYarnSessionCliTest {
                Assert.assertEquals(6, client.getMaxSlots());
        }
 
+
        private static class TestCLI extends FlinkYarnSessionCli {
 
                public TestCLI(String shortPrefix, String longPrefix) {
@@ -143,7 +144,7 @@ public class FlinkYarnSessionCliTest {
                private static class JarAgnosticClusterDescriptor extends 
YarnClusterDescriptor {
                        @Override
                        public void setLocalJarPath(Path localJarPath) {
-//                             setLocalJarPath("/tmp");
+                               // add nothing
                        }
                }
 
@@ -160,12 +161,7 @@ public class FlinkYarnSessionCliTest {
                                Mockito.mock(YarnClient.class),
                                Mockito.mock(ApplicationReport.class),
                                config,
-                               new Path("/tmp"), true);
-               }
-
-               @Override
-               protected ActorSystem createActorSystem() throws IOException {
-                       return Mockito.mock(ActorSystem.class);
+                               new Path("/tmp"), false);
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8d589623/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
index 9518f75..dfc71e0 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
@@ -81,14 +81,14 @@ public class YarnClusterClient extends ClusterClient {
        //---------- Class internal fields -------------------
 
        private final AbstractYarnClusterDescriptor clusterDescriptor;
-       private final ActorRef applicationClient;
+       private final LazApplicationClientLoader applicationClient;
        private final FiniteDuration akkaDuration;
        private final Timeout akkaTimeout;
-       private final ApplicationReport applicationId;
+       private final ApplicationReport appReport;
        private final ApplicationId appId;
        private final String trackingURL;
 
-       private boolean isConnected = false;
+       private boolean isConnected = true;
 
        private final boolean perJobCluster;
 
@@ -120,63 +120,18 @@ public class YarnClusterClient extends ClusterClient {
                this.yarnClient = yarnClient;
                this.hadoopConfig = yarnClient.getConfig();
                this.sessionFilesDir = sessionFilesDir;
-               this.applicationId = appReport;
+               this.appReport = appReport;
                this.appId = appReport.getApplicationId();
                this.trackingURL = appReport.getTrackingUrl();
                this.perJobCluster = perJobCluster;
 
-               /* The leader retrieval service for connecting to the cluster 
and finding the active leader. */
-               LeaderRetrievalService leaderRetrievalService;
-               try {
-                       leaderRetrievalService = 
LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig);
-               } catch (Exception e) {
-                       throw new IOException("Could not create the leader 
retrieval service.", e);
-               }
-
-               // start application client
-               LOG.info("Start application client.");
-
-               applicationClient = actorSystem.actorOf(
-                       Props.create(
-                               ApplicationClient.class,
-                               flinkConfig,
-                               leaderRetrievalService),
-                       "applicationClient");
+               this.applicationClient = new LazApplicationClientLoader();
 
                pollingRunner = new PollingThread(yarnClient, appId);
                pollingRunner.setDaemon(true);
                pollingRunner.start();
 
                Runtime.getRuntime().addShutdownHook(clientShutdownHook);
-
-               isConnected = true;
-
-               if (perJobCluster) {
-
-                       logAndSysout("Waiting until all TaskManagers have 
connected");
-
-                       for (GetClusterStatusResponse currentStatus, lastStatus 
= null;; lastStatus = currentStatus) {
-                               currentStatus = getClusterStatus();
-                               if (currentStatus != null && 
!currentStatus.equals(lastStatus)) {
-                                       logAndSysout("TaskManager status (" + 
currentStatus.numRegisteredTaskManagers() + "/"
-                                               + 
clusterDescriptor.getTaskManagerCount() + ")");
-                                       if 
(currentStatus.numRegisteredTaskManagers() >= 
clusterDescriptor.getTaskManagerCount()) {
-                                               logAndSysout("All TaskManagers 
are connected");
-                                               break;
-                                       }
-                               } else if (lastStatus == null) {
-                                       logAndSysout("No status updates from 
the YARN cluster received so far. Waiting ...");
-                               }
-
-                               try {
-                                       Thread.sleep(250);
-                               } catch (InterruptedException e) {
-                                       LOG.error("Interrupted while waiting 
for TaskManagers");
-                                       System.err.println("Thread is 
interrupted");
-                                       throw new IOException("Interrupted 
while waiting for TaskManagers", e);
-                               }
-                       }
-               }
        }
 
        /**
@@ -219,7 +174,10 @@ public class YarnClusterClient extends ClusterClient {
         */
        private void stopAfterJob(JobID jobID) {
                Preconditions.checkNotNull(jobID, "The job id must not be 
null");
-               Future<Object> messageReceived = ask(applicationClient, new 
YarnMessages.LocalStopAMAfterJob(jobID), akkaTimeout);
+               Future<Object> messageReceived =
+                       ask(
+                               applicationClient.get(),
+                               new YarnMessages.LocalStopAMAfterJob(jobID), 
akkaTimeout);
                try {
                        Await.result(messageReceived, akkaDuration);
                } catch (Exception e) {
@@ -263,7 +221,7 @@ public class YarnClusterClient extends ClusterClient {
 
        @Override
        public String getClusterIdentifier() {
-               return applicationId.getApplicationId().toString();
+               return "Yarn cluster with application id " + 
appReport.getApplicationId();
        }
 
        /**
@@ -278,7 +236,11 @@ public class YarnClusterClient extends ClusterClient {
                        return null;
                }
 
-               Future<Object> clusterStatusOption = ask(applicationClient, 
YarnMessages.getLocalGetyarnClusterStatus(), akkaTimeout);
+               Future<Object> clusterStatusOption =
+                       ask(
+                               applicationClient.get(),
+                               YarnMessages.getLocalGetyarnClusterStatus(),
+                               akkaTimeout);
                Object clusterStatus;
                try {
                        clusterStatus = Await.result(clusterStatusOption, 
akkaDuration);
@@ -338,9 +300,11 @@ public class YarnClusterClient extends ClusterClient {
                while(true) {
                        Object result;
                        try {
-                               Future<Object> response = 
Patterns.ask(applicationClient,
-                                               
YarnMessages.getLocalGetYarnMessage(), new Timeout(akkaDuration));
-
+                               Future<Object> response =
+                                       Patterns.ask(
+                                               applicationClient.get(),
+                                               
YarnMessages.getLocalGetYarnMessage(),
+                                               new Timeout(akkaDuration));
                                result = Await.result(response, akkaDuration);
                        } catch(Exception ioe) {
                                LOG.warn("Error retrieving the YARN messages 
locally", ioe);
@@ -406,11 +370,12 @@ public class YarnClusterClient extends ClusterClient {
                        // we are already in the shutdown hook
                }
 
-               if(actorSystem != null){
+               if(actorSystemLoader.isLoaded()){
                        LOG.info("Sending shutdown request to the Application 
Master");
-                       if(applicationClient != ActorRef.noSender()) {
+                       if(applicationClient.get() != ActorRef.noSender()) {
                                try {
-                                       Future<Object> response = 
Patterns.ask(applicationClient,
+                                       Future<Object> response =
+                                               
Patterns.ask(applicationClient.get(),
                                                        new 
YarnMessages.LocalStopYarnSession(getApplicationStatus(),
                                                                        "Flink 
YARN Client requested shutdown"),
                                                        new 
Timeout(akkaDuration));
@@ -467,7 +432,7 @@ public class YarnClusterClient extends ClusterClient {
                                LOG.warn("Application failed. Diagnostics " + 
appReport.getDiagnostics());
                                LOG.warn("If log aggregation is activated in 
the Hadoop cluster, we recommend to retrieve "
                                        + "the full application log using this 
command:\n"
-                                       + "\tyarn logs -applicationId " + 
appReport.getApplicationId() + "\n"
+                                       + "\tyarn logs -appReport " + 
appReport.getApplicationId() + "\n"
                                        + "(It sometimes takes a few seconds 
until the logs are aggregated)");
                        }
                } catch (Exception e) {
@@ -553,4 +518,68 @@ public class YarnClusterClient extends ClusterClient {
        public boolean isDetached() {
                return super.isDetached() || clusterDescriptor.isDetachedMode();
        }
+
+       public ApplicationId getApplicationId() {
+               return appId;
+       }
+
+       protected class LazApplicationClientLoader {
+
+               private ActorRef applicationClient;
+
+               /**
+                * Creates a new ApplicationClient actor or returns an existing 
one. May start an ActorSystem.
+                * @return ActorSystem
+                */
+               public ActorRef get() {
+                       if (applicationClient == null) {
+                               /* The leader retrieval service for connecting 
to the cluster and finding the active leader. */
+                               LeaderRetrievalService leaderRetrievalService;
+                               try {
+                                       leaderRetrievalService = 
LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig);
+                               } catch (Exception e) {
+                                       throw new RuntimeException("Could not 
create the leader retrieval service.", e);
+                               }
+
+                               // start application client
+                               LOG.info("Start application client.");
+
+                               applicationClient = 
actorSystemLoader.get().actorOf(
+                                       Props.create(
+                                               ApplicationClient.class,
+                                               flinkConfig,
+                                               leaderRetrievalService),
+                                       "applicationClient");
+
+                               if (perJobCluster) {
+
+                                       logAndSysout("Waiting until all 
TaskManagers have connected");
+
+                                       for (GetClusterStatusResponse 
currentStatus, lastStatus = null;; lastStatus = currentStatus) {
+                                               currentStatus = 
getClusterStatus();
+                                               if (currentStatus != null && 
!currentStatus.equals(lastStatus)) {
+                                                       
logAndSysout("TaskManager status (" + currentStatus.numRegisteredTaskManagers() 
+ "/"
+                                                               + 
clusterDescriptor.getTaskManagerCount() + ")");
+                                                       if 
(currentStatus.numRegisteredTaskManagers() >= 
clusterDescriptor.getTaskManagerCount()) {
+                                                               
logAndSysout("All TaskManagers are connected");
+                                                               break;
+                                                       }
+                                               } else if (lastStatus == null) {
+                                                       logAndSysout("No status 
updates from the YARN cluster received so far. Waiting ...");
+                                               }
+
+                                               try {
+                                                       Thread.sleep(250);
+                                               } catch (InterruptedException 
e) {
+                                                       LOG.error("Interrupted 
while waiting for TaskManagers");
+                                                       
System.err.println("Thread is interrupted");
+                                                       throw new 
RuntimeException("Interrupted while waiting for TaskManagers", e);
+                                               }
+                                       }
+                               }
+                       }
+
+                       return applicationClient;
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8d589623/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index c0ad27e..a0225a7 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -30,7 +30,6 @@ import org.apache.flink.client.cli.CliFrontendParser;
 import org.apache.flink.client.cli.CustomCommandLine;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
 import org.apache.flink.yarn.YarnClusterDescriptor;
 import org.apache.flink.yarn.YarnClusterClient;
@@ -113,7 +112,7 @@ public class FlinkYarnSessionCli implements 
CustomCommandLine<YarnClusterClient>
        private final boolean acceptInteractiveInput;
        
        //------------------------------------ Internal fields 
-------------------------
-       private YarnClusterClient yarnCluster = null;
+       private YarnClusterClient yarnCluster;
        private boolean detachedMode = false;
 
        public FlinkYarnSessionCli(String shortPrefix, String longPrefix) {
@@ -555,7 +554,7 @@ public class FlinkYarnSessionCli implements 
CustomCommandLine<YarnClusterClient>
                        if (detachedMode) {
                                LOG.info("The Flink YARN client has been 
started in detached mode. In order to stop " +
                                        "Flink on YARN, use the following 
command or a YARN web interface to stop it:\n" +
-                                       "yarn application -kill 
"+yarnCluster.getClusterIdentifier());
+                                       "yarn application -kill " + 
APPLICATION_ID.getOpt());
                                yarnCluster.disconnect();
                        } else {
                                runInteractiveCli(yarnCluster, true);
@@ -608,7 +607,7 @@ public class FlinkYarnSessionCli implements 
CustomCommandLine<YarnClusterClient>
                                // print info and quit:
                                LOG.info("The Flink YARN client has been 
started in detached mode. In order to stop " +
                                                "Flink on YARN, use the 
following command or a YARN web interface to stop it:\n" +
-                                               "yarn application -kill " + 
yarnCluster.getClusterIdentifier() + "\n" +
+                                               "yarn application -kill " + 
yarnCluster.getApplicationId() + System.lineSeparator() +
                                                "Please also note that the 
temporary files of the YARN session in {} will not be removed.",
                                                
yarnDescriptor.getSessionFilesDir());
                                yarnCluster.disconnect();

Reply via email to