http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java deleted file mode 100644 index 3b48228..0000000 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java +++ /dev/null @@ -1,559 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.yarn; - -import akka.actor.ActorRef; -import akka.actor.ActorSystem; - -import static akka.pattern.Patterns.ask; - -import akka.actor.PoisonPill; -import akka.actor.Props; -import akka.pattern.Patterns; -import akka.util.Timeout; -import org.apache.flink.api.common.JobID; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse; -import org.apache.flink.runtime.clusterframework.messages.InfoMessage; -import org.apache.flink.runtime.net.ConnectionUtils; -import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; -import org.apache.flink.runtime.util.LeaderRetrievalUtils; -import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster; -import org.apache.flink.util.Preconditions; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.service.Service; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ApplicationReport; -import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; -import org.apache.hadoop.yarn.api.records.YarnApplicationState; -import org.apache.hadoop.yarn.client.api.YarnClient; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.None$; -import scala.Option; -import scala.Some; -import scala.Tuple2; -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; - -import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * Java representation of a running Flink cluster within YARN. - */ -public class FlinkYarnCluster extends AbstractFlinkYarnCluster { - private static final Logger LOG = LoggerFactory.getLogger(FlinkYarnCluster.class); - - private static final int POLLING_THREAD_INTERVAL_MS = 1000; - - private YarnClient yarnClient; - private Thread actorRunner; - private Thread clientShutdownHook = new ClientShutdownHook(); - private PollingThread pollingRunner; - private final Configuration hadoopConfig; - // (HDFS) location of the files required to run on YARN. Needed here to delete them on shutdown. - private final Path sessionFilesDir; - private final InetSocketAddress jobManagerAddress; - - //---------- Class internal fields ------------------- - - private ActorSystem actorSystem; - private ActorRef applicationClient; - private ApplicationReport intialAppReport; - private final FiniteDuration akkaDuration; - private final Timeout akkaTimeout; - private final ApplicationId applicationId; - private final boolean detached; - private final org.apache.flink.configuration.Configuration flinkConfig; - private final ApplicationId appId; - - private boolean isConnected = false; - - - /** - * Create a new Flink on YARN cluster. - * - * @param yarnClient Client to talk to YARN - * @param appId the YARN application ID - * @param hadoopConfig Hadoop configuration - * @param flinkConfig Flink configuration - * @param sessionFilesDir Location of files required for YARN session - * @param detached Set to true if no actor system or RPC communication with the cluster should be established - * @throws IOException - * @throws YarnException - */ - public FlinkYarnCluster( - final YarnClient yarnClient, - final ApplicationId appId, - Configuration hadoopConfig, - org.apache.flink.configuration.Configuration flinkConfig, - Path sessionFilesDir, - boolean detached) throws IOException, YarnException { - this.akkaDuration = AkkaUtils.getTimeout(flinkConfig); - this.akkaTimeout = Timeout.durationToTimeout(akkaDuration); - this.yarnClient = yarnClient; - this.hadoopConfig = hadoopConfig; - this.sessionFilesDir = sessionFilesDir; - this.applicationId = appId; - this.detached = detached; - this.flinkConfig = flinkConfig; - this.appId = appId; - - // get one application report manually - intialAppReport = yarnClient.getApplicationReport(appId); - String jobManagerHost = intialAppReport.getHost(); - int jobManagerPort = intialAppReport.getRpcPort(); - this.jobManagerAddress = new InetSocketAddress(jobManagerHost, jobManagerPort); - } - - /** - * Connect the FlinkYarnCluster to the ApplicationMaster. - * - * Detached YARN sessions don't need to connect to the ApplicationMaster. - * Detached per job YARN sessions need to connect until the required number of TaskManagers have been started. - * - * @throws IOException - */ - public void connectToCluster() throws IOException { - if(isConnected) { - throw new IllegalStateException("Can not connect to the cluster again"); - } - - // start actor system - LOG.info("Start actor system."); - // find name of own public interface, able to connect to the JM - // try to find address for 2 seconds. log after 400 ms. - InetAddress ownHostname = ConnectionUtils.findConnectingAddress(jobManagerAddress, 2000, 400); - actorSystem = AkkaUtils.createActorSystem(flinkConfig, - new Some<Tuple2<String, Object>>(new Tuple2<String, Object>(ownHostname.getCanonicalHostName(), 0))); - - // Create the leader election service - flinkConfig.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, this.jobManagerAddress.getHostName()); - flinkConfig.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, this.jobManagerAddress.getPort()); - - LeaderRetrievalService leaderRetrievalService; - - try { - leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig); - } catch (Exception e) { - throw new IOException("Could not create the leader retrieval service.", e); - } - - // start application client - LOG.info("Start application client."); - - applicationClient = actorSystem.actorOf( - Props.create( - ApplicationClient.class, - flinkConfig, - leaderRetrievalService), - "applicationClient"); - - actorRunner = new Thread(new Runnable() { - @Override - public void run() { - // blocks until ApplicationClient has been stopped - actorSystem.awaitTermination(); - - // get final application report - try { - ApplicationReport appReport = yarnClient.getApplicationReport(appId); - - LOG.info("Application " + appId + " finished with state " + appReport - .getYarnApplicationState() + " and final state " + appReport - .getFinalApplicationStatus() + " at " + appReport.getFinishTime()); - - if (appReport.getYarnApplicationState() == YarnApplicationState.FAILED || appReport.getYarnApplicationState() - == YarnApplicationState.KILLED) { - LOG.warn("Application failed. Diagnostics " + appReport.getDiagnostics()); - LOG.warn("If log aggregation is activated in the Hadoop cluster, we recommend to retrieve " - + "the full application log using this command:\n" - + "\tyarn logs -applicationId " + appReport.getApplicationId() + "\n" - + "(It sometimes takes a few seconds until the logs are aggregated)"); - } - } catch (Exception e) { - LOG.warn("Error while getting final application report", e); - } - } - }); - actorRunner.setDaemon(true); - actorRunner.start(); - - pollingRunner = new PollingThread(yarnClient, appId); - pollingRunner.setDaemon(true); - pollingRunner.start(); - - Runtime.getRuntime().addShutdownHook(clientShutdownHook); - - isConnected = true; - } - - @Override - public void disconnect() { - if(!isConnected) { - throw new IllegalStateException("Can not disconnect from an unconnected cluster."); - } - LOG.info("Disconnecting FlinkYarnCluster from ApplicationMaster"); - - if(!Runtime.getRuntime().removeShutdownHook(clientShutdownHook)) { - LOG.warn("Error while removing the shutdown hook. The YARN session might be killed unintentionally"); - } - // tell the actor to shut down. - applicationClient.tell(PoisonPill.getInstance(), applicationClient); - - try { - actorRunner.join(1000); // wait for 1 second - } catch (InterruptedException e) { - LOG.warn("Shutdown of the actor runner was interrupted", e); - Thread.currentThread().interrupt(); - } - try { - pollingRunner.stopRunner(); - pollingRunner.join(1000); - } catch(InterruptedException e) { - LOG.warn("Shutdown of the polling runner was interrupted", e); - Thread.currentThread().interrupt(); - } - isConnected = false; - } - - - // -------------------------- Interaction with the cluster ------------------------ - - /* - * This call blocks until the message has been recevied. - */ - @Override - public void stopAfterJob(JobID jobID) { - Preconditions.checkNotNull(jobID, "The job id must not be null"); - Future<Object> messageReceived = ask(applicationClient, new YarnMessages.LocalStopAMAfterJob(jobID), akkaTimeout); - try { - Await.result(messageReceived, akkaDuration); - } catch (Exception e) { - throw new RuntimeException("Unable to tell application master to stop once the specified job has been finised", e); - } - } - - @Override - public org.apache.flink.configuration.Configuration getFlinkConfiguration() { - return flinkConfig; - } - - @Override - public InetSocketAddress getJobManagerAddress() { - return jobManagerAddress; - } - - @Override - public String getWebInterfaceURL() { - String url = this.intialAppReport.getTrackingUrl(); - // there seems to be a difference between HD 2.2.0 and 2.6.0 - if(!url.startsWith("http://")) { - url = "http://" + url; - } - return url; - } - - @Override - public String getApplicationId() { - return applicationId.toString(); - } - - @Override - public boolean isDetached() { - return this.detached; - } - - /** - * This method is only available if the cluster hasn't been started in detached mode. - */ - @Override - public GetClusterStatusResponse getClusterStatus() { - if(!isConnected) { - throw new IllegalStateException("The cluster is not connected to the ApplicationMaster."); - } - if(hasBeenStopped()) { - throw new RuntimeException("The FlinkYarnCluster has already been stopped"); - } - Future<Object> clusterStatusOption = ask(applicationClient, YarnMessages.getLocalGetyarnClusterStatus(), akkaTimeout); - Object clusterStatus; - try { - clusterStatus = Await.result(clusterStatusOption, akkaDuration); - } catch (Exception e) { - throw new RuntimeException("Unable to get Cluster status from Application Client", e); - } - if(clusterStatus instanceof None$) { - return null; - } else if(clusterStatus instanceof Some) { - return (GetClusterStatusResponse) (((Some) clusterStatus).get()); - } else { - throw new RuntimeException("Unexpected type: " + clusterStatus.getClass().getCanonicalName()); - } - } - - @Override - public boolean hasFailed() { - if(!isConnected) { - throw new IllegalStateException("The cluster has been connected to the ApplicationMaster."); - } - if(pollingRunner == null) { - LOG.warn("FlinkYarnCluster.hasFailed() has been called on an uninitialized cluster." + - "The system might be in an erroneous state"); - } - ApplicationReport lastReport = pollingRunner.getLastReport(); - if(lastReport == null) { - LOG.warn("FlinkYarnCluster.hasFailed() has been called on a cluster that didn't receive a status so far." + - "The system might be in an erroneous state"); - return false; - } else { - YarnApplicationState appState = lastReport.getYarnApplicationState(); - boolean status = (appState == YarnApplicationState.FAILED || - appState == YarnApplicationState.KILLED); - if(status) { - LOG.warn("YARN reported application state {}", appState); - LOG.warn("Diagnostics: {}", lastReport.getDiagnostics()); - } - return status; - } - } - - - @Override - public String getDiagnostics() { - if(!isConnected) { - throw new IllegalStateException("The cluster has been connected to the ApplicationMaster."); - } - - if (!hasFailed()) { - LOG.warn("getDiagnostics() called for cluster which is not in failed state"); - } - ApplicationReport lastReport = pollingRunner.getLastReport(); - if (lastReport == null) { - LOG.warn("Last report is null"); - return null; - } else { - return lastReport.getDiagnostics(); - } - } - - @Override - public List<String> getNewMessages() { - if(!isConnected) { - throw new IllegalStateException("The cluster has been connected to the ApplicationMaster."); - } - - if(hasBeenStopped()) { - throw new RuntimeException("The FlinkYarnCluster has already been stopped"); - } - List<String> ret = new ArrayList<String>(); - - // get messages from ApplicationClient (locally) - while(true) { - Object result; - try { - Future<Object> response = Patterns.ask(applicationClient, - YarnMessages.getLocalGetYarnMessage(), new Timeout(akkaDuration)); - - result = Await.result(response, akkaDuration); - } catch(Exception ioe) { - LOG.warn("Error retrieving the YARN messages locally", ioe); - break; - } - - if(!(result instanceof Option)) { - throw new RuntimeException("LocalGetYarnMessage requires a response of type " + - "Option. Instead the response is of type " + result.getClass() + "."); - } else { - Option messageOption = (Option) result; - LOG.debug("Received message option {}", messageOption); - if(messageOption.isEmpty()) { - break; - } else { - Object obj = messageOption.get(); - - if(obj instanceof InfoMessage) { - InfoMessage msg = (InfoMessage) obj; - ret.add("[" + msg.date() + "] " + msg.message()); - } else { - LOG.warn("LocalGetYarnMessage returned unexpected type: " + messageOption); - } - } - } - } - return ret; - } - - // -------------------------- Shutdown handling ------------------------ - - private AtomicBoolean hasBeenShutDown = new AtomicBoolean(false); - - /** - * Shutdown the YARN cluster. - * @param failApplication whether we should fail the YARN application (in case of errors in Flink) - */ - @Override - public void shutdown(boolean failApplication) { - if(!isConnected) { - throw new IllegalStateException("The cluster has been connected to the ApplicationMaster."); - } - - if(hasBeenShutDown.getAndSet(true)) { - return; - } - - try { - Runtime.getRuntime().removeShutdownHook(clientShutdownHook); - } catch (IllegalStateException e) { - // we are already in the shutdown hook - } - - if(actorSystem != null){ - LOG.info("Sending shutdown request to the Application Master"); - if(applicationClient != ActorRef.noSender()) { - try { - FinalApplicationStatus finalStatus; - if (failApplication) { - finalStatus = FinalApplicationStatus.FAILED; - } else { - finalStatus = FinalApplicationStatus.SUCCEEDED; - } - Future<Object> response = Patterns.ask(applicationClient, - new YarnMessages.LocalStopYarnSession(finalStatus, - "Flink YARN Client requested shutdown"), - new Timeout(akkaDuration)); - Await.ready(response, akkaDuration); - } catch(Exception e) { - LOG.warn("Error while stopping YARN Application Client", e); - } - } - - actorSystem.shutdown(); - actorSystem.awaitTermination(); - - actorSystem = null; - } - - LOG.info("Deleting files in " + sessionFilesDir ); - try { - FileSystem shutFS = FileSystem.get(hadoopConfig); - shutFS.delete(sessionFilesDir, true); // delete conf and jar file. - shutFS.close(); - }catch(IOException e){ - LOG.error("Could not delete the Flink jar and configuration files in HDFS..", e); - } - - try { - actorRunner.join(1000); // wait for 1 second - } catch (InterruptedException e) { - LOG.warn("Shutdown of the actor runner was interrupted", e); - Thread.currentThread().interrupt(); - } - try { - pollingRunner.stopRunner(); - pollingRunner.join(1000); - } catch(InterruptedException e) { - LOG.warn("Shutdown of the polling runner was interrupted", e); - Thread.currentThread().interrupt(); - } - - LOG.info("YARN Client is shutting down"); - yarnClient.stop(); // actorRunner is using the yarnClient. - yarnClient = null; // set null to clearly see if somebody wants to access it afterwards. - } - - @Override - public boolean hasBeenStopped() { - return hasBeenShutDown.get(); - } - - - public class ClientShutdownHook extends Thread { - @Override - public void run() { - LOG.info("Shutting down FlinkYarnCluster from the client shutdown hook"); - shutdown(false); - } - } - - // -------------------------- Polling ------------------------ - - public static class PollingThread extends Thread { - - AtomicBoolean running = new AtomicBoolean(true); - private YarnClient yarnClient; - private ApplicationId appId; - - // ------- status information stored in the polling thread - private final Object lock = new Object(); - private ApplicationReport lastReport; - - - public PollingThread(YarnClient yarnClient, ApplicationId appId) { - this.yarnClient = yarnClient; - this.appId = appId; - } - - public void stopRunner() { - if(!running.get()) { - LOG.warn("Polling thread was already stopped"); - } - running.set(false); - } - - public ApplicationReport getLastReport() { - synchronized (lock) { - return lastReport; - } - } - - @Override - public void run() { - while (running.get() && yarnClient.isInState(Service.STATE.STARTED)) { - try { - ApplicationReport report = yarnClient.getApplicationReport(appId); - synchronized (lock) { - lastReport = report; - } - } catch (Exception e) { - LOG.warn("Error while getting application report", e); - } - try { - Thread.sleep(FlinkYarnCluster.POLLING_THREAD_INTERVAL_MS); - } catch (InterruptedException e) { - LOG.error("Polling thread got interrupted", e); - Thread.currentThread().interrupt(); // pass interrupt. - } - } - if(running.get() && !yarnClient.isInState(Service.STATE.STARTED)) { - // == if the polling thread is still running but the yarn client is stopped. - LOG.warn("YARN client is unexpected in state " + yarnClient.getServiceState()); - } - } - } - -}
http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java index 1d0afc4..24b5a35 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java @@ -22,7 +22,6 @@ import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; -import org.apache.flink.client.CliFrontend; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; @@ -39,6 +38,7 @@ import org.apache.flink.runtime.util.LeaderRetrievalUtils; import org.apache.flink.runtime.util.SignalHandler; import org.apache.flink.runtime.webmonitor.WebMonitor; +import org.apache.flink.yarn.cli.FlinkYarnSessionCli; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.security.Credentials; @@ -192,7 +192,7 @@ public class YarnApplicationMasterRunner { // Flink configuration final Map<String, String> dynamicProperties = - CliFrontend.getDynamicProperties(ENV.get(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES)); + FlinkYarnSessionCli.getDynamicProperties(ENV.get(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES)); LOG.debug("YARN dynamic properties: {}", dynamicProperties); final Configuration config = createConfiguration(currDir, dynamicProperties); @@ -292,8 +292,7 @@ public class YarnApplicationMasterRunner { // 3: Flink's Yarn ResourceManager LOG.debug("Starting YARN Flink Resource Manager"); - // we need the leader retrieval service here to be informed of new - // leader session IDs, even though there can be only one leader ever + // we need the leader retrieval service here to be informed of new leaders and session IDs LeaderRetrievalService leaderRetriever = LeaderRetrievalUtils.createLeaderRetrievalService(config, jobManager); http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java new file mode 100644 index 0000000..a5b8af7 --- /dev/null +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java @@ -0,0 +1,577 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.yarn; + +import akka.actor.ActorRef; + +import static akka.pattern.Patterns.ask; + +import akka.actor.PoisonPill; +import akka.actor.Props; +import akka.pattern.Patterns; +import akka.util.Timeout; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobSubmissionResult; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.ProgramInvocationException; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse; +import org.apache.flink.runtime.clusterframework.messages.InfoMessage; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.util.LeaderRetrievalUtils; +import org.apache.flink.util.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.None$; +import scala.Option; +import scala.Some; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Java representation of a running Flink cluster within YARN. + */ +public class YarnClusterClient extends ClusterClient { + + private static final Logger LOG = LoggerFactory.getLogger(YarnClusterClient.class); + + private static final int POLLING_THREAD_INTERVAL_MS = 1000; + + private YarnClient yarnClient; + private Thread actorRunner; + private Thread clientShutdownHook = new ClientShutdownHook(); + private PollingThread pollingRunner; + private final Configuration hadoopConfig; + // (HDFS) location of the files required to run on YARN. Needed here to delete them on shutdown. + private final Path sessionFilesDir; + + /** The leader retrieval service for connecting to the cluster and finding the active leader. */ + private final LeaderRetrievalService leaderRetrievalService; + + //---------- Class internal fields ------------------- + + private final AbstractYarnClusterDescriptor clusterDescriptor; + private final ActorRef applicationClient; + private final FiniteDuration akkaDuration; + private final Timeout akkaTimeout; + private final ApplicationReport applicationId; + private final ApplicationId appId; + private final String trackingURL; + + private boolean isConnected = false; + + + /** + * Create a new Flink on YARN cluster. + * + * @param clusterDescriptor The descriptor used at cluster creation + * @param yarnClient Client to talk to YARN + * @param appReport the YARN application ID + * @param flinkConfig Flink configuration + * @param sessionFilesDir Location of files required for YARN session + * @throws IOException + * @throws YarnException + */ + public YarnClusterClient( + final AbstractYarnClusterDescriptor clusterDescriptor, + final YarnClient yarnClient, + final ApplicationReport appReport, + org.apache.flink.configuration.Configuration flinkConfig, + Path sessionFilesDir) throws IOException, YarnException { + + super(flinkConfig); + + this.akkaDuration = AkkaUtils.getTimeout(flinkConfig); + this.akkaTimeout = Timeout.durationToTimeout(akkaDuration); + this.clusterDescriptor = clusterDescriptor; + this.yarnClient = yarnClient; + this.hadoopConfig = yarnClient.getConfig(); + this.sessionFilesDir = sessionFilesDir; + this.applicationId = appReport; + this.appId = appReport.getApplicationId(); + this.trackingURL = appReport.getTrackingUrl(); + + try { + leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig); + } catch (Exception e) { + throw new IOException("Could not create the leader retrieval service.", e); + } + + + if (isConnected) { + throw new IllegalStateException("Already connected to the cluster."); + } + + // start application client + LOG.info("Start application client."); + + applicationClient = actorSystem.actorOf( + Props.create( + ApplicationClient.class, + flinkConfig, + leaderRetrievalService), + "applicationClient"); + + actorRunner = new Thread(new Runnable() { + @Override + public void run() { + // blocks until ApplicationClient has been stopped + actorSystem.awaitTermination(); + + // get final application report + try { + ApplicationReport appReport = yarnClient.getApplicationReport(appId); + + LOG.info("Application " + appId + " finished with state " + appReport + .getYarnApplicationState() + " and final state " + appReport + .getFinalApplicationStatus() + " at " + appReport.getFinishTime()); + + if (appReport.getYarnApplicationState() == YarnApplicationState.FAILED || appReport.getYarnApplicationState() + == YarnApplicationState.KILLED) { + LOG.warn("Application failed. Diagnostics " + appReport.getDiagnostics()); + LOG.warn("If log aggregation is activated in the Hadoop cluster, we recommend to retrieve " + + "the full application log using this command:\n" + + "\tyarn logs -applicationId " + appReport.getApplicationId() + "\n" + + "(It sometimes takes a few seconds until the logs are aggregated)"); + } + } catch (Exception e) { + LOG.warn("Error while getting final application report", e); + } + } + }); + actorRunner.setDaemon(true); + actorRunner.start(); + + pollingRunner = new PollingThread(yarnClient, appId); + pollingRunner.setDaemon(true); + pollingRunner.start(); + + Runtime.getRuntime().addShutdownHook(clientShutdownHook); + + isConnected = true; + + logAndSysout("Waiting until all TaskManagers have connected"); + + while(true) { + GetClusterStatusResponse status = getClusterStatus(); + if (status != null) { + if (status.numRegisteredTaskManagers() < clusterDescriptor.getTaskManagerCount()) { + logAndSysout("TaskManager status (" + status.numRegisteredTaskManagers() + "/" + + clusterDescriptor.getTaskManagerCount() + ")"); + } else { + logAndSysout("All TaskManagers are connected"); + break; + } + } else { + logAndSysout("No status updates from the YARN cluster received so far. Waiting ..."); + } + + try { + Thread.sleep(500); + } catch (InterruptedException e) { + LOG.error("Interrupted while waiting for TaskManagers"); + System.err.println("Thread is interrupted"); + throw new IOException("Interrupted while waiting for TaskManagers", e); + } + } + } + + public void disconnect() { + if(!isConnected) { + throw new IllegalStateException("Can not disconnect from an unconnected cluster."); + } + LOG.info("Disconnecting YarnClusterClient from ApplicationMaster"); + + if(!Runtime.getRuntime().removeShutdownHook(clientShutdownHook)) { + LOG.warn("Error while removing the shutdown hook. The YARN session might be killed unintentionally"); + } + // tell the actor to shut down. + applicationClient.tell(PoisonPill.getInstance(), applicationClient); + + try { + actorRunner.join(1000); // wait for 1 second + } catch (InterruptedException e) { + LOG.warn("Shutdown of the actor runner was interrupted", e); + Thread.currentThread().interrupt(); + } + try { + pollingRunner.stopRunner(); + pollingRunner.join(1000); + } catch(InterruptedException e) { + LOG.warn("Shutdown of the polling runner was interrupted", e); + Thread.currentThread().interrupt(); + } + isConnected = false; + } + + + // -------------------------- Interaction with the cluster ------------------------ + + /* + * Tells the Cluster to monitor the status of JobId and stop itself once the specified job has finished. + */ + private void stopAfterJob(JobID jobID) { + Preconditions.checkNotNull(jobID, "The job id must not be null"); + Future<Object> messageReceived = ask(applicationClient, new YarnMessages.LocalStopAMAfterJob(jobID), akkaTimeout); + try { + Await.result(messageReceived, akkaDuration); + } catch (Exception e) { + throw new RuntimeException("Unable to tell application master to stop once the specified job has been finised", e); + } + } + + @Override + public org.apache.flink.configuration.Configuration getFlinkConfiguration() { + return flinkConfig; + } + + @Override + public int getMaxSlots() { + int maxSlots = clusterDescriptor.getTaskManagerCount() * clusterDescriptor.getTaskManagerSlots(); + return maxSlots > 0 ? maxSlots : -1; + } + + @Override + protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException { + if (isDetached()) { + JobSubmissionResult result = super.runDetached(jobGraph, classLoader); + stopAfterJob(jobGraph.getJobID()); + return result; + } else { + return super.run(jobGraph, classLoader); + } + } + + @Override + public String getWebInterfaceURL() { + // there seems to be a difference between HD 2.2.0 and 2.6.0 + if(!trackingURL.startsWith("http://")) { + return "http://" + trackingURL; + } else { + return trackingURL; + } + } + + @Override + public String getClusterIdentifier() { + return applicationId.getApplicationId().toString(); + } + + /** + * This method is only available if the cluster hasn't been started in detached mode. + */ + @Override + public GetClusterStatusResponse getClusterStatus() { + if(!isConnected) { + throw new IllegalStateException("The cluster is not connected to the ApplicationMaster."); + } + if(hasBeenShutdown()) { + throw new RuntimeException("The YarnClusterClient has already been stopped"); + } + Future<Object> clusterStatusOption = ask(applicationClient, YarnMessages.getLocalGetyarnClusterStatus(), akkaTimeout); + Object clusterStatus; + try { + clusterStatus = Await.result(clusterStatusOption, akkaDuration); + } catch (Exception e) { + throw new RuntimeException("Unable to get ClusterClient status from Application Client", e); + } + if(clusterStatus instanceof None$) { + return null; + } else if(clusterStatus instanceof Some) { + return (GetClusterStatusResponse) (((Some) clusterStatus).get()); + } else { + throw new RuntimeException("Unexpected type: " + clusterStatus.getClass().getCanonicalName()); + } + } + + public ApplicationStatus getApplicationStatus() { + if(!isConnected) { + throw new IllegalStateException("The cluster has been connected to the ApplicationMaster."); + } + if(pollingRunner == null) { + LOG.warn("YarnClusterClient.getApplicationStatus() has been called on an uninitialized cluster." + + "The system might be in an erroneous state"); + } + ApplicationReport lastReport = pollingRunner.getLastReport(); + if(lastReport == null) { + LOG.warn("YarnClusterClient.getApplicationStatus() has been called on a cluster that didn't receive a status so far." + + "The system might be in an erroneous state"); + return ApplicationStatus.UNKNOWN; + } else { + YarnApplicationState appState = lastReport.getYarnApplicationState(); + ApplicationStatus status = + (appState == YarnApplicationState.FAILED || appState == YarnApplicationState.KILLED) ? + ApplicationStatus.FAILED : ApplicationStatus.SUCCEEDED; + if(status != ApplicationStatus.SUCCEEDED) { + LOG.warn("YARN reported application state {}", appState); + LOG.warn("Diagnostics: {}", lastReport.getDiagnostics()); + } + return status; + } + } + + + private String getDiagnostics() { + if(!isConnected) { + throw new IllegalStateException("The cluster has been connected to the ApplicationMaster."); + } + + if (getApplicationStatus() == ApplicationStatus.SUCCEEDED) { + LOG.warn("getDiagnostics() called for cluster which is not in failed state"); + } + ApplicationReport lastReport = pollingRunner.getLastReport(); + if (lastReport == null) { + LOG.warn("Last report is null"); + return null; + } else { + return lastReport.getDiagnostics(); + } + } + + @Override + public List<String> getNewMessages() { + if(!isConnected) { + throw new IllegalStateException("The cluster has been connected to the ApplicationMaster."); + } + + if(hasBeenShutdown()) { + throw new RuntimeException("The YarnClusterClient has already been stopped"); + } + List<String> ret = new ArrayList<String>(); + + // get messages from ApplicationClient (locally) + while(true) { + Object result; + try { + Future<Object> response = Patterns.ask(applicationClient, + YarnMessages.getLocalGetYarnMessage(), new Timeout(akkaDuration)); + + result = Await.result(response, akkaDuration); + } catch(Exception ioe) { + LOG.warn("Error retrieving the YARN messages locally", ioe); + break; + } + + if(!(result instanceof Option)) { + throw new RuntimeException("LocalGetYarnMessage requires a response of type " + + "Option. Instead the response is of type " + result.getClass() + "."); + } else { + Option messageOption = (Option) result; + LOG.debug("Received message option {}", messageOption); + if(messageOption.isEmpty()) { + break; + } else { + Object obj = messageOption.get(); + + if(obj instanceof InfoMessage) { + InfoMessage msg = (InfoMessage) obj; + ret.add("[" + msg.date() + "] " + msg.message()); + } else { + LOG.warn("LocalGetYarnMessage returned unexpected type: " + messageOption); + } + } + } + } + return ret; + } + + // -------------------------- Shutdown handling ------------------------ + + private AtomicBoolean hasBeenShutDown = new AtomicBoolean(false); + + /** + * Shuts down or disconnects from the YARN cluster. + */ + @Override + public void finalizeCluster() { + + if (!isConnected) { + throw new IllegalStateException("The cluster has been not been connected to the ApplicationMaster."); + } + + if (isDetached()) { + // only disconnect if we are running detached + disconnect(); + return; + } + + // show cluster status + + List<String> msgs = getNewMessages(); + if (msgs != null && msgs.size() > 1) { + + logAndSysout("The following messages were created by the YARN cluster while running the Job:"); + for (String msg : msgs) { + logAndSysout(msg); + } + } + if (getApplicationStatus() != ApplicationStatus.SUCCEEDED) { + logAndSysout("YARN cluster is in non-successful state " + getApplicationStatus()); + logAndSysout("YARN Diagnostics: " + getDiagnostics()); + } + + + if(hasBeenShutDown.getAndSet(true)) { + return; + } + + try { + Runtime.getRuntime().removeShutdownHook(clientShutdownHook); + } catch (IllegalStateException e) { + // we are already in the shutdown hook + } + + if(actorSystem != null){ + LOG.info("Sending shutdown request to the Application Master"); + if(applicationClient != ActorRef.noSender()) { + try { + Future<Object> response = Patterns.ask(applicationClient, + new YarnMessages.LocalStopYarnSession(getApplicationStatus(), + "Flink YARN Client requested shutdown"), + new Timeout(akkaDuration)); + Await.ready(response, akkaDuration); + } catch(Exception e) { + LOG.warn("Error while stopping YARN Application Client", e); + } + } + + actorSystem.shutdown(); + actorSystem.awaitTermination(); + } + + LOG.info("Deleting files in " + sessionFilesDir); + try { + FileSystem shutFS = FileSystem.get(hadoopConfig); + shutFS.delete(sessionFilesDir, true); // delete conf and jar file. + shutFS.close(); + }catch(IOException e){ + LOG.error("Could not delete the Flink jar and configuration files in HDFS..", e); + } + + try { + actorRunner.join(1000); // wait for 1 second + } catch (InterruptedException e) { + LOG.warn("Shutdown of the actor runner was interrupted", e); + Thread.currentThread().interrupt(); + } + try { + pollingRunner.stopRunner(); + pollingRunner.join(1000); + } catch(InterruptedException e) { + LOG.warn("Shutdown of the polling runner was interrupted", e); + Thread.currentThread().interrupt(); + } + + LOG.info("YARN Client is shutting down"); + yarnClient.stop(); // actorRunner is using the yarnClient. + yarnClient = null; // set null to clearly see if somebody wants to access it afterwards. + } + + public boolean hasBeenShutdown() { + return hasBeenShutDown.get(); + } + + + private class ClientShutdownHook extends Thread { + @Override + public void run() { + LOG.info("Shutting down YarnClusterClient from the client shutdown hook"); + shutdown(); + } + } + + // -------------------------- Polling ------------------------ + + private static class PollingThread extends Thread { + + AtomicBoolean running = new AtomicBoolean(true); + private YarnClient yarnClient; + private ApplicationId appId; + + // ------- status information stored in the polling thread + private final Object lock = new Object(); + private ApplicationReport lastReport; + + + public PollingThread(YarnClient yarnClient, ApplicationId appId) { + this.yarnClient = yarnClient; + this.appId = appId; + } + + public void stopRunner() { + if(!running.get()) { + LOG.warn("Polling thread was already stopped"); + } + running.set(false); + } + + public ApplicationReport getLastReport() { + synchronized (lock) { + return lastReport; + } + } + + @Override + public void run() { + while (running.get() && yarnClient.isInState(Service.STATE.STARTED)) { + try { + ApplicationReport report = yarnClient.getApplicationReport(appId); + synchronized (lock) { + lastReport = report; + } + } catch (Exception e) { + LOG.warn("Error while getting application report", e); + } + try { + Thread.sleep(YarnClusterClient.POLLING_THREAD_INTERVAL_MS); + } catch (InterruptedException e) { + LOG.error("Polling thread got interrupted", e); + Thread.currentThread().interrupt(); // pass interrupt. + stopRunner(); + } + } + if(running.get() && !yarnClient.isInState(Service.STATE.STARTED)) { + // == if the polling thread is still running but the yarn client is stopped. + LOG.warn("YARN client is unexpected in state " + yarnClient.getServiceState()); + } + } + } + + @Override + public boolean isDetached() { + // either we have set detached mode using the general '-d' flag or using the Yarn CLI flag 'yd' + return super.isDetached() || clusterDescriptor.isDetachedMode(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java new file mode 100644 index 0000000..43e7c7b --- /dev/null +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.yarn; + +/** + * Default implementation of {@link AbstractYarnClusterDescriptor} which starts an {@link YarnApplicationMasterRunner}. + */ +public class YarnClusterDescriptor extends AbstractYarnClusterDescriptor { + @Override + protected Class<?> getApplicationMasterClass() { + return YarnApplicationMasterRunner.class; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java new file mode 100644 index 0000000..a2375c5 --- /dev/null +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java @@ -0,0 +1,606 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.yarn.cli; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.client.CliFrontend; +import org.apache.flink.client.ClientUtils; +import org.apache.flink.client.cli.CliFrontendParser; +import org.apache.flink.client.cli.CustomCommandLine; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.StandaloneClusterClient; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.yarn.YarnClusterDescriptor; +import org.apache.flink.yarn.YarnClusterClient; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.FilenameFilter; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** + * Class handling the command line interface to the YARN session. + */ +public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient> { + private static final Logger LOG = LoggerFactory.getLogger(FlinkYarnSessionCli.class); + + //------------------------------------ Constants ------------------------- + + public static final String CONFIG_FILE_LOGBACK_NAME = "logback.xml"; + public static final String CONFIG_FILE_LOG4J_NAME = "log4j.properties"; + + private static final int CLIENT_POLLING_INTERVALL = 3; + + /** The id for the CommandLine interface */ + private static final String ID = "yarn-cluster"; + + // YARN-session related constants + private static final String YARN_PROPERTIES_FILE = ".yarn-properties-"; + private static final String YARN_PROPERTIES_JOBMANAGER_KEY = "jobManager"; + private static final String YARN_PROPERTIES_PARALLELISM = "parallelism"; + private static final String YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING = "dynamicPropertiesString"; + + private static final String YARN_DYNAMIC_PROPERTIES_SEPARATOR = "@@"; // this has to be a regex for String.split() + + //------------------------------------ Command Line argument options ------------------------- + // the prefix transformation is used by the CliFrontend static constructor. + private final Option QUERY; + // --- or --- + private final Option QUEUE; + private final Option SHIP_PATH; + private final Option FLINK_JAR; + private final Option JM_MEMORY; + private final Option TM_MEMORY; + private final Option CONTAINER; + private final Option SLOTS; + private final Option DETACHED; + private final Option STREAMING; + private final Option NAME; + + /** + * Dynamic properties allow the user to specify additional configuration values with -D, such as + * -Dfs.overwrite-files=true -Dtaskmanager.network.numberOfBuffers=16368 + */ + private final Option DYNAMIC_PROPERTIES; + + private final boolean acceptInteractiveInput; + + //------------------------------------ Internal fields ------------------------- + private YarnClusterClient yarnCluster = null; + private boolean detachedMode = false; + + public FlinkYarnSessionCli(String shortPrefix, String longPrefix) { + this(shortPrefix, longPrefix, true); + } + + public FlinkYarnSessionCli(String shortPrefix, String longPrefix, boolean acceptInteractiveInput) { + this.acceptInteractiveInput = acceptInteractiveInput; + + QUERY = new Option(shortPrefix + "q", longPrefix + "query", false, "Display available YARN resources (memory, cores)"); + QUEUE = new Option(shortPrefix + "qu", longPrefix + "queue", true, "Specify YARN queue."); + SHIP_PATH = new Option(shortPrefix + "t", longPrefix + "ship", true, "Ship files in the specified directory (t for transfer)"); + FLINK_JAR = new Option(shortPrefix + "j", longPrefix + "jar", true, "Path to Flink jar file"); + JM_MEMORY = new Option(shortPrefix + "jm", longPrefix + "jobManagerMemory", true, "Memory for JobManager Container [in MB]"); + TM_MEMORY = new Option(shortPrefix + "tm", longPrefix + "taskManagerMemory", true, "Memory per TaskManager Container [in MB]"); + CONTAINER = new Option(shortPrefix + "n", longPrefix + "container", true, "Number of YARN container to allocate (=Number of Task Managers)"); + SLOTS = new Option(shortPrefix + "s", longPrefix + "slots", true, "Number of slots per TaskManager"); + DYNAMIC_PROPERTIES = new Option(shortPrefix + "D", true, "Dynamic properties"); + DETACHED = new Option(shortPrefix + "d", longPrefix + "detached", false, "Start detached"); + STREAMING = new Option(shortPrefix + "st", longPrefix + "streaming", false, "Start Flink in streaming mode"); + NAME = new Option(shortPrefix + "nm", longPrefix + "name", true, "Set a custom name for the application on YARN"); + } + + /** + * Resumes from a Flink Yarn properties file + * @param flinkConfiguration The flink configuration + * @return True if the properties were loaded, false otherwise + */ + private boolean resumeFromYarnProperties(Configuration flinkConfiguration) { + // load the YARN properties + File propertiesFile = new File(getYarnPropertiesLocation(flinkConfiguration)); + if (!propertiesFile.exists()) { + return false; + } + + logAndSysout("Found YARN properties file " + propertiesFile.getAbsolutePath()); + + Properties yarnProperties = new Properties(); + try { + try (InputStream is = new FileInputStream(propertiesFile)) { + yarnProperties.load(is); + } + } + catch (IOException e) { + throw new RuntimeException("Cannot read the YARN properties file", e); + } + + // configure the default parallelism from YARN + String propParallelism = yarnProperties.getProperty(YARN_PROPERTIES_PARALLELISM); + if (propParallelism != null) { // maybe the property is not set + try { + int parallelism = Integer.parseInt(propParallelism); + flinkConfiguration.setInteger(ConfigConstants.DEFAULT_PARALLELISM_KEY, parallelism); + + logAndSysout("YARN properties set default parallelism to " + parallelism); + } + catch (NumberFormatException e) { + throw new RuntimeException("Error while parsing the YARN properties: " + + "Property " + YARN_PROPERTIES_PARALLELISM + " is not an integer."); + } + } + + // get the JobManager address from the YARN properties + String address = yarnProperties.getProperty(YARN_PROPERTIES_JOBMANAGER_KEY); + InetSocketAddress jobManagerAddress; + if (address != null) { + try { + jobManagerAddress = ClientUtils.parseHostPortAddress(address); + // store address in config from where it is retrieved by the retrieval service + CliFrontend.writeJobManagerAddressToConfig(flinkConfiguration, jobManagerAddress); + } + catch (Exception e) { + throw new RuntimeException("YARN properties contain an invalid entry for JobManager address.", e); + } + + logAndSysout("Using JobManager address from YARN properties " + jobManagerAddress); + } + + // handle the YARN client's dynamic properties + String dynamicPropertiesEncoded = yarnProperties.getProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING); + Map<String, String> dynamicProperties = getDynamicProperties(dynamicPropertiesEncoded); + for (Map.Entry<String, String> dynamicProperty : dynamicProperties.entrySet()) { + flinkConfiguration.setString(dynamicProperty.getKey(), dynamicProperty.getValue()); + } + + return true; + } + + public YarnClusterDescriptor createDescriptor(String defaultApplicationName, CommandLine cmd) { + + + YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(); + + if (!cmd.hasOption(CONTAINER.getOpt())) { // number of containers is required option! + LOG.error("Missing required argument {}", CONTAINER.getOpt()); + printUsage(); + throw new IllegalArgumentException("Missing required argument " + CONTAINER.getOpt()); + } + yarnClusterDescriptor.setTaskManagerCount(Integer.valueOf(cmd.getOptionValue(CONTAINER.getOpt()))); + + // Jar Path + Path localJarPath; + if (cmd.hasOption(FLINK_JAR.getOpt())) { + String userPath = cmd.getOptionValue(FLINK_JAR.getOpt()); + if(!userPath.startsWith("file://")) { + userPath = "file://" + userPath; + } + localJarPath = new Path(userPath); + } else { + LOG.info("No path for the flink jar passed. Using the location of " + + yarnClusterDescriptor.getClass() + " to locate the jar"); + localJarPath = new Path("file://" + + yarnClusterDescriptor.getClass().getProtectionDomain().getCodeSource().getLocation().getPath()); + } + + yarnClusterDescriptor.setLocalJarPath(localJarPath); + + List<File> shipFiles = new ArrayList<>(); + // path to directory to ship + if (cmd.hasOption(SHIP_PATH.getOpt())) { + String shipPath = cmd.getOptionValue(SHIP_PATH.getOpt()); + File shipDir = new File(shipPath); + if (shipDir.isDirectory()) { + shipFiles = new ArrayList<>(Arrays.asList(shipDir.listFiles(new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + return !(name.equals(".") || name.equals("..")); + } + }))); + } else { + LOG.warn("Ship directory is not a directory. Ignoring it."); + } + } + + yarnClusterDescriptor.setShipFiles(shipFiles); + + // queue + if (cmd.hasOption(QUEUE.getOpt())) { + yarnClusterDescriptor.setQueue(cmd.getOptionValue(QUEUE.getOpt())); + } + + // JobManager Memory + if (cmd.hasOption(JM_MEMORY.getOpt())) { + int jmMemory = Integer.valueOf(cmd.getOptionValue(JM_MEMORY.getOpt())); + yarnClusterDescriptor.setJobManagerMemory(jmMemory); + } + + // Task Managers memory + if (cmd.hasOption(TM_MEMORY.getOpt())) { + int tmMemory = Integer.valueOf(cmd.getOptionValue(TM_MEMORY.getOpt())); + yarnClusterDescriptor.setTaskManagerMemory(tmMemory); + } + + if (cmd.hasOption(SLOTS.getOpt())) { + int slots = Integer.valueOf(cmd.getOptionValue(SLOTS.getOpt())); + yarnClusterDescriptor.setTaskManagerSlots(slots); + } + + String[] dynamicProperties = null; + if (cmd.hasOption(DYNAMIC_PROPERTIES.getOpt())) { + dynamicProperties = cmd.getOptionValues(DYNAMIC_PROPERTIES.getOpt()); + } + String dynamicPropertiesEncoded = StringUtils.join(dynamicProperties, YARN_DYNAMIC_PROPERTIES_SEPARATOR); + + yarnClusterDescriptor.setDynamicPropertiesEncoded(dynamicPropertiesEncoded); + + if (cmd.hasOption(DETACHED.getOpt()) || cmd.hasOption(CliFrontendParser.DETACHED_OPTION.getOpt())) { + this.detachedMode = true; + yarnClusterDescriptor.setDetachedMode(true); + } + + if(cmd.hasOption(NAME.getOpt())) { + yarnClusterDescriptor.setName(cmd.getOptionValue(NAME.getOpt())); + } else { + // set the default application name, if none is specified + if(defaultApplicationName != null) { + yarnClusterDescriptor.setName(defaultApplicationName); + } + } + + // ----- Convenience ----- + + // the number of slots available from YARN: + int yarnTmSlots = yarnClusterDescriptor.getTaskManagerSlots(); + if (yarnTmSlots == -1) { + yarnTmSlots = 1; + } + + int maxSlots = yarnTmSlots * yarnClusterDescriptor.getTaskManagerCount(); + int userParallelism = Integer.valueOf(cmd.getOptionValue(CliFrontendParser.PARALLELISM_OPTION.getOpt(), "-1")); + if (userParallelism != -1) { + int slotsPerTM = userParallelism / yarnClusterDescriptor.getTaskManagerCount(); + String message = "The YARN cluster has " + maxSlots + " slots available, " + + "but the user requested a parallelism of " + userParallelism + " on YARN. " + + "Each of the " + yarnClusterDescriptor.getTaskManagerCount() + " TaskManagers " + + "will get "+slotsPerTM+" slots."; + logAndSysout(message); + yarnClusterDescriptor.setTaskManagerSlots(slotsPerTM); + } + + return yarnClusterDescriptor; + } + + @Override + public YarnClusterClient createClient(String applicationName, CommandLine cmdLine) throws Exception { + + YarnClusterDescriptor yarnClusterDescriptor = createDescriptor(applicationName, cmdLine); + + try { + return yarnClusterDescriptor.deploy(); + } catch (Exception e) { + throw new RuntimeException("Error deploying the YARN cluster", e); + } + + } + + private void printUsage() { + System.out.println("Usage:"); + HelpFormatter formatter = new HelpFormatter(); + formatter.setWidth(200); + formatter.setLeftPadding(5); + formatter.setSyntaxPrefix(" Required"); + Options req = new Options(); + req.addOption(CONTAINER); + formatter.printHelp(" ", req); + + formatter.setSyntaxPrefix(" Optional"); + Options opt = new Options(); + opt.addOption(JM_MEMORY); + opt.addOption(TM_MEMORY); + opt.addOption(QUERY); + opt.addOption(QUEUE); + opt.addOption(SLOTS); + opt.addOption(DYNAMIC_PROPERTIES); + opt.addOption(DETACHED); + opt.addOption(STREAMING); + opt.addOption(NAME); + formatter.printHelp(" ", opt); + } + + private static void writeYarnProperties(Properties properties, File propertiesFile) { + try { + OutputStream out = new FileOutputStream(propertiesFile); + properties.store(out, "Generated YARN properties file"); + out.close(); + } catch (IOException e) { + throw new RuntimeException("Error writing the properties file", e); + } + propertiesFile.setReadable(true, false); // readable for all. + } + + public static void runInteractiveCli(YarnClusterClient yarnCluster, boolean readConsoleInput) { + final String HELP = "Available commands:\n" + + "help - show these commands\n" + + "stop - stop the YARN session"; + int numTaskmanagers = 0; + try { + BufferedReader in = new BufferedReader(new InputStreamReader(System.in)); + label: + while (true) { + // ------------------ check if there are updates by the cluster ----------- + + GetClusterStatusResponse status = yarnCluster.getClusterStatus(); + LOG.debug("Received status message: {}", status); + + if (status != null && numTaskmanagers != status.numRegisteredTaskManagers()) { + System.err.println("Number of connected TaskManagers changed to " + + status.numRegisteredTaskManagers() + ". " + + "Slots available: " + status.totalNumberOfSlots()); + numTaskmanagers = status.numRegisteredTaskManagers(); + } + + List<String> messages = yarnCluster.getNewMessages(); + if (messages != null && messages.size() > 0) { + System.err.println("New messages from the YARN cluster: "); + for (String msg : messages) { + System.err.println(msg); + } + } + + if (yarnCluster.getApplicationStatus() != ApplicationStatus.SUCCEEDED) { + System.err.println("The YARN cluster has failed"); + yarnCluster.shutdown(); + } + + // wait until CLIENT_POLLING_INTERVAL is over or the user entered something. + long startTime = System.currentTimeMillis(); + while ((System.currentTimeMillis() - startTime) < CLIENT_POLLING_INTERVALL * 1000 + && (!readConsoleInput || !in.ready())) + { + Thread.sleep(200); + } + //------------- handle interactive command by user. ---------------------- + + if (readConsoleInput && in.ready()) { + String command = in.readLine(); + switch (command) { + case "quit": + case "stop": + break label; + + case "help": + System.err.println(HELP); + break; + default: + System.err.println("Unknown command '" + command + "'. Showing help: \n" + HELP); + break; + } + } + + if (yarnCluster.hasBeenShutdown()) { + LOG.info("Stopping interactive command line interface, YARN cluster has been stopped."); + break; + } + } + } catch(Exception e) { + LOG.warn("Exception while running the interactive command line interface", e); + } + } + + public static void main(String[] args) { + FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", ""); // no prefix for the YARN session + System.exit(cli.run(args)); + } + + @Override + public String getIdentifier() { + return ID; + } + + public void addOptions(Options options) { + options.addOption(FLINK_JAR); + options.addOption(JM_MEMORY); + options.addOption(TM_MEMORY); + options.addOption(CONTAINER); + options.addOption(QUEUE); + options.addOption(QUERY); + options.addOption(SHIP_PATH); + options.addOption(SLOTS); + options.addOption(DYNAMIC_PROPERTIES); + options.addOption(DETACHED); + options.addOption(STREAMING); + options.addOption(NAME); + } + + @Override + public ClusterClient retrieveCluster(Configuration config) throws Exception { + + if(resumeFromYarnProperties(config)) { + return new StandaloneClusterClient(config); + } + + return null; + } + + public int run(String[] args) { + // + // Command Line Options + // + Options options = new Options(); + addOptions(options); + + CommandLineParser parser = new PosixParser(); + CommandLine cmd; + try { + cmd = parser.parse(options, args); + } catch(Exception e) { + System.out.println(e.getMessage()); + printUsage(); + return 1; + } + + // Query cluster for metrics + if (cmd.hasOption(QUERY.getOpt())) { + YarnClusterDescriptor flinkYarnClient = new YarnClusterDescriptor(); + String description; + try { + description = flinkYarnClient.getClusterDescription(); + } catch (Exception e) { + System.err.println("Error while querying the YARN cluster for available resources: "+e.getMessage()); + e.printStackTrace(System.err); + return 1; + } + System.out.println(description); + return 0; + } else { + + YarnClusterDescriptor flinkYarnClient; + try { + flinkYarnClient = createDescriptor(null, cmd); + } catch (Exception e) { + System.err.println("Error while starting the YARN Client. Please check log output!"); + return 1; + } + + try { + yarnCluster = flinkYarnClient.deploy(); + } catch (Exception e) { + System.err.println("Error while deploying YARN cluster: "+e.getMessage()); + e.printStackTrace(System.err); + return 1; + } + //------------------ ClusterClient deployed, handle connection details + String jobManagerAddress = yarnCluster.getJobManagerAddress().getAddress().getHostAddress() + ":" + yarnCluster.getJobManagerAddress().getPort(); + System.out.println("Flink JobManager is now running on " + jobManagerAddress); + System.out.println("JobManager Web Interface: " + yarnCluster.getWebInterfaceURL()); + + // file that we write into the conf/ dir containing the jobManager address and the dop. + File yarnPropertiesFile = new File(getYarnPropertiesLocation(yarnCluster.getFlinkConfiguration())); + + Properties yarnProps = new Properties(); + yarnProps.setProperty(YARN_PROPERTIES_JOBMANAGER_KEY, jobManagerAddress); + if (flinkYarnClient.getTaskManagerSlots() != -1) { + String parallelism = + Integer.toString(flinkYarnClient.getTaskManagerSlots() * flinkYarnClient.getTaskManagerCount()); + yarnProps.setProperty(YARN_PROPERTIES_PARALLELISM, parallelism); + } + // add dynamic properties + if (flinkYarnClient.getDynamicPropertiesEncoded() != null) { + yarnProps.setProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING, + flinkYarnClient.getDynamicPropertiesEncoded()); + } + writeYarnProperties(yarnProps, yarnPropertiesFile); + + //------------------ ClusterClient running, let user control it ------------ + + if (detachedMode) { + // print info and quit: + LOG.info("The Flink YARN client has been started in detached mode. In order to stop " + + "Flink on YARN, use the following command or a YARN web interface to stop it:\n" + + "yarn application -kill " + yarnCluster.getClusterIdentifier() + "\n" + + "Please also note that the temporary files of the YARN session in {} will not be removed.", + flinkYarnClient.getSessionFilesDir()); + yarnCluster.disconnect(); + } else { + runInteractiveCli(yarnCluster, acceptInteractiveInput); + + if (!yarnCluster.hasBeenShutdown()) { + LOG.info("Command Line Interface requested session shutdown"); + yarnCluster.shutdown(); + } + + try { + yarnPropertiesFile.delete(); + } catch (Exception e) { + LOG.warn("Exception while deleting the JobManager address file", e); + } + } + } + return 0; + } + + /** + * Utility method for tests. + */ + public void stop() { + if (yarnCluster != null) { + LOG.info("Command line interface is shutting down the yarnCluster"); + yarnCluster.shutdown(); + } + } + + private void logAndSysout(String message) { + LOG.info(message); + System.out.println(message); + } + + public static Map<String, String> getDynamicProperties(String dynamicPropertiesEncoded) { + if (dynamicPropertiesEncoded != null && dynamicPropertiesEncoded.length() > 0) { + Map<String, String> properties = new HashMap<>(); + + String[] propertyLines = dynamicPropertiesEncoded.split(YARN_DYNAMIC_PROPERTIES_SEPARATOR); + for (String propLine : propertyLines) { + if (propLine == null) { + continue; + } + + String[] kv = propLine.split("="); + if (kv.length >= 2 && kv[0] != null && kv[1] != null && kv[0].length() > 0) { + properties.put(kv[0], kv[1]); + } + } + return properties; + } + else { + return Collections.emptyMap(); + } + } + + private static String getYarnPropertiesLocation(Configuration conf) { + String defaultPropertiesFileLocation = System.getProperty("java.io.tmpdir"); + String currentUser = System.getProperty("user.name"); + String propertiesFileLocation = conf.getString(ConfigConstants.YARN_PROPERTIES_FILE_LOCATION, defaultPropertiesFileLocation); + + return propertiesFileLocation + File.separator + YARN_PROPERTIES_FILE + currentUser; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala index 2876309..aea1aac 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala @@ -23,12 +23,10 @@ import java.util.UUID import akka.actor._ import grizzled.slf4j.Logger import org.apache.flink.configuration.Configuration -import org.apache.flink.runtime.clusterframework.ApplicationStatus import org.apache.flink.runtime.clusterframework.messages._ import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, LeaderRetrievalService} import org.apache.flink.runtime.{LeaderSessionMessageFilter, FlinkActor, LogMessages} import org.apache.flink.yarn.YarnMessages._ -import org.apache.hadoop.yarn.api.records.FinalApplicationStatus import scala.collection.mutable import scala.concurrent.duration._ @@ -36,7 +34,7 @@ import scala.language.postfixOps /** Actor which is responsible to repeatedly poll the Yarn cluster status from the ResourceManager. * - * This class represents the bridge between the [[FlinkYarnCluster]] and the + * This class represents the bridge between the [[YarnClusterClient]] and the * [[YarnApplicationMasterRunner]]. * * @param flinkConfig Configuration object @@ -135,9 +133,9 @@ class ApplicationClient( } case msg: RegisterInfoMessageListenerSuccessful => + // The job manager acts as a proxy between the client and the resource managert val jm = sender() - - log.info(s"Successfully registered at the ResourceManager $jm") + log.info(s"Successfully registered at the ResourceManager using JobManager $jm") yarnJobManager = Some(jm) http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnMessages.scala ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnMessages.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnMessages.scala index 8645581..da1917b 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnMessages.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnMessages.scala @@ -18,12 +18,13 @@ package org.apache.flink.yarn -import java.util.{List => JavaList, UUID, Date} +import java.util.{Date, UUID, List => JavaList} import org.apache.flink.api.common.JobID +import org.apache.flink.runtime.clusterframework.ApplicationStatus import org.apache.flink.runtime.messages.RequiresLeaderSessionID import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.yarn.api.records.{ContainerStatus, Container, FinalApplicationStatus} +import org.apache.hadoop.yarn.api.records.{Container, ContainerStatus, FinalApplicationStatus} import scala.concurrent.duration.{Deadline, FiniteDuration} @@ -31,7 +32,7 @@ object YarnMessages { case class ApplicationMasterStatus(numTaskManagers: Int, numSlots: Int) - case class LocalStopYarnSession(status: FinalApplicationStatus, diagnostics: String) + case class LocalStopYarnSession(status: ApplicationStatus, diagnostics: String) /** * Entry point to start a new YarnSession.
