http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
new file mode 100644
index 0000000..7220a29
--- /dev/null
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -0,0 +1,943 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.client.CliFrontend;
+import org.apache.flink.client.deployment.ClusterDescriptor;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.Records;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.CONFIG_FILE_LOG4J_NAME;
+import static 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.CONFIG_FILE_LOGBACK_NAME;
+import static 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.getDynamicProperties;
+
+/**
+* All classes in this package contain code taken from
+* 
https://github.com/apache/hadoop-common/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java?source=cc
+* and
+* https://github.com/hortonworks/simple-yarn-app
+* and
+* 
https://github.com/yahoo/storm-yarn/blob/master/src/main/java/com/yahoo/storm/yarn/StormOnYarn.java
+*
+* The Flink jar is uploaded to HDFS by this client.
+* The application master and all the TaskManager containers get the jar file 
downloaded
+* by YARN into their local fs.
+*
+*/
+public abstract class AbstractYarnClusterDescriptor implements 
ClusterDescriptor<YarnClusterClient> {
+       private static final Logger LOG = 
LoggerFactory.getLogger(YarnClusterDescriptor.class);
+
+       private static final String CONFIG_FILE_NAME = "flink-conf.yaml";
+
+       /**
+        * Minimum memory requirements, checked by the Client.
+        */
+       private static final int MIN_JM_MEMORY = 768; // the minimum memory 
should be higher than the min heap cutoff
+       private static final int MIN_TM_MEMORY = 768;
+
+       private Configuration conf = new YarnConfiguration();
+
+       /**
+        * Files (usually in a distributed file system) used for the YARN 
session of Flink.
+        * Contains configuration files and jar files.
+        */
+       private Path sessionFilesDir;
+
+       /**
+        * If the user has specified a different number of slots, we store them 
here
+        */
+       private int slots = -1;
+
+       private int jobManagerMemoryMb = 1024;
+
+       private int taskManagerMemoryMb = 1024;
+
+       private int taskManagerCount = 1;
+
+       private String yarnQueue = null;
+
+       private String configurationDirectory;
+
+       private Path flinkConfigurationPath;
+
+       private Path flinkLoggingConfigurationPath; // optional
+
+       private Path flinkJarPath;
+
+       private String dynamicPropertiesEncoded;
+
+       private List<File> shipFiles = new ArrayList<>();
+       private org.apache.flink.configuration.Configuration flinkConfiguration;
+
+       private boolean detached;
+
+       private String customName = null;
+
+       public AbstractYarnClusterDescriptor() {
+               // for unit tests only
+               if(System.getenv("IN_TESTS") != null) {
+                       try {
+                               conf.addResource(new 
File(System.getenv("YARN_CONF_DIR") + "/yarn-site.xml").toURI().toURL());
+                       } catch (Throwable t) {
+                               throw new RuntimeException("Error",t);
+                       }
+               }
+
+               // load the config
+               this.configurationDirectory = 
CliFrontend.getConfigurationDirectoryFromEnv();
+               GlobalConfiguration.loadConfiguration(configurationDirectory);
+               this.flinkConfiguration = 
GlobalConfiguration.getConfiguration();
+
+               File confFile = new File(configurationDirectory + 
File.separator + CONFIG_FILE_NAME);
+               if (!confFile.exists()) {
+                       throw new RuntimeException("Unable to locate 
configuration file in " + confFile);
+               }
+               flinkConfigurationPath = new Path(confFile.getAbsolutePath());
+
+               //check if there is a logback or log4j file
+               if (configurationDirectory.length() > 0) {
+                       File logback = new File(configurationDirectory + 
File.pathSeparator + CONFIG_FILE_LOGBACK_NAME);
+                       if (logback.exists()) {
+                               shipFiles.add(logback);
+                               flinkLoggingConfigurationPath = new 
Path(logback.toURI());
+                       }
+                       File log4j = new File(configurationDirectory + 
File.pathSeparator + CONFIG_FILE_LOG4J_NAME);
+                       if (log4j.exists()) {
+                               shipFiles.add(log4j);
+                               if (flinkLoggingConfigurationPath != null) {
+                                       // this means there is already a 
logback configuration file --> fail
+                                       LOG.warn("The configuration directory 
('" + configurationDirectory + "') contains both LOG4J and " +
+                                               "Logback configuration files. 
Please delete or rename one of them.");
+                               }
+                               flinkLoggingConfigurationPath = new 
Path(log4j.toURI());
+                       }
+               }
+       }
+
+       /**
+        * The class to bootstrap the application master of the Yarn cluster 
(runs main method).
+        */
+       protected abstract Class<?> getApplicationMasterClass();
+
+       public void setJobManagerMemory(int memoryMb) {
+               if(memoryMb < MIN_JM_MEMORY) {
+                       throw new IllegalArgumentException("The JobManager 
memory (" + memoryMb + ") is below the minimum required memory amount "
+                               + "of " + MIN_JM_MEMORY+ " MB");
+               }
+               this.jobManagerMemoryMb = memoryMb;
+       }
+
+       public void setTaskManagerMemory(int memoryMb) {
+               if(memoryMb < MIN_TM_MEMORY) {
+                       throw new IllegalArgumentException("The TaskManager 
memory (" + memoryMb + ") is below the minimum required memory amount "
+                               + "of " + MIN_TM_MEMORY+ " MB");
+               }
+               this.taskManagerMemoryMb = memoryMb;
+       }
+
+       public void 
setFlinkConfiguration(org.apache.flink.configuration.Configuration conf) {
+               this.flinkConfiguration = conf;
+       }
+
+       public org.apache.flink.configuration.Configuration 
getFlinkConfiguration() {
+               return flinkConfiguration;
+       }
+
+       public void setTaskManagerSlots(int slots) {
+               if(slots <= 0) {
+                       throw new IllegalArgumentException("Number of 
TaskManager slots must be positive");
+               }
+               this.slots = slots;
+       }
+
+       public int getTaskManagerSlots() {
+               return this.slots;
+       }
+
+       public void setQueue(String queue) {
+               this.yarnQueue = queue;
+       }
+
+       public void setLocalJarPath(Path localJarPath) {
+               if(!localJarPath.toString().endsWith("jar")) {
+                       throw new IllegalArgumentException("The passed jar path 
('" + localJarPath + "') does not end with the 'jar' extension");
+               }
+               this.flinkJarPath = localJarPath;
+       }
+
+       public void setConfigurationFilePath(Path confPath) {
+               flinkConfigurationPath = confPath;
+       }
+
+       public void setConfigurationDirectory(String configurationDirectory) {
+               this.configurationDirectory = configurationDirectory;
+       }
+
+       public void setFlinkLoggingConfigurationPath(Path logConfPath) {
+               flinkLoggingConfigurationPath = logConfPath;
+       }
+
+       public Path getFlinkLoggingConfigurationPath() {
+               return flinkLoggingConfigurationPath;
+       }
+
+       public void setTaskManagerCount(int tmCount) {
+               if(tmCount < 1) {
+                       throw new IllegalArgumentException("The TaskManager 
count has to be at least 1.");
+               }
+               this.taskManagerCount = tmCount;
+       }
+
+       public int getTaskManagerCount() {
+               return this.taskManagerCount;
+       }
+
+       public void setShipFiles(List<File> shipFiles) {
+               for(File shipFile: shipFiles) {
+                       // remove uberjar from ship list (by default everything 
in the lib/ folder is added to
+                       // the list of files to ship, but we handle the uberjar 
separately.
+                       if(!(shipFile.getName().startsWith("flink-dist") && 
shipFile.getName().endsWith("jar"))) {
+                               this.shipFiles.add(shipFile);
+                       }
+               }
+       }
+
+       public void setDynamicPropertiesEncoded(String 
dynamicPropertiesEncoded) {
+               this.dynamicPropertiesEncoded = dynamicPropertiesEncoded;
+       }
+
+       public String getDynamicPropertiesEncoded() {
+               return this.dynamicPropertiesEncoded;
+       }
+
+
+       private void isReadyForDeployment() throws YarnDeploymentException {
+               if(taskManagerCount <= 0) {
+                       throw new YarnDeploymentException("Taskmanager count 
must be positive");
+               }
+               if(this.flinkJarPath == null) {
+                       throw new YarnDeploymentException("The Flink jar path 
is null");
+               }
+               if(this.configurationDirectory == null) {
+                       throw new YarnDeploymentException("Configuration 
directory not set");
+               }
+               if(this.flinkConfigurationPath == null) {
+                       throw new YarnDeploymentException("Configuration path 
not set");
+               }
+               if(this.flinkConfiguration == null) {
+                       throw new YarnDeploymentException("Flink configuration 
object has not been set");
+               }
+
+               // check if required Hadoop environment variables are set. If 
not, warn user
+               if(System.getenv("HADOOP_CONF_DIR") == null &&
+                       System.getenv("YARN_CONF_DIR") == null) {
+                       LOG.warn("Neither the HADOOP_CONF_DIR nor the 
YARN_CONF_DIR environment variable is set." +
+                               "The Flink YARN Client needs one of these to be 
set to properly load the Hadoop " +
+                               "configuration for accessing YARN.");
+               }
+       }
+
+       private static boolean allocateResource(int[] nodeManagers, int 
toAllocate) {
+               for(int i = 0; i < nodeManagers.length; i++) {
+                       if(nodeManagers[i] >= toAllocate) {
+                               nodeManagers[i] -= toAllocate;
+                               return true;
+                       }
+               }
+               return false;
+       }
+
+       public void setDetachedMode(boolean detachedMode) {
+               this.detached = detachedMode;
+       }
+
+       public boolean isDetachedMode() {
+               return detached;
+       }
+
+
+       /**
+        * Gets a Hadoop Yarn client
+        * @return Returns a YarnClient which has to be shutdown manually
+        */
+       public static YarnClient getYarnClient(Configuration conf) {
+               YarnClient yarnClient = YarnClient.createYarnClient();
+               yarnClient.init(conf);
+               yarnClient.start();
+               return yarnClient;
+       }
+
+       @Override
+       public YarnClusterClient deploy() throws Exception {
+
+               UserGroupInformation.setConfiguration(conf);
+               UserGroupInformation ugi = 
UserGroupInformation.getCurrentUser();
+
+               if (UserGroupInformation.isSecurityEnabled()) {
+                       if (!ugi.hasKerberosCredentials()) {
+                               throw new YarnDeploymentException("In secure 
mode. Please provide Kerberos credentials in order to authenticate. " +
+                                       "You may use kinit to authenticate and 
request a TGT from the Kerberos server.");
+                       }
+                       return ugi.doAs(new 
PrivilegedExceptionAction<YarnClusterClient>() {
+                               @Override
+                               public YarnClusterClient run() throws Exception 
{
+                                       return deployInternal();
+                               }
+                       });
+               } else {
+                       return deployInternal();
+               }
+       }
+
+       /**
+        * This method will block until the ApplicationMaster/JobManager have 
been
+        * deployed on YARN.
+        */
+       protected YarnClusterClient deployInternal() throws Exception {
+               isReadyForDeployment();
+
+               LOG.info("Using values:");
+               LOG.info("\tTaskManager count = {}", taskManagerCount);
+               LOG.info("\tJobManager memory = {}", jobManagerMemoryMb);
+               LOG.info("\tTaskManager memory = {}", taskManagerMemoryMb);
+
+               // Create application via yarnClient
+               final YarnClient yarnClient = getYarnClient(conf);
+               final YarnClientApplication yarnApplication = 
yarnClient.createApplication();
+               GetNewApplicationResponse appResponse = 
yarnApplication.getNewApplicationResponse();
+
+               // ------------------ Add dynamic properties to local 
flinkConfiguraton ------
+
+               Map<String, String> dynProperties = 
getDynamicProperties(dynamicPropertiesEncoded);
+               for (Map.Entry<String, String> dynProperty : 
dynProperties.entrySet()) {
+                       flinkConfiguration.setString(dynProperty.getKey(), 
dynProperty.getValue());
+               }
+
+               // ------------------ Set default file system scheme 
-------------------------
+
+               try {
+                       
org.apache.flink.core.fs.FileSystem.setDefaultScheme(flinkConfiguration);
+               } catch (IOException e) {
+                       throw new IOException("Error while setting the default 
" +
+                               "filesystem scheme from configuration.", e);
+               }
+               // ------------------ Check if the specified queue exists 
--------------------
+
+               try {
+                       List<QueueInfo> queues = yarnClient.getAllQueues();
+                       if (queues.size() > 0 && this.yarnQueue != null) { // 
check only if there are queues configured in yarn and for this session.
+                               boolean queueFound = false;
+                               for (QueueInfo queue : queues) {
+                                       if 
(queue.getQueueName().equals(this.yarnQueue)) {
+                                               queueFound = true;
+                                               break;
+                                       }
+                               }
+                               if (!queueFound) {
+                                       String queueNames = "";
+                                       for (QueueInfo queue : queues) {
+                                               queueNames += 
queue.getQueueName() + ", ";
+                                       }
+                                       LOG.warn("The specified queue '" + 
this.yarnQueue + "' does not exist. " +
+                                               "Available queues: " + 
queueNames);
+                               }
+                       } else {
+                               LOG.debug("The YARN cluster does not have any 
queues configured");
+                       }
+               } catch(Throwable e) {
+                       LOG.warn("Error while getting queue information from 
YARN: " + e.getMessage());
+                       if(LOG.isDebugEnabled()) {
+                               LOG.debug("Error details", e);
+                       }
+               }
+
+               // ------------------ Check if the YARN ClusterClient has the 
requested resources --------------
+
+               // the yarnMinAllocationMB specifies the smallest possible 
container allocation size.
+               // all allocations below this value are automatically set to 
this value.
+               final int yarnMinAllocationMB = 
conf.getInt("yarn.scheduler.minimum-allocation-mb", 0);
+               if(jobManagerMemoryMb < yarnMinAllocationMB || 
taskManagerMemoryMb < yarnMinAllocationMB) {
+                       LOG.warn("The JobManager or TaskManager memory is below 
the smallest possible YARN Container size. "
+                               + "The value of 
'yarn.scheduler.minimum-allocation-mb' is '" + yarnMinAllocationMB + "'. Please 
increase the memory size." +
+                               "YARN will allocate the smaller containers but 
the scheduler will account for the minimum-allocation-mb, maybe not all 
instances " +
+                               "you requested will start.");
+               }
+
+               // set the memory to minAllocationMB to do the next checks 
correctly
+               if(jobManagerMemoryMb < yarnMinAllocationMB) {
+                       jobManagerMemoryMb =  yarnMinAllocationMB;
+               }
+               if(taskManagerMemoryMb < yarnMinAllocationMB) {
+                       taskManagerMemoryMb =  yarnMinAllocationMB;
+               }
+
+               Resource maxRes = appResponse.getMaximumResourceCapability();
+               final String NOTE = "Please check the 
'yarn.scheduler.maximum-allocation-mb' and the 
'yarn.nodemanager.resource.memory-mb' configuration values\n";
+               if(jobManagerMemoryMb > maxRes.getMemory() ) {
+                       failSessionDuringDeployment(yarnClient, 
yarnApplication);
+                       throw new YarnDeploymentException("The cluster does not 
have the requested resources for the JobManager available!\n"
+                               + "Maximum Memory: " + maxRes.getMemory() + "MB 
Requested: " + jobManagerMemoryMb + "MB. " + NOTE);
+               }
+
+               if(taskManagerMemoryMb > maxRes.getMemory() ) {
+                       failSessionDuringDeployment(yarnClient, 
yarnApplication);
+                       throw new YarnDeploymentException("The cluster does not 
have the requested resources for the TaskManagers available!\n"
+                               + "Maximum Memory: " + maxRes.getMemory() + " 
Requested: " + taskManagerMemoryMb + "MB. " + NOTE);
+               }
+
+               final String NOTE_RSC = "\nThe Flink YARN client will try to 
allocate the YARN session, but maybe not all TaskManagers are " +
+                       "connecting from the beginning because the resources 
are currently not available in the cluster. " +
+                       "The allocation might take more time than usual because 
the Flink YARN client needs to wait until " +
+                       "the resources become available.";
+               int totalMemoryRequired = jobManagerMemoryMb + 
taskManagerMemoryMb * taskManagerCount;
+               ClusterResourceDescription freeClusterMem = 
getCurrentFreeClusterResources(yarnClient);
+               if(freeClusterMem.totalFreeMemory < totalMemoryRequired) {
+                       LOG.warn("This YARN session requires " + 
totalMemoryRequired + "MB of memory in the cluster. "
+                               + "There are currently only " + 
freeClusterMem.totalFreeMemory + "MB available." + NOTE_RSC);
+
+               }
+               if(taskManagerMemoryMb > freeClusterMem.containerLimit) {
+                       LOG.warn("The requested amount of memory for the 
TaskManagers (" + taskManagerMemoryMb + "MB) is more than "
+                               + "the largest possible YARN container: " + 
freeClusterMem.containerLimit + NOTE_RSC);
+               }
+               if(jobManagerMemoryMb > freeClusterMem.containerLimit) {
+                       LOG.warn("The requested amount of memory for the 
JobManager (" + jobManagerMemoryMb + "MB) is more than "
+                               + "the largest possible YARN container: " + 
freeClusterMem.containerLimit + NOTE_RSC);
+               }
+
+               // ----------------- check if the requested containers fit into 
the cluster.
+
+               int[] nmFree = Arrays.copyOf(freeClusterMem.nodeManagersFree, 
freeClusterMem.nodeManagersFree.length);
+               // first, allocate the jobManager somewhere.
+               if(!allocateResource(nmFree, jobManagerMemoryMb)) {
+                       LOG.warn("Unable to find a NodeManager that can fit the 
JobManager/Application master. " +
+                               "The JobManager requires " + jobManagerMemoryMb 
+ "MB. NodeManagers available: " +
+                               
Arrays.toString(freeClusterMem.nodeManagersFree) + NOTE_RSC);
+               }
+               // allocate TaskManagers
+               for(int i = 0; i < taskManagerCount; i++) {
+                       if(!allocateResource(nmFree, taskManagerMemoryMb)) {
+                               LOG.warn("There is not enough memory available 
in the YARN cluster. " +
+                                       "The TaskManager(s) require " + 
taskManagerMemoryMb + "MB each. " +
+                                       "NodeManagers available: " + 
Arrays.toString(freeClusterMem.nodeManagersFree) + "\n" +
+                                       "After allocating the JobManager (" + 
jobManagerMemoryMb + "MB) and (" + i + "/" + taskManagerCount + ") 
TaskManagers, " +
+                                       "the following NodeManagers are 
available: " + Arrays.toString(nmFree)  + NOTE_RSC );
+                       }
+               }
+
+               // ------------------ Prepare Application Master Container  
------------------------------
+
+               // respect custom JVM options in the YAML file
+               final String javaOpts = 
flinkConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "");
+
+               String logbackFile = configurationDirectory + File.separator + 
CONFIG_FILE_LOGBACK_NAME;
+               boolean hasLogback = new File(logbackFile).exists();
+               String log4jFile = configurationDirectory + File.separator + 
CONFIG_FILE_LOG4J_NAME;
+
+               boolean hasLog4j = new File(log4jFile).exists();
+               if(hasLogback) {
+                       shipFiles.add(new File(logbackFile));
+               }
+               if(hasLog4j) {
+                       shipFiles.add(new File(log4jFile));
+               }
+
+               // Set up the container launch context for the application 
master
+               ContainerLaunchContext amContainer = 
Records.newRecord(ContainerLaunchContext.class);
+
+               String amCommand = "$JAVA_HOME/bin/java"
+                       + " -Xmx" + Utils.calculateHeapSize(jobManagerMemoryMb, 
flinkConfiguration)
+                       + "M " + javaOpts;
+
+               if(hasLogback || hasLog4j) {
+                       amCommand += " -Dlog.file=\"" + 
ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.log\"";
+
+                       if(hasLogback) {
+                               amCommand += " 
-Dlogback.configurationFile=file:" + CONFIG_FILE_LOGBACK_NAME;
+                       }
+
+                       if(hasLog4j) {
+                               amCommand += " -Dlog4j.configuration=file:" + 
CONFIG_FILE_LOG4J_NAME;
+                       }
+               }
+
+               amCommand += " " + getApplicationMasterClass().getName() + " "
+                       + " 1>"
+                       + ApplicationConstants.LOG_DIR_EXPANSION_VAR + 
"/jobmanager.out"
+                       + " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + 
"/jobmanager.err";
+               amContainer.setCommands(Collections.singletonList(amCommand));
+
+               LOG.debug("Application Master start command: " + amCommand);
+
+               // intialize HDFS
+               // Copy the application master jar to the filesystem
+               // Create a local resource to point to the destination jar path
+               final FileSystem fs = FileSystem.get(conf);
+
+               // hard coded check for the GoogleHDFS client because its not 
overriding the getScheme() method.
+               if 
(!fs.getClass().getSimpleName().equals("GoogleHadoopFileSystem") &&
+                       fs.getScheme().startsWith("file")) {
+                       LOG.warn("The file system scheme is '" + fs.getScheme() 
+ "'. This indicates that the "
+                               + "specified Hadoop configuration path is wrong 
and the system is using the default Hadoop configuration values."
+                               + "The Flink YARN client needs to store its 
files in a distributed file system");
+               }
+
+               // Set-up ApplicationSubmissionContext for the application
+               ApplicationSubmissionContext appContext = 
yarnApplication.getApplicationSubmissionContext();
+
+               if 
(RecoveryMode.isHighAvailabilityModeActivated(flinkConfiguration)) {
+                       // activate re-execution of failed applications
+                       appContext.setMaxAppAttempts(
+                               flinkConfiguration.getInteger(
+                                       
ConfigConstants.YARN_APPLICATION_ATTEMPTS,
+                                       
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));
+
+                       activateHighAvailabilitySupport(appContext);
+               } else {
+                       // set number of application retries to 1 in the 
default case
+                       appContext.setMaxAppAttempts(
+                               flinkConfiguration.getInteger(
+                                       
ConfigConstants.YARN_APPLICATION_ATTEMPTS,
+                                       1));
+               }
+
+               final ApplicationId appId = appContext.getApplicationId();
+
+               // Setup jar for ApplicationMaster
+               LocalResource appMasterJar = 
Records.newRecord(LocalResource.class);
+               LocalResource flinkConf = 
Records.newRecord(LocalResource.class);
+               Path remotePathJar = Utils.setupLocalResource(fs, 
appId.toString(), flinkJarPath, appMasterJar, fs.getHomeDirectory());
+               Path remotePathConf = Utils.setupLocalResource(fs, 
appId.toString(), flinkConfigurationPath, flinkConf, fs.getHomeDirectory());
+               Map<String, LocalResource> localResources = new HashMap<>(2);
+               localResources.put("flink.jar", appMasterJar);
+               localResources.put("flink-conf.yaml", flinkConf);
+
+
+               // setup security tokens (code from apache storm)
+               final Path[] paths = new Path[2 + shipFiles.size()];
+               StringBuilder envShipFileList = new StringBuilder();
+               // upload ship files
+               for (int i = 0; i < shipFiles.size(); i++) {
+                       File shipFile = shipFiles.get(i);
+                       LocalResource shipResources = 
Records.newRecord(LocalResource.class);
+                       Path shipLocalPath = new Path("file://" + 
shipFile.getAbsolutePath());
+                       paths[2 + i] = Utils.setupLocalResource(fs, 
appId.toString(),
+                               shipLocalPath, shipResources, 
fs.getHomeDirectory());
+                       localResources.put(shipFile.getName(), shipResources);
+
+                       envShipFileList.append(paths[2 + i]);
+                       if(i+1 < shipFiles.size()) {
+                               envShipFileList.append(',');
+                       }
+               }
+
+               paths[0] = remotePathJar;
+               paths[1] = remotePathConf;
+               sessionFilesDir = new Path(fs.getHomeDirectory(), ".flink/" + 
appId.toString() + "/");
+
+               FsPermission permission = new FsPermission(FsAction.ALL, 
FsAction.NONE, FsAction.NONE);
+               fs.setPermission(sessionFilesDir, permission); // set 
permission for path.
+
+               Utils.setTokensFor(amContainer, paths, conf);
+
+               amContainer.setLocalResources(localResources);
+               fs.close();
+
+               // Setup CLASSPATH for ApplicationMaster
+               Map<String, String> appMasterEnv = new HashMap<>();
+               // set user specified app master environment variables
+               
appMasterEnv.putAll(Utils.getEnvironmentVariables(ConfigConstants.YARN_APPLICATION_MASTER_ENV_PREFIX,
 flinkConfiguration));
+               // set classpath from YARN configuration
+               Utils.setupEnv(conf, appMasterEnv);
+               // set Flink on YARN internal configuration values
+               appMasterEnv.put(YarnConfigKeys.ENV_TM_COUNT, 
String.valueOf(taskManagerCount));
+               appMasterEnv.put(YarnConfigKeys.ENV_TM_MEMORY, 
String.valueOf(taskManagerMemoryMb));
+               appMasterEnv.put(YarnConfigKeys.FLINK_JAR_PATH, 
remotePathJar.toString() );
+               appMasterEnv.put(YarnConfigKeys.ENV_APP_ID, appId.toString());
+               appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR, 
fs.getHomeDirectory().toString());
+               appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_SHIP_FILES, 
envShipFileList.toString());
+               appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_USERNAME, 
UserGroupInformation.getCurrentUser().getShortUserName());
+               appMasterEnv.put(YarnConfigKeys.ENV_SLOTS, 
String.valueOf(slots));
+               appMasterEnv.put(YarnConfigKeys.ENV_DETACHED, 
String.valueOf(detached));
+
+               if(dynamicPropertiesEncoded != null) {
+                       appMasterEnv.put(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES, 
dynamicPropertiesEncoded);
+               }
+
+               amContainer.setEnvironment(appMasterEnv);
+
+               // Set up resource type requirements for ApplicationMaster
+               Resource capability = Records.newRecord(Resource.class);
+               capability.setMemory(jobManagerMemoryMb);
+               capability.setVirtualCores(1);
+
+               String name;
+               if(customName == null) {
+                       name = "Flink session with " + taskManagerCount + " 
TaskManagers";
+                       if(detached) {
+                               name += " (detached)";
+                       }
+               } else {
+                       name = customName;
+               }
+
+               appContext.setApplicationName(name); // application name
+               appContext.setApplicationType("Apache Flink");
+               appContext.setAMContainerSpec(amContainer);
+               appContext.setResource(capability);
+               if(yarnQueue != null) {
+                       appContext.setQueue(yarnQueue);
+               }
+
+               // add a hook to clean up in case deployment fails
+               Thread deploymentFailureHook = new 
DeploymentFailureHook(yarnClient, yarnApplication);
+               Runtime.getRuntime().addShutdownHook(deploymentFailureHook);
+               LOG.info("Submitting application master " + appId);
+               yarnClient.submitApplication(appContext);
+
+               LOG.info("Waiting for the cluster to be allocated");
+               int waittime = 0;
+               ApplicationReport report;
+               loop: while( true ) {
+                       try {
+                               report = yarnClient.getApplicationReport(appId);
+                       } catch (IOException e) {
+                               throw new YarnDeploymentException("Failed to 
deploy the cluster: " + e.getMessage());
+                       }
+                       YarnApplicationState appState = 
report.getYarnApplicationState();
+                       switch(appState) {
+                               case FAILED:
+                               case FINISHED:
+                               case KILLED:
+                                       throw new YarnDeploymentException("The 
YARN application unexpectedly switched to state "
+                                               + appState + " during 
deployment. \n" +
+                                               "Diagnostics from YARN: " + 
report.getDiagnostics() + "\n" +
+                                               "If log aggregation is enabled 
on your cluster, use this command to further investigate the issue:\n" +
+                                               "yarn logs -applicationId " + 
appId);
+                                       //break ..
+                               case RUNNING:
+                                       LOG.info("YARN application has been 
deployed successfully.");
+                                       break loop;
+                               default:
+                                       LOG.info("Deploying cluster, current 
state " + appState);
+                                       if(waittime > 60000) {
+                                               LOG.info("Deployment took more 
than 60 seconds. Please check if the requested resources are available in the 
YARN cluster");
+                                       }
+
+                       }
+                       waittime += 1000;
+                       Thread.sleep(1000);
+               }
+               // print the application id for user to cancel themselves.
+               if (isDetachedMode()) {
+                       LOG.info("The Flink YARN client has been started in 
detached mode. In order to stop " +
+                                       "Flink on YARN, use the following 
command or a YARN web interface to stop " +
+                                       "it:\nyarn application -kill " + appId 
+ "\nPlease also note that the " +
+                                       "temporary files of the YARN session in 
the home directoy will not be removed.");
+               }
+               // since deployment was successful, remove the hook
+               try {
+                       
Runtime.getRuntime().removeShutdownHook(deploymentFailureHook);
+               } catch (IllegalStateException e) {
+                       // we're already in the shut down hook.
+               }
+
+               String host = report.getHost();
+               int port = report.getRpcPort();
+               String trackingURL = report.getTrackingUrl();
+
+               // Correctly initialize the Flink config
+               
flinkConfiguration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, host);
+               
flinkConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, port);
+
+               // the Flink cluster is deployed in YARN. Represent cluster
+               return new YarnClusterClient(this, yarnClient, report, 
flinkConfiguration, sessionFilesDir);
+       }
+
+       /**
+        * Kills YARN application and stops YARN client.
+        *
+        * Use this method to kill the App before it has been properly deployed
+        */
+       private void failSessionDuringDeployment(YarnClient yarnClient, 
YarnClientApplication yarnApplication) {
+               LOG.info("Killing YARN application");
+
+               try {
+                       
yarnClient.killApplication(yarnApplication.getNewApplicationResponse().getApplicationId());
+               } catch (Exception e) {
+                       // we only log a debug message here because the 
"killApplication" call is a best-effort
+                       // call (we don't know if the application has been 
deployed when the error occured).
+                       LOG.debug("Error while killing YARN application", e);
+               }
+               yarnClient.stop();
+       }
+
+
+       private static class ClusterResourceDescription {
+               final public int totalFreeMemory;
+               final public int containerLimit;
+               final public int[] nodeManagersFree;
+
+               public ClusterResourceDescription(int totalFreeMemory, int 
containerLimit, int[] nodeManagersFree) {
+                       this.totalFreeMemory = totalFreeMemory;
+                       this.containerLimit = containerLimit;
+                       this.nodeManagersFree = nodeManagersFree;
+               }
+       }
+
+       private ClusterResourceDescription 
getCurrentFreeClusterResources(YarnClient yarnClient) throws YarnException, 
IOException {
+               List<NodeReport> nodes = 
yarnClient.getNodeReports(NodeState.RUNNING);
+
+               int totalFreeMemory = 0;
+               int containerLimit = 0;
+               int[] nodeManagersFree = new int[nodes.size()];
+
+               for(int i = 0; i < nodes.size(); i++) {
+                       NodeReport rep = nodes.get(i);
+                       int free = rep.getCapability().getMemory() - 
(rep.getUsed() != null ? rep.getUsed().getMemory() : 0 );
+                       nodeManagersFree[i] = free;
+                       totalFreeMemory += free;
+                       if(free > containerLimit) {
+                               containerLimit = free;
+                       }
+               }
+               return new ClusterResourceDescription(totalFreeMemory, 
containerLimit, nodeManagersFree);
+       }
+
+       @Override
+       public String getClusterDescription() throws Exception {
+
+               ByteArrayOutputStream baos = new ByteArrayOutputStream();
+               PrintStream ps = new PrintStream(baos);
+
+               YarnClient yarnClient = getYarnClient(conf);
+               YarnClusterMetrics metrics = yarnClient.getYarnClusterMetrics();
+
+               ps.append("NodeManagers in the ClusterClient " + 
metrics.getNumNodeManagers());
+               List<NodeReport> nodes = 
yarnClient.getNodeReports(NodeState.RUNNING);
+               final String format = "|%-16s |%-16s %n";
+               ps.printf("|Property         |Value          %n");
+               ps.println("+---------------------------------------+");
+               int totalMemory = 0;
+               int totalCores = 0;
+               for(NodeReport rep : nodes) {
+                       final Resource res = rep.getCapability();
+                       totalMemory += res.getMemory();
+                       totalCores += res.getVirtualCores();
+                       ps.format(format, "NodeID", rep.getNodeId());
+                       ps.format(format, "Memory", res.getMemory() + " MB");
+                       ps.format(format, "vCores", res.getVirtualCores());
+                       ps.format(format, "HealthReport", 
rep.getHealthReport());
+                       ps.format(format, "Containers", rep.getNumContainers());
+                       ps.println("+---------------------------------------+");
+               }
+               ps.println("Summary: totalMemory " + totalMemory + " totalCores 
" + totalCores);
+               List<QueueInfo> qInfo = yarnClient.getAllQueues();
+               for(QueueInfo q : qInfo) {
+                       ps.println("Queue: " + q.getQueueName() + ", Current 
Capacity: " + q.getCurrentCapacity() + " Max Capacity: " +
+                               q.getMaximumCapacity() + " Applications: " + 
q.getApplications().size());
+               }
+               yarnClient.stop();
+               return baos.toString();
+       }
+
+       public String getSessionFilesDir() {
+               return sessionFilesDir.toString();
+       }
+
+       public void setName(String name) {
+               if(name == null) {
+                       throw new IllegalArgumentException("The passed name is 
null");
+               }
+               customName = name;
+       }
+
+       private void 
activateHighAvailabilitySupport(ApplicationSubmissionContext appContext) throws 
InvocationTargetException, IllegalAccessException {
+               ApplicationSubmissionContextReflector reflector = 
ApplicationSubmissionContextReflector.getInstance();
+
+               
reflector.setKeepContainersAcrossApplicationAttempts(appContext, true);
+               reflector.setAttemptFailuresValidityInterval(appContext, 
AkkaUtils.getTimeout(flinkConfiguration).toMillis());
+       }
+
+       /**
+        * Singleton object which uses reflection to determine whether the 
{@link ApplicationSubmissionContext}
+        * supports the setKeepContainersAcrossApplicationAttempts and the 
setAttemptFailuresValidityInterval
+        * methods. Depending on the Hadoop version these methods are supported 
or not. If the methods
+        * are not supported, then nothing happens when 
setKeepContainersAcrossApplicationAttempts or
+        * setAttemptFailuresValidityInterval are called.
+        */
+       private static class ApplicationSubmissionContextReflector {
+               private static final Logger LOG = 
LoggerFactory.getLogger(ApplicationSubmissionContextReflector.class);
+
+               private static final ApplicationSubmissionContextReflector 
instance = new 
ApplicationSubmissionContextReflector(ApplicationSubmissionContext.class);
+
+               public static ApplicationSubmissionContextReflector 
getInstance() {
+                       return instance;
+               }
+
+               private static final String keepContainersMethodName = 
"setKeepContainersAcrossApplicationAttempts";
+               private static final String 
attemptsFailuresValidityIntervalMethodName = 
"setAttemptFailuresValidityInterval";
+
+               private final Method keepContainersMethod;
+               private final Method attemptFailuresValidityIntervalMethod;
+
+               private 
ApplicationSubmissionContextReflector(Class<ApplicationSubmissionContext> 
clazz) {
+                       Method keepContainersMethod;
+                       Method attemptFailuresValidityIntervalMethod;
+
+                       try {
+                               // this method is only supported by Hadoop 
2.4.0 onwards
+                               keepContainersMethod = 
clazz.getMethod(keepContainersMethodName, boolean.class);
+                               LOG.debug("{} supports method {}.", 
clazz.getCanonicalName(), keepContainersMethodName);
+                       } catch (NoSuchMethodException e) {
+                               LOG.debug("{} does not support method {}.", 
clazz.getCanonicalName(), keepContainersMethodName);
+                               // assign null because the Hadoop version 
apparently does not support this call.
+                               keepContainersMethod = null;
+                       }
+
+                       this.keepContainersMethod = keepContainersMethod;
+
+                       try {
+                               // this method is only supported by Hadoop 
2.6.0 onwards
+                               attemptFailuresValidityIntervalMethod = 
clazz.getMethod(attemptsFailuresValidityIntervalMethodName, long.class);
+                               LOG.debug("{} supports method {}.", 
clazz.getCanonicalName(), attemptsFailuresValidityIntervalMethodName);
+                       } catch (NoSuchMethodException e) {
+                               LOG.debug("{} does not support method {}.", 
clazz.getCanonicalName(), attemptsFailuresValidityIntervalMethodName);
+                               // assign null because the Hadoop version 
apparently does not support this call.
+                               attemptFailuresValidityIntervalMethod = null;
+                       }
+
+                       this.attemptFailuresValidityIntervalMethod = 
attemptFailuresValidityIntervalMethod;
+               }
+
+               public void setKeepContainersAcrossApplicationAttempts(
+                               ApplicationSubmissionContext appContext,
+                               boolean keepContainers) throws 
InvocationTargetException, IllegalAccessException {
+
+                       if (keepContainersMethod != null) {
+                               LOG.debug("Calling method {} of {}.", 
keepContainersMethod.getName(),
+                                       
appContext.getClass().getCanonicalName());
+                               keepContainersMethod.invoke(appContext, 
keepContainers);
+                       } else {
+                               LOG.debug("{} does not support method {}. Doing 
nothing.",
+                                       
appContext.getClass().getCanonicalName(), keepContainersMethodName);
+                       }
+               }
+
+               public void setAttemptFailuresValidityInterval(
+                               ApplicationSubmissionContext appContext,
+                               long validityInterval) throws 
InvocationTargetException, IllegalAccessException {
+                       if (attemptFailuresValidityIntervalMethod != null) {
+                               LOG.debug("Calling method {} of {}.",
+                                       
attemptFailuresValidityIntervalMethod.getName(),
+                                       
appContext.getClass().getCanonicalName());
+                               
attemptFailuresValidityIntervalMethod.invoke(appContext, validityInterval);
+                       } else {
+                               LOG.debug("{} does not support method {}. Doing 
nothing.",
+                                       
appContext.getClass().getCanonicalName(),
+                                       
attemptsFailuresValidityIntervalMethodName);
+                       }
+               }
+       }
+
+       private static class YarnDeploymentException extends RuntimeException {
+               private static final long serialVersionUID = 
-812040641215388943L;
+
+               public YarnDeploymentException() {
+               }
+
+               public YarnDeploymentException(String message) {
+                       super(message);
+               }
+
+               public YarnDeploymentException(String message, Throwable cause) 
{
+                       super(message, cause);
+               }
+       }
+
+       private class DeploymentFailureHook extends Thread {
+
+               DeploymentFailureHook(YarnClient yarnClient, 
YarnClientApplication yarnApplication) {
+                       this.yarnClient = yarnClient;
+                       this.yarnApplication = yarnApplication;
+               }
+
+               private YarnClient yarnClient;
+               private YarnClientApplication yarnApplication;
+
+               @Override
+               public void run() {
+                       LOG.info("Cancelling deployment from Deployment Failure 
Hook");
+                       failSessionDuringDeployment(yarnClient, 
yarnApplication);
+                       LOG.info("Deleting files in " + sessionFilesDir);
+                       try {
+                               FileSystem fs = FileSystem.get(conf);
+                               fs.delete(sessionFilesDir, true);
+                               fs.close();
+                       } catch (IOException e) {
+                               LOG.error("Failed to delete Flink Jar and conf 
files in HDFS", e);
+                       }
+               }
+       }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
deleted file mode 100644
index 467e06d..0000000
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.yarn;
-
-/**
- * Default implementation of {@link FlinkYarnClientBase} which starts an 
{@link YarnApplicationMasterRunner}.
- */
-public class FlinkYarnClient extends FlinkYarnClientBase {
-       @Override
-       protected Class<?> getApplicationMasterClass() {
-               return YarnApplicationMasterRunner.class;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java
deleted file mode 100644
index 6f81d09..0000000
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java
+++ /dev/null
@@ -1,907 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn;
-
-import org.apache.flink.client.CliFrontend;
-import org.apache.flink.client.FlinkYarnSessionCli;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.jobmanager.RecoveryMode;
-import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
-import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.NodeReport;
-import org.apache.hadoop.yarn.api.records.NodeState;
-import org.apache.hadoop.yarn.api.records.QueueInfo;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
-import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.client.api.YarnClientApplication;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.util.Records;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
-* All classes in this package contain code taken from
-* 
https://github.com/apache/hadoop-common/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java?source=cc
-* and
-* https://github.com/hortonworks/simple-yarn-app
-* and
-* 
https://github.com/yahoo/storm-yarn/blob/master/src/main/java/com/yahoo/storm/yarn/StormOnYarn.java
-*
-* The Flink jar is uploaded to HDFS by this client.
-* The application master and all the TaskManager containers get the jar file 
downloaded
-* by YARN into their local fs.
-*
-*/
-public abstract class FlinkYarnClientBase extends AbstractFlinkYarnClient {
-       private static final Logger LOG = 
LoggerFactory.getLogger(FlinkYarnClient.class);
-
-       /**
-        * Minimum memory requirements, checked by the Client.
-        */
-       private static final int MIN_JM_MEMORY = 768; // the minimum memory 
should be higher than the min heap cutoff
-       private static final int MIN_TM_MEMORY = 768;
-
-       private Configuration conf;
-       private YarnClient yarnClient;
-       private YarnClientApplication yarnApplication;
-       private Thread deploymentFailureHook = new DeploymentFailureHook();
-
-       /**
-        * Files (usually in a distributed file system) used for the YARN 
session of Flink.
-        * Contains configuration files and jar files.
-        */
-       private Path sessionFilesDir;
-
-       /**
-        * If the user has specified a different number of slots, we store them 
here
-        */
-       private int slots = -1;
-
-       private int jobManagerMemoryMb = 1024;
-
-       private int taskManagerMemoryMb = 1024;
-
-       private int taskManagerCount = 1;
-
-       private String yarnQueue = null;
-
-       private String configurationDirectory;
-
-       private Path flinkConfigurationPath;
-
-       private Path flinkLoggingConfigurationPath; // optional
-
-       private Path flinkJarPath;
-
-       private String dynamicPropertiesEncoded;
-
-       private List<File> shipFiles = new ArrayList<>();
-       private org.apache.flink.configuration.Configuration flinkConfiguration;
-
-       private boolean detached;
-
-       private String customName = null;
-
-       public FlinkYarnClientBase() {
-               conf = new YarnConfiguration();
-               if(this.yarnClient == null) {
-                       // Create yarnClient
-                       yarnClient = YarnClient.createYarnClient();
-                       yarnClient.init(conf);
-                       yarnClient.start();
-               }
-
-               // for unit tests only
-               if(System.getenv("IN_TESTS") != null) {
-                       try {
-                               conf.addResource(new 
File(System.getenv("YARN_CONF_DIR") + "/yarn-site.xml").toURI().toURL());
-                       } catch (Throwable t) {
-                               throw new RuntimeException("Error",t);
-                       }
-               }
-       }
-
-       /**
-        * The class to bootstrap the application master of the Yarn cluster 
(runs main method).
-        */
-       protected abstract Class<?> getApplicationMasterClass();
-
-       @Override
-       public void setJobManagerMemory(int memoryMb) {
-               if(memoryMb < MIN_JM_MEMORY) {
-                       throw new IllegalArgumentException("The JobManager 
memory (" + memoryMb + ") is below the minimum required memory amount "
-                               + "of " + MIN_JM_MEMORY+ " MB");
-               }
-               this.jobManagerMemoryMb = memoryMb;
-       }
-
-       @Override
-       public void setTaskManagerMemory(int memoryMb) {
-               if(memoryMb < MIN_TM_MEMORY) {
-                       throw new IllegalArgumentException("The TaskManager 
memory (" + memoryMb + ") is below the minimum required memory amount "
-                               + "of " + MIN_TM_MEMORY+ " MB");
-               }
-               this.taskManagerMemoryMb = memoryMb;
-       }
-
-       @Override
-       public void 
setFlinkConfiguration(org.apache.flink.configuration.Configuration conf) {
-               this.flinkConfiguration = conf;
-       }
-
-       @Override
-       public org.apache.flink.configuration.Configuration 
getFlinkConfiguration() {
-               return flinkConfiguration;
-       }
-
-       @Override
-       public void setTaskManagerSlots(int slots) {
-               if(slots <= 0) {
-                       throw new IllegalArgumentException("Number of 
TaskManager slots must be positive");
-               }
-               this.slots = slots;
-       }
-
-       @Override
-       public int getTaskManagerSlots() {
-               return this.slots;
-       }
-
-       @Override
-       public void setQueue(String queue) {
-               this.yarnQueue = queue;
-       }
-
-       @Override
-       public void setLocalJarPath(Path localJarPath) {
-               if(!localJarPath.toString().endsWith("jar")) {
-                       throw new IllegalArgumentException("The passed jar path 
('" + localJarPath + "') does not end with the 'jar' extension");
-               }
-               this.flinkJarPath = localJarPath;
-       }
-
-       @Override
-       public void setConfigurationFilePath(Path confPath) {
-               flinkConfigurationPath = confPath;
-       }
-
-       @Override
-       public void setConfigurationDirectory(String configurationDirectory) {
-               this.configurationDirectory = configurationDirectory;
-       }
-
-       @Override
-       public void setFlinkLoggingConfigurationPath(Path logConfPath) {
-               flinkLoggingConfigurationPath = logConfPath;
-       }
-
-       @Override
-       public Path getFlinkLoggingConfigurationPath() {
-               return flinkLoggingConfigurationPath;
-       }
-
-       @Override
-       public void setTaskManagerCount(int tmCount) {
-               if(tmCount < 1) {
-                       throw new IllegalArgumentException("The TaskManager 
count has to be at least 1.");
-               }
-               this.taskManagerCount = tmCount;
-       }
-
-       @Override
-       public int getTaskManagerCount() {
-               return this.taskManagerCount;
-       }
-
-       @Override
-       public void setShipFiles(List<File> shipFiles) {
-               for(File shipFile: shipFiles) {
-                       // remove uberjar from ship list (by default everything 
in the lib/ folder is added to
-                       // the list of files to ship, but we handle the uberjar 
separately.
-                       if(!(shipFile.getName().startsWith("flink-dist") && 
shipFile.getName().endsWith("jar"))) {
-                               this.shipFiles.add(shipFile);
-                       }
-               }
-       }
-
-       @Override
-       public void setDynamicPropertiesEncoded(String 
dynamicPropertiesEncoded) {
-               this.dynamicPropertiesEncoded = dynamicPropertiesEncoded;
-       }
-
-       @Override
-       public String getDynamicPropertiesEncoded() {
-               return this.dynamicPropertiesEncoded;
-       }
-
-
-       public void isReadyForDeployment() throws YarnDeploymentException {
-               if(taskManagerCount <= 0) {
-                       throw new YarnDeploymentException("Taskmanager count 
must be positive");
-               }
-               if(this.flinkJarPath == null) {
-                       throw new YarnDeploymentException("The Flink jar path 
is null");
-               }
-               if(this.configurationDirectory == null) {
-                       throw new YarnDeploymentException("Configuration 
directory not set");
-               }
-               if(this.flinkConfigurationPath == null) {
-                       throw new YarnDeploymentException("Configuration path 
not set");
-               }
-               if(this.flinkConfiguration == null) {
-                       throw new YarnDeploymentException("Flink configuration 
object has not been set");
-               }
-
-               // check if required Hadoop environment variables are set. If 
not, warn user
-               if(System.getenv("HADOOP_CONF_DIR") == null &&
-                       System.getenv("YARN_CONF_DIR") == null) {
-                       LOG.warn("Neither the HADOOP_CONF_DIR nor the 
YARN_CONF_DIR environment variable is set." +
-                               "The Flink YARN Client needs one of these to be 
set to properly load the Hadoop " +
-                               "configuration for accessing YARN.");
-               }
-       }
-
-       public static boolean allocateResource(int[] nodeManagers, int 
toAllocate) {
-               for(int i = 0; i < nodeManagers.length; i++) {
-                       if(nodeManagers[i] >= toAllocate) {
-                               nodeManagers[i] -= toAllocate;
-                               return true;
-                       }
-               }
-               return false;
-       }
-
-       @Override
-       public void setDetachedMode(boolean detachedMode) {
-               this.detached = detachedMode;
-       }
-
-       @Override
-       public boolean isDetached() {
-               return detached;
-       }
-
-       @Override
-       public AbstractFlinkYarnCluster deploy() throws Exception {
-
-               UserGroupInformation.setConfiguration(conf);
-               UserGroupInformation ugi = 
UserGroupInformation.getCurrentUser();
-
-               if (UserGroupInformation.isSecurityEnabled()) {
-                       if (!ugi.hasKerberosCredentials()) {
-                               throw new YarnDeploymentException("In secure 
mode. Please provide Kerberos credentials in order to authenticate. " +
-                                       "You may use kinit to authenticate and 
request a TGT from the Kerberos server.");
-                       }
-                       return ugi.doAs(new 
PrivilegedExceptionAction<AbstractFlinkYarnCluster>() {
-                               @Override
-                               public AbstractFlinkYarnCluster run() throws 
Exception {
-                                       return deployInternal();
-                               }
-                       });
-               } else {
-                       return deployInternal();
-               }
-       }
-
-
-
-       /**
-        * This method will block until the ApplicationMaster/JobManager have 
been
-        * deployed on YARN.
-        */
-       protected AbstractFlinkYarnCluster deployInternal() throws Exception {
-               isReadyForDeployment();
-
-               LOG.info("Using values:");
-               LOG.info("\tTaskManager count = {}", taskManagerCount);
-               LOG.info("\tJobManager memory = {}", jobManagerMemoryMb);
-               LOG.info("\tTaskManager memory = {}", taskManagerMemoryMb);
-
-               // Create application via yarnClient
-               yarnApplication = yarnClient.createApplication();
-               GetNewApplicationResponse appResponse = 
yarnApplication.getNewApplicationResponse();
-
-               // ------------------ Add dynamic properties to local 
flinkConfiguraton ------
-
-               Map<String, String> dynProperties = 
CliFrontend.getDynamicProperties(dynamicPropertiesEncoded);
-               for (Map.Entry<String, String> dynProperty : 
dynProperties.entrySet()) {
-                       flinkConfiguration.setString(dynProperty.getKey(), 
dynProperty.getValue());
-               }
-
-               try {
-                       
org.apache.flink.core.fs.FileSystem.setDefaultScheme(flinkConfiguration);
-               } catch (IOException e) {
-                       throw new IOException("Error while setting the default 
" +
-                               "filesystem scheme from configuration.", e);
-               }
-               // ------------------ Check if the specified queue exists 
--------------
-
-               try {
-                       List<QueueInfo> queues = yarnClient.getAllQueues();
-                       if (queues.size() > 0 && this.yarnQueue != null) { // 
check only if there are queues configured in yarn and for this session.
-                               boolean queueFound = false;
-                               for (QueueInfo queue : queues) {
-                                       if 
(queue.getQueueName().equals(this.yarnQueue)) {
-                                               queueFound = true;
-                                               break;
-                                       }
-                               }
-                               if (!queueFound) {
-                                       String queueNames = "";
-                                       for (QueueInfo queue : queues) {
-                                               queueNames += 
queue.getQueueName() + ", ";
-                                       }
-                                       LOG.warn("The specified queue '" + 
this.yarnQueue + "' does not exist. " +
-                                               "Available queues: " + 
queueNames);
-                               }
-                       } else {
-                               LOG.debug("The YARN cluster does not have any 
queues configured");
-                       }
-               } catch(Throwable e) {
-                       LOG.warn("Error while getting queue information from 
YARN: " + e.getMessage());
-                       if(LOG.isDebugEnabled()) {
-                               LOG.debug("Error details", e);
-                       }
-               }
-
-               // ------------------ Check if the YARN Cluster has the 
requested resources --------------
-
-               // the yarnMinAllocationMB specifies the smallest possible 
container allocation size.
-               // all allocations below this value are automatically set to 
this value.
-               final int yarnMinAllocationMB = 
conf.getInt("yarn.scheduler.minimum-allocation-mb", 0);
-               if(jobManagerMemoryMb < yarnMinAllocationMB || 
taskManagerMemoryMb < yarnMinAllocationMB) {
-                       LOG.warn("The JobManager or TaskManager memory is below 
the smallest possible YARN Container size. "
-                               + "The value of 
'yarn.scheduler.minimum-allocation-mb' is '" + yarnMinAllocationMB + "'. Please 
increase the memory size." +
-                               "YARN will allocate the smaller containers but 
the scheduler will account for the minimum-allocation-mb, maybe not all 
instances " +
-                               "you requested will start.");
-               }
-
-               // set the memory to minAllocationMB to do the next checks 
correctly
-               if(jobManagerMemoryMb < yarnMinAllocationMB) {
-                       jobManagerMemoryMb =  yarnMinAllocationMB;
-               }
-               if(taskManagerMemoryMb < yarnMinAllocationMB) {
-                       taskManagerMemoryMb =  yarnMinAllocationMB;
-               }
-
-               Resource maxRes = appResponse.getMaximumResourceCapability();
-               final String NOTE = "Please check the 
'yarn.scheduler.maximum-allocation-mb' and the 
'yarn.nodemanager.resource.memory-mb' configuration values\n";
-               if(jobManagerMemoryMb > maxRes.getMemory() ) {
-                       failSessionDuringDeployment();
-                       throw new YarnDeploymentException("The cluster does not 
have the requested resources for the JobManager available!\n"
-                               + "Maximum Memory: " + maxRes.getMemory() + "MB 
Requested: " + jobManagerMemoryMb + "MB. " + NOTE);
-               }
-
-               if(taskManagerMemoryMb > maxRes.getMemory() ) {
-                       failSessionDuringDeployment();
-                       throw new YarnDeploymentException("The cluster does not 
have the requested resources for the TaskManagers available!\n"
-                               + "Maximum Memory: " + maxRes.getMemory() + " 
Requested: " + taskManagerMemoryMb + "MB. " + NOTE);
-               }
-
-               final String NOTE_RSC = "\nThe Flink YARN client will try to 
allocate the YARN session, but maybe not all TaskManagers are " +
-                       "connecting from the beginning because the resources 
are currently not available in the cluster. " +
-                       "The allocation might take more time than usual because 
the Flink YARN client needs to wait until " +
-                       "the resources become available.";
-               int totalMemoryRequired = jobManagerMemoryMb + 
taskManagerMemoryMb * taskManagerCount;
-               ClusterResourceDescription freeClusterMem = 
getCurrentFreeClusterResources(yarnClient);
-               if(freeClusterMem.totalFreeMemory < totalMemoryRequired) {
-                       LOG.warn("This YARN session requires " + 
totalMemoryRequired + "MB of memory in the cluster. "
-                               + "There are currently only " + 
freeClusterMem.totalFreeMemory + "MB available." + NOTE_RSC);
-
-               }
-               if(taskManagerMemoryMb > freeClusterMem.containerLimit) {
-                       LOG.warn("The requested amount of memory for the 
TaskManagers (" + taskManagerMemoryMb + "MB) is more than "
-                               + "the largest possible YARN container: " + 
freeClusterMem.containerLimit + NOTE_RSC);
-               }
-               if(jobManagerMemoryMb > freeClusterMem.containerLimit) {
-                       LOG.warn("The requested amount of memory for the 
JobManager (" + jobManagerMemoryMb + "MB) is more than "
-                               + "the largest possible YARN container: " + 
freeClusterMem.containerLimit + NOTE_RSC);
-               }
-
-               // ----------------- check if the requested containers fit into 
the cluster.
-
-               int[] nmFree = Arrays.copyOf(freeClusterMem.nodeManagersFree, 
freeClusterMem.nodeManagersFree.length);
-               // first, allocate the jobManager somewhere.
-               if(!allocateResource(nmFree, jobManagerMemoryMb)) {
-                       LOG.warn("Unable to find a NodeManager that can fit the 
JobManager/Application master. " +
-                               "The JobManager requires " + jobManagerMemoryMb 
+ "MB. NodeManagers available: " +
-                               
Arrays.toString(freeClusterMem.nodeManagersFree) + NOTE_RSC);
-               }
-               // allocate TaskManagers
-               for(int i = 0; i < taskManagerCount; i++) {
-                       if(!allocateResource(nmFree, taskManagerMemoryMb)) {
-                               LOG.warn("There is not enough memory available 
in the YARN cluster. " +
-                                       "The TaskManager(s) require " + 
taskManagerMemoryMb + "MB each. " +
-                                       "NodeManagers available: " + 
Arrays.toString(freeClusterMem.nodeManagersFree) + "\n" +
-                                       "After allocating the JobManager (" + 
jobManagerMemoryMb + "MB) and (" + i + "/" + taskManagerCount + ") 
TaskManagers, " +
-                                       "the following NodeManagers are 
available: " + Arrays.toString(nmFree)  + NOTE_RSC );
-                       }
-               }
-
-               // ------------------ Prepare Application Master Container  
------------------------------
-
-               // respect custom JVM options in the YAML file
-               final String javaOpts = 
flinkConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "");
-
-               String logbackFile = configurationDirectory + File.separator + 
FlinkYarnSessionCli.CONFIG_FILE_LOGBACK_NAME;
-               boolean hasLogback = new File(logbackFile).exists();
-               String log4jFile = configurationDirectory + File.separator + 
FlinkYarnSessionCli.CONFIG_FILE_LOG4J_NAME;
-
-               boolean hasLog4j = new File(log4jFile).exists();
-               if(hasLogback) {
-                       shipFiles.add(new File(logbackFile));
-               }
-               if(hasLog4j) {
-                       shipFiles.add(new File(log4jFile));
-               }
-
-               // Set up the container launch context for the application 
master
-               ContainerLaunchContext amContainer = 
Records.newRecord(ContainerLaunchContext.class);
-
-               String amCommand = "$JAVA_HOME/bin/java"
-                       + " -Xmx" + Utils.calculateHeapSize(jobManagerMemoryMb, 
flinkConfiguration)
-                       + "M " + javaOpts;
-
-               if(hasLogback || hasLog4j) {
-                       amCommand += " -Dlog.file=\"" + 
ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.log\"";
-
-                       if(hasLogback) {
-                               amCommand += " 
-Dlogback.configurationFile=file:" + 
FlinkYarnSessionCli.CONFIG_FILE_LOGBACK_NAME;
-                       }
-
-                       if(hasLog4j) {
-                               amCommand += " -Dlog4j.configuration=file:" + 
FlinkYarnSessionCli.CONFIG_FILE_LOG4J_NAME;
-                       }
-               }
-
-               amCommand += " " + getApplicationMasterClass().getName() + " "
-                       + " 1>"
-                       + ApplicationConstants.LOG_DIR_EXPANSION_VAR + 
"/jobmanager.out"
-                       + " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + 
"/jobmanager.err";
-               amContainer.setCommands(Collections.singletonList(amCommand));
-
-               LOG.debug("Application Master start command: " + amCommand);
-
-               // intialize HDFS
-               // Copy the application master jar to the filesystem
-               // Create a local resource to point to the destination jar path
-               final FileSystem fs = FileSystem.get(conf);
-
-               // hard coded check for the GoogleHDFS client because its not 
overriding the getScheme() method.
-               if 
(!fs.getClass().getSimpleName().equals("GoogleHadoopFileSystem") &&
-                       fs.getScheme().startsWith("file")) {
-                       LOG.warn("The file system scheme is '" + fs.getScheme() 
+ "'. This indicates that the "
-                               + "specified Hadoop configuration path is wrong 
and the system is using the default Hadoop configuration values."
-                               + "The Flink YARN client needs to store its 
files in a distributed file system");
-               }
-
-               // Set-up ApplicationSubmissionContext for the application
-               ApplicationSubmissionContext appContext = 
yarnApplication.getApplicationSubmissionContext();
-
-               if 
(RecoveryMode.isHighAvailabilityModeActivated(flinkConfiguration)) {
-                       // activate re-execution of failed applications
-                       appContext.setMaxAppAttempts(
-                               flinkConfiguration.getInteger(
-                                       
ConfigConstants.YARN_APPLICATION_ATTEMPTS,
-                                       
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));
-
-                       activateHighAvailabilitySupport(appContext);
-               } else {
-                       // set number of application retries to 1 in the 
default case
-                       appContext.setMaxAppAttempts(
-                               flinkConfiguration.getInteger(
-                                       
ConfigConstants.YARN_APPLICATION_ATTEMPTS,
-                                       1));
-               }
-
-               final ApplicationId appId = appContext.getApplicationId();
-
-               // Setup jar for ApplicationMaster
-               LocalResource appMasterJar = 
Records.newRecord(LocalResource.class);
-               LocalResource flinkConf = 
Records.newRecord(LocalResource.class);
-               Path remotePathJar = Utils.setupLocalResource(fs, 
appId.toString(), flinkJarPath, appMasterJar, fs.getHomeDirectory());
-               Path remotePathConf = Utils.setupLocalResource(fs, 
appId.toString(), flinkConfigurationPath, flinkConf, fs.getHomeDirectory());
-               Map<String, LocalResource> localResources = new HashMap<>(2);
-               localResources.put("flink.jar", appMasterJar);
-               localResources.put("flink-conf.yaml", flinkConf);
-
-
-               // setup security tokens (code from apache storm)
-               final Path[] paths = new Path[2 + shipFiles.size()];
-               StringBuilder envShipFileList = new StringBuilder();
-               // upload ship files
-               for (int i = 0; i < shipFiles.size(); i++) {
-                       File shipFile = shipFiles.get(i);
-                       LocalResource shipResources = 
Records.newRecord(LocalResource.class);
-                       Path shipLocalPath = new Path("file://" + 
shipFile.getAbsolutePath());
-                       paths[2 + i] = Utils.setupLocalResource(fs, 
appId.toString(),
-                               shipLocalPath, shipResources, 
fs.getHomeDirectory());
-                       localResources.put(shipFile.getName(), shipResources);
-
-                       envShipFileList.append(paths[2 + i]);
-                       if(i+1 < shipFiles.size()) {
-                               envShipFileList.append(',');
-                       }
-               }
-
-               paths[0] = remotePathJar;
-               paths[1] = remotePathConf;
-               sessionFilesDir = new Path(fs.getHomeDirectory(), ".flink/" + 
appId.toString() + "/");
-
-               FsPermission permission = new FsPermission(FsAction.ALL, 
FsAction.NONE, FsAction.NONE);
-               fs.setPermission(sessionFilesDir, permission); // set 
permission for path.
-
-               Utils.setTokensFor(amContainer, paths, conf);
-
-               amContainer.setLocalResources(localResources);
-               fs.close();
-
-               // Setup CLASSPATH for ApplicationMaster
-               Map<String, String> appMasterEnv = new HashMap<>();
-               // set user specified app master environment variables
-               
appMasterEnv.putAll(Utils.getEnvironmentVariables(ConfigConstants.YARN_APPLICATION_MASTER_ENV_PREFIX,
 flinkConfiguration));
-               // set classpath from YARN configuration
-               Utils.setupEnv(conf, appMasterEnv);
-               // set Flink on YARN internal configuration values
-               appMasterEnv.put(YarnConfigKeys.ENV_TM_COUNT, 
String.valueOf(taskManagerCount));
-               appMasterEnv.put(YarnConfigKeys.ENV_TM_MEMORY, 
String.valueOf(taskManagerMemoryMb));
-               appMasterEnv.put(YarnConfigKeys.FLINK_JAR_PATH, 
remotePathJar.toString() );
-               appMasterEnv.put(YarnConfigKeys.ENV_APP_ID, appId.toString());
-               appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR, 
fs.getHomeDirectory().toString());
-               appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_SHIP_FILES, 
envShipFileList.toString());
-               appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_USERNAME, 
UserGroupInformation.getCurrentUser().getShortUserName());
-               appMasterEnv.put(YarnConfigKeys.ENV_SLOTS, 
String.valueOf(slots));
-               appMasterEnv.put(YarnConfigKeys.ENV_DETACHED, 
String.valueOf(detached));
-
-               if(dynamicPropertiesEncoded != null) {
-                       appMasterEnv.put(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES, 
dynamicPropertiesEncoded);
-               }
-
-               amContainer.setEnvironment(appMasterEnv);
-
-               // Set up resource type requirements for ApplicationMaster
-               Resource capability = Records.newRecord(Resource.class);
-               capability.setMemory(jobManagerMemoryMb);
-               capability.setVirtualCores(1);
-
-               String name;
-               if(customName == null) {
-                       name = "Flink session with " + taskManagerCount + " 
TaskManagers";
-                       if(detached) {
-                               name += " (detached)";
-                       }
-               } else {
-                       name = customName;
-               }
-
-               appContext.setApplicationName(name); // application name
-               appContext.setApplicationType("Apache Flink");
-               appContext.setAMContainerSpec(amContainer);
-               appContext.setResource(capability);
-               if(yarnQueue != null) {
-                       appContext.setQueue(yarnQueue);
-               }
-
-               // add a hook to clean up in case deployment fails
-               Runtime.getRuntime().addShutdownHook(deploymentFailureHook);
-               LOG.info("Submitting application master " + appId);
-               yarnClient.submitApplication(appContext);
-
-               LOG.info("Waiting for the cluster to be allocated");
-               int waittime = 0;
-               loop: while( true ) {
-                       ApplicationReport report;
-                       try {
-                               report = yarnClient.getApplicationReport(appId);
-                       } catch (IOException e) {
-                               throw new YarnDeploymentException("Failed to 
deploy the cluster: " + e.getMessage());
-                       }
-                       YarnApplicationState appState = 
report.getYarnApplicationState();
-                       switch(appState) {
-                               case FAILED:
-                               case FINISHED:
-                               case KILLED:
-                                       throw new YarnDeploymentException("The 
YARN application unexpectedly switched to state "
-                                               + appState + " during 
deployment. \n" +
-                                               "Diagnostics from YARN: " + 
report.getDiagnostics() + "\n" +
-                                               "If log aggregation is enabled 
on your cluster, use this command to further investigate the issue:\n" +
-                                               "yarn logs -applicationId " + 
appId);
-                                       //break ..
-                               case RUNNING:
-                                       LOG.info("YARN application has been 
deployed successfully.");
-                                       break loop;
-                               default:
-                                       LOG.info("Deploying cluster, current 
state " + appState);
-                                       if(waittime > 60000) {
-                                               LOG.info("Deployment took more 
than 60 seconds. Please check if the requested resources are available in the 
YARN cluster");
-                                       }
-
-                       }
-                       waittime += 1000;
-                       Thread.sleep(1000);
-               }
-               // print the application id for user to cancel themselves.
-               if (isDetached()) {
-                       LOG.info("The Flink YARN client has been started in 
detached mode. In order to stop " +
-                                       "Flink on YARN, use the following 
command or a YARN web interface to stop " +
-                                       "it:\nyarn application -kill " + appId 
+ "\nPlease also note that the " +
-                                       "temporary files of the YARN session in 
the home directoy will not be removed.");
-               }
-               // since deployment was successful, remove the hook
-               try {
-                       
Runtime.getRuntime().removeShutdownHook(deploymentFailureHook);
-               } catch (IllegalStateException e) {
-                       // we're already in the shut down hook.
-               }
-               // the Flink cluster is deployed in YARN. Represent cluster
-               return new FlinkYarnCluster(yarnClient, appId, conf, 
flinkConfiguration, sessionFilesDir, detached);
-       }
-
-       /**
-        * Kills YARN application and stops YARN client.
-        *
-        * Use this method to kill the App before it has been properly deployed
-        */
-       private void failSessionDuringDeployment() {
-               LOG.info("Killing YARN application");
-
-               try {
-                       
yarnClient.killApplication(yarnApplication.getNewApplicationResponse().getApplicationId());
-               } catch (Exception e) {
-                       // we only log a debug message here because the 
"killApplication" call is a best-effort
-                       // call (we don't know if the application has been 
deployed when the error occured).
-                       LOG.debug("Error while killing YARN application", e);
-               }
-               yarnClient.stop();
-       }
-
-
-       private static class ClusterResourceDescription {
-               final public int totalFreeMemory;
-               final public int containerLimit;
-               final public int[] nodeManagersFree;
-
-               public ClusterResourceDescription(int totalFreeMemory, int 
containerLimit, int[] nodeManagersFree) {
-                       this.totalFreeMemory = totalFreeMemory;
-                       this.containerLimit = containerLimit;
-                       this.nodeManagersFree = nodeManagersFree;
-               }
-       }
-
-       private ClusterResourceDescription 
getCurrentFreeClusterResources(YarnClient yarnClient) throws YarnException, 
IOException {
-               List<NodeReport> nodes = 
yarnClient.getNodeReports(NodeState.RUNNING);
-
-               int totalFreeMemory = 0;
-               int containerLimit = 0;
-               int[] nodeManagersFree = new int[nodes.size()];
-
-               for(int i = 0; i < nodes.size(); i++) {
-                       NodeReport rep = nodes.get(i);
-                       int free = rep.getCapability().getMemory() - 
(rep.getUsed() != null ? rep.getUsed().getMemory() : 0 );
-                       nodeManagersFree[i] = free;
-                       totalFreeMemory += free;
-                       if(free > containerLimit) {
-                               containerLimit = free;
-                       }
-               }
-               return new ClusterResourceDescription(totalFreeMemory, 
containerLimit, nodeManagersFree);
-       }
-
-       @Override
-       public String getClusterDescription() throws Exception {
-
-               ByteArrayOutputStream baos = new ByteArrayOutputStream();
-               PrintStream ps = new PrintStream(baos);
-
-               YarnClusterMetrics metrics = yarnClient.getYarnClusterMetrics();
-
-               ps.append("NodeManagers in the Cluster " + 
metrics.getNumNodeManagers());
-               List<NodeReport> nodes = 
yarnClient.getNodeReports(NodeState.RUNNING);
-               final String format = "|%-16s |%-16s %n";
-               ps.printf("|Property         |Value          %n");
-               ps.println("+---------------------------------------+");
-               int totalMemory = 0;
-               int totalCores = 0;
-               for(NodeReport rep : nodes) {
-                       final Resource res = rep.getCapability();
-                       totalMemory += res.getMemory();
-                       totalCores += res.getVirtualCores();
-                       ps.format(format, "NodeID", rep.getNodeId());
-                       ps.format(format, "Memory", res.getMemory() + " MB");
-                       ps.format(format, "vCores", res.getVirtualCores());
-                       ps.format(format, "HealthReport", 
rep.getHealthReport());
-                       ps.format(format, "Containers", rep.getNumContainers());
-                       ps.println("+---------------------------------------+");
-               }
-               ps.println("Summary: totalMemory " + totalMemory + " totalCores 
" + totalCores);
-               List<QueueInfo> qInfo = yarnClient.getAllQueues();
-               for(QueueInfo q : qInfo) {
-                       ps.println("Queue: " + q.getQueueName() + ", Current 
Capacity: " + q.getCurrentCapacity() + " Max Capacity: " +
-                               q.getMaximumCapacity() + " Applications: " + 
q.getApplications().size());
-               }
-               yarnClient.stop();
-               return baos.toString();
-       }
-
-       @Override
-       public String getSessionFilesDir() {
-               return sessionFilesDir.toString();
-       }
-
-       @Override
-       public void setName(String name) {
-               if(name == null) {
-                       throw new IllegalArgumentException("The passed name is 
null");
-               }
-               customName = name;
-       }
-
-       private void 
activateHighAvailabilitySupport(ApplicationSubmissionContext appContext) throws 
InvocationTargetException, IllegalAccessException {
-               ApplicationSubmissionContextReflector reflector = 
ApplicationSubmissionContextReflector.getInstance();
-
-               
reflector.setKeepContainersAcrossApplicationAttempts(appContext, true);
-               reflector.setAttemptFailuresValidityInterval(appContext, 
AkkaUtils.getTimeout(flinkConfiguration).toMillis());
-       }
-
-       /**
-        * Singleton object which uses reflection to determine whether the 
{@link ApplicationSubmissionContext}
-        * supports the setKeepContainersAcrossApplicationAttempts and the 
setAttemptFailuresValidityInterval
-        * methods. Depending on the Hadoop version these methods are supported 
or not. If the methods
-        * are not supported, then nothing happens when 
setKeepContainersAcrossApplicationAttempts or
-        * setAttemptFailuresValidityInterval are called.
-        */
-       private static class ApplicationSubmissionContextReflector {
-               private static final Logger LOG = 
LoggerFactory.getLogger(ApplicationSubmissionContextReflector.class);
-
-               private static final ApplicationSubmissionContextReflector 
instance = new 
ApplicationSubmissionContextReflector(ApplicationSubmissionContext.class);
-
-               public static ApplicationSubmissionContextReflector 
getInstance() {
-                       return instance;
-               }
-
-               private static final String keepContainersMethodName = 
"setKeepContainersAcrossApplicationAttempts";
-               private static final String 
attemptsFailuresValidityIntervalMethodName = 
"setAttemptFailuresValidityInterval";
-
-               private final Method keepContainersMethod;
-               private final Method attemptFailuresValidityIntervalMethod;
-
-               private 
ApplicationSubmissionContextReflector(Class<ApplicationSubmissionContext> 
clazz) {
-                       Method keepContainersMethod;
-                       Method attemptFailuresValidityIntervalMethod;
-
-                       try {
-                               // this method is only supported by Hadoop 
2.4.0 onwards
-                               keepContainersMethod = 
clazz.getMethod(keepContainersMethodName, boolean.class);
-                               LOG.debug("{} supports method {}.", 
clazz.getCanonicalName(), keepContainersMethodName);
-                       } catch (NoSuchMethodException e) {
-                               LOG.debug("{} does not support method {}.", 
clazz.getCanonicalName(), keepContainersMethodName);
-                               // assign null because the Hadoop version 
apparently does not support this call.
-                               keepContainersMethod = null;
-                       }
-
-                       this.keepContainersMethod = keepContainersMethod;
-
-                       try {
-                               // this method is only supported by Hadoop 
2.6.0 onwards
-                               attemptFailuresValidityIntervalMethod = 
clazz.getMethod(attemptsFailuresValidityIntervalMethodName, long.class);
-                               LOG.debug("{} supports method {}.", 
clazz.getCanonicalName(), attemptsFailuresValidityIntervalMethodName);
-                       } catch (NoSuchMethodException e) {
-                               LOG.debug("{} does not support method {}.", 
clazz.getCanonicalName(), attemptsFailuresValidityIntervalMethodName);
-                               // assign null because the Hadoop version 
apparently does not support this call.
-                               attemptFailuresValidityIntervalMethod = null;
-                       }
-
-                       this.attemptFailuresValidityIntervalMethod = 
attemptFailuresValidityIntervalMethod;
-               }
-
-               public void setKeepContainersAcrossApplicationAttempts(
-                               ApplicationSubmissionContext appContext,
-                               boolean keepContainers) throws 
InvocationTargetException, IllegalAccessException {
-
-                       if (keepContainersMethod != null) {
-                               LOG.debug("Calling method {} of {}.", 
keepContainersMethod.getName(),
-                                       
appContext.getClass().getCanonicalName());
-                               keepContainersMethod.invoke(appContext, 
keepContainers);
-                       } else {
-                               LOG.debug("{} does not support method {}. Doing 
nothing.",
-                                       
appContext.getClass().getCanonicalName(), keepContainersMethodName);
-                       }
-               }
-
-               public void setAttemptFailuresValidityInterval(
-                               ApplicationSubmissionContext appContext,
-                               long validityInterval) throws 
InvocationTargetException, IllegalAccessException {
-                       if (attemptFailuresValidityIntervalMethod != null) {
-                               LOG.debug("Calling method {} of {}.",
-                                       
attemptFailuresValidityIntervalMethod.getName(),
-                                       
appContext.getClass().getCanonicalName());
-                               
attemptFailuresValidityIntervalMethod.invoke(appContext, validityInterval);
-                       } else {
-                               LOG.debug("{} does not support method {}. Doing 
nothing.",
-                                       
appContext.getClass().getCanonicalName(),
-                                       
attemptsFailuresValidityIntervalMethodName);
-                       }
-               }
-       }
-
-       public static class YarnDeploymentException extends RuntimeException {
-               private static final long serialVersionUID = 
-812040641215388943L;
-
-               public YarnDeploymentException() {
-               }
-
-               public YarnDeploymentException(String message) {
-                       super(message);
-               }
-
-               public YarnDeploymentException(String message, Throwable cause) 
{
-                       super(message, cause);
-               }
-       }
-
-       private class DeploymentFailureHook extends Thread {
-               @Override
-               public void run() {
-                       LOG.info("Cancelling deployment from Deployment Failure 
Hook");
-                       failSessionDuringDeployment();
-                       LOG.info("Deleting files in " + sessionFilesDir);
-                       try {
-                               FileSystem fs = FileSystem.get(conf);
-                               fs.delete(sessionFilesDir, true);
-                               fs.close();
-                       } catch (IOException e) {
-                               LOG.error("Failed to delete Flink Jar and conf 
files in HDFS", e);
-                       }
-               }
-       }
-}
-

Reply via email to