http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java new file mode 100644 index 0000000..7220a29 --- /dev/null +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java @@ -0,0 +1,943 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.client.CliFrontend; +import org.apache.flink.client.deployment.ClusterDescriptor; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.jobmanager.RecoveryMode; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.QueueInfo; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.client.api.YarnClientApplication; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.util.Records; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.PrintStream; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.yarn.cli.FlinkYarnSessionCli.CONFIG_FILE_LOG4J_NAME; +import static org.apache.flink.yarn.cli.FlinkYarnSessionCli.CONFIG_FILE_LOGBACK_NAME; +import static org.apache.flink.yarn.cli.FlinkYarnSessionCli.getDynamicProperties; + +/** +* All classes in this package contain code taken from +* https://github.com/apache/hadoop-common/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java?source=cc +* and +* https://github.com/hortonworks/simple-yarn-app +* and +* https://github.com/yahoo/storm-yarn/blob/master/src/main/java/com/yahoo/storm/yarn/StormOnYarn.java +* +* The Flink jar is uploaded to HDFS by this client. +* The application master and all the TaskManager containers get the jar file downloaded +* by YARN into their local fs. +* +*/ +public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor<YarnClusterClient> { + private static final Logger LOG = LoggerFactory.getLogger(YarnClusterDescriptor.class); + + private static final String CONFIG_FILE_NAME = "flink-conf.yaml"; + + /** + * Minimum memory requirements, checked by the Client. + */ + private static final int MIN_JM_MEMORY = 768; // the minimum memory should be higher than the min heap cutoff + private static final int MIN_TM_MEMORY = 768; + + private Configuration conf = new YarnConfiguration(); + + /** + * Files (usually in a distributed file system) used for the YARN session of Flink. + * Contains configuration files and jar files. + */ + private Path sessionFilesDir; + + /** + * If the user has specified a different number of slots, we store them here + */ + private int slots = -1; + + private int jobManagerMemoryMb = 1024; + + private int taskManagerMemoryMb = 1024; + + private int taskManagerCount = 1; + + private String yarnQueue = null; + + private String configurationDirectory; + + private Path flinkConfigurationPath; + + private Path flinkLoggingConfigurationPath; // optional + + private Path flinkJarPath; + + private String dynamicPropertiesEncoded; + + private List<File> shipFiles = new ArrayList<>(); + private org.apache.flink.configuration.Configuration flinkConfiguration; + + private boolean detached; + + private String customName = null; + + public AbstractYarnClusterDescriptor() { + // for unit tests only + if(System.getenv("IN_TESTS") != null) { + try { + conf.addResource(new File(System.getenv("YARN_CONF_DIR") + "/yarn-site.xml").toURI().toURL()); + } catch (Throwable t) { + throw new RuntimeException("Error",t); + } + } + + // load the config + this.configurationDirectory = CliFrontend.getConfigurationDirectoryFromEnv(); + GlobalConfiguration.loadConfiguration(configurationDirectory); + this.flinkConfiguration = GlobalConfiguration.getConfiguration(); + + File confFile = new File(configurationDirectory + File.separator + CONFIG_FILE_NAME); + if (!confFile.exists()) { + throw new RuntimeException("Unable to locate configuration file in " + confFile); + } + flinkConfigurationPath = new Path(confFile.getAbsolutePath()); + + //check if there is a logback or log4j file + if (configurationDirectory.length() > 0) { + File logback = new File(configurationDirectory + File.pathSeparator + CONFIG_FILE_LOGBACK_NAME); + if (logback.exists()) { + shipFiles.add(logback); + flinkLoggingConfigurationPath = new Path(logback.toURI()); + } + File log4j = new File(configurationDirectory + File.pathSeparator + CONFIG_FILE_LOG4J_NAME); + if (log4j.exists()) { + shipFiles.add(log4j); + if (flinkLoggingConfigurationPath != null) { + // this means there is already a logback configuration file --> fail + LOG.warn("The configuration directory ('" + configurationDirectory + "') contains both LOG4J and " + + "Logback configuration files. Please delete or rename one of them."); + } + flinkLoggingConfigurationPath = new Path(log4j.toURI()); + } + } + } + + /** + * The class to bootstrap the application master of the Yarn cluster (runs main method). + */ + protected abstract Class<?> getApplicationMasterClass(); + + public void setJobManagerMemory(int memoryMb) { + if(memoryMb < MIN_JM_MEMORY) { + throw new IllegalArgumentException("The JobManager memory (" + memoryMb + ") is below the minimum required memory amount " + + "of " + MIN_JM_MEMORY+ " MB"); + } + this.jobManagerMemoryMb = memoryMb; + } + + public void setTaskManagerMemory(int memoryMb) { + if(memoryMb < MIN_TM_MEMORY) { + throw new IllegalArgumentException("The TaskManager memory (" + memoryMb + ") is below the minimum required memory amount " + + "of " + MIN_TM_MEMORY+ " MB"); + } + this.taskManagerMemoryMb = memoryMb; + } + + public void setFlinkConfiguration(org.apache.flink.configuration.Configuration conf) { + this.flinkConfiguration = conf; + } + + public org.apache.flink.configuration.Configuration getFlinkConfiguration() { + return flinkConfiguration; + } + + public void setTaskManagerSlots(int slots) { + if(slots <= 0) { + throw new IllegalArgumentException("Number of TaskManager slots must be positive"); + } + this.slots = slots; + } + + public int getTaskManagerSlots() { + return this.slots; + } + + public void setQueue(String queue) { + this.yarnQueue = queue; + } + + public void setLocalJarPath(Path localJarPath) { + if(!localJarPath.toString().endsWith("jar")) { + throw new IllegalArgumentException("The passed jar path ('" + localJarPath + "') does not end with the 'jar' extension"); + } + this.flinkJarPath = localJarPath; + } + + public void setConfigurationFilePath(Path confPath) { + flinkConfigurationPath = confPath; + } + + public void setConfigurationDirectory(String configurationDirectory) { + this.configurationDirectory = configurationDirectory; + } + + public void setFlinkLoggingConfigurationPath(Path logConfPath) { + flinkLoggingConfigurationPath = logConfPath; + } + + public Path getFlinkLoggingConfigurationPath() { + return flinkLoggingConfigurationPath; + } + + public void setTaskManagerCount(int tmCount) { + if(tmCount < 1) { + throw new IllegalArgumentException("The TaskManager count has to be at least 1."); + } + this.taskManagerCount = tmCount; + } + + public int getTaskManagerCount() { + return this.taskManagerCount; + } + + public void setShipFiles(List<File> shipFiles) { + for(File shipFile: shipFiles) { + // remove uberjar from ship list (by default everything in the lib/ folder is added to + // the list of files to ship, but we handle the uberjar separately. + if(!(shipFile.getName().startsWith("flink-dist") && shipFile.getName().endsWith("jar"))) { + this.shipFiles.add(shipFile); + } + } + } + + public void setDynamicPropertiesEncoded(String dynamicPropertiesEncoded) { + this.dynamicPropertiesEncoded = dynamicPropertiesEncoded; + } + + public String getDynamicPropertiesEncoded() { + return this.dynamicPropertiesEncoded; + } + + + private void isReadyForDeployment() throws YarnDeploymentException { + if(taskManagerCount <= 0) { + throw new YarnDeploymentException("Taskmanager count must be positive"); + } + if(this.flinkJarPath == null) { + throw new YarnDeploymentException("The Flink jar path is null"); + } + if(this.configurationDirectory == null) { + throw new YarnDeploymentException("Configuration directory not set"); + } + if(this.flinkConfigurationPath == null) { + throw new YarnDeploymentException("Configuration path not set"); + } + if(this.flinkConfiguration == null) { + throw new YarnDeploymentException("Flink configuration object has not been set"); + } + + // check if required Hadoop environment variables are set. If not, warn user + if(System.getenv("HADOOP_CONF_DIR") == null && + System.getenv("YARN_CONF_DIR") == null) { + LOG.warn("Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set." + + "The Flink YARN Client needs one of these to be set to properly load the Hadoop " + + "configuration for accessing YARN."); + } + } + + private static boolean allocateResource(int[] nodeManagers, int toAllocate) { + for(int i = 0; i < nodeManagers.length; i++) { + if(nodeManagers[i] >= toAllocate) { + nodeManagers[i] -= toAllocate; + return true; + } + } + return false; + } + + public void setDetachedMode(boolean detachedMode) { + this.detached = detachedMode; + } + + public boolean isDetachedMode() { + return detached; + } + + + /** + * Gets a Hadoop Yarn client + * @return Returns a YarnClient which has to be shutdown manually + */ + public static YarnClient getYarnClient(Configuration conf) { + YarnClient yarnClient = YarnClient.createYarnClient(); + yarnClient.init(conf); + yarnClient.start(); + return yarnClient; + } + + @Override + public YarnClusterClient deploy() throws Exception { + + UserGroupInformation.setConfiguration(conf); + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + + if (UserGroupInformation.isSecurityEnabled()) { + if (!ugi.hasKerberosCredentials()) { + throw new YarnDeploymentException("In secure mode. Please provide Kerberos credentials in order to authenticate. " + + "You may use kinit to authenticate and request a TGT from the Kerberos server."); + } + return ugi.doAs(new PrivilegedExceptionAction<YarnClusterClient>() { + @Override + public YarnClusterClient run() throws Exception { + return deployInternal(); + } + }); + } else { + return deployInternal(); + } + } + + /** + * This method will block until the ApplicationMaster/JobManager have been + * deployed on YARN. + */ + protected YarnClusterClient deployInternal() throws Exception { + isReadyForDeployment(); + + LOG.info("Using values:"); + LOG.info("\tTaskManager count = {}", taskManagerCount); + LOG.info("\tJobManager memory = {}", jobManagerMemoryMb); + LOG.info("\tTaskManager memory = {}", taskManagerMemoryMb); + + // Create application via yarnClient + final YarnClient yarnClient = getYarnClient(conf); + final YarnClientApplication yarnApplication = yarnClient.createApplication(); + GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse(); + + // ------------------ Add dynamic properties to local flinkConfiguraton ------ + + Map<String, String> dynProperties = getDynamicProperties(dynamicPropertiesEncoded); + for (Map.Entry<String, String> dynProperty : dynProperties.entrySet()) { + flinkConfiguration.setString(dynProperty.getKey(), dynProperty.getValue()); + } + + // ------------------ Set default file system scheme ------------------------- + + try { + org.apache.flink.core.fs.FileSystem.setDefaultScheme(flinkConfiguration); + } catch (IOException e) { + throw new IOException("Error while setting the default " + + "filesystem scheme from configuration.", e); + } + // ------------------ Check if the specified queue exists -------------------- + + try { + List<QueueInfo> queues = yarnClient.getAllQueues(); + if (queues.size() > 0 && this.yarnQueue != null) { // check only if there are queues configured in yarn and for this session. + boolean queueFound = false; + for (QueueInfo queue : queues) { + if (queue.getQueueName().equals(this.yarnQueue)) { + queueFound = true; + break; + } + } + if (!queueFound) { + String queueNames = ""; + for (QueueInfo queue : queues) { + queueNames += queue.getQueueName() + ", "; + } + LOG.warn("The specified queue '" + this.yarnQueue + "' does not exist. " + + "Available queues: " + queueNames); + } + } else { + LOG.debug("The YARN cluster does not have any queues configured"); + } + } catch(Throwable e) { + LOG.warn("Error while getting queue information from YARN: " + e.getMessage()); + if(LOG.isDebugEnabled()) { + LOG.debug("Error details", e); + } + } + + // ------------------ Check if the YARN ClusterClient has the requested resources -------------- + + // the yarnMinAllocationMB specifies the smallest possible container allocation size. + // all allocations below this value are automatically set to this value. + final int yarnMinAllocationMB = conf.getInt("yarn.scheduler.minimum-allocation-mb", 0); + if(jobManagerMemoryMb < yarnMinAllocationMB || taskManagerMemoryMb < yarnMinAllocationMB) { + LOG.warn("The JobManager or TaskManager memory is below the smallest possible YARN Container size. " + + "The value of 'yarn.scheduler.minimum-allocation-mb' is '" + yarnMinAllocationMB + "'. Please increase the memory size." + + "YARN will allocate the smaller containers but the scheduler will account for the minimum-allocation-mb, maybe not all instances " + + "you requested will start."); + } + + // set the memory to minAllocationMB to do the next checks correctly + if(jobManagerMemoryMb < yarnMinAllocationMB) { + jobManagerMemoryMb = yarnMinAllocationMB; + } + if(taskManagerMemoryMb < yarnMinAllocationMB) { + taskManagerMemoryMb = yarnMinAllocationMB; + } + + Resource maxRes = appResponse.getMaximumResourceCapability(); + final String NOTE = "Please check the 'yarn.scheduler.maximum-allocation-mb' and the 'yarn.nodemanager.resource.memory-mb' configuration values\n"; + if(jobManagerMemoryMb > maxRes.getMemory() ) { + failSessionDuringDeployment(yarnClient, yarnApplication); + throw new YarnDeploymentException("The cluster does not have the requested resources for the JobManager available!\n" + + "Maximum Memory: " + maxRes.getMemory() + "MB Requested: " + jobManagerMemoryMb + "MB. " + NOTE); + } + + if(taskManagerMemoryMb > maxRes.getMemory() ) { + failSessionDuringDeployment(yarnClient, yarnApplication); + throw new YarnDeploymentException("The cluster does not have the requested resources for the TaskManagers available!\n" + + "Maximum Memory: " + maxRes.getMemory() + " Requested: " + taskManagerMemoryMb + "MB. " + NOTE); + } + + final String NOTE_RSC = "\nThe Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are " + + "connecting from the beginning because the resources are currently not available in the cluster. " + + "The allocation might take more time than usual because the Flink YARN client needs to wait until " + + "the resources become available."; + int totalMemoryRequired = jobManagerMemoryMb + taskManagerMemoryMb * taskManagerCount; + ClusterResourceDescription freeClusterMem = getCurrentFreeClusterResources(yarnClient); + if(freeClusterMem.totalFreeMemory < totalMemoryRequired) { + LOG.warn("This YARN session requires " + totalMemoryRequired + "MB of memory in the cluster. " + + "There are currently only " + freeClusterMem.totalFreeMemory + "MB available." + NOTE_RSC); + + } + if(taskManagerMemoryMb > freeClusterMem.containerLimit) { + LOG.warn("The requested amount of memory for the TaskManagers (" + taskManagerMemoryMb + "MB) is more than " + + "the largest possible YARN container: " + freeClusterMem.containerLimit + NOTE_RSC); + } + if(jobManagerMemoryMb > freeClusterMem.containerLimit) { + LOG.warn("The requested amount of memory for the JobManager (" + jobManagerMemoryMb + "MB) is more than " + + "the largest possible YARN container: " + freeClusterMem.containerLimit + NOTE_RSC); + } + + // ----------------- check if the requested containers fit into the cluster. + + int[] nmFree = Arrays.copyOf(freeClusterMem.nodeManagersFree, freeClusterMem.nodeManagersFree.length); + // first, allocate the jobManager somewhere. + if(!allocateResource(nmFree, jobManagerMemoryMb)) { + LOG.warn("Unable to find a NodeManager that can fit the JobManager/Application master. " + + "The JobManager requires " + jobManagerMemoryMb + "MB. NodeManagers available: " + + Arrays.toString(freeClusterMem.nodeManagersFree) + NOTE_RSC); + } + // allocate TaskManagers + for(int i = 0; i < taskManagerCount; i++) { + if(!allocateResource(nmFree, taskManagerMemoryMb)) { + LOG.warn("There is not enough memory available in the YARN cluster. " + + "The TaskManager(s) require " + taskManagerMemoryMb + "MB each. " + + "NodeManagers available: " + Arrays.toString(freeClusterMem.nodeManagersFree) + "\n" + + "After allocating the JobManager (" + jobManagerMemoryMb + "MB) and (" + i + "/" + taskManagerCount + ") TaskManagers, " + + "the following NodeManagers are available: " + Arrays.toString(nmFree) + NOTE_RSC ); + } + } + + // ------------------ Prepare Application Master Container ------------------------------ + + // respect custom JVM options in the YAML file + final String javaOpts = flinkConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, ""); + + String logbackFile = configurationDirectory + File.separator + CONFIG_FILE_LOGBACK_NAME; + boolean hasLogback = new File(logbackFile).exists(); + String log4jFile = configurationDirectory + File.separator + CONFIG_FILE_LOG4J_NAME; + + boolean hasLog4j = new File(log4jFile).exists(); + if(hasLogback) { + shipFiles.add(new File(logbackFile)); + } + if(hasLog4j) { + shipFiles.add(new File(log4jFile)); + } + + // Set up the container launch context for the application master + ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class); + + String amCommand = "$JAVA_HOME/bin/java" + + " -Xmx" + Utils.calculateHeapSize(jobManagerMemoryMb, flinkConfiguration) + + "M " + javaOpts; + + if(hasLogback || hasLog4j) { + amCommand += " -Dlog.file=\"" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.log\""; + + if(hasLogback) { + amCommand += " -Dlogback.configurationFile=file:" + CONFIG_FILE_LOGBACK_NAME; + } + + if(hasLog4j) { + amCommand += " -Dlog4j.configuration=file:" + CONFIG_FILE_LOG4J_NAME; + } + } + + amCommand += " " + getApplicationMasterClass().getName() + " " + + " 1>" + + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.out" + + " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.err"; + amContainer.setCommands(Collections.singletonList(amCommand)); + + LOG.debug("Application Master start command: " + amCommand); + + // intialize HDFS + // Copy the application master jar to the filesystem + // Create a local resource to point to the destination jar path + final FileSystem fs = FileSystem.get(conf); + + // hard coded check for the GoogleHDFS client because its not overriding the getScheme() method. + if (!fs.getClass().getSimpleName().equals("GoogleHadoopFileSystem") && + fs.getScheme().startsWith("file")) { + LOG.warn("The file system scheme is '" + fs.getScheme() + "'. This indicates that the " + + "specified Hadoop configuration path is wrong and the system is using the default Hadoop configuration values." + + "The Flink YARN client needs to store its files in a distributed file system"); + } + + // Set-up ApplicationSubmissionContext for the application + ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext(); + + if (RecoveryMode.isHighAvailabilityModeActivated(flinkConfiguration)) { + // activate re-execution of failed applications + appContext.setMaxAppAttempts( + flinkConfiguration.getInteger( + ConfigConstants.YARN_APPLICATION_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)); + + activateHighAvailabilitySupport(appContext); + } else { + // set number of application retries to 1 in the default case + appContext.setMaxAppAttempts( + flinkConfiguration.getInteger( + ConfigConstants.YARN_APPLICATION_ATTEMPTS, + 1)); + } + + final ApplicationId appId = appContext.getApplicationId(); + + // Setup jar for ApplicationMaster + LocalResource appMasterJar = Records.newRecord(LocalResource.class); + LocalResource flinkConf = Records.newRecord(LocalResource.class); + Path remotePathJar = Utils.setupLocalResource(fs, appId.toString(), flinkJarPath, appMasterJar, fs.getHomeDirectory()); + Path remotePathConf = Utils.setupLocalResource(fs, appId.toString(), flinkConfigurationPath, flinkConf, fs.getHomeDirectory()); + Map<String, LocalResource> localResources = new HashMap<>(2); + localResources.put("flink.jar", appMasterJar); + localResources.put("flink-conf.yaml", flinkConf); + + + // setup security tokens (code from apache storm) + final Path[] paths = new Path[2 + shipFiles.size()]; + StringBuilder envShipFileList = new StringBuilder(); + // upload ship files + for (int i = 0; i < shipFiles.size(); i++) { + File shipFile = shipFiles.get(i); + LocalResource shipResources = Records.newRecord(LocalResource.class); + Path shipLocalPath = new Path("file://" + shipFile.getAbsolutePath()); + paths[2 + i] = Utils.setupLocalResource(fs, appId.toString(), + shipLocalPath, shipResources, fs.getHomeDirectory()); + localResources.put(shipFile.getName(), shipResources); + + envShipFileList.append(paths[2 + i]); + if(i+1 < shipFiles.size()) { + envShipFileList.append(','); + } + } + + paths[0] = remotePathJar; + paths[1] = remotePathConf; + sessionFilesDir = new Path(fs.getHomeDirectory(), ".flink/" + appId.toString() + "/"); + + FsPermission permission = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE); + fs.setPermission(sessionFilesDir, permission); // set permission for path. + + Utils.setTokensFor(amContainer, paths, conf); + + amContainer.setLocalResources(localResources); + fs.close(); + + // Setup CLASSPATH for ApplicationMaster + Map<String, String> appMasterEnv = new HashMap<>(); + // set user specified app master environment variables + appMasterEnv.putAll(Utils.getEnvironmentVariables(ConfigConstants.YARN_APPLICATION_MASTER_ENV_PREFIX, flinkConfiguration)); + // set classpath from YARN configuration + Utils.setupEnv(conf, appMasterEnv); + // set Flink on YARN internal configuration values + appMasterEnv.put(YarnConfigKeys.ENV_TM_COUNT, String.valueOf(taskManagerCount)); + appMasterEnv.put(YarnConfigKeys.ENV_TM_MEMORY, String.valueOf(taskManagerMemoryMb)); + appMasterEnv.put(YarnConfigKeys.FLINK_JAR_PATH, remotePathJar.toString() ); + appMasterEnv.put(YarnConfigKeys.ENV_APP_ID, appId.toString()); + appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR, fs.getHomeDirectory().toString()); + appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_SHIP_FILES, envShipFileList.toString()); + appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_USERNAME, UserGroupInformation.getCurrentUser().getShortUserName()); + appMasterEnv.put(YarnConfigKeys.ENV_SLOTS, String.valueOf(slots)); + appMasterEnv.put(YarnConfigKeys.ENV_DETACHED, String.valueOf(detached)); + + if(dynamicPropertiesEncoded != null) { + appMasterEnv.put(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES, dynamicPropertiesEncoded); + } + + amContainer.setEnvironment(appMasterEnv); + + // Set up resource type requirements for ApplicationMaster + Resource capability = Records.newRecord(Resource.class); + capability.setMemory(jobManagerMemoryMb); + capability.setVirtualCores(1); + + String name; + if(customName == null) { + name = "Flink session with " + taskManagerCount + " TaskManagers"; + if(detached) { + name += " (detached)"; + } + } else { + name = customName; + } + + appContext.setApplicationName(name); // application name + appContext.setApplicationType("Apache Flink"); + appContext.setAMContainerSpec(amContainer); + appContext.setResource(capability); + if(yarnQueue != null) { + appContext.setQueue(yarnQueue); + } + + // add a hook to clean up in case deployment fails + Thread deploymentFailureHook = new DeploymentFailureHook(yarnClient, yarnApplication); + Runtime.getRuntime().addShutdownHook(deploymentFailureHook); + LOG.info("Submitting application master " + appId); + yarnClient.submitApplication(appContext); + + LOG.info("Waiting for the cluster to be allocated"); + int waittime = 0; + ApplicationReport report; + loop: while( true ) { + try { + report = yarnClient.getApplicationReport(appId); + } catch (IOException e) { + throw new YarnDeploymentException("Failed to deploy the cluster: " + e.getMessage()); + } + YarnApplicationState appState = report.getYarnApplicationState(); + switch(appState) { + case FAILED: + case FINISHED: + case KILLED: + throw new YarnDeploymentException("The YARN application unexpectedly switched to state " + + appState + " during deployment. \n" + + "Diagnostics from YARN: " + report.getDiagnostics() + "\n" + + "If log aggregation is enabled on your cluster, use this command to further investigate the issue:\n" + + "yarn logs -applicationId " + appId); + //break .. + case RUNNING: + LOG.info("YARN application has been deployed successfully."); + break loop; + default: + LOG.info("Deploying cluster, current state " + appState); + if(waittime > 60000) { + LOG.info("Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster"); + } + + } + waittime += 1000; + Thread.sleep(1000); + } + // print the application id for user to cancel themselves. + if (isDetachedMode()) { + LOG.info("The Flink YARN client has been started in detached mode. In order to stop " + + "Flink on YARN, use the following command or a YARN web interface to stop " + + "it:\nyarn application -kill " + appId + "\nPlease also note that the " + + "temporary files of the YARN session in the home directoy will not be removed."); + } + // since deployment was successful, remove the hook + try { + Runtime.getRuntime().removeShutdownHook(deploymentFailureHook); + } catch (IllegalStateException e) { + // we're already in the shut down hook. + } + + String host = report.getHost(); + int port = report.getRpcPort(); + String trackingURL = report.getTrackingUrl(); + + // Correctly initialize the Flink config + flinkConfiguration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, host); + flinkConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, port); + + // the Flink cluster is deployed in YARN. Represent cluster + return new YarnClusterClient(this, yarnClient, report, flinkConfiguration, sessionFilesDir); + } + + /** + * Kills YARN application and stops YARN client. + * + * Use this method to kill the App before it has been properly deployed + */ + private void failSessionDuringDeployment(YarnClient yarnClient, YarnClientApplication yarnApplication) { + LOG.info("Killing YARN application"); + + try { + yarnClient.killApplication(yarnApplication.getNewApplicationResponse().getApplicationId()); + } catch (Exception e) { + // we only log a debug message here because the "killApplication" call is a best-effort + // call (we don't know if the application has been deployed when the error occured). + LOG.debug("Error while killing YARN application", e); + } + yarnClient.stop(); + } + + + private static class ClusterResourceDescription { + final public int totalFreeMemory; + final public int containerLimit; + final public int[] nodeManagersFree; + + public ClusterResourceDescription(int totalFreeMemory, int containerLimit, int[] nodeManagersFree) { + this.totalFreeMemory = totalFreeMemory; + this.containerLimit = containerLimit; + this.nodeManagersFree = nodeManagersFree; + } + } + + private ClusterResourceDescription getCurrentFreeClusterResources(YarnClient yarnClient) throws YarnException, IOException { + List<NodeReport> nodes = yarnClient.getNodeReports(NodeState.RUNNING); + + int totalFreeMemory = 0; + int containerLimit = 0; + int[] nodeManagersFree = new int[nodes.size()]; + + for(int i = 0; i < nodes.size(); i++) { + NodeReport rep = nodes.get(i); + int free = rep.getCapability().getMemory() - (rep.getUsed() != null ? rep.getUsed().getMemory() : 0 ); + nodeManagersFree[i] = free; + totalFreeMemory += free; + if(free > containerLimit) { + containerLimit = free; + } + } + return new ClusterResourceDescription(totalFreeMemory, containerLimit, nodeManagersFree); + } + + @Override + public String getClusterDescription() throws Exception { + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + PrintStream ps = new PrintStream(baos); + + YarnClient yarnClient = getYarnClient(conf); + YarnClusterMetrics metrics = yarnClient.getYarnClusterMetrics(); + + ps.append("NodeManagers in the ClusterClient " + metrics.getNumNodeManagers()); + List<NodeReport> nodes = yarnClient.getNodeReports(NodeState.RUNNING); + final String format = "|%-16s |%-16s %n"; + ps.printf("|Property |Value %n"); + ps.println("+---------------------------------------+"); + int totalMemory = 0; + int totalCores = 0; + for(NodeReport rep : nodes) { + final Resource res = rep.getCapability(); + totalMemory += res.getMemory(); + totalCores += res.getVirtualCores(); + ps.format(format, "NodeID", rep.getNodeId()); + ps.format(format, "Memory", res.getMemory() + " MB"); + ps.format(format, "vCores", res.getVirtualCores()); + ps.format(format, "HealthReport", rep.getHealthReport()); + ps.format(format, "Containers", rep.getNumContainers()); + ps.println("+---------------------------------------+"); + } + ps.println("Summary: totalMemory " + totalMemory + " totalCores " + totalCores); + List<QueueInfo> qInfo = yarnClient.getAllQueues(); + for(QueueInfo q : qInfo) { + ps.println("Queue: " + q.getQueueName() + ", Current Capacity: " + q.getCurrentCapacity() + " Max Capacity: " + + q.getMaximumCapacity() + " Applications: " + q.getApplications().size()); + } + yarnClient.stop(); + return baos.toString(); + } + + public String getSessionFilesDir() { + return sessionFilesDir.toString(); + } + + public void setName(String name) { + if(name == null) { + throw new IllegalArgumentException("The passed name is null"); + } + customName = name; + } + + private void activateHighAvailabilitySupport(ApplicationSubmissionContext appContext) throws InvocationTargetException, IllegalAccessException { + ApplicationSubmissionContextReflector reflector = ApplicationSubmissionContextReflector.getInstance(); + + reflector.setKeepContainersAcrossApplicationAttempts(appContext, true); + reflector.setAttemptFailuresValidityInterval(appContext, AkkaUtils.getTimeout(flinkConfiguration).toMillis()); + } + + /** + * Singleton object which uses reflection to determine whether the {@link ApplicationSubmissionContext} + * supports the setKeepContainersAcrossApplicationAttempts and the setAttemptFailuresValidityInterval + * methods. Depending on the Hadoop version these methods are supported or not. If the methods + * are not supported, then nothing happens when setKeepContainersAcrossApplicationAttempts or + * setAttemptFailuresValidityInterval are called. + */ + private static class ApplicationSubmissionContextReflector { + private static final Logger LOG = LoggerFactory.getLogger(ApplicationSubmissionContextReflector.class); + + private static final ApplicationSubmissionContextReflector instance = new ApplicationSubmissionContextReflector(ApplicationSubmissionContext.class); + + public static ApplicationSubmissionContextReflector getInstance() { + return instance; + } + + private static final String keepContainersMethodName = "setKeepContainersAcrossApplicationAttempts"; + private static final String attemptsFailuresValidityIntervalMethodName = "setAttemptFailuresValidityInterval"; + + private final Method keepContainersMethod; + private final Method attemptFailuresValidityIntervalMethod; + + private ApplicationSubmissionContextReflector(Class<ApplicationSubmissionContext> clazz) { + Method keepContainersMethod; + Method attemptFailuresValidityIntervalMethod; + + try { + // this method is only supported by Hadoop 2.4.0 onwards + keepContainersMethod = clazz.getMethod(keepContainersMethodName, boolean.class); + LOG.debug("{} supports method {}.", clazz.getCanonicalName(), keepContainersMethodName); + } catch (NoSuchMethodException e) { + LOG.debug("{} does not support method {}.", clazz.getCanonicalName(), keepContainersMethodName); + // assign null because the Hadoop version apparently does not support this call. + keepContainersMethod = null; + } + + this.keepContainersMethod = keepContainersMethod; + + try { + // this method is only supported by Hadoop 2.6.0 onwards + attemptFailuresValidityIntervalMethod = clazz.getMethod(attemptsFailuresValidityIntervalMethodName, long.class); + LOG.debug("{} supports method {}.", clazz.getCanonicalName(), attemptsFailuresValidityIntervalMethodName); + } catch (NoSuchMethodException e) { + LOG.debug("{} does not support method {}.", clazz.getCanonicalName(), attemptsFailuresValidityIntervalMethodName); + // assign null because the Hadoop version apparently does not support this call. + attemptFailuresValidityIntervalMethod = null; + } + + this.attemptFailuresValidityIntervalMethod = attemptFailuresValidityIntervalMethod; + } + + public void setKeepContainersAcrossApplicationAttempts( + ApplicationSubmissionContext appContext, + boolean keepContainers) throws InvocationTargetException, IllegalAccessException { + + if (keepContainersMethod != null) { + LOG.debug("Calling method {} of {}.", keepContainersMethod.getName(), + appContext.getClass().getCanonicalName()); + keepContainersMethod.invoke(appContext, keepContainers); + } else { + LOG.debug("{} does not support method {}. Doing nothing.", + appContext.getClass().getCanonicalName(), keepContainersMethodName); + } + } + + public void setAttemptFailuresValidityInterval( + ApplicationSubmissionContext appContext, + long validityInterval) throws InvocationTargetException, IllegalAccessException { + if (attemptFailuresValidityIntervalMethod != null) { + LOG.debug("Calling method {} of {}.", + attemptFailuresValidityIntervalMethod.getName(), + appContext.getClass().getCanonicalName()); + attemptFailuresValidityIntervalMethod.invoke(appContext, validityInterval); + } else { + LOG.debug("{} does not support method {}. Doing nothing.", + appContext.getClass().getCanonicalName(), + attemptsFailuresValidityIntervalMethodName); + } + } + } + + private static class YarnDeploymentException extends RuntimeException { + private static final long serialVersionUID = -812040641215388943L; + + public YarnDeploymentException() { + } + + public YarnDeploymentException(String message) { + super(message); + } + + public YarnDeploymentException(String message, Throwable cause) { + super(message, cause); + } + } + + private class DeploymentFailureHook extends Thread { + + DeploymentFailureHook(YarnClient yarnClient, YarnClientApplication yarnApplication) { + this.yarnClient = yarnClient; + this.yarnApplication = yarnApplication; + } + + private YarnClient yarnClient; + private YarnClientApplication yarnApplication; + + @Override + public void run() { + LOG.info("Cancelling deployment from Deployment Failure Hook"); + failSessionDuringDeployment(yarnClient, yarnApplication); + LOG.info("Deleting files in " + sessionFilesDir); + try { + FileSystem fs = FileSystem.get(conf); + fs.delete(sessionFilesDir, true); + fs.close(); + } catch (IOException e) { + LOG.error("Failed to delete Flink Jar and conf files in HDFS", e); + } + } + } +} +
http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java deleted file mode 100644 index 467e06d..0000000 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.yarn; - -/** - * Default implementation of {@link FlinkYarnClientBase} which starts an {@link YarnApplicationMasterRunner}. - */ -public class FlinkYarnClient extends FlinkYarnClientBase { - @Override - protected Class<?> getApplicationMasterClass() { - return YarnApplicationMasterRunner.class; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java deleted file mode 100644 index 6f81d09..0000000 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java +++ /dev/null @@ -1,907 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.yarn; - -import org.apache.flink.client.CliFrontend; -import org.apache.flink.client.FlinkYarnSessionCli; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.jobmanager.RecoveryMode; -import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient; -import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsAction; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.yarn.api.ApplicationConstants; -import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ApplicationReport; -import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; -import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; -import org.apache.hadoop.yarn.api.records.LocalResource; -import org.apache.hadoop.yarn.api.records.NodeReport; -import org.apache.hadoop.yarn.api.records.NodeState; -import org.apache.hadoop.yarn.api.records.QueueInfo; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.YarnApplicationState; -import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; -import org.apache.hadoop.yarn.client.api.YarnClient; -import org.apache.hadoop.yarn.client.api.YarnClientApplication; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.util.Records; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.IOException; -import java.io.PrintStream; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.security.PrivilegedExceptionAction; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** -* All classes in this package contain code taken from -* https://github.com/apache/hadoop-common/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java?source=cc -* and -* https://github.com/hortonworks/simple-yarn-app -* and -* https://github.com/yahoo/storm-yarn/blob/master/src/main/java/com/yahoo/storm/yarn/StormOnYarn.java -* -* The Flink jar is uploaded to HDFS by this client. -* The application master and all the TaskManager containers get the jar file downloaded -* by YARN into their local fs. -* -*/ -public abstract class FlinkYarnClientBase extends AbstractFlinkYarnClient { - private static final Logger LOG = LoggerFactory.getLogger(FlinkYarnClient.class); - - /** - * Minimum memory requirements, checked by the Client. - */ - private static final int MIN_JM_MEMORY = 768; // the minimum memory should be higher than the min heap cutoff - private static final int MIN_TM_MEMORY = 768; - - private Configuration conf; - private YarnClient yarnClient; - private YarnClientApplication yarnApplication; - private Thread deploymentFailureHook = new DeploymentFailureHook(); - - /** - * Files (usually in a distributed file system) used for the YARN session of Flink. - * Contains configuration files and jar files. - */ - private Path sessionFilesDir; - - /** - * If the user has specified a different number of slots, we store them here - */ - private int slots = -1; - - private int jobManagerMemoryMb = 1024; - - private int taskManagerMemoryMb = 1024; - - private int taskManagerCount = 1; - - private String yarnQueue = null; - - private String configurationDirectory; - - private Path flinkConfigurationPath; - - private Path flinkLoggingConfigurationPath; // optional - - private Path flinkJarPath; - - private String dynamicPropertiesEncoded; - - private List<File> shipFiles = new ArrayList<>(); - private org.apache.flink.configuration.Configuration flinkConfiguration; - - private boolean detached; - - private String customName = null; - - public FlinkYarnClientBase() { - conf = new YarnConfiguration(); - if(this.yarnClient == null) { - // Create yarnClient - yarnClient = YarnClient.createYarnClient(); - yarnClient.init(conf); - yarnClient.start(); - } - - // for unit tests only - if(System.getenv("IN_TESTS") != null) { - try { - conf.addResource(new File(System.getenv("YARN_CONF_DIR") + "/yarn-site.xml").toURI().toURL()); - } catch (Throwable t) { - throw new RuntimeException("Error",t); - } - } - } - - /** - * The class to bootstrap the application master of the Yarn cluster (runs main method). - */ - protected abstract Class<?> getApplicationMasterClass(); - - @Override - public void setJobManagerMemory(int memoryMb) { - if(memoryMb < MIN_JM_MEMORY) { - throw new IllegalArgumentException("The JobManager memory (" + memoryMb + ") is below the minimum required memory amount " - + "of " + MIN_JM_MEMORY+ " MB"); - } - this.jobManagerMemoryMb = memoryMb; - } - - @Override - public void setTaskManagerMemory(int memoryMb) { - if(memoryMb < MIN_TM_MEMORY) { - throw new IllegalArgumentException("The TaskManager memory (" + memoryMb + ") is below the minimum required memory amount " - + "of " + MIN_TM_MEMORY+ " MB"); - } - this.taskManagerMemoryMb = memoryMb; - } - - @Override - public void setFlinkConfiguration(org.apache.flink.configuration.Configuration conf) { - this.flinkConfiguration = conf; - } - - @Override - public org.apache.flink.configuration.Configuration getFlinkConfiguration() { - return flinkConfiguration; - } - - @Override - public void setTaskManagerSlots(int slots) { - if(slots <= 0) { - throw new IllegalArgumentException("Number of TaskManager slots must be positive"); - } - this.slots = slots; - } - - @Override - public int getTaskManagerSlots() { - return this.slots; - } - - @Override - public void setQueue(String queue) { - this.yarnQueue = queue; - } - - @Override - public void setLocalJarPath(Path localJarPath) { - if(!localJarPath.toString().endsWith("jar")) { - throw new IllegalArgumentException("The passed jar path ('" + localJarPath + "') does not end with the 'jar' extension"); - } - this.flinkJarPath = localJarPath; - } - - @Override - public void setConfigurationFilePath(Path confPath) { - flinkConfigurationPath = confPath; - } - - @Override - public void setConfigurationDirectory(String configurationDirectory) { - this.configurationDirectory = configurationDirectory; - } - - @Override - public void setFlinkLoggingConfigurationPath(Path logConfPath) { - flinkLoggingConfigurationPath = logConfPath; - } - - @Override - public Path getFlinkLoggingConfigurationPath() { - return flinkLoggingConfigurationPath; - } - - @Override - public void setTaskManagerCount(int tmCount) { - if(tmCount < 1) { - throw new IllegalArgumentException("The TaskManager count has to be at least 1."); - } - this.taskManagerCount = tmCount; - } - - @Override - public int getTaskManagerCount() { - return this.taskManagerCount; - } - - @Override - public void setShipFiles(List<File> shipFiles) { - for(File shipFile: shipFiles) { - // remove uberjar from ship list (by default everything in the lib/ folder is added to - // the list of files to ship, but we handle the uberjar separately. - if(!(shipFile.getName().startsWith("flink-dist") && shipFile.getName().endsWith("jar"))) { - this.shipFiles.add(shipFile); - } - } - } - - @Override - public void setDynamicPropertiesEncoded(String dynamicPropertiesEncoded) { - this.dynamicPropertiesEncoded = dynamicPropertiesEncoded; - } - - @Override - public String getDynamicPropertiesEncoded() { - return this.dynamicPropertiesEncoded; - } - - - public void isReadyForDeployment() throws YarnDeploymentException { - if(taskManagerCount <= 0) { - throw new YarnDeploymentException("Taskmanager count must be positive"); - } - if(this.flinkJarPath == null) { - throw new YarnDeploymentException("The Flink jar path is null"); - } - if(this.configurationDirectory == null) { - throw new YarnDeploymentException("Configuration directory not set"); - } - if(this.flinkConfigurationPath == null) { - throw new YarnDeploymentException("Configuration path not set"); - } - if(this.flinkConfiguration == null) { - throw new YarnDeploymentException("Flink configuration object has not been set"); - } - - // check if required Hadoop environment variables are set. If not, warn user - if(System.getenv("HADOOP_CONF_DIR") == null && - System.getenv("YARN_CONF_DIR") == null) { - LOG.warn("Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set." + - "The Flink YARN Client needs one of these to be set to properly load the Hadoop " + - "configuration for accessing YARN."); - } - } - - public static boolean allocateResource(int[] nodeManagers, int toAllocate) { - for(int i = 0; i < nodeManagers.length; i++) { - if(nodeManagers[i] >= toAllocate) { - nodeManagers[i] -= toAllocate; - return true; - } - } - return false; - } - - @Override - public void setDetachedMode(boolean detachedMode) { - this.detached = detachedMode; - } - - @Override - public boolean isDetached() { - return detached; - } - - @Override - public AbstractFlinkYarnCluster deploy() throws Exception { - - UserGroupInformation.setConfiguration(conf); - UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); - - if (UserGroupInformation.isSecurityEnabled()) { - if (!ugi.hasKerberosCredentials()) { - throw new YarnDeploymentException("In secure mode. Please provide Kerberos credentials in order to authenticate. " + - "You may use kinit to authenticate and request a TGT from the Kerberos server."); - } - return ugi.doAs(new PrivilegedExceptionAction<AbstractFlinkYarnCluster>() { - @Override - public AbstractFlinkYarnCluster run() throws Exception { - return deployInternal(); - } - }); - } else { - return deployInternal(); - } - } - - - - /** - * This method will block until the ApplicationMaster/JobManager have been - * deployed on YARN. - */ - protected AbstractFlinkYarnCluster deployInternal() throws Exception { - isReadyForDeployment(); - - LOG.info("Using values:"); - LOG.info("\tTaskManager count = {}", taskManagerCount); - LOG.info("\tJobManager memory = {}", jobManagerMemoryMb); - LOG.info("\tTaskManager memory = {}", taskManagerMemoryMb); - - // Create application via yarnClient - yarnApplication = yarnClient.createApplication(); - GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse(); - - // ------------------ Add dynamic properties to local flinkConfiguraton ------ - - Map<String, String> dynProperties = CliFrontend.getDynamicProperties(dynamicPropertiesEncoded); - for (Map.Entry<String, String> dynProperty : dynProperties.entrySet()) { - flinkConfiguration.setString(dynProperty.getKey(), dynProperty.getValue()); - } - - try { - org.apache.flink.core.fs.FileSystem.setDefaultScheme(flinkConfiguration); - } catch (IOException e) { - throw new IOException("Error while setting the default " + - "filesystem scheme from configuration.", e); - } - // ------------------ Check if the specified queue exists -------------- - - try { - List<QueueInfo> queues = yarnClient.getAllQueues(); - if (queues.size() > 0 && this.yarnQueue != null) { // check only if there are queues configured in yarn and for this session. - boolean queueFound = false; - for (QueueInfo queue : queues) { - if (queue.getQueueName().equals(this.yarnQueue)) { - queueFound = true; - break; - } - } - if (!queueFound) { - String queueNames = ""; - for (QueueInfo queue : queues) { - queueNames += queue.getQueueName() + ", "; - } - LOG.warn("The specified queue '" + this.yarnQueue + "' does not exist. " + - "Available queues: " + queueNames); - } - } else { - LOG.debug("The YARN cluster does not have any queues configured"); - } - } catch(Throwable e) { - LOG.warn("Error while getting queue information from YARN: " + e.getMessage()); - if(LOG.isDebugEnabled()) { - LOG.debug("Error details", e); - } - } - - // ------------------ Check if the YARN Cluster has the requested resources -------------- - - // the yarnMinAllocationMB specifies the smallest possible container allocation size. - // all allocations below this value are automatically set to this value. - final int yarnMinAllocationMB = conf.getInt("yarn.scheduler.minimum-allocation-mb", 0); - if(jobManagerMemoryMb < yarnMinAllocationMB || taskManagerMemoryMb < yarnMinAllocationMB) { - LOG.warn("The JobManager or TaskManager memory is below the smallest possible YARN Container size. " - + "The value of 'yarn.scheduler.minimum-allocation-mb' is '" + yarnMinAllocationMB + "'. Please increase the memory size." + - "YARN will allocate the smaller containers but the scheduler will account for the minimum-allocation-mb, maybe not all instances " + - "you requested will start."); - } - - // set the memory to minAllocationMB to do the next checks correctly - if(jobManagerMemoryMb < yarnMinAllocationMB) { - jobManagerMemoryMb = yarnMinAllocationMB; - } - if(taskManagerMemoryMb < yarnMinAllocationMB) { - taskManagerMemoryMb = yarnMinAllocationMB; - } - - Resource maxRes = appResponse.getMaximumResourceCapability(); - final String NOTE = "Please check the 'yarn.scheduler.maximum-allocation-mb' and the 'yarn.nodemanager.resource.memory-mb' configuration values\n"; - if(jobManagerMemoryMb > maxRes.getMemory() ) { - failSessionDuringDeployment(); - throw new YarnDeploymentException("The cluster does not have the requested resources for the JobManager available!\n" - + "Maximum Memory: " + maxRes.getMemory() + "MB Requested: " + jobManagerMemoryMb + "MB. " + NOTE); - } - - if(taskManagerMemoryMb > maxRes.getMemory() ) { - failSessionDuringDeployment(); - throw new YarnDeploymentException("The cluster does not have the requested resources for the TaskManagers available!\n" - + "Maximum Memory: " + maxRes.getMemory() + " Requested: " + taskManagerMemoryMb + "MB. " + NOTE); - } - - final String NOTE_RSC = "\nThe Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are " + - "connecting from the beginning because the resources are currently not available in the cluster. " + - "The allocation might take more time than usual because the Flink YARN client needs to wait until " + - "the resources become available."; - int totalMemoryRequired = jobManagerMemoryMb + taskManagerMemoryMb * taskManagerCount; - ClusterResourceDescription freeClusterMem = getCurrentFreeClusterResources(yarnClient); - if(freeClusterMem.totalFreeMemory < totalMemoryRequired) { - LOG.warn("This YARN session requires " + totalMemoryRequired + "MB of memory in the cluster. " - + "There are currently only " + freeClusterMem.totalFreeMemory + "MB available." + NOTE_RSC); - - } - if(taskManagerMemoryMb > freeClusterMem.containerLimit) { - LOG.warn("The requested amount of memory for the TaskManagers (" + taskManagerMemoryMb + "MB) is more than " - + "the largest possible YARN container: " + freeClusterMem.containerLimit + NOTE_RSC); - } - if(jobManagerMemoryMb > freeClusterMem.containerLimit) { - LOG.warn("The requested amount of memory for the JobManager (" + jobManagerMemoryMb + "MB) is more than " - + "the largest possible YARN container: " + freeClusterMem.containerLimit + NOTE_RSC); - } - - // ----------------- check if the requested containers fit into the cluster. - - int[] nmFree = Arrays.copyOf(freeClusterMem.nodeManagersFree, freeClusterMem.nodeManagersFree.length); - // first, allocate the jobManager somewhere. - if(!allocateResource(nmFree, jobManagerMemoryMb)) { - LOG.warn("Unable to find a NodeManager that can fit the JobManager/Application master. " + - "The JobManager requires " + jobManagerMemoryMb + "MB. NodeManagers available: " + - Arrays.toString(freeClusterMem.nodeManagersFree) + NOTE_RSC); - } - // allocate TaskManagers - for(int i = 0; i < taskManagerCount; i++) { - if(!allocateResource(nmFree, taskManagerMemoryMb)) { - LOG.warn("There is not enough memory available in the YARN cluster. " + - "The TaskManager(s) require " + taskManagerMemoryMb + "MB each. " + - "NodeManagers available: " + Arrays.toString(freeClusterMem.nodeManagersFree) + "\n" + - "After allocating the JobManager (" + jobManagerMemoryMb + "MB) and (" + i + "/" + taskManagerCount + ") TaskManagers, " + - "the following NodeManagers are available: " + Arrays.toString(nmFree) + NOTE_RSC ); - } - } - - // ------------------ Prepare Application Master Container ------------------------------ - - // respect custom JVM options in the YAML file - final String javaOpts = flinkConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, ""); - - String logbackFile = configurationDirectory + File.separator + FlinkYarnSessionCli.CONFIG_FILE_LOGBACK_NAME; - boolean hasLogback = new File(logbackFile).exists(); - String log4jFile = configurationDirectory + File.separator + FlinkYarnSessionCli.CONFIG_FILE_LOG4J_NAME; - - boolean hasLog4j = new File(log4jFile).exists(); - if(hasLogback) { - shipFiles.add(new File(logbackFile)); - } - if(hasLog4j) { - shipFiles.add(new File(log4jFile)); - } - - // Set up the container launch context for the application master - ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class); - - String amCommand = "$JAVA_HOME/bin/java" - + " -Xmx" + Utils.calculateHeapSize(jobManagerMemoryMb, flinkConfiguration) - + "M " + javaOpts; - - if(hasLogback || hasLog4j) { - amCommand += " -Dlog.file=\"" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.log\""; - - if(hasLogback) { - amCommand += " -Dlogback.configurationFile=file:" + FlinkYarnSessionCli.CONFIG_FILE_LOGBACK_NAME; - } - - if(hasLog4j) { - amCommand += " -Dlog4j.configuration=file:" + FlinkYarnSessionCli.CONFIG_FILE_LOG4J_NAME; - } - } - - amCommand += " " + getApplicationMasterClass().getName() + " " - + " 1>" - + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.out" - + " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.err"; - amContainer.setCommands(Collections.singletonList(amCommand)); - - LOG.debug("Application Master start command: " + amCommand); - - // intialize HDFS - // Copy the application master jar to the filesystem - // Create a local resource to point to the destination jar path - final FileSystem fs = FileSystem.get(conf); - - // hard coded check for the GoogleHDFS client because its not overriding the getScheme() method. - if (!fs.getClass().getSimpleName().equals("GoogleHadoopFileSystem") && - fs.getScheme().startsWith("file")) { - LOG.warn("The file system scheme is '" + fs.getScheme() + "'. This indicates that the " - + "specified Hadoop configuration path is wrong and the system is using the default Hadoop configuration values." - + "The Flink YARN client needs to store its files in a distributed file system"); - } - - // Set-up ApplicationSubmissionContext for the application - ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext(); - - if (RecoveryMode.isHighAvailabilityModeActivated(flinkConfiguration)) { - // activate re-execution of failed applications - appContext.setMaxAppAttempts( - flinkConfiguration.getInteger( - ConfigConstants.YARN_APPLICATION_ATTEMPTS, - YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)); - - activateHighAvailabilitySupport(appContext); - } else { - // set number of application retries to 1 in the default case - appContext.setMaxAppAttempts( - flinkConfiguration.getInteger( - ConfigConstants.YARN_APPLICATION_ATTEMPTS, - 1)); - } - - final ApplicationId appId = appContext.getApplicationId(); - - // Setup jar for ApplicationMaster - LocalResource appMasterJar = Records.newRecord(LocalResource.class); - LocalResource flinkConf = Records.newRecord(LocalResource.class); - Path remotePathJar = Utils.setupLocalResource(fs, appId.toString(), flinkJarPath, appMasterJar, fs.getHomeDirectory()); - Path remotePathConf = Utils.setupLocalResource(fs, appId.toString(), flinkConfigurationPath, flinkConf, fs.getHomeDirectory()); - Map<String, LocalResource> localResources = new HashMap<>(2); - localResources.put("flink.jar", appMasterJar); - localResources.put("flink-conf.yaml", flinkConf); - - - // setup security tokens (code from apache storm) - final Path[] paths = new Path[2 + shipFiles.size()]; - StringBuilder envShipFileList = new StringBuilder(); - // upload ship files - for (int i = 0; i < shipFiles.size(); i++) { - File shipFile = shipFiles.get(i); - LocalResource shipResources = Records.newRecord(LocalResource.class); - Path shipLocalPath = new Path("file://" + shipFile.getAbsolutePath()); - paths[2 + i] = Utils.setupLocalResource(fs, appId.toString(), - shipLocalPath, shipResources, fs.getHomeDirectory()); - localResources.put(shipFile.getName(), shipResources); - - envShipFileList.append(paths[2 + i]); - if(i+1 < shipFiles.size()) { - envShipFileList.append(','); - } - } - - paths[0] = remotePathJar; - paths[1] = remotePathConf; - sessionFilesDir = new Path(fs.getHomeDirectory(), ".flink/" + appId.toString() + "/"); - - FsPermission permission = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE); - fs.setPermission(sessionFilesDir, permission); // set permission for path. - - Utils.setTokensFor(amContainer, paths, conf); - - amContainer.setLocalResources(localResources); - fs.close(); - - // Setup CLASSPATH for ApplicationMaster - Map<String, String> appMasterEnv = new HashMap<>(); - // set user specified app master environment variables - appMasterEnv.putAll(Utils.getEnvironmentVariables(ConfigConstants.YARN_APPLICATION_MASTER_ENV_PREFIX, flinkConfiguration)); - // set classpath from YARN configuration - Utils.setupEnv(conf, appMasterEnv); - // set Flink on YARN internal configuration values - appMasterEnv.put(YarnConfigKeys.ENV_TM_COUNT, String.valueOf(taskManagerCount)); - appMasterEnv.put(YarnConfigKeys.ENV_TM_MEMORY, String.valueOf(taskManagerMemoryMb)); - appMasterEnv.put(YarnConfigKeys.FLINK_JAR_PATH, remotePathJar.toString() ); - appMasterEnv.put(YarnConfigKeys.ENV_APP_ID, appId.toString()); - appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR, fs.getHomeDirectory().toString()); - appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_SHIP_FILES, envShipFileList.toString()); - appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_USERNAME, UserGroupInformation.getCurrentUser().getShortUserName()); - appMasterEnv.put(YarnConfigKeys.ENV_SLOTS, String.valueOf(slots)); - appMasterEnv.put(YarnConfigKeys.ENV_DETACHED, String.valueOf(detached)); - - if(dynamicPropertiesEncoded != null) { - appMasterEnv.put(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES, dynamicPropertiesEncoded); - } - - amContainer.setEnvironment(appMasterEnv); - - // Set up resource type requirements for ApplicationMaster - Resource capability = Records.newRecord(Resource.class); - capability.setMemory(jobManagerMemoryMb); - capability.setVirtualCores(1); - - String name; - if(customName == null) { - name = "Flink session with " + taskManagerCount + " TaskManagers"; - if(detached) { - name += " (detached)"; - } - } else { - name = customName; - } - - appContext.setApplicationName(name); // application name - appContext.setApplicationType("Apache Flink"); - appContext.setAMContainerSpec(amContainer); - appContext.setResource(capability); - if(yarnQueue != null) { - appContext.setQueue(yarnQueue); - } - - // add a hook to clean up in case deployment fails - Runtime.getRuntime().addShutdownHook(deploymentFailureHook); - LOG.info("Submitting application master " + appId); - yarnClient.submitApplication(appContext); - - LOG.info("Waiting for the cluster to be allocated"); - int waittime = 0; - loop: while( true ) { - ApplicationReport report; - try { - report = yarnClient.getApplicationReport(appId); - } catch (IOException e) { - throw new YarnDeploymentException("Failed to deploy the cluster: " + e.getMessage()); - } - YarnApplicationState appState = report.getYarnApplicationState(); - switch(appState) { - case FAILED: - case FINISHED: - case KILLED: - throw new YarnDeploymentException("The YARN application unexpectedly switched to state " - + appState + " during deployment. \n" + - "Diagnostics from YARN: " + report.getDiagnostics() + "\n" + - "If log aggregation is enabled on your cluster, use this command to further investigate the issue:\n" + - "yarn logs -applicationId " + appId); - //break .. - case RUNNING: - LOG.info("YARN application has been deployed successfully."); - break loop; - default: - LOG.info("Deploying cluster, current state " + appState); - if(waittime > 60000) { - LOG.info("Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster"); - } - - } - waittime += 1000; - Thread.sleep(1000); - } - // print the application id for user to cancel themselves. - if (isDetached()) { - LOG.info("The Flink YARN client has been started in detached mode. In order to stop " + - "Flink on YARN, use the following command or a YARN web interface to stop " + - "it:\nyarn application -kill " + appId + "\nPlease also note that the " + - "temporary files of the YARN session in the home directoy will not be removed."); - } - // since deployment was successful, remove the hook - try { - Runtime.getRuntime().removeShutdownHook(deploymentFailureHook); - } catch (IllegalStateException e) { - // we're already in the shut down hook. - } - // the Flink cluster is deployed in YARN. Represent cluster - return new FlinkYarnCluster(yarnClient, appId, conf, flinkConfiguration, sessionFilesDir, detached); - } - - /** - * Kills YARN application and stops YARN client. - * - * Use this method to kill the App before it has been properly deployed - */ - private void failSessionDuringDeployment() { - LOG.info("Killing YARN application"); - - try { - yarnClient.killApplication(yarnApplication.getNewApplicationResponse().getApplicationId()); - } catch (Exception e) { - // we only log a debug message here because the "killApplication" call is a best-effort - // call (we don't know if the application has been deployed when the error occured). - LOG.debug("Error while killing YARN application", e); - } - yarnClient.stop(); - } - - - private static class ClusterResourceDescription { - final public int totalFreeMemory; - final public int containerLimit; - final public int[] nodeManagersFree; - - public ClusterResourceDescription(int totalFreeMemory, int containerLimit, int[] nodeManagersFree) { - this.totalFreeMemory = totalFreeMemory; - this.containerLimit = containerLimit; - this.nodeManagersFree = nodeManagersFree; - } - } - - private ClusterResourceDescription getCurrentFreeClusterResources(YarnClient yarnClient) throws YarnException, IOException { - List<NodeReport> nodes = yarnClient.getNodeReports(NodeState.RUNNING); - - int totalFreeMemory = 0; - int containerLimit = 0; - int[] nodeManagersFree = new int[nodes.size()]; - - for(int i = 0; i < nodes.size(); i++) { - NodeReport rep = nodes.get(i); - int free = rep.getCapability().getMemory() - (rep.getUsed() != null ? rep.getUsed().getMemory() : 0 ); - nodeManagersFree[i] = free; - totalFreeMemory += free; - if(free > containerLimit) { - containerLimit = free; - } - } - return new ClusterResourceDescription(totalFreeMemory, containerLimit, nodeManagersFree); - } - - @Override - public String getClusterDescription() throws Exception { - - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - PrintStream ps = new PrintStream(baos); - - YarnClusterMetrics metrics = yarnClient.getYarnClusterMetrics(); - - ps.append("NodeManagers in the Cluster " + metrics.getNumNodeManagers()); - List<NodeReport> nodes = yarnClient.getNodeReports(NodeState.RUNNING); - final String format = "|%-16s |%-16s %n"; - ps.printf("|Property |Value %n"); - ps.println("+---------------------------------------+"); - int totalMemory = 0; - int totalCores = 0; - for(NodeReport rep : nodes) { - final Resource res = rep.getCapability(); - totalMemory += res.getMemory(); - totalCores += res.getVirtualCores(); - ps.format(format, "NodeID", rep.getNodeId()); - ps.format(format, "Memory", res.getMemory() + " MB"); - ps.format(format, "vCores", res.getVirtualCores()); - ps.format(format, "HealthReport", rep.getHealthReport()); - ps.format(format, "Containers", rep.getNumContainers()); - ps.println("+---------------------------------------+"); - } - ps.println("Summary: totalMemory " + totalMemory + " totalCores " + totalCores); - List<QueueInfo> qInfo = yarnClient.getAllQueues(); - for(QueueInfo q : qInfo) { - ps.println("Queue: " + q.getQueueName() + ", Current Capacity: " + q.getCurrentCapacity() + " Max Capacity: " + - q.getMaximumCapacity() + " Applications: " + q.getApplications().size()); - } - yarnClient.stop(); - return baos.toString(); - } - - @Override - public String getSessionFilesDir() { - return sessionFilesDir.toString(); - } - - @Override - public void setName(String name) { - if(name == null) { - throw new IllegalArgumentException("The passed name is null"); - } - customName = name; - } - - private void activateHighAvailabilitySupport(ApplicationSubmissionContext appContext) throws InvocationTargetException, IllegalAccessException { - ApplicationSubmissionContextReflector reflector = ApplicationSubmissionContextReflector.getInstance(); - - reflector.setKeepContainersAcrossApplicationAttempts(appContext, true); - reflector.setAttemptFailuresValidityInterval(appContext, AkkaUtils.getTimeout(flinkConfiguration).toMillis()); - } - - /** - * Singleton object which uses reflection to determine whether the {@link ApplicationSubmissionContext} - * supports the setKeepContainersAcrossApplicationAttempts and the setAttemptFailuresValidityInterval - * methods. Depending on the Hadoop version these methods are supported or not. If the methods - * are not supported, then nothing happens when setKeepContainersAcrossApplicationAttempts or - * setAttemptFailuresValidityInterval are called. - */ - private static class ApplicationSubmissionContextReflector { - private static final Logger LOG = LoggerFactory.getLogger(ApplicationSubmissionContextReflector.class); - - private static final ApplicationSubmissionContextReflector instance = new ApplicationSubmissionContextReflector(ApplicationSubmissionContext.class); - - public static ApplicationSubmissionContextReflector getInstance() { - return instance; - } - - private static final String keepContainersMethodName = "setKeepContainersAcrossApplicationAttempts"; - private static final String attemptsFailuresValidityIntervalMethodName = "setAttemptFailuresValidityInterval"; - - private final Method keepContainersMethod; - private final Method attemptFailuresValidityIntervalMethod; - - private ApplicationSubmissionContextReflector(Class<ApplicationSubmissionContext> clazz) { - Method keepContainersMethod; - Method attemptFailuresValidityIntervalMethod; - - try { - // this method is only supported by Hadoop 2.4.0 onwards - keepContainersMethod = clazz.getMethod(keepContainersMethodName, boolean.class); - LOG.debug("{} supports method {}.", clazz.getCanonicalName(), keepContainersMethodName); - } catch (NoSuchMethodException e) { - LOG.debug("{} does not support method {}.", clazz.getCanonicalName(), keepContainersMethodName); - // assign null because the Hadoop version apparently does not support this call. - keepContainersMethod = null; - } - - this.keepContainersMethod = keepContainersMethod; - - try { - // this method is only supported by Hadoop 2.6.0 onwards - attemptFailuresValidityIntervalMethod = clazz.getMethod(attemptsFailuresValidityIntervalMethodName, long.class); - LOG.debug("{} supports method {}.", clazz.getCanonicalName(), attemptsFailuresValidityIntervalMethodName); - } catch (NoSuchMethodException e) { - LOG.debug("{} does not support method {}.", clazz.getCanonicalName(), attemptsFailuresValidityIntervalMethodName); - // assign null because the Hadoop version apparently does not support this call. - attemptFailuresValidityIntervalMethod = null; - } - - this.attemptFailuresValidityIntervalMethod = attemptFailuresValidityIntervalMethod; - } - - public void setKeepContainersAcrossApplicationAttempts( - ApplicationSubmissionContext appContext, - boolean keepContainers) throws InvocationTargetException, IllegalAccessException { - - if (keepContainersMethod != null) { - LOG.debug("Calling method {} of {}.", keepContainersMethod.getName(), - appContext.getClass().getCanonicalName()); - keepContainersMethod.invoke(appContext, keepContainers); - } else { - LOG.debug("{} does not support method {}. Doing nothing.", - appContext.getClass().getCanonicalName(), keepContainersMethodName); - } - } - - public void setAttemptFailuresValidityInterval( - ApplicationSubmissionContext appContext, - long validityInterval) throws InvocationTargetException, IllegalAccessException { - if (attemptFailuresValidityIntervalMethod != null) { - LOG.debug("Calling method {} of {}.", - attemptFailuresValidityIntervalMethod.getName(), - appContext.getClass().getCanonicalName()); - attemptFailuresValidityIntervalMethod.invoke(appContext, validityInterval); - } else { - LOG.debug("{} does not support method {}. Doing nothing.", - appContext.getClass().getCanonicalName(), - attemptsFailuresValidityIntervalMethodName); - } - } - } - - public static class YarnDeploymentException extends RuntimeException { - private static final long serialVersionUID = -812040641215388943L; - - public YarnDeploymentException() { - } - - public YarnDeploymentException(String message) { - super(message); - } - - public YarnDeploymentException(String message, Throwable cause) { - super(message, cause); - } - } - - private class DeploymentFailureHook extends Thread { - @Override - public void run() { - LOG.info("Cancelling deployment from Deployment Failure Hook"); - failSessionDuringDeployment(); - LOG.info("Deleting files in " + sessionFilesDir); - try { - FileSystem fs = FileSystem.get(conf); - fs.delete(sessionFilesDir, true); - fs.close(); - } catch (IOException e) { - LOG.error("Failed to delete Flink Jar and conf files in HDFS", e); - } - } - } -} -
