[
https://issues.apache.org/jira/browse/FLINK-2790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14946651#comment-14946651
]
ASF GitHub Bot commented on FLINK-2790:
---------------------------------------
Github user uce commented on a diff in the pull request:
https://github.com/apache/flink/pull/1213#discussion_r41374754
--- Diff:
flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java ---
@@ -0,0 +1,867 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.CliFrontend;
+import org.apache.flink.client.FlinkYarnSessionCli;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import
org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.Records;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+* All classes in this package contain code taken from
+*
https://github.com/apache/hadoop-common/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java?source=cc
+* and
+* https://github.com/hortonworks/simple-yarn-app
+* and
+*
https://github.com/yahoo/storm-yarn/blob/master/src/main/java/com/yahoo/storm/yarn/StormOnYarn.java
+*
+* The Flink jar is uploaded to HDFS by this client.
+* The application master and all the TaskManager containers get the jar
file downloaded
+* by YARN into their local fs.
+*
+*/
+public abstract class FlinkYarnClientBase extends AbstractFlinkYarnClient {
+ private static final Logger LOG =
LoggerFactory.getLogger(FlinkYarnClient.class);
+
+ /**
+ * Constants,
+ * all starting with ENV_ are used as environment variables to pass
values from the Client
+ * to the Application Master.
+ */
+ public final static String ENV_TM_MEMORY = "_CLIENT_TM_MEMORY";
+ public final static String ENV_TM_COUNT = "_CLIENT_TM_COUNT";
+ public final static String ENV_APP_ID = "_APP_ID";
+ public final static String FLINK_JAR_PATH = "_FLINK_JAR_PATH"; // the
Flink jar resource location (in HDFS).
+ public static final String ENV_CLIENT_HOME_DIR = "_CLIENT_HOME_DIR";
+ public static final String ENV_CLIENT_SHIP_FILES = "_CLIENT_SHIP_FILES";
+ public static final String ENV_CLIENT_USERNAME = "_CLIENT_USERNAME";
+ public static final String ENV_SLOTS = "_SLOTS";
+ public static final String ENV_DETACHED = "_DETACHED";
+ public static final String ENV_STREAMING_MODE = "_STREAMING_MODE";
+ public static final String ENV_DYNAMIC_PROPERTIES =
"_DYNAMIC_PROPERTIES";
+
+
+ /**
+ * Minimum memory requirements, checked by the Client.
+ */
+ private static final int MIN_JM_MEMORY = 768; // the minimum memory
should be higher than the min heap cutoff
+ private static final int MIN_TM_MEMORY = 768;
+
+ private Configuration conf;
+ private YarnClient yarnClient;
+ private YarnClientApplication yarnApplication;
+
+
+ /**
+ * Files (usually in a distributed file system) used for the YARN
session of Flink.
+ * Contains configuration files and jar files.
+ */
+ private Path sessionFilesDir;
+
+ /**
+ * If the user has specified a different number of slots, we store them
here
+ */
+ private int slots = -1;
+
+ private int jobManagerMemoryMb = 1024;
+
+ private int taskManagerMemoryMb = 1024;
+
+ private int taskManagerCount = 1;
+
+ private String yarnQueue = null;
+
+ private String configurationDirectory;
+
+ private Path flinkConfigurationPath;
+
+ private Path flinkLoggingConfigurationPath; // optional
+
+ private Path flinkJarPath;
+
+ private String dynamicPropertiesEncoded;
+
+ private List<File> shipFiles = new ArrayList<File>();
+ private org.apache.flink.configuration.Configuration flinkConfiguration;
+
+ private boolean detached;
+ private boolean streamingMode;
+
+ private String customName = null;
+
+ public FlinkYarnClientBase() {
+ conf = new YarnConfiguration();
+ if(this.yarnClient == null) {
+ // Create yarnClient
+ yarnClient = YarnClient.createYarnClient();
+ yarnClient.init(conf);
+ yarnClient.start();
+ }
+
+ // for unit tests only
+ if(System.getenv("IN_TESTS") != null) {
+ try {
+ conf.addResource(new
File(System.getenv("YARN_CONF_DIR") + "/yarn-site.xml").toURI().toURL());
+ } catch (Throwable t) {
+ throw new RuntimeException("Error",t);
+ }
+ }
+ }
+
+ protected abstract Class<?> getApplicationMasterClass();
+
+ @Override
+ public void setJobManagerMemory(int memoryMb) {
+ if(memoryMb < MIN_JM_MEMORY) {
+ throw new IllegalArgumentException("The JobManager
memory (" + memoryMb + ") is below the minimum required memory amount "
+ + "of " + MIN_JM_MEMORY+ " MB");
+ }
+ this.jobManagerMemoryMb = memoryMb;
+ }
+
+ @Override
+ public void setTaskManagerMemory(int memoryMb) {
+ if(memoryMb < MIN_TM_MEMORY) {
+ throw new IllegalArgumentException("The TaskManager
memory (" + memoryMb + ") is below the minimum required memory amount "
+ + "of " + MIN_TM_MEMORY+ " MB");
+ }
+ this.taskManagerMemoryMb = memoryMb;
+ }
+
+ @Override
+ public void
setFlinkConfigurationObject(org.apache.flink.configuration.Configuration conf) {
+ this.flinkConfiguration = conf;
+ }
+
+ @Override
+ public void setTaskManagerSlots(int slots) {
+ if(slots <= 0) {
+ throw new IllegalArgumentException("Number of
TaskManager slots must be positive");
+ }
+ this.slots = slots;
+ }
+
+ @Override
+ public int getTaskManagerSlots() {
+ return this.slots;
+ }
+
+ @Override
+ public void setQueue(String queue) {
+ this.yarnQueue = queue;
+ }
+
+ @Override
+ public void setLocalJarPath(Path localJarPath) {
+ if(!localJarPath.toString().endsWith("jar")) {
+ throw new IllegalArgumentException("The passed jar path
('" + localJarPath + "') does not end with the 'jar' extension");
+ }
+ this.flinkJarPath = localJarPath;
+ }
+
+ @Override
+ public void setConfigurationFilePath(Path confPath) {
+ flinkConfigurationPath = confPath;
+ }
+
+ public void setConfigurationDirectory(String configurationDirectory) {
+ this.configurationDirectory = configurationDirectory;
+ }
+
+ @Override
+ public void setFlinkLoggingConfigurationPath(Path logConfPath) {
+ flinkLoggingConfigurationPath = logConfPath;
+ }
+
+ @Override
+ public Path getFlinkLoggingConfigurationPath() {
+ return flinkLoggingConfigurationPath;
+ }
+
+ @Override
+ public void setTaskManagerCount(int tmCount) {
+ if(tmCount < 1) {
+ throw new IllegalArgumentException("The TaskManager
count has to be at least 1.");
+ }
+ this.taskManagerCount = tmCount;
+ }
+
+ @Override
+ public int getTaskManagerCount() {
+ return this.taskManagerCount;
+ }
+
+ @Override
+ public void setShipFiles(List<File> shipFiles) {
+ for(File shipFile: shipFiles) {
+ // remove uberjar from ship list (by default everything
in the lib/ folder is added to
+ // the list of files to ship, but we handle the uberjar
separately.
+ if(!(shipFile.getName().startsWith("flink-dist-") &&
shipFile.getName().endsWith("jar"))) {
+ this.shipFiles.add(shipFile);
+ }
+ }
+ }
+
+ public void setDynamicPropertiesEncoded(String
dynamicPropertiesEncoded) {
+ this.dynamicPropertiesEncoded = dynamicPropertiesEncoded;
+ }
+
+ @Override
+ public String getDynamicPropertiesEncoded() {
+ return this.dynamicPropertiesEncoded;
+ }
+
+
+ public void isReadyForDeployment() throws YarnDeploymentException {
+ if(taskManagerCount <= 0) {
+ throw new YarnDeploymentException("Taskmanager count
must be positive");
+ }
+ if(this.flinkJarPath == null) {
+ throw new YarnDeploymentException("The Flink jar path
is null");
+ }
+ if(this.configurationDirectory == null) {
+ throw new YarnDeploymentException("Configuration
directory not set");
+ }
+ if(this.flinkConfigurationPath == null) {
+ throw new YarnDeploymentException("Configuration path
not set");
+ }
+ if(this.flinkConfiguration == null) {
+ throw new YarnDeploymentException("Flink configuration
object has not been set");
+ }
+
+ // check if required Hadoop environment variables are set. If
not, warn user
+ if(System.getenv("HADOOP_CONF_DIR") == null &&
+ System.getenv("YARN_CONF_DIR") == null) {
+ LOG.warn("Neither the HADOOP_CONF_DIR nor the
YARN_CONF_DIR environment variable is set." +
+ "The Flink YARN Client needs one of these to be
set to properly load the Hadoop " +
+ "configuration for accessing YARN.");
+ }
+ }
+
+ public static boolean allocateResource(int[] nodeManagers, int
toAllocate) {
+ for(int i = 0; i < nodeManagers.length; i++) {
+ if(nodeManagers[i] >= toAllocate) {
+ nodeManagers[i] -= toAllocate;
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public void setDetachedMode(boolean detachedMode) {
+ this.detached = detachedMode;
+ }
+
+ @Override
+ public boolean isDetached() {
+ return detached;
+ }
+
+ public AbstractFlinkYarnCluster deploy() throws Exception {
+
+ UserGroupInformation.setConfiguration(conf);
+ UserGroupInformation ugi =
UserGroupInformation.getCurrentUser();
+
+ if (UserGroupInformation.isSecurityEnabled()) {
+ if (!ugi.hasKerberosCredentials()) {
+ throw new YarnDeploymentException("In secure
mode. Please provide Kerberos credentials in order to authenticate. " +
+ "You may use kinit to authenticate and
request a TGT from the Kerberos server.");
+ }
+ return ugi.doAs(new
PrivilegedExceptionAction<AbstractFlinkYarnCluster>() {
+ @Override
+ public AbstractFlinkYarnCluster run() throws
Exception {
+ return deployInternal();
+ }
+ });
+ } else {
+ return deployInternal();
+ }
+ }
+
+
+
+ /**
+ * This method will block until the ApplicationMaster/JobManager have
been
+ * deployed on YARN.
+ */
+ protected AbstractFlinkYarnCluster deployInternal() throws Exception {
+ isReadyForDeployment();
+
+ LOG.info("Using values:");
+ LOG.info("\tTaskManager count = {}", taskManagerCount);
+ LOG.info("\tJobManager memory = {}", jobManagerMemoryMb);
+ LOG.info("\tTaskManager memory = {}", taskManagerMemoryMb);
+
+ // Create application via yarnClient
+ yarnApplication = yarnClient.createApplication();
+ GetNewApplicationResponse appResponse =
yarnApplication.getNewApplicationResponse();
+
+ // ------------------ Add dynamic properties to local
flinkConfiguraton ------
+
+ List<Tuple2<String, String>> dynProperties =
CliFrontend.getDynamicProperties(dynamicPropertiesEncoded);
+ for (Tuple2<String, String> dynProperty : dynProperties) {
+ flinkConfiguration.setString(dynProperty.f0,
dynProperty.f1);
+ }
+
+ // ------------------ Check if the specified queue exists
--------------
+
+ try {
+ List<QueueInfo> queues = yarnClient.getAllQueues();
+ if (queues.size() > 0 && this.yarnQueue != null) { //
check only if there are queues configured in yarn and for this session.
+ boolean queueFound = false;
+ for (QueueInfo queue : queues) {
+ if
(queue.getQueueName().equals(this.yarnQueue)) {
+ queueFound = true;
+ break;
+ }
+ }
+ if (!queueFound) {
+ String queueNames = "";
+ for (QueueInfo queue : queues) {
+ queueNames +=
queue.getQueueName() + ", ";
+ }
+ LOG.warn("The specified queue '" +
this.yarnQueue + "' does not exist. " +
+ "Available queues: " +
queueNames);
+ }
+ } else {
+ LOG.debug("The YARN cluster does not have any
queues configured");
+ }
+ } catch(Throwable e) {
+ LOG.warn("Error while getting queue information from
YARN: " + e.getMessage());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Error details", e);
+ }
+ }
+
+ // ------------------ Check if the YARN Cluster has the
requested resources --------------
+
+ // the yarnMinAllocationMB specifies the smallest possible
container allocation size.
+ // all allocations below this value are automatically set to
this value.
+ final int yarnMinAllocationMB =
conf.getInt("yarn.scheduler.minimum-allocation-mb", 0);
+ if(jobManagerMemoryMb < yarnMinAllocationMB ||
taskManagerMemoryMb < yarnMinAllocationMB) {
+ LOG.warn("The JobManager or TaskManager memory is below
the smallest possible YARN Container size. "
+ + "The value of
'yarn.scheduler.minimum-allocation-mb' is '" + yarnMinAllocationMB + "'. Please
increase the memory size." +
+ "YARN will allocate the smaller containers but
the scheduler will account for the minimum-allocation-mb, maybe not all
instances " +
+ "you requested will start.");
+ }
+
+ // set the memory to minAllocationMB to do the next checks
correctly
+ if(jobManagerMemoryMb < yarnMinAllocationMB) {
+ jobManagerMemoryMb = yarnMinAllocationMB;
+ }
+ if(taskManagerMemoryMb < yarnMinAllocationMB) {
+ taskManagerMemoryMb = yarnMinAllocationMB;
+ }
+
+ Resource maxRes = appResponse.getMaximumResourceCapability();
+ final String NOTE = "Please check the
'yarn.scheduler.maximum-allocation-mb' and the
'yarn.nodemanager.resource.memory-mb' configuration values\n";
+ if(jobManagerMemoryMb > maxRes.getMemory() ) {
+ failSessionDuringDeployment();
+ throw new YarnDeploymentException("The cluster does not
have the requested resources for the JobManager available!\n"
+ + "Maximum Memory: " + maxRes.getMemory() + "MB
Requested: " + jobManagerMemoryMb + "MB. " + NOTE);
+ }
+
+ if(taskManagerMemoryMb > maxRes.getMemory() ) {
+ failSessionDuringDeployment();
+ throw new YarnDeploymentException("The cluster does not
have the requested resources for the TaskManagers available!\n"
+ + "Maximum Memory: " + maxRes.getMemory() + "
Requested: " + taskManagerMemoryMb + "MB. " + NOTE);
+ }
+
+ final String NOTE_RSC = "\nThe Flink YARN client will try to
allocate the YARN session, but maybe not all TaskManagers are " +
+ "connecting from the beginning because the resources
are currently not available in the cluster. " +
+ "The allocation might take more time than usual because
the Flink YARN client needs to wait until " +
+ "the resources become available.";
+ int totalMemoryRequired = jobManagerMemoryMb +
taskManagerMemoryMb * taskManagerCount;
+ ClusterResourceDescription freeClusterMem =
getCurrentFreeClusterResources(yarnClient);
+ if(freeClusterMem.totalFreeMemory < totalMemoryRequired) {
+ LOG.warn("This YARN session requires " +
totalMemoryRequired + "MB of memory in the cluster. "
+ + "There are currently only " +
freeClusterMem.totalFreeMemory + "MB available." + NOTE_RSC);
+
+ }
+ if(taskManagerMemoryMb > freeClusterMem.containerLimit) {
+ LOG.warn("The requested amount of memory for the
TaskManagers (" + taskManagerMemoryMb + "MB) is more than "
+ + "the largest possible YARN container: " +
freeClusterMem.containerLimit + NOTE_RSC);
+ }
+ if(jobManagerMemoryMb > freeClusterMem.containerLimit) {
+ LOG.warn("The requested amount of memory for the
JobManager (" + jobManagerMemoryMb + "MB) is more than "
+ + "the largest possible YARN container: " +
freeClusterMem.containerLimit + NOTE_RSC);
+ }
+
+ // ----------------- check if the requested containers fit into
the cluster.
+
+ int[] nmFree = Arrays.copyOf(freeClusterMem.nodeManagersFree,
freeClusterMem.nodeManagersFree.length);
+ // first, allocate the jobManager somewhere.
+ if(!allocateResource(nmFree, jobManagerMemoryMb)) {
+ LOG.warn("Unable to find a NodeManager that can fit the
JobManager/Application master. " +
+ "The JobManager requires " + jobManagerMemoryMb
+ "MB. NodeManagers available: " +
+
Arrays.toString(freeClusterMem.nodeManagersFree) + NOTE_RSC);
+ }
+ // allocate TaskManagers
+ for(int i = 0; i < taskManagerCount; i++) {
+ if(!allocateResource(nmFree, taskManagerMemoryMb)) {
+ LOG.warn("There is not enough memory available
in the YARN cluster. " +
+ "The TaskManager(s) require " +
taskManagerMemoryMb + "MB each. " +
+ "NodeManagers available: " +
Arrays.toString(freeClusterMem.nodeManagersFree) + "\n" +
+ "After allocating the JobManager (" +
jobManagerMemoryMb + "MB) and (" + i + "/" + taskManagerCount + ")
TaskManagers, " +
+ "the following NodeManagers are
available: " + Arrays.toString(nmFree) + NOTE_RSC );
+ }
+ }
+
+ // ------------------ Prepare Application Master Container
------------------------------
+
+ // respect custom JVM options in the YAML file
+ final String javaOpts =
flinkConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "");
+
+ String logbackFile = configurationDirectory + File.separator +
FlinkYarnSessionCli.CONFIG_FILE_LOGBACK_NAME;
+ boolean hasLogback = new File(logbackFile).exists();
+ String log4jFile = configurationDirectory + File.separator +
FlinkYarnSessionCli.CONFIG_FILE_LOG4J_NAME;
+
+ boolean hasLog4j = new File(log4jFile).exists();
+ if(hasLogback) {
+ shipFiles.add(new File(logbackFile));
+ }
+ if(hasLog4j) {
+ shipFiles.add(new File(log4jFile));
+ }
+
+ // Set up the container launch context for the application
master
+ ContainerLaunchContext amContainer =
Records.newRecord(ContainerLaunchContext.class);
+
+ String amCommand = "$JAVA_HOME/bin/java"
+ + " -Xmx" + Utils.calculateHeapSize(jobManagerMemoryMb,
flinkConfiguration) + "M " +javaOpts;
+
+ if(hasLogback || hasLog4j) {
+ amCommand += " -Dlog.file=\"" +
ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager-main.log\"";
+
+ if(hasLogback) {
+ amCommand += "
-Dlogback.configurationFile=file:" +
FlinkYarnSessionCli.CONFIG_FILE_LOGBACK_NAME;
+ }
+
+ if(hasLog4j) {
+ amCommand += " -Dlog4j.configuration=file:" +
FlinkYarnSessionCli.CONFIG_FILE_LOG4J_NAME;
+ }
+ }
+
+ amCommand += " " + getApplicationMasterClass().getName()
+ " "
--- End diff --
indentation
> Add high availability support for Yarn
> --------------------------------------
>
> Key: FLINK-2790
> URL: https://issues.apache.org/jira/browse/FLINK-2790
> Project: Flink
> Issue Type: Sub-task
> Components: JobManager, TaskManager
> Reporter: Till Rohrmann
> Fix For: 0.10
>
>
> Add master high availability support for Yarn. The idea is to let Yarn
> restart a failed application master in a new container. For that, we set the
> number of application retries to something greater than 1.
> From version 2.4.0 onwards, it is possible to reuse already started
> containers for the TaskManagers, thus, avoiding unnecessary restart delays.
> From version 2.6.0 onwards, it is possible to specify an interval in which
> the number of application attempts have to be exceeded in order to fail the
> job. This will prevent long running jobs from eventually depleting all
> available application attempts.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)