[
https://issues.apache.org/jira/browse/FLINK-4928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15690839#comment-15690839
]
ASF GitHub Bot commented on FLINK-4928:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/2744#discussion_r88945488
--- Diff:
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
---
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import
org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
+import
org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
+import
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * This class is the executable entry point for the YARN application
master.
+ * It starts actor system and the actors for {@link
org.apache.flink.runtime.jobmaster.JobMaster}
+ * and {@link org.apache.flink.yarn.YarnResourceManager}.
+ *
+ * The JobMasters handles Flink job execution, while the
YarnResourceManager handles container
+ * allocation and failure detection.
+ */
+public class YarnFlinkApplicationMasterRunner implements LeaderContender,
OnCompletionActions, FatalErrorHandler {
+
+ /** Logger */
+ protected static final Logger LOG =
LoggerFactory.getLogger(YarnFlinkApplicationMasterRunner.class);
+
+ /** The process environment variables */
+ private static final Map<String, String> ENV = System.getenv();
+
+ /** The exit code returned if the initialization of the application
master failed */
+ private static final int INIT_ERROR_EXIT_CODE = 31;
+
+ /** The job graph file path */
+ private static final String JOB_GRAPH_FILE_PATH = "flink.jobgraph.path";
+
+ /** The lock to guard startup / shutdown / manipulation methods */
+ private final Object lock = new Object();
+
+ @GuardedBy("lock")
+ private MetricRegistry metricRegistry;
+
+ @GuardedBy("lock")
+ private HighAvailabilityServices haServices;
+
+ @GuardedBy("lock")
+ private LeaderElectionService jmLeaderElectionService;
+
+ @GuardedBy("lock")
+ private RpcService jobMasterRpcService;
+
+ @GuardedBy("lock")
+ private RpcService resourceManagerRpcService;
+
+ @GuardedBy("lock")
+ private ResourceManager resourceManager;
+
+ @GuardedBy("lock")
+ private JobMaster jobMaster;
+
+ @GuardedBy("lock")
+ JobManagerServices jobManagerServices;
+
+ @GuardedBy("lock")
+ JobManagerMetricGroup jobManagerMetrics;
+
+ @GuardedBy("lock")
+ private JobGraph jobGraph;
+
+ /** Flag marking the app master runner as started/running */
+ private volatile boolean running;
+ //
------------------------------------------------------------------------
+ // Program entry point
+ //
------------------------------------------------------------------------
+
+ /**
+ * The entry point for the YARN application master.
+ *
+ * @param args The command line arguments.
+ */
+ public static void main(String[] args) {
+ EnvironmentInformation.logEnvironmentInfo(LOG, "YARN
ApplicationMaster runner", args);
+ SignalHandler.register(LOG);
+ JvmShutdownSafeguard.installAsShutdownHook(LOG);
+
+ // run and exit with the proper return code
+ int returnCode = new
YarnFlinkApplicationMasterRunner().run(args);
+ System.exit(returnCode);
+ }
+
+ /**
+ * The instance entry point for the YARN application master. Obtains
user group
+ * information and calls the main work method {@link
#runApplicationMaster(org.apache.flink.configuration.Configuration)} as a
+ * privileged action.
+ *
+ * @param args The command line arguments.
+ * @return The process exit code.
+ */
+ protected int run(String[] args) {
+ try {
+ LOG.debug("All environment variables: {}", ENV);
+
+ final String yarnClientUsername =
ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
+ require(yarnClientUsername != null, "YARN client user
name environment variable {} not set",
+ YarnConfigKeys.ENV_HADOOP_USER_NAME);
+
+ final String currDir = ENV.get(Environment.PWD.key());
+ require(currDir != null, "Current working directory
variable (%s) not set", Environment.PWD.key());
+ LOG.debug("Current working Directory: {}", currDir);
+
+ final String remoteKeytabPath =
ENV.get(YarnConfigKeys.KEYTAB_PATH);
+ LOG.debug("remoteKeytabPath obtained {}",
remoteKeytabPath);
+
+ final String remoteKeytabPrincipal =
ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
+ LOG.info("remoteKeytabPrincipal obtained {}",
remoteKeytabPrincipal);
+
+ String keytabPath = null;
+ if(remoteKeytabPath != null) {
+ File f = new File(currDir,
Utils.KEYTAB_FILE_NAME);
+ keytabPath = f.getAbsolutePath();
+ LOG.debug("keytabPath: {}", keytabPath);
+ }
+
+ UserGroupInformation currentUser =
UserGroupInformation.getCurrentUser();
+
+ LOG.info("YARN daemon is running as: {} Yarn client
user obtainer: {}",
+ currentUser.getShortUserName(),
yarnClientUsername );
+
+ SecurityContext.SecurityConfiguration sc = new
SecurityContext.SecurityConfiguration();
+
+ //To support Yarn Secure Integration Test Scenario
+ File krb5Conf = new File(currDir, Utils.KRB5_FILE_NAME);
+ if(krb5Conf.exists() && krb5Conf.canRead()) {
+ String krb5Path = krb5Conf.getAbsolutePath();
+ LOG.info("KRB5 Conf: {}", krb5Path);
+ org.apache.hadoop.conf.Configuration conf = new
org.apache.hadoop.conf.Configuration();
+
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
"kerberos");
+
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");
+ sc.setHadoopConfiguration(conf);
+ }
+
+ // Flink configuration
+ final Map<String, String> dynamicProperties =
+
FlinkYarnSessionCli.getDynamicProperties(ENV.get(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES));
+ LOG.debug("YARN dynamic properties: {}",
dynamicProperties);
+
+ final Configuration flinkConfig =
createConfiguration(currDir, dynamicProperties);
+ if(keytabPath != null && remoteKeytabPrincipal != null)
{
+
flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytabPath);
+
flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY,
remoteKeytabPrincipal);
+ }
+
+
SecurityContext.install(sc.setFlinkConfiguration(flinkConfig));
+
+ return SecurityContext.getInstalled().runSecured(new
SecurityContext.FlinkSecuredRunner<Integer>() {
+ @Override
+ public Integer run() {
+ return
runApplicationMaster(flinkConfig);
+ }
+ });
+
+ }
+ catch (Throwable t) {
+ // make sure that everything whatever ends up in the log
+ LOG.error("YARN Application Master initialization
failed", t);
+ return INIT_ERROR_EXIT_CODE;
+ }
+ }
+
+ //
------------------------------------------------------------------------
+ // Core work method
+ //
------------------------------------------------------------------------
+
+ /**
+ * The main work method, must run as a privileged action.
+ *
+ * @return The return code for the Java process.
+ */
+ protected int runApplicationMaster(Configuration config) {
+
+ try {
+ // ---- (1) create common services
+ // Note that we use the "appMasterHostname" given by
YARN here, to make sure
+ // we use the hostnames given by YARN consistently
throughout akka.
+ // for akka "localhost" and "localhost.localdomain" are
different actors.
+ final String appMasterHostname =
ENV.get(Environment.NM_HOST.key());
+ require(appMasterHostname != null,
+ "ApplicationMaster hostname variable %s
not set", Environment.NM_HOST.key());
+ LOG.info("YARN assigned hostname for application
master: {}", appMasterHostname);
+
+ // try to start the rpc service
+ // using the port range definition from the config.
+ final String amPortRange = config.getString(
+
ConfigConstants.YARN_APPLICATION_MASTER_PORT,
+
ConfigConstants.DEFAULT_YARN_JOB_MANAGER_PORT);
+
+ synchronized (lock) {
+ haServices =
HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(config);
+ metricRegistry = new
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
+
+ // ---- (2) init resource manager -------
+ resourceManagerRpcService =
createRpcService(config, appMasterHostname, amPortRange);
+ resourceManager = createResourceManager(config);
+
+ // ---- (3) init job master parameters
+ jobMasterRpcService = createRpcService(config,
appMasterHostname, amPortRange);
+ jobManagerServices =
JobManagerServices.fromConfiguration(config, haServices);
+ jobManagerMetrics = new
JobManagerMetricGroup(metricRegistry, jobMasterRpcService.getAddress());
+ jobMaster = createJobMaster(config);
+
+ // ---- (4) start the resource manager and job
master:
+ resourceManager.start();
+ LOG.debug("YARN Flink Resource Manager
started");
+
+ // mark the job as running in the HA services
+ try {
+
haServices.getRunningJobsRegistry().setJobRunning(jobGraph.getJobID());
+ }
+ catch (Throwable t) {
+ throw new
JobExecutionException(jobGraph.getJobID(),
+ "Could not register the
job at the high-availability services", t);
+ }
+ jmLeaderElectionService.start(this);
+
+ // ---- (5) start the web monitor
+ // TODO: add web monitor
+ }
+ running = true;
+ while (running) {
+ Thread.sleep(100);
+ }
+ // everything started, we can wait until all is done or
the process is killed
+ LOG.info("YARN Application Master finished");
+ }
+ catch (Throwable t) {
+ // make sure that everything whatever ends up in the log
+ LOG.error("YARN Application Master initialization
failed", t);
+ shutdown(ApplicationStatus.FAILED, t.getMessage());
+ return INIT_ERROR_EXIT_CODE;
+ }
+
+ return 0;
+ }
+
+ //
------------------------------------------------------------------------
+ // Utilities
+ //
------------------------------------------------------------------------
+
+ /**
+ * Validates a condition, throwing a RuntimeException if the condition
is violated.
+ *
+ * @param condition The condition.
+ * @param message The message for the runtime exception, with format
variables as defined by
+ * {@link String#format(String,
Object...)}.
+ * @param values The format arguments.
+ */
+ private static void require(boolean condition, String message,
Object... values) {
+ if (!condition) {
+ throw new RuntimeException(String.format(message,
values));
+ }
+ }
--- End diff --
Better to use Flink's `Preconditions#checkArgument/checkState` class.
> Implement FLIP-6 YARN Application Master Runner
> -----------------------------------------------
>
> Key: FLINK-4928
> URL: https://issues.apache.org/jira/browse/FLINK-4928
> Project: Flink
> Issue Type: Sub-task
> Components: YARN
> Environment: {{flip-6}} feature branch
> Reporter: Stephan Ewen
> Assignee: shuai.xu
>
> The Application Master Runner is the master process started in a YARN
> container when submitting the Flink-on-YARN job to YARN.
> It has the following data available:
> - Flink jars
> - Job jars
> - JobGraph
> - Environment variables
> - Contextual information like security tokens and certificates
> Its responsibility is the following:
> - Read all configuration and environment variables, computing the effective
> configuration
> - Start all shared components (Rpc, HighAvailability Services)
> - Start the ResourceManager
> - Start the JobManager Runner
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)