[FLINK-4928] [yarn] Implement FLIP-6 YARN Application Master Runner
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e8293bcb Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e8293bcb Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e8293bcb Branch: refs/heads/flip-6 Commit: e8293bcba588296656ae8425506bd2edf94a70e4 Parents: 8e57fba Author: shuai.xus <[email protected]> Authored: Thu Nov 3 16:24:47 2016 +0800 Committer: Stephan Ewen <[email protected]> Committed: Mon Dec 5 02:49:44 2016 +0100 ---------------------------------------------------------------------- ...bstractYarnFlinkApplicationMasterRunner.java | 213 +++++++++++++ .../yarn/YarnFlinkApplicationMasterRunner.java | 316 +++++++++++++++++++ 2 files changed, 529 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e8293bcb/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java new file mode 100644 index 0000000..923694e --- /dev/null +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.security.SecurityContext; +import org.apache.flink.util.Preconditions; +import org.apache.flink.yarn.cli.FlinkYarnSessionCli; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.Map; + +/** + * This class is the executable entry point for the YARN application master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmaster.JobMaster} + * and {@link YarnResourceManager}. + * + * The JobMasters handles Flink job execution, while the YarnResourceManager handles container + * allocation and failure detection. + */ +public abstract class AbstractYarnFlinkApplicationMasterRunner { + + /** Logger */ + protected static final Logger LOG = LoggerFactory.getLogger(AbstractYarnFlinkApplicationMasterRunner.class); + + /** The process environment variables */ + protected static final Map<String, String> ENV = System.getenv(); + + /** The exit code returned if the initialization of the application master failed */ + protected static final int INIT_ERROR_EXIT_CODE = 31; + + /** The host name passed by env */ + protected String appMasterHostname; + + /** + * The instance entry point for the YARN application master. Obtains user group + * information and calls the main work method {@link #runApplicationMaster(org.apache.flink.configuration.Configuration)} as a + * privileged action. + * + * @param args The command line arguments. + * @return The process exit code. + */ + protected int run(String[] args) { + try { + LOG.debug("All environment variables: {}", ENV); + + final String yarnClientUsername = ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME); + Preconditions.checkArgument(yarnClientUsername != null, "YARN client user name environment variable {} not set", + YarnConfigKeys.ENV_HADOOP_USER_NAME); + + final String currDir = ENV.get(Environment.PWD.key()); + Preconditions.checkArgument(currDir != null, "Current working directory variable (%s) not set", Environment.PWD.key()); + LOG.debug("Current working directory: {}", currDir); + + final String remoteKeytabPath = ENV.get(YarnConfigKeys.KEYTAB_PATH); + LOG.debug("Remote keytab path obtained {}", remoteKeytabPath); + + final String remoteKeytabPrincipal = ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL); + LOG.info("Remote keytab principal obtained {}", remoteKeytabPrincipal); + + String keytabPath = null; + if(remoteKeytabPath != null) { + File f = new File(currDir, Utils.KEYTAB_FILE_NAME); + keytabPath = f.getAbsolutePath(); + LOG.debug("Keytab path: {}", keytabPath); + } + + UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); + + LOG.info("YARN daemon is running as: {} Yarn client user obtainer: {}", + currentUser.getShortUserName(), yarnClientUsername ); + + SecurityContext.SecurityConfiguration sc = new SecurityContext.SecurityConfiguration(); + + //To support Yarn Secure Integration Test Scenario + File krb5Conf = new File(currDir, Utils.KRB5_FILE_NAME); + if(krb5Conf.exists() && krb5Conf.canRead()) { + String krb5Path = krb5Conf.getAbsolutePath(); + LOG.info("KRB5 Conf: {}", krb5Path); + org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration(); + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true"); + sc.setHadoopConfiguration(conf); + } + + // Flink configuration + final Map<String, String> dynamicProperties = + FlinkYarnSessionCli.getDynamicProperties(ENV.get(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES)); + LOG.debug("YARN dynamic properties: {}", dynamicProperties); + + final Configuration flinkConfig = createConfiguration(currDir, dynamicProperties); + if(keytabPath != null && remoteKeytabPrincipal != null) { + flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytabPath); + flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, remoteKeytabPrincipal); + } + + SecurityContext.install(sc.setFlinkConfiguration(flinkConfig)); + + // Note that we use the "appMasterHostname" given by YARN here, to make sure + // we use the hostnames given by YARN consistently throughout akka. + // for akka "localhost" and "localhost.localdomain" are different actors. + this.appMasterHostname = ENV.get(Environment.NM_HOST.key()); + Preconditions.checkArgument(appMasterHostname != null, + "ApplicationMaster hostname variable %s not set", Environment.NM_HOST.key()); + LOG.info("YARN assigned hostname for application master: {}", appMasterHostname); + + return SecurityContext.getInstalled().runSecured(new SecurityContext.FlinkSecuredRunner<Integer>() { + @Override + public Integer run() { + return runApplicationMaster(flinkConfig); + } + }); + + } + catch (Throwable t) { + // make sure that everything whatever ends up in the log + LOG.error("YARN Application Master initialization failed", t); + return INIT_ERROR_EXIT_CODE; + } + } + + // ------------------------------------------------------------------------ + // Core work method + // ------------------------------------------------------------------------ + + /** + * The main work method, must run as a privileged action. + * + * @return The return code for the Java process. + */ + protected abstract int runApplicationMaster(Configuration config); + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + /** + * @param baseDirectory The working directory + * @param additional Additional parameters + * + * @return The configuration to be used by the TaskExecutors. + */ + private static Configuration createConfiguration(String baseDirectory, Map<String, String> additional) { + LOG.info("Loading config from directory {}.", baseDirectory); + + Configuration configuration = GlobalConfiguration.loadConfiguration(baseDirectory); + + configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, baseDirectory); + + // add dynamic properties to JobManager configuration. + for (Map.Entry<String, String> property : additional.entrySet()) { + configuration.setString(property.getKey(), property.getValue()); + } + + // override zookeeper namespace with user cli argument (if provided) + String cliZKNamespace = ENV.get(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE); + if (cliZKNamespace != null && !cliZKNamespace.isEmpty()) { + configuration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, cliZKNamespace); + } + + // if a web monitor shall be started, set the port to random binding + if (configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) { + configuration.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0); + } + + // if the user has set the deprecated YARN-specific config keys, we add the + // corresponding generic config keys instead. that way, later code needs not + // deal with deprecated config keys + + BootstrapTools.substituteDeprecatedConfigKey(configuration, + ConfigConstants.YARN_HEAP_CUTOFF_RATIO, + ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO); + + BootstrapTools.substituteDeprecatedConfigKey(configuration, + ConfigConstants.YARN_HEAP_CUTOFF_MIN, + ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN); + + BootstrapTools.substituteDeprecatedConfigPrefix(configuration, + ConfigConstants.YARN_APPLICATION_MASTER_ENV_PREFIX, + ConfigConstants.CONTAINERIZED_MASTER_ENV_PREFIX); + + BootstrapTools.substituteDeprecatedConfigPrefix(configuration, + ConfigConstants.YARN_TASK_MANAGER_ENV_PREFIX, + ConfigConstants.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX); + + return configuration; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/e8293bcb/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java new file mode 100644 index 0000000..e58c77e --- /dev/null +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import akka.actor.ActorSystem; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.jobmaster.JobManagerRunner; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException; +import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.JvmShutdownSafeguard; +import org.apache.flink.runtime.util.SignalHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; + +import javax.annotation.concurrent.GuardedBy; + +import java.io.File; +import java.io.FileInputStream; +import java.io.ObjectInputStream; + +/** + * This class is the executable entry point for the YARN application master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmaster.JobManagerRunner} + * and {@link org.apache.flink.yarn.YarnResourceManager}. + * + * The JobMasnagerRunner start a {@link org.apache.flink.runtime.jobmaster.JobMaster} + * JobMaster handles Flink job execution, while the YarnResourceManager handles container + * allocation and failure detection. + */ +public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicationMasterRunner + implements OnCompletionActions, FatalErrorHandler { + + /** Logger */ + protected static final Logger LOG = LoggerFactory.getLogger(YarnFlinkApplicationMasterRunner.class); + + /** The job graph file path */ + private static final String JOB_GRAPH_FILE_PATH = "flink.jobgraph.path"; + + /** The lock to guard startup / shutdown / manipulation methods */ + private final Object lock = new Object(); + + @GuardedBy("lock") + private MetricRegistry metricRegistry; + + @GuardedBy("lock") + private HighAvailabilityServices haServices; + + @GuardedBy("lock") + private RpcService commonRpcService; + + @GuardedBy("lock") + private ResourceManager resourceManager; + + @GuardedBy("lock") + private JobManagerRunner jobManagerRunner; + + @GuardedBy("lock") + private JobGraph jobGraph; + + // ------------------------------------------------------------------------ + // Program entry point + // ------------------------------------------------------------------------ + + /** + * The entry point for the YARN application master. + * + * @param args The command line arguments. + */ + public static void main(String[] args) { + EnvironmentInformation.logEnvironmentInfo(LOG, "YARN ApplicationMaster runner", args); + SignalHandler.register(LOG); + JvmShutdownSafeguard.installAsShutdownHook(LOG); + + // run and exit with the proper return code + int returnCode = new YarnFlinkApplicationMasterRunner().run(args); + System.exit(returnCode); + } + + @Override + protected int runApplicationMaster(Configuration config) { + + try { + // ---- (1) create common services + + // try to start the rpc service + // using the port range definition from the config. + final String amPortRange = config.getString( + ConfigConstants.YARN_APPLICATION_MASTER_PORT, + ConfigConstants.DEFAULT_YARN_JOB_MANAGER_PORT); + + synchronized (lock) { + haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(config); + metricRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config)); + commonRpcService = createRpcService(config, appMasterHostname, amPortRange); + + // ---- (2) init resource manager ------- + resourceManager = createResourceManager(config); + + // ---- (3) init job master parameters + jobManagerRunner = createJobManagerRunner(config); + + // ---- (4) start the resource manager and job manager runner: + resourceManager.start(); + LOG.debug("YARN Flink Resource Manager started"); + + jobManagerRunner.start(); + LOG.debug("Job Manager Runner started"); + + // ---- (5) start the web monitor + // TODO: add web monitor + } + + // wait for resource manager to finish + resourceManager.getTerminationFuture().get(); + // everything started, we can wait until all is done or the process is killed + LOG.info("YARN Application Master finished"); + } + catch (Throwable t) { + // make sure that everything whatever ends up in the log + LOG.error("YARN Application Master initialization failed", t); + shutdown(ApplicationStatus.FAILED, t.getMessage()); + return INIT_ERROR_EXIT_CODE; + } + + return 0; + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + protected RpcService createRpcService( + Configuration configuration, + String bindAddress, + String portRange) throws Exception{ + ActorSystem actorSystem = BootstrapTools.startActorSystem(configuration, bindAddress, portRange, LOG); + FiniteDuration duration = AkkaUtils.getTimeout(configuration); + return new AkkaRpcService(actorSystem, Time.of(duration.length(), duration.unit())); + } + + private ResourceManager createResourceManager(Configuration config) throws ConfigurationException { + final ResourceManagerConfiguration resourceManagerConfiguration = ResourceManagerConfiguration.fromConfiguration(config); + final SlotManagerFactory slotManagerFactory = new DefaultSlotManager.Factory(); + final JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(haServices); + + return new YarnResourceManager(config, + ENV, + commonRpcService, + resourceManagerConfiguration, + haServices, + slotManagerFactory, + metricRegistry, + jobLeaderIdService, + this); + } + + private JobManagerRunner createJobManagerRunner(Configuration config) throws Exception{ + // first get JobGraph from local resources + //TODO: generate the job graph from user's jar + jobGraph = loadJobGraph(config); + + // we first need to mark the job as running in the HA services, so that the + // JobManager leader will recognize that it as work to do + try { + haServices.getRunningJobsRegistry().setJobRunning(jobGraph.getJobID()); + } + catch (Throwable t) { + throw new JobExecutionException(jobGraph.getJobID(), + "Could not register the job at the high-availability services", t); + } + + // now the JobManagerRunner + return new JobManagerRunner( + jobGraph, config, + commonRpcService, + haServices, + this, + this); + } + + protected void shutdown(ApplicationStatus status, String msg) { + synchronized (lock) { + if (jobManagerRunner != null) { + try { + jobManagerRunner.shutdown(); + } catch (Throwable tt) { + LOG.warn("Failed to stop the JobManagerRunner", tt); + } + } + if (resourceManager != null) { + try { + resourceManager.shutDownCluster(status, msg); + resourceManager.shutDown(); + } catch (Throwable tt) { + LOG.warn("Failed to stop the ResourceManager", tt); + } + } + if (commonRpcService != null) { + try { + commonRpcService.stopService(); + } catch (Throwable tt) { + LOG.error("Error shutting down resource manager rpc service", tt); + } + } + if (haServices != null) { + try { + haServices.shutdown(); + } catch (Throwable tt) { + LOG.warn("Failed to stop the HA service", tt); + } + } + if (metricRegistry != null) { + try { + metricRegistry.shutdown(); + } catch (Throwable tt) { + LOG.warn("Failed to stop the metrics registry", tt); + } + } + } + } + + private static JobGraph loadJobGraph(Configuration config) throws Exception { + JobGraph jg = null; + String jobGraphFile = config.getString(JOB_GRAPH_FILE_PATH, "job.graph"); + if (jobGraphFile != null) { + File fp = new File(jobGraphFile); + if (fp.isFile()) { + FileInputStream input = new FileInputStream(fp); + ObjectInputStream obInput = new ObjectInputStream(input); + jg = (JobGraph) obInput.readObject(); + input.close(); + } + } + if (jg == null) { + throw new Exception("Fail to load job graph " + jobGraphFile); + } + return jg; + } + + //------------------------------------------------------------------------------------- + // Fatal error handler + //------------------------------------------------------------------------------------- + + @Override + public void onFatalError(Throwable exception) { + LOG.error("Encountered fatal error.", exception); + + shutdown(ApplicationStatus.FAILED, exception.getMessage()); + } + + //---------------------------------------------------------------------------------------------- + // Result and error handling methods + //---------------------------------------------------------------------------------------------- + + /** + * Job completion notification triggered by JobManager + */ + @Override + public void jobFinished(JobExecutionResult result) { + shutdown(ApplicationStatus.SUCCEEDED, null); + } + + /** + * Job completion notification triggered by JobManager + */ + @Override + public void jobFailed(Throwable cause) { + shutdown(ApplicationStatus.FAILED, cause.getMessage()); + } + + /** + * Job completion notification triggered by self + */ + @Override + public void jobFinishedByOther() { + shutdown(ApplicationStatus.UNKNOWN, null); + } +}
