[FLINK-3667] delay connection to JobManager until job execution - lazily initialize ActorSystem - make sure it is not created before job execution - print connection information on the CLI
This closes #2189 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8d589623 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8d589623 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8d589623 Branch: refs/heads/master Commit: 8d589623d2c2d039b014bc8783bef25351ec36ce Parents: b674fd5 Author: Maximilian Michels <[email protected]> Authored: Thu Jun 30 17:41:42 2016 +0200 Committer: Maximilian Michels <[email protected]> Committed: Fri Jul 1 15:22:31 2016 +0200 ---------------------------------------------------------------------- .../org/apache/flink/client/CliFrontend.java | 18 ++- .../deployment/StandaloneClusterDescriptor.java | 2 +- .../flink/client/program/ClusterClient.java | 94 ++++++++---- .../client/program/StandaloneClusterClient.java | 5 +- .../apache/flink/client/CliFrontendRunTest.java | 11 +- .../TestingClusterClientWithoutActorSystem.java | 55 ------- ...CliFrontendYarnAddressConfigurationTest.java | 4 +- .../flink/yarn/FlinkYarnSessionCliTest.java | 10 +- .../apache/flink/yarn/YarnClusterClient.java | 149 +++++++++++-------- .../flink/yarn/cli/FlinkYarnSessionCli.java | 7 +- 10 files changed, 177 insertions(+), 178 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/8d589623/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java index 94f5cdb..1322f23 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java @@ -154,7 +154,6 @@ public class CliFrontend { LOG.info("Trying to load configuration file"); GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath()); System.setProperty(ENV_CONFIG_DIRECTORY, configDirectory.getAbsolutePath()); - this.config = GlobalConfiguration.getConfiguration(); try { @@ -234,7 +233,7 @@ public class CliFrontend { ClusterClient client = null; try { - client = getClient(options, program.getMainClassName()); + client = createClient(options, program.getMainClassName()); client.setPrintStatusDuringExecution(options.getStdoutLogging()); client.setDetached(options.getDetachedMode()); LOG.debug("Client slots is set to {}", client.getMaxSlots()); @@ -810,7 +809,7 @@ public class CliFrontend { CustomCommandLine customCLI = getActiveCustomCommandLine(options.getCommandLine()); try { ClusterClient client = customCLI.retrieveCluster(options.getCommandLine(), config); - LOG.info("Using address {} to connect to JobManager.", client.getJobManagerAddressFromConfig()); + logAndSysout("Using address " + client.getJobManagerAddressFromConfig() + " to connect to JobManager."); return client; } catch (Exception e) { LOG.error("Couldn't retrieve {} cluster.", customCLI.getId(), e); @@ -827,6 +826,7 @@ public class CliFrontend { * @throws Exception */ protected ActorGateway getJobManagerGateway(CommandLineOptions options) throws Exception { + logAndSysout("Retrieving JobManager."); return retrieveClient(options).getJobManagerGateway(); } @@ -836,7 +836,7 @@ public class CliFrontend { * @param programName Program name * @throws Exception */ - protected ClusterClient getClient( + protected ClusterClient createClient( CommandLineOptions options, String programName) throws Exception { @@ -846,12 +846,12 @@ public class CliFrontend { ClusterClient client; try { client = activeCommandLine.retrieveCluster(options.getCommandLine(), config); - logAndSysout("Cluster retrieved"); + logAndSysout("Cluster retrieved: " + client.getClusterIdentifier()); } catch (UnsupportedOperationException e) { try { String applicationName = "Flink Application: " + programName; client = activeCommandLine.createCluster(applicationName, options.getCommandLine(), config); - logAndSysout("Cluster started"); + logAndSysout("Cluster started: " + client.getClusterIdentifier()); } catch (UnsupportedOperationException e2) { throw new IllegalConfigurationException( "The JobManager address is neither provided at the command-line, " + @@ -859,7 +859,9 @@ public class CliFrontend { } } - logAndSysout("Using address " + client.getJobManagerAddress() + " to connect to JobManager."); + // Avoid resolving the JobManager Gateway here to prevent blocking until we invoke the user's program. + final InetSocketAddress jobManagerAddress = client.getJobManagerAddressFromConfig(); + logAndSysout("Using address " + jobManagerAddress.getHostString() + ":" + jobManagerAddress.getPort() + " to connect to JobManager."); logAndSysout("JobManager web interface address " + client.getWebInterfaceURL()); return client; } @@ -1054,7 +1056,7 @@ public class CliFrontend { * @param config The config to write to */ public static void setJobManagerAddressInConfig(Configuration config, InetSocketAddress address) { - config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, address.getHostName()); + config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, address.getHostString()); config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, address.getPort()); } http://git-wip-us.apache.org/repos/asf/flink/blob/8d589623/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java index 57ccc47..7a3d4d4 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java @@ -50,7 +50,7 @@ public class StandaloneClusterDescriptor implements ClusterDescriptor<Standalone } @Override - public StandaloneClusterClient deploy() { + public StandaloneClusterClient deploy() throws UnsupportedOperationException { throw new UnsupportedOperationException("Can't deploy a standalone cluster."); } } http://git-wip-us.apache.org/repos/asf/flink/blob/8d589623/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java index d5057b8..6cb5abb 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java @@ -81,8 +81,8 @@ public abstract class ClusterClient { /** The optimizer used in the optimization of batch programs */ final Optimizer compiler; - /** The actor system used to communicate with the JobManager */ - protected final ActorSystem actorSystem; + /** The actor system used to communicate with the JobManager. Lazily initialized upon first use */ + protected final LazyActorSystemLoader actorSystemLoader; /** Configuration of the client */ protected final Configuration flinkConfig; @@ -127,39 +127,74 @@ public abstract class ClusterClient { this.timeout = AkkaUtils.getClientTimeout(flinkConfig); this.lookupTimeout = AkkaUtils.getLookupTimeout(flinkConfig); - this.actorSystem = createActorSystem(); + this.actorSystemLoader = new LazyActorSystemLoader(flinkConfig, LOG); } // ------------------------------------------------------------------------ // Startup & Shutdown // ------------------------------------------------------------------------ - /** - * Method to create the ActorSystem of the Client. May be overriden in subclasses. - * @return ActorSystem - * @throws IOException - */ - protected ActorSystem createActorSystem() throws IOException { + protected static class LazyActorSystemLoader { + + private final Logger LOG; + + private final Configuration flinkConfig; + + private ActorSystem actorSystem; - if (actorSystem != null) { - throw new RuntimeException("This method may only be called once."); + private LazyActorSystemLoader(Configuration flinkConfig, Logger LOG) { + this.flinkConfig = flinkConfig; + this.LOG = LOG; } - // start actor system - LOG.info("Starting client actor system."); + /** + * Indicates whether the ActorSystem has already been instantiated. + * @return boolean True if it exists, False otherwise + */ + public boolean isLoaded() { + return actorSystem != null; + } - String hostName = flinkConfig.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null); - int port = flinkConfig.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1); - if (hostName == null || port == -1) { - throw new IOException("The initial JobManager address has not been set correctly."); + public void shutdown() { + if (isLoaded()) { + actorSystem.shutdown(); + actorSystem.awaitTermination(); + actorSystem = null; + } + } + + /** + * Creates a new ActorSystem or returns an existing one. + * @return ActorSystem + */ + public ActorSystem get() { + + if (!isLoaded()) { + // start actor system + LOG.info("Starting client actor system."); + + String hostName = flinkConfig.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null); + int port = flinkConfig.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1); + if (hostName == null || port == -1) { + throw new RuntimeException("The initial JobManager address has not been set correctly."); + } + InetSocketAddress initialJobManagerAddress = new InetSocketAddress(hostName, port); + + // find name of own public interface, able to connect to the JM + // try to find address for 2 seconds. log after 400 ms. + InetAddress ownHostname; + try { + ownHostname = ConnectionUtils.findConnectingAddress(initialJobManagerAddress, 2000, 400); + } catch (IOException e) { + throw new RuntimeException("Failed to resolve JobManager address at " + initialJobManagerAddress, e); + } + actorSystem = AkkaUtils.createActorSystem(flinkConfig, + new Some<>(new Tuple2<String, Object>(ownHostname.getCanonicalHostName(), 0))); + } + + return actorSystem; } - InetSocketAddress initialJobManagerAddress = new InetSocketAddress(hostName, port); - // find name of own public interface, able to connect to the JM - // try to find address for 2 seconds. log after 400 ms. - InetAddress ownHostname = ConnectionUtils.findConnectingAddress(initialJobManagerAddress, 2000, 400); - return AkkaUtils.createActorSystem(flinkConfig, - new Some<>(new Tuple2<String, Object>(ownHostname.getCanonicalHostName(), 0))); } /** @@ -170,10 +205,7 @@ public abstract class ClusterClient { try { finalizeCluster(); } finally { - if (!this.actorSystem.isTerminated()) { - this.actorSystem.shutdown(); - this.actorSystem.awaitTermination(); - } + this.actorSystemLoader.shutdown(); } } } @@ -201,7 +233,7 @@ public abstract class ClusterClient { /** * Gets the current JobManager address from the Flink configuration (may change in case of a HA setup). - * @return The address (host and port) of the leading JobManager + * @return The address (host and port) of the leading JobManager when it was last retrieved (may be outdated) */ public InetSocketAddress getJobManagerAddressFromConfig() { try { @@ -375,7 +407,7 @@ public abstract class ClusterClient { try { logAndSysout("Submitting job with JobID: " + jobGraph.getJobID() + ". Waiting for job completion."); this.lastJobID = jobGraph.getJobID(); - return JobClient.submitJobAndWait(actorSystem, + return JobClient.submitJobAndWait(actorSystemLoader.get(), leaderRetrievalService, jobGraph, timeout, printStatusDuringExecution, classLoader); } catch (JobExecutionException e) { throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), e); @@ -614,7 +646,7 @@ public abstract class ClusterClient { return LeaderRetrievalUtils.retrieveLeaderGateway( LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig), - actorSystem, + actorSystemLoader.get(), lookupTimeout); } @@ -652,7 +684,7 @@ public abstract class ClusterClient { /** * Returns a string representation of the cluster. */ - protected abstract String getClusterIdentifier(); + public abstract String getClusterIdentifier(); /** * Request the cluster to shut down or disconnect. http://git-wip-us.apache.org/repos/asf/flink/blob/8d589623/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java index 82f350a..2c6e101 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java @@ -44,7 +44,7 @@ public class StandaloneClusterClient extends ClusterClient { @Override public String getWebInterfaceURL() { - String host = this.getJobManagerAddress().getHostName(); + String host = this.getJobManagerAddressFromConfig().getHostString(); int port = getFlinkConfiguration().getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT); return "http://" + host + ":" + port; @@ -74,7 +74,8 @@ public class StandaloneClusterClient extends ClusterClient { @Override public String getClusterIdentifier() { - return "Standalone cluster with JobManager running at " + this.getJobManagerAddress(); + // Avoid blocking here by getting the address from the config without resolving the address + return "Standalone cluster with JobManager at " + this.getJobManagerAddressFromConfig(); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/8d589623/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java index fa554c6..f710d8e 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java @@ -118,13 +118,13 @@ public class CliFrontendRunTest { } // -------------------------------------------------------------------------------------------- - + public static final class RunTestingCliFrontend extends CliFrontend { - + private final int expectedParallelism; private final boolean sysoutLogging; private final boolean isDetached; - + public RunTestingCliFrontend(int expectedParallelism, boolean logging, boolean isDetached) throws Exception { super(CliFrontendTestUtils.getConfigDir()); this.expectedParallelism = expectedParallelism; @@ -139,10 +139,5 @@ public class CliFrontendRunTest { assertEquals(expectedParallelism, parallelism); return 0; } - - @Override - protected ClusterClient getClient(CommandLineOptions options, String programName) throws Exception { - return TestingClusterClientWithoutActorSystem.create(); - } } } http://git-wip-us.apache.org/repos/asf/flink/blob/8d589623/flink-clients/src/test/java/org/apache/flink/client/TestingClusterClientWithoutActorSystem.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/test/java/org/apache/flink/client/TestingClusterClientWithoutActorSystem.java b/flink-clients/src/test/java/org/apache/flink/client/TestingClusterClientWithoutActorSystem.java deleted file mode 100644 index ab608cb..0000000 --- a/flink-clients/src/test/java/org/apache/flink/client/TestingClusterClientWithoutActorSystem.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.client; - -import akka.actor.ActorSystem; -import org.apache.flink.client.program.ClusterClient; -import org.apache.flink.client.program.StandaloneClusterClient; -import org.apache.flink.configuration.Configuration; -import org.mockito.Mockito; - -import java.io.IOException; - -/** - * A client to use in tests which does not instantiate an ActorSystem. - */ -public class TestingClusterClientWithoutActorSystem extends StandaloneClusterClient { - - private TestingClusterClientWithoutActorSystem() throws IOException { - super(new Configuration()); - } - - /** - * Do not instantiate the Actor System to save resources. - * @return Mocked ActorSystem - * @throws IOException - */ - @Override - protected ActorSystem createActorSystem() throws IOException { - return Mockito.mock(ActorSystem.class); - } - - public static ClusterClient create() { - try { - return new TestingClusterClientWithoutActorSystem(); - } catch (IOException e) { - throw new RuntimeException("Could not create TestingClientWithoutActorSystem.", e); - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/8d589623/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java index 5c10de8..323c10b 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java @@ -303,8 +303,8 @@ public class CliFrontendYarnAddressConfigurationTest { @Override // make method public - public ClusterClient getClient(CommandLineOptions options, String programName) throws Exception { - return super.getClient(options, programName); + public ClusterClient createClient(CommandLineOptions options, String programName) throws Exception { + return super.createClient(options, programName); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/8d589623/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java index 2d08bee..ba249c2 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java @@ -134,6 +134,7 @@ public class FlinkYarnSessionCliTest { Assert.assertEquals(6, client.getMaxSlots()); } + private static class TestCLI extends FlinkYarnSessionCli { public TestCLI(String shortPrefix, String longPrefix) { @@ -143,7 +144,7 @@ public class FlinkYarnSessionCliTest { private static class JarAgnosticClusterDescriptor extends YarnClusterDescriptor { @Override public void setLocalJarPath(Path localJarPath) { -// setLocalJarPath("/tmp"); + // add nothing } } @@ -160,12 +161,7 @@ public class FlinkYarnSessionCliTest { Mockito.mock(YarnClient.class), Mockito.mock(ApplicationReport.class), config, - new Path("/tmp"), true); - } - - @Override - protected ActorSystem createActorSystem() throws IOException { - return Mockito.mock(ActorSystem.class); + new Path("/tmp"), false); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/8d589623/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java index 9518f75..dfc71e0 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java @@ -81,14 +81,14 @@ public class YarnClusterClient extends ClusterClient { //---------- Class internal fields ------------------- private final AbstractYarnClusterDescriptor clusterDescriptor; - private final ActorRef applicationClient; + private final LazApplicationClientLoader applicationClient; private final FiniteDuration akkaDuration; private final Timeout akkaTimeout; - private final ApplicationReport applicationId; + private final ApplicationReport appReport; private final ApplicationId appId; private final String trackingURL; - private boolean isConnected = false; + private boolean isConnected = true; private final boolean perJobCluster; @@ -120,63 +120,18 @@ public class YarnClusterClient extends ClusterClient { this.yarnClient = yarnClient; this.hadoopConfig = yarnClient.getConfig(); this.sessionFilesDir = sessionFilesDir; - this.applicationId = appReport; + this.appReport = appReport; this.appId = appReport.getApplicationId(); this.trackingURL = appReport.getTrackingUrl(); this.perJobCluster = perJobCluster; - /* The leader retrieval service for connecting to the cluster and finding the active leader. */ - LeaderRetrievalService leaderRetrievalService; - try { - leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig); - } catch (Exception e) { - throw new IOException("Could not create the leader retrieval service.", e); - } - - // start application client - LOG.info("Start application client."); - - applicationClient = actorSystem.actorOf( - Props.create( - ApplicationClient.class, - flinkConfig, - leaderRetrievalService), - "applicationClient"); + this.applicationClient = new LazApplicationClientLoader(); pollingRunner = new PollingThread(yarnClient, appId); pollingRunner.setDaemon(true); pollingRunner.start(); Runtime.getRuntime().addShutdownHook(clientShutdownHook); - - isConnected = true; - - if (perJobCluster) { - - logAndSysout("Waiting until all TaskManagers have connected"); - - for (GetClusterStatusResponse currentStatus, lastStatus = null;; lastStatus = currentStatus) { - currentStatus = getClusterStatus(); - if (currentStatus != null && !currentStatus.equals(lastStatus)) { - logAndSysout("TaskManager status (" + currentStatus.numRegisteredTaskManagers() + "/" - + clusterDescriptor.getTaskManagerCount() + ")"); - if (currentStatus.numRegisteredTaskManagers() >= clusterDescriptor.getTaskManagerCount()) { - logAndSysout("All TaskManagers are connected"); - break; - } - } else if (lastStatus == null) { - logAndSysout("No status updates from the YARN cluster received so far. Waiting ..."); - } - - try { - Thread.sleep(250); - } catch (InterruptedException e) { - LOG.error("Interrupted while waiting for TaskManagers"); - System.err.println("Thread is interrupted"); - throw new IOException("Interrupted while waiting for TaskManagers", e); - } - } - } } /** @@ -219,7 +174,10 @@ public class YarnClusterClient extends ClusterClient { */ private void stopAfterJob(JobID jobID) { Preconditions.checkNotNull(jobID, "The job id must not be null"); - Future<Object> messageReceived = ask(applicationClient, new YarnMessages.LocalStopAMAfterJob(jobID), akkaTimeout); + Future<Object> messageReceived = + ask( + applicationClient.get(), + new YarnMessages.LocalStopAMAfterJob(jobID), akkaTimeout); try { Await.result(messageReceived, akkaDuration); } catch (Exception e) { @@ -263,7 +221,7 @@ public class YarnClusterClient extends ClusterClient { @Override public String getClusterIdentifier() { - return applicationId.getApplicationId().toString(); + return "Yarn cluster with application id " + appReport.getApplicationId(); } /** @@ -278,7 +236,11 @@ public class YarnClusterClient extends ClusterClient { return null; } - Future<Object> clusterStatusOption = ask(applicationClient, YarnMessages.getLocalGetyarnClusterStatus(), akkaTimeout); + Future<Object> clusterStatusOption = + ask( + applicationClient.get(), + YarnMessages.getLocalGetyarnClusterStatus(), + akkaTimeout); Object clusterStatus; try { clusterStatus = Await.result(clusterStatusOption, akkaDuration); @@ -338,9 +300,11 @@ public class YarnClusterClient extends ClusterClient { while(true) { Object result; try { - Future<Object> response = Patterns.ask(applicationClient, - YarnMessages.getLocalGetYarnMessage(), new Timeout(akkaDuration)); - + Future<Object> response = + Patterns.ask( + applicationClient.get(), + YarnMessages.getLocalGetYarnMessage(), + new Timeout(akkaDuration)); result = Await.result(response, akkaDuration); } catch(Exception ioe) { LOG.warn("Error retrieving the YARN messages locally", ioe); @@ -406,11 +370,12 @@ public class YarnClusterClient extends ClusterClient { // we are already in the shutdown hook } - if(actorSystem != null){ + if(actorSystemLoader.isLoaded()){ LOG.info("Sending shutdown request to the Application Master"); - if(applicationClient != ActorRef.noSender()) { + if(applicationClient.get() != ActorRef.noSender()) { try { - Future<Object> response = Patterns.ask(applicationClient, + Future<Object> response = + Patterns.ask(applicationClient.get(), new YarnMessages.LocalStopYarnSession(getApplicationStatus(), "Flink YARN Client requested shutdown"), new Timeout(akkaDuration)); @@ -467,7 +432,7 @@ public class YarnClusterClient extends ClusterClient { LOG.warn("Application failed. Diagnostics " + appReport.getDiagnostics()); LOG.warn("If log aggregation is activated in the Hadoop cluster, we recommend to retrieve " + "the full application log using this command:\n" - + "\tyarn logs -applicationId " + appReport.getApplicationId() + "\n" + + "\tyarn logs -appReport " + appReport.getApplicationId() + "\n" + "(It sometimes takes a few seconds until the logs are aggregated)"); } } catch (Exception e) { @@ -553,4 +518,68 @@ public class YarnClusterClient extends ClusterClient { public boolean isDetached() { return super.isDetached() || clusterDescriptor.isDetachedMode(); } + + public ApplicationId getApplicationId() { + return appId; + } + + protected class LazApplicationClientLoader { + + private ActorRef applicationClient; + + /** + * Creates a new ApplicationClient actor or returns an existing one. May start an ActorSystem. + * @return ActorSystem + */ + public ActorRef get() { + if (applicationClient == null) { + /* The leader retrieval service for connecting to the cluster and finding the active leader. */ + LeaderRetrievalService leaderRetrievalService; + try { + leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig); + } catch (Exception e) { + throw new RuntimeException("Could not create the leader retrieval service.", e); + } + + // start application client + LOG.info("Start application client."); + + applicationClient = actorSystemLoader.get().actorOf( + Props.create( + ApplicationClient.class, + flinkConfig, + leaderRetrievalService), + "applicationClient"); + + if (perJobCluster) { + + logAndSysout("Waiting until all TaskManagers have connected"); + + for (GetClusterStatusResponse currentStatus, lastStatus = null;; lastStatus = currentStatus) { + currentStatus = getClusterStatus(); + if (currentStatus != null && !currentStatus.equals(lastStatus)) { + logAndSysout("TaskManager status (" + currentStatus.numRegisteredTaskManagers() + "/" + + clusterDescriptor.getTaskManagerCount() + ")"); + if (currentStatus.numRegisteredTaskManagers() >= clusterDescriptor.getTaskManagerCount()) { + logAndSysout("All TaskManagers are connected"); + break; + } + } else if (lastStatus == null) { + logAndSysout("No status updates from the YARN cluster received so far. Waiting ..."); + } + + try { + Thread.sleep(250); + } catch (InterruptedException e) { + LOG.error("Interrupted while waiting for TaskManagers"); + System.err.println("Thread is interrupted"); + throw new RuntimeException("Interrupted while waiting for TaskManagers", e); + } + } + } + } + + return applicationClient; + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8d589623/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java index c0ad27e..a0225a7 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java @@ -30,7 +30,6 @@ import org.apache.flink.client.cli.CliFrontendParser; import org.apache.flink.client.cli.CustomCommandLine; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.yarn.AbstractYarnClusterDescriptor; import org.apache.flink.yarn.YarnClusterDescriptor; import org.apache.flink.yarn.YarnClusterClient; @@ -113,7 +112,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient> private final boolean acceptInteractiveInput; //------------------------------------ Internal fields ------------------------- - private YarnClusterClient yarnCluster = null; + private YarnClusterClient yarnCluster; private boolean detachedMode = false; public FlinkYarnSessionCli(String shortPrefix, String longPrefix) { @@ -555,7 +554,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient> if (detachedMode) { LOG.info("The Flink YARN client has been started in detached mode. In order to stop " + "Flink on YARN, use the following command or a YARN web interface to stop it:\n" + - "yarn application -kill "+yarnCluster.getClusterIdentifier()); + "yarn application -kill " + APPLICATION_ID.getOpt()); yarnCluster.disconnect(); } else { runInteractiveCli(yarnCluster, true); @@ -608,7 +607,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient> // print info and quit: LOG.info("The Flink YARN client has been started in detached mode. In order to stop " + "Flink on YARN, use the following command or a YARN web interface to stop it:\n" + - "yarn application -kill " + yarnCluster.getClusterIdentifier() + "\n" + + "yarn application -kill " + yarnCluster.getApplicationId() + System.lineSeparator() + "Please also note that the temporary files of the YARN session in {} will not be removed.", yarnDescriptor.getSessionFilesDir()); yarnCluster.disconnect();
