[
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206625#comment-15206625
]
ASF GitHub Bot commented on FLINK-3544:
---------------------------------------
Github user mxm commented on a diff in the pull request:
https://github.com/apache/flink/pull/1741#discussion_r57011989
--- Diff:
flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
---
@@ -0,0 +1,601 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+
+import org.apache.flink.client.CliFrontend;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.process.ProcessReaper;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.runtime.webmonitor.WebMonitor;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.Records;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedAction;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class is the executable entry point for the YARN application
master.
+ * It starts actor system and the actors for {@link
org.apache.flink.runtime.jobmanager.JobManager}
+ * and {@link YarnFlinkResourceManager}.
+ *
+ * The JobManager handles Flink job execution, while the
YarnFlinkResourceManager handles container
+ * allocation and failure detection.
+ */
+public class YarnApplicationMasterRunner {
+
+ /** Logger */
+ protected static final Logger LOG =
LoggerFactory.getLogger(YarnApplicationMasterRunner.class);
+
+ /** The maximum time that TaskManagers may be waiting to register at
the JobManager,
+ * before they quit */
+ private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT =
new FiniteDuration(5, TimeUnit.MINUTES);
+
+ /** The process environment variables */
+ private static final Map<String, String> ENV = System.getenv();
+
+ /** The exit code returned if the initialization of the application
master failed */
+ private static final int INIT_ERROR_EXIT_CODE = 31;
+
+ /** The exit code returned if the process exits because a critical
actor died */
+ private static final int ACTOR_DIED_EXIT_CODE = 32;
+
+
+ //
------------------------------------------------------------------------
+ // Program entry point
+ //
------------------------------------------------------------------------
+
+ /**
+ * The entry point for the YARN application master.
+ *
+ * @param args The command line arguments.
+ */
+ public static void main(String[] args) {
+ EnvironmentInformation.logEnvironmentInfo(LOG, "YARN
ApplicationMaster / JobManager", args);
+ SignalHandler.register(LOG);
+
+ // run and exit with the proper return code
+ int returnCode = new YarnApplicationMasterRunner().run(args);
+ System.exit(returnCode);
+ }
+
+ /**
+ * The instance entry point for the YARN application master. Obtains
user group
+ * information and calls the main work method {@link
#runApplicationMaster()} as a
+ * privileged action.
+ *
+ * @param args The command line arguments.
+ * @return The process exit code.
+ */
+ protected int run(String[] args) {
+ try {
+ LOG.debug("All environment variables: {}", ENV);
+
+ final String yarnClientUsername =
ENV.get(YarnConfigKeys.ENV_CLIENT_USERNAME);
+ require(yarnClientUsername != null, "YARN client user
name environment variable {} not set",
+ YarnConfigKeys.ENV_CLIENT_USERNAME);
+
+ final UserGroupInformation currentUser;
+ try {
+ currentUser =
UserGroupInformation.getCurrentUser();
+ } catch (Throwable t) {
+ throw new Exception("Cannot access
UserGroupInformation information for current user", t);
+ }
+
+ LOG.info("YARN daemon runs as user {}. Running Flink
Application Master/JobManager as user {}",
+ currentUser.getShortUserName(),
yarnClientUsername);
+
+ UserGroupInformation ugi =
UserGroupInformation.createRemoteUser(yarnClientUsername);
+
+ // transfer all security tokens, for example for
authenticated HDFS and HBase access
+ for (Token<?> token : currentUser.getTokens()) {
+ ugi.addToken(token);
+ }
+
+ // run the actual work in a secured privileged action
+ return ugi.doAs(new PrivilegedAction<Integer>() {
+ @Override
+ public Integer run() {
+ return runApplicationMaster();
+ }
+ });
+ }
+ catch (Throwable t) {
+ // make sure that everything whatever ends up in the log
+ LOG.error("YARN Application Master initialization
failed", t);
+ return INIT_ERROR_EXIT_CODE;
+ }
+ }
+
+ //
------------------------------------------------------------------------
+ // Core work method
+ //
------------------------------------------------------------------------
+
+ /**
+ * The main work method, must run as a privileged action.
+ *
+ * @return The return code for the Java process.
+ */
+ protected int runApplicationMaster() {
+ ActorSystem actorSystem = null;
+ WebMonitor webMonitor = null;
+
+ try {
+ // ------- (1) load and parse / validate all
configurations -------
+
+ // loading all config values here has the advantage
that the program fails fast, if any
+ // configuration problem occurs
+
+ final String currDir = ENV.get(Environment.PWD.key());
+ require(currDir != null, "Current working directory
variable (%s) not set", Environment.PWD.key());
+
+ // Note that we use the "appMasterHostname" given by
YARN here, to make sure
+ // we use the hostnames given by YARN consistently
throughout akka.
+ // for akka "localhost" and "localhost.localdomain" are
different actors.
+ final String appMasterHostname =
ENV.get(Environment.NM_HOST.key());
+ require(appMasterHostname != null,
+ "ApplicationMaster hostname variable %s not
set", Environment.NM_HOST.key());
+
+ LOG.info("YARN assigned hostname for application
master: {}", appMasterHostname);
+
+ // Flink configuration
+ final Map<String, String> dynamicProperties =
+
CliFrontend.getDynamicProperties(ENV.get(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES));
+ LOG.debug("YARN dynamic properties: {}",
dynamicProperties);
+
+ final Configuration config =
createConfiguration(currDir, dynamicProperties);
+
+ // Hadoop/Yarn configuration (loads config data
automatically from classpath files)
+ final YarnConfiguration yarnConfig = new
YarnConfiguration();
+
+ final int taskManagerContainerMemory;
+ final int numInitialTaskManagers;
+ final int slotsPerTaskManager;
+
+ try {
+ taskManagerContainerMemory =
Integer.parseInt(ENV.get(YarnConfigKeys.ENV_TM_MEMORY));
+ } catch (NumberFormatException e) {
+ throw new RuntimeException("Invalid value for "
+ YarnConfigKeys.ENV_TM_MEMORY + " : "
+ + e.getMessage());
+ }
+ try {
+ numInitialTaskManagers =
Integer.parseInt(ENV.get(YarnConfigKeys.ENV_TM_COUNT));
+ } catch (NumberFormatException e) {
+ throw new RuntimeException("Invalid value for "
+ YarnConfigKeys.ENV_TM_COUNT + " : "
+ + e.getMessage());
+ }
+ try {
+ slotsPerTaskManager =
Integer.parseInt(ENV.get(YarnConfigKeys.ENV_SLOTS));
+ } catch (NumberFormatException e) {
+ throw new RuntimeException("Invalid value for "
+ YarnConfigKeys.ENV_SLOTS + " : "
+ + e.getMessage());
+ }
+
+ final ContaineredTaskManagerParameters
taskManagerParameters =
+ ContaineredTaskManagerParameters.create(config,
taskManagerContainerMemory, slotsPerTaskManager);
+
+ LOG.info("TaskManagers will be created with {} task
slots", taskManagerParameters.numSlots());
+ LOG.info("TaskManagers will be started with container
size {} MB, JVM heap size {} MB, " +
+ "JVM direct memory limit {} MB",
+
taskManagerParameters.taskManagerTotalMemoryMB(),
+ taskManagerParameters.taskManagerHeapSizeMB(),
+
taskManagerParameters.taskManagerDirectMemoryLimitMB());
+
+
+ // ----------------- (2) start the actor system
-------------------
+
+ // try to start the actor system, JobManager and
JobManager actor system
+ // using the port range definition from the config.
+ final String amPortRange = config.getString(
+
ConfigConstants.YARN_APPLICATION_MASTER_PORT,
+
ConfigConstants.DEFAULT_YARN_JOB_MANAGER_PORT);
+
+ actorSystem = BootstrapTools.startActorSystem(config,
appMasterHostname, amPortRange, LOG);
+
+ final String akkaHostname =
AkkaUtils.getAddress(actorSystem).host().get();
+ final int akkaPort = (Integer)
AkkaUtils.getAddress(actorSystem).port().get();
+
+ LOG.info("Actor system bound to hostname {}.",
akkaHostname);
+
+
+ // ---- (3) Generate the configuration for the
TaskManagers
+
+ final Configuration taskManagerConfig =
BootstrapTools.generateTaskManagerConfiguration(
+ config, akkaHostname, akkaPort,
slotsPerTaskManager, TASKMANAGER_REGISTRATION_TIMEOUT);
+ LOG.debug("TaskManager configuration: {}",
taskManagerConfig);
+
+ final ContainerLaunchContext taskManagerContext =
createTaskManagerContext(
+ config, yarnConfig, ENV,
+ taskManagerParameters, taskManagerConfig,
+ currDir, getTaskManagerClass(), LOG);
+
+
+ // ---- (4) start the actors and components in this
order:
+
+ // 1) JobManager & Archive (in non-HA case, the leader
service takes this)
+ // 2) Web Monitor (we need its port to register)
+ // 3) Resource Master for YARN
+ // 4) Process reapers for the JobManager and Resource
Master
+
+
+ // 1: the JobManager
+ LOG.debug("Starting JobManager actor");
+
+ // we start the JobManager with its standard name
+ ActorRef jobManager = JobManager.startJobManagerActors(
+ config, actorSystem,
+ new scala.Some<>(JobManager.JOB_MANAGER_NAME()),
+ scala.Option.<String>empty(),
+ getJobManagerClass(),
+ getArchivistClass())._1();
+
+
+ // 2: the web monitor
+ LOG.debug("Starting Web Frontend");
+
+ webMonitor =
BootstrapTools.startWebMonitorIfConfigured(config, actorSystem, jobManager,
LOG);
+ final String webMonitorURL = webMonitor == null ? null :
+ "http://" + appMasterHostname + ":" +
webMonitor.getServerPort();
+
+ // 3: Flink's Yarn resource manager
+ LOG.debug("Starting YARN Flink Resource Manager");
+
+ // we need the leader retrieval service here to be
informed of new
+ // leader session IDs, even though there can be only
one leader ever
+ LeaderRetrievalService leaderRetriever =
+
LeaderRetrievalUtils.createLeaderRetrievalService(config, jobManager);
+
+ Props resourceMasterProps =
YarnFlinkResourceManager.createActorProps(
+ getResourceManagerClass(),
+ config,
+ yarnConfig,
+ leaderRetriever,
+ appMasterHostname,
+ webMonitorURL,
+ taskManagerParameters,
+ taskManagerContext,
+ numInitialTaskManagers,
+ LOG);
+
+ ActorRef resourceMaster =
actorSystem.actorOf(resourceMasterProps);
+
+
+ // 4: Process reapers
+ // The process reapers ensure that upon unexpected
actor death, the process exits
+ // and does not stay lingering around unresponsive
+
+ LOG.debug("Starting process reapers for JobManager and
YARN Application Master");
+
+ actorSystem.actorOf(
+ Props.create(ProcessReaper.class,
resourceMaster, LOG, ACTOR_DIED_EXIT_CODE),
+ "YARN_Resource_Master_Process_Reaper");
+
+ actorSystem.actorOf(
+ Props.create(ProcessReaper.class, jobManager,
LOG, ACTOR_DIED_EXIT_CODE),
+ "JobManager_Process_Reaper");
+ }
+ catch (Throwable t) {
+ // make sure that everything whatever ends up in the log
+ LOG.error("YARN Application Master initialization
failed", t);
+
+ if (actorSystem != null) {
+ try {
+ actorSystem.shutdown();
+ } catch (Throwable tt) {
+ LOG.error("Error shutting down actor
system", tt);
+ }
+ }
+
+ if (webMonitor != null) {
+ try {
+ webMonitor.stop();
+ } catch (Throwable ignored) {}
--- End diff --
Doesn't that tend to hide the original error we caught? The regular
shutdown should of course log errors of `stop()`.
> ResourceManager runtime components
> ----------------------------------
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
> Issue Type: Sub-task
> Components: ResourceManager
> Affects Versions: 1.1.0
> Reporter: Maximilian Michels
> Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)