[FLINK-4929] [yarn] Implement FLIP-6 YARN TaskExecutor Runner Summary: Implement FLIP-6 YARN TaskExecutor Runner
Test Plan: NA Reviewers: biao.liub Differential Revision: http://phabricator.taobao.net/D6564 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/55e94c3c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/55e94c3c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/55e94c3c Branch: refs/heads/flip-6 Commit: 55e94c3c655dc73beaebfd13b83531194f0ae539 Parents: e8293bc Author: shuai.xus <[email protected]> Authored: Wed Nov 23 18:00:07 2016 +0800 Committer: Stephan Ewen <[email protected]> Committed: Mon Dec 5 02:49:44 2016 +0100 ---------------------------------------------------------------------- .../runtime/taskexecutor/TaskManagerRunner.java | 6 + .../flink/yarn/YarnTaskExecutorRunner.java | 257 +++++++++++++++++++ 2 files changed, 263 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/55e94c3c/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java index 1145a46..3500f6d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java @@ -23,6 +23,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; @@ -151,6 +152,11 @@ public class TaskManagerRunner implements FatalErrorHandler { } } + // export the termination future for caller to know it is terminated + public Future<Void> getTerminationFuture() { + return taskManager.getTerminationFuture(); + } + // -------------------------------------------------------------------------------------------- // FatalErrorHandler methods // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/55e94c3c/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java new file mode 100644 index 0000000..d9912eb --- /dev/null +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.security.SecurityContext; +import org.apache.flink.runtime.taskexecutor.TaskManagerRunner; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.JvmShutdownSafeguard; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.util.Preconditions; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.Map; + +/** + * This class is the executable entry point for running a TaskExecutor in a YARN container. + */ +public class YarnTaskExecutorRunner { + + /** Logger */ + protected static final Logger LOG = LoggerFactory.getLogger(YarnTaskExecutorRunner.class); + + /** The process environment variables */ + private static final Map<String, String> ENV = System.getenv(); + + /** The exit code returned if the initialization of the yarn task executor runner failed */ + private static final int INIT_ERROR_EXIT_CODE = 31; + + private MetricRegistry metricRegistry; + + private HighAvailabilityServices haServices; + + private RpcService taskExecutorRpcService; + + private TaskManagerRunner taskManagerRunner; + + // ------------------------------------------------------------------------ + // Program entry point + // ------------------------------------------------------------------------ + + /** + * The entry point for the YARN task executor runner. + * + * @param args The command line arguments. + */ + public static void main(String[] args) { + EnvironmentInformation.logEnvironmentInfo(LOG, "YARN TaskExecutor runner", args); + SignalHandler.register(LOG); + JvmShutdownSafeguard.installAsShutdownHook(LOG); + + // run and exit with the proper return code + int returnCode = new YarnTaskExecutorRunner().run(args); + System.exit(returnCode); + } + + /** + * The instance entry point for the YARN task executor. Obtains user group + * information and calls the main work method {@link #runTaskExecutor(org.apache.flink.configuration.Configuration)} as a + * privileged action. + * + * @param args The command line arguments. + * @return The process exit code. + */ + protected int run(String[] args) { + try { + LOG.debug("All environment variables: {}", ENV); + + final String yarnClientUsername = ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME); + final String localDirs = ENV.get(Environment.LOCAL_DIRS.key()); + LOG.info("Current working/local Directory: {}", localDirs); + + final String currDir = ENV.get(Environment.PWD.key()); + LOG.info("Current working Directory: {}", currDir); + + final String remoteKeytabPath = ENV.get(YarnConfigKeys.KEYTAB_PATH); + LOG.info("TM: remote keytab path obtained {}", remoteKeytabPath); + + final String remoteKeytabPrincipal = ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL); + LOG.info("TM: remote keytab principal obtained {}", remoteKeytabPrincipal); + + final Configuration configuration = GlobalConfiguration.loadConfiguration(currDir); + FileSystem.setDefaultScheme(configuration); + + // configure local directory + String flinkTempDirs = configuration.getString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, null); + if (flinkTempDirs == null) { + LOG.info("Setting directories for temporary file " + localDirs); + configuration.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, localDirs); + } + else { + LOG.info("Overriding YARN's temporary file directories with those " + + "specified in the Flink config: " + flinkTempDirs); + } + + // tell akka to die in case of an error + configuration.setBoolean(ConfigConstants.AKKA_JVM_EXIT_ON_FATAL_ERROR, true); + + String keytabPath = null; + if(remoteKeytabPath != null) { + File f = new File(currDir, Utils.KEYTAB_FILE_NAME); + keytabPath = f.getAbsolutePath(); + LOG.info("keytab path: {}", keytabPath); + } + + UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); + + LOG.info("YARN daemon is running as: {} Yarn client user obtainer: {}", + currentUser.getShortUserName(), yarnClientUsername); + + SecurityContext.SecurityConfiguration sc = new SecurityContext.SecurityConfiguration(); + + //To support Yarn Secure Integration Test Scenario + File krb5Conf = new File(currDir, Utils.KRB5_FILE_NAME); + if(krb5Conf.exists() && krb5Conf.canRead()) { + String krb5Path = krb5Conf.getAbsolutePath(); + LOG.info("KRB5 Conf: {}", krb5Path); + org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration(); + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true"); + sc.setHadoopConfiguration(conf); + } + + if(keytabPath != null && remoteKeytabPrincipal != null) { + configuration.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytabPath); + configuration.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, remoteKeytabPrincipal); + } + + SecurityContext.install(sc.setFlinkConfiguration(configuration)); + + return SecurityContext.getInstalled().runSecured(new SecurityContext.FlinkSecuredRunner<Integer>() { + @Override + public Integer run() { + return runTaskExecutor(configuration); + } + }); + + } + catch (Throwable t) { + // make sure that everything whatever ends up in the log + LOG.error("YARN Application Master initialization failed", t); + return INIT_ERROR_EXIT_CODE; + } + } + + // ------------------------------------------------------------------------ + // Core work method + // ------------------------------------------------------------------------ + + /** + * The main work method, must run as a privileged action. + * + * @return The return code for the Java process. + */ + protected int runTaskExecutor(Configuration config) { + + try { + // ---- (1) create common services + // first get the ResouceId, resource id is the container id for yarn. + final String containerId = ENV.get(YarnFlinkResourceManager.ENV_FLINK_CONTAINER_ID); + Preconditions.checkArgument(containerId != null, + "ContainerId variable %s not set", YarnFlinkResourceManager.ENV_FLINK_CONTAINER_ID); + // use the hostname passed by job manager + final String taskExecutorHostname = ENV.get(YarnResourceManager.ENV_FLINK_NODE_ID); + if (taskExecutorHostname != null) { + config.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, taskExecutorHostname); + } + + ResourceID resourceID = new ResourceID(containerId); + LOG.info("YARN assigned resource id {} for the task executor.", resourceID.toString()); + + haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(config); + metricRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config)); + + // ---- (2) init task manager runner ------- + taskExecutorRpcService = TaskManagerRunner.createRpcService(config, haServices); + taskManagerRunner = new TaskManagerRunner(config, resourceID, taskExecutorRpcService, haServices, metricRegistry); + + // ---- (3) start the task manager runner + taskManagerRunner.start(); + LOG.debug("YARN task executor started"); + + taskManagerRunner.getTerminationFuture().get(); + // everything started, we can wait until all is done or the process is killed + LOG.info("YARN task manager runner finished"); + shutdown(); + } + catch (Throwable t) { + // make sure that everything whatever ends up in the log + LOG.error("YARN task executor initialization failed", t); + shutdown(); + return INIT_ERROR_EXIT_CODE; + } + + return 0; + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + + protected void shutdown() { + if (taskExecutorRpcService != null) { + try { + taskExecutorRpcService.stopService(); + } catch (Throwable tt) { + LOG.error("Error shutting down job master rpc service", tt); + } + } + if (haServices != null) { + try { + haServices.shutdown(); + } catch (Throwable tt) { + LOG.warn("Failed to stop the HA service", tt); + } + } + if (metricRegistry != null) { + try { + metricRegistry.shutdown(); + } catch (Throwable tt) { + LOG.warn("Failed to stop the metrics registry", tt); + } + } + } + +}
