[
https://issues.apache.org/jira/browse/FLINK-4928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15690837#comment-15690837
]
ASF GitHub Bot commented on FLINK-4928:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/2744#discussion_r88944228
--- Diff:
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
---
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import
org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
+import
org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
+import
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * This class is the executable entry point for the YARN application
master.
+ * It starts actor system and the actors for {@link
org.apache.flink.runtime.jobmaster.JobMaster}
+ * and {@link org.apache.flink.yarn.YarnResourceManager}.
+ *
+ * The JobMasters handles Flink job execution, while the
YarnResourceManager handles container
+ * allocation and failure detection.
+ */
+public class YarnFlinkApplicationMasterRunner implements LeaderContender,
OnCompletionActions, FatalErrorHandler {
+
+ /** Logger */
+ protected static final Logger LOG =
LoggerFactory.getLogger(YarnFlinkApplicationMasterRunner.class);
+
+ /** The process environment variables */
+ private static final Map<String, String> ENV = System.getenv();
+
+ /** The exit code returned if the initialization of the application
master failed */
+ private static final int INIT_ERROR_EXIT_CODE = 31;
+
+ /** The job graph file path */
+ private static final String JOB_GRAPH_FILE_PATH = "flink.jobgraph.path";
+
+ /** The lock to guard startup / shutdown / manipulation methods */
+ private final Object lock = new Object();
+
+ @GuardedBy("lock")
+ private MetricRegistry metricRegistry;
+
+ @GuardedBy("lock")
+ private HighAvailabilityServices haServices;
+
+ @GuardedBy("lock")
+ private LeaderElectionService jmLeaderElectionService;
+
+ @GuardedBy("lock")
+ private RpcService jobMasterRpcService;
+
+ @GuardedBy("lock")
+ private RpcService resourceManagerRpcService;
+
+ @GuardedBy("lock")
+ private ResourceManager resourceManager;
+
+ @GuardedBy("lock")
+ private JobMaster jobMaster;
+
+ @GuardedBy("lock")
+ JobManagerServices jobManagerServices;
+
+ @GuardedBy("lock")
+ JobManagerMetricGroup jobManagerMetrics;
+
+ @GuardedBy("lock")
+ private JobGraph jobGraph;
+
+ /** Flag marking the app master runner as started/running */
+ private volatile boolean running;
+ //
------------------------------------------------------------------------
+ // Program entry point
+ //
------------------------------------------------------------------------
+
+ /**
+ * The entry point for the YARN application master.
+ *
+ * @param args The command line arguments.
+ */
+ public static void main(String[] args) {
+ EnvironmentInformation.logEnvironmentInfo(LOG, "YARN
ApplicationMaster runner", args);
+ SignalHandler.register(LOG);
+ JvmShutdownSafeguard.installAsShutdownHook(LOG);
+
+ // run and exit with the proper return code
+ int returnCode = new
YarnFlinkApplicationMasterRunner().run(args);
+ System.exit(returnCode);
+ }
+
+ /**
+ * The instance entry point for the YARN application master. Obtains
user group
+ * information and calls the main work method {@link
#runApplicationMaster(org.apache.flink.configuration.Configuration)} as a
+ * privileged action.
+ *
+ * @param args The command line arguments.
+ * @return The process exit code.
+ */
+ protected int run(String[] args) {
+ try {
+ LOG.debug("All environment variables: {}", ENV);
+
+ final String yarnClientUsername =
ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
+ require(yarnClientUsername != null, "YARN client user
name environment variable {} not set",
+ YarnConfigKeys.ENV_HADOOP_USER_NAME);
+
+ final String currDir = ENV.get(Environment.PWD.key());
+ require(currDir != null, "Current working directory
variable (%s) not set", Environment.PWD.key());
+ LOG.debug("Current working Directory: {}", currDir);
+
+ final String remoteKeytabPath =
ENV.get(YarnConfigKeys.KEYTAB_PATH);
+ LOG.debug("remoteKeytabPath obtained {}",
remoteKeytabPath);
--- End diff --
I think it would be better to use non camel case here and simply spell it
out since this is a logging statement which is user facing.
> Implement FLIP-6 YARN Application Master Runner
> -----------------------------------------------
>
> Key: FLINK-4928
> URL: https://issues.apache.org/jira/browse/FLINK-4928
> Project: Flink
> Issue Type: Sub-task
> Components: YARN
> Environment: {{flip-6}} feature branch
> Reporter: Stephan Ewen
> Assignee: shuai.xu
>
> The Application Master Runner is the master process started in a YARN
> container when submitting the Flink-on-YARN job to YARN.
> It has the following data available:
> - Flink jars
> - Job jars
> - JobGraph
> - Environment variables
> - Contextual information like security tokens and certificates
> Its responsibility is the following:
> - Read all configuration and environment variables, computing the effective
> configuration
> - Start all shared components (Rpc, HighAvailability Services)
> - Start the ResourceManager
> - Start the JobManager Runner
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)