[FLINK-4930] [client, yarn] Implement FLIP-6 YARN client Summary: Implement FLIP-6 YARN client
Test Plan: NA Reviewers: biao.liub Differential Revision: http://phabricator.taobao.net/D6563 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3695a8e9 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3695a8e9 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3695a8e9 Branch: refs/heads/master Commit: 3695a8e92e9c3deee368d9cc3ce89a5ab117d6a1 Parents: 2a7dbda Author: shuai.xus <[email protected]> Authored: Wed Nov 23 17:19:35 2016 +0800 Committer: Stephan Ewen <[email protected]> Committed: Fri Dec 23 20:54:27 2016 +0100 ---------------------------------------------------------------------- .../org/apache/flink/client/CliFrontend.java | 1 + .../yarn/AbstractYarnClusterDescriptor.java | 101 +++++--- .../apache/flink/yarn/YarnClusterClientV2.java | 169 +++++++++++++ .../flink/yarn/YarnClusterDescriptorV2.java | 34 +++ .../org/apache/flink/yarn/cli/FlinkYarnCLI.java | 253 +++++++++++++++++++ 5 files changed, 524 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3695a8e9/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java index 1ec0674..dc3280e 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java @@ -131,6 +131,7 @@ public class CliFrontend { /** command line interface of the YARN session, with a special initialization here * to prefix all options with y/yarn. */ loadCustomCommandLine("org.apache.flink.yarn.cli.FlinkYarnSessionCli", "y", "yarn"); + loadCustomCommandLine("org.apache.flink.yarn.cli.FlinkYarnCLI", "y", "yarn"); customCommandLine.add(new DefaultCLI()); } http://git-wip-us.apache.org/repos/asf/flink/blob/3695a8e9/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java index ca18439..b4c87b8 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java @@ -25,6 +25,7 @@ import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.runtime.security.SecurityUtils; import org.apache.hadoop.conf.Configuration; @@ -60,6 +61,8 @@ import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; import java.io.PrintStream; +import java.io.FileOutputStream; +import java.io.ObjectOutputStream; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.URISyntaxException; @@ -460,28 +463,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor flinkConfiguration.setString(dynProperty.getKey(), dynProperty.getValue()); } - // ------------------ Set default file system scheme ------------------------- - - try { - org.apache.flink.core.fs.FileSystem.setDefaultScheme(flinkConfiguration); - } catch (IOException e) { - throw new IOException("Error while setting the default " + - "filesystem scheme from configuration.", e); - } - - // initialize file system - // Copy the application master jar to the filesystem - // Create a local resource to point to the destination jar path - final FileSystem fs = FileSystem.get(conf); - - // hard coded check for the GoogleHDFS client because its not overriding the getScheme() method. - if (!fs.getClass().getSimpleName().equals("GoogleHadoopFileSystem") && - fs.getScheme().startsWith("file")) { - LOG.warn("The file system scheme is '" + fs.getScheme() + "'. This indicates that the " - + "specified Hadoop configuration path is wrong and the system is using the default Hadoop configuration values." - + "The Flink YARN client needs to store its files in a distributed file system"); - } - // ------------------ Check if the YARN ClusterClient has the requested resources -------------- // the yarnMinAllocationMB specifies the smallest possible container allocation size. @@ -505,6 +486,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor // Create application via yarnClient final YarnClientApplication yarnApplication = yarnClient.createApplication(); GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse(); + ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext(); Resource maxRes = appResponse.getMaximumResourceCapability(); final String NOTE = "Please check the 'yarn.scheduler.maximum-allocation-mb' and the 'yarn.nodemanager.resource.memory-mb' configuration values\n"; @@ -560,6 +542,45 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor } } + ApplicationReport report = startAppMaster(null, yarnClient); + + String host = report.getHost(); + int port = report.getRpcPort(); + + // Correctly initialize the Flink config + flinkConfiguration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, host); + flinkConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, port); + + // the Flink cluster is deployed in YARN. Represent cluster + return createYarnClusterClient(this, yarnClient, report, flinkConfiguration, sessionFilesDir, true); + } + + public ApplicationReport startAppMaster(JobGraph jobGraph, YarnClient yarnClient) throws Exception { + + // ------------------ Set default file system scheme ------------------------- + + try { + org.apache.flink.core.fs.FileSystem.setDefaultScheme(flinkConfiguration); + } catch (IOException e) { + throw new IOException("Error while setting the default " + + "filesystem scheme from configuration.", e); + } + + // initialize file system + // Copy the application master jar to the filesystem + // Create a local resource to point to the destination jar path + final FileSystem fs = FileSystem.get(conf); + + // hard coded check for the GoogleHDFS client because its not overriding the getScheme() method. + if (!fs.getClass().getSimpleName().equals("GoogleHadoopFileSystem") && + fs.getScheme().startsWith("file")) { + LOG.warn("The file system scheme is '" + fs.getScheme() + "'. This indicates that the " + + "specified Hadoop configuration path is wrong and the system is using the default Hadoop configuration values." + + "The Flink YARN client needs to store its files in a distributed file system"); + } + + final YarnClientApplication yarnApplication = yarnClient.createApplication(); + ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext(); Set<File> effectiveShipFiles = new HashSet<>(shipFiles.size()); for (File file : shipFiles) { effectiveShipFiles.add(file.getAbsoluteFile()); @@ -596,8 +617,8 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor effectiveShipFiles.addAll(userJarFiles); } + // Set-up ApplicationSubmissionContext for the application - ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext(); final ApplicationId appId = appContext.getApplicationId(); @@ -694,6 +715,27 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor paths.add(remotePathConf); classPathBuilder.append("flink-conf.yaml").append(File.pathSeparator); + // write job graph to tmp file and add it to local resource + // TODO: need refine ? + if (jobGraph != null) { + try { + File fp = new File("/tmp/jobgraph-" + appId.toString()); + FileOutputStream input = new FileOutputStream(fp); + ObjectOutputStream obInput = new ObjectOutputStream(input); + obInput.writeObject(jobGraph); + input.close(); + LocalResource jobgraph = Records.newRecord(LocalResource.class); + Path remoteJobGraph = + Utils.setupLocalResource(fs, appId.toString(), new Path(fp.toURI()), jobgraph, fs.getHomeDirectory()); + localResources.put("job.graph", jobgraph); + paths.add(remoteJobGraph); + classPathBuilder.append("job.graph").append(File.pathSeparator); + } catch (Exception e) { + LOG.warn("Add job graph to local resource fail"); + throw e; + } + } + sessionFilesDir = new Path(fs.getHomeDirectory(), ".flink/" + appId.toString() + "/"); FsPermission permission = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE); @@ -835,7 +877,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor LOG.debug("Application State: {}", appState); switch(appState) { case FAILED: - case FINISHED: + case FINISHED: //TODO: the finished state may be valid in flip-6 case KILLED: throw new YarnDeploymentException("The YARN application unexpectedly switched to state " + appState + " during deployment. \n" + @@ -871,16 +913,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor } catch (IllegalStateException e) { // we're already in the shut down hook. } - - String host = report.getHost(); - int port = report.getRpcPort(); - - // Correctly initialize the Flink config - flinkConfiguration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, host); - flinkConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, port); - - // the Flink cluster is deployed in YARN. Represent cluster - return createYarnClusterClient(this, yarnClient, report, flinkConfiguration, sessionFilesDir, true); + return report; } /** http://git-wip-us.apache.org/repos/asf/flink/blob/3695a8e9/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientV2.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientV2.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientV2.java new file mode 100644 index 0000000..daa2c3b --- /dev/null +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientV2.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.yarn; + +import org.apache.flink.api.common.JobSubmissionResult; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.ProgramInvocationException; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.URL; +import java.util.List; + +/** + * Java representation of a running Flink job on YARN. + * Since flip-6, a flink job will be run as a yarn job by default, each job has a jobmaster, + * so this class will be used as a client to communicate with yarn and start the job on yarn. + */ +public class YarnClusterClientV2 extends ClusterClient { + + private static final Logger LOG = LoggerFactory.getLogger(YarnClusterClientV2.class); + + private YarnClient yarnClient; + + private final AbstractYarnClusterDescriptor clusterDescriptor; + + private ApplicationId appId; + + private String trackingURL; + + /** + * Create a client to communicate with YARN cluster. + * + * @param clusterDescriptor The descriptor used to create yarn job + * @param flinkConfig Flink configuration + * @throws java.io.IOException + */ + public YarnClusterClientV2( + final AbstractYarnClusterDescriptor clusterDescriptor, + org.apache.flink.configuration.Configuration flinkConfig) throws IOException { + + super(flinkConfig); + + this.clusterDescriptor = clusterDescriptor; + this.yarnClient = clusterDescriptor.getYarnClient(); + this.trackingURL = ""; + } + + @Override + public org.apache.flink.configuration.Configuration getFlinkConfiguration() { + return flinkConfig; + } + + @Override + public int getMaxSlots() { + // Now need not set max slot + return 0; + } + + @Override + public boolean hasUserJarsInClassPath(List<URL> userJarFiles) { + return clusterDescriptor.hasUserJarFiles(userJarFiles); + } + + @Override + protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException { + try { + // Create application via yarnClient + ApplicationReport report = this.clusterDescriptor.startAppMaster(jobGraph, yarnClient); + if (report.getYarnApplicationState().equals(YarnApplicationState.RUNNING)) { + appId = report.getApplicationId(); + trackingURL = report.getTrackingUrl(); + logAndSysout("Please refer to " + getWebInterfaceURL() + + " for the running status of job " + jobGraph.getJobID().toString()); + //TODO: not support attach mode now + return new JobSubmissionResult(jobGraph.getJobID()); + } + else { + throw new ProgramInvocationException("Fail to submit the job."); + } + } + catch (Exception e) { + throw new ProgramInvocationException("Fail to submit the job", e.getCause()); + } + } + + @Override + public String getWebInterfaceURL() { + // there seems to be a difference between HD 2.2.0 and 2.6.0 + if(!trackingURL.startsWith("http://")) { + return "http://" + trackingURL; + } else { + return trackingURL; + } + } + + @Override + public String getClusterIdentifier() { + return "Yarn cluster with application id " + getApplicationId(); + } + + /** + * This method is only available if the cluster hasn't been started in detached mode. + */ + @Override + public GetClusterStatusResponse getClusterStatus() { + throw new UnsupportedOperationException("Not support getClusterStatus since Flip-6."); + } + + public ApplicationStatus getApplicationStatus() { + //TODO: this method is useful for later + return null; + } + + @Override + public List<String> getNewMessages() { + throw new UnsupportedOperationException("Not support getNewMessages since Flip-6."); + } + + @Override + public void finalizeCluster() { + throw new UnsupportedOperationException("Not support finalizeCluster since Flip-6."); + } + + @Override + public boolean isDetached() { + return super.isDetached() || clusterDescriptor.isDetachedMode(); + } + + @Override + public void waitForClusterToBeReady() { + throw new UnsupportedOperationException("Not support waitForClusterToBeReady since Flip-6."); + } + + @Override + public InetSocketAddress getJobManagerAddress() { + //TODO: just return a local address in order to be compatible with createClient in CliFrontend + return new InetSocketAddress("localhost", 0); + } + + public ApplicationId getApplicationId() { + return appId; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/3695a8e9/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java new file mode 100644 index 0000000..e3bd944 --- /dev/null +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.yarn; + + +/** + * Implementation of {@link org.apache.flink.yarn.AbstractYarnClusterDescriptor} which is used to start the new application master for a job under flip-6. + * This implementation is now however tricky, since YarnClusterDescriptorV2 is related YarnClusterClientV2, but AbstractYarnClusterDescriptor is related + * to YarnClusterClient. We should let YarnClusterDescriptorV2 implements ClusterDescriptor<YarnClusterClientV2>. + * However, in order to use the code in AbstractYarnClusterDescriptor for setting environments and so on, we make YarnClusterDescriptorV2 as now. + */ +public class YarnClusterDescriptorV2 extends AbstractYarnClusterDescriptor { + + @Override + protected Class<?> getApplicationMasterClass() { + return YarnFlinkApplicationMasterRunner.class; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/3695a8e9/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java new file mode 100644 index 0000000..ca5049c --- /dev/null +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.yarn.cli; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.client.cli.CliFrontendParser; +import org.apache.flink.client.cli.CustomCommandLine; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.Preconditions; +import org.apache.flink.yarn.YarnClusterClientV2; +import org.apache.flink.yarn.YarnClusterDescriptorV2; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.net.URL; +import java.net.URLDecoder; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.client.cli.CliFrontendParser.ADDRESS_OPTION; + +/** + * Class handling the command line interface to the YARN per job mode under flip-6. + */ +public class FlinkYarnCLI implements CustomCommandLine<YarnClusterClientV2> { + private static final Logger LOG = LoggerFactory.getLogger(FlinkYarnCLI.class); + + /** The id for the CommandLine interface */ + private static final String ID = "yarn"; + + private static final String YARN_DYNAMIC_PROPERTIES_SEPARATOR = "@@"; // this has to be a regex for String.split() + + //------------------------------------ Command Line argument options ------------------------- + // the prefix transformation is used by the CliFrontend static constructor. + private final Option QUEUE; + private final Option SHIP_PATH; + private final Option FLINK_JAR; + private final Option JM_MEMORY; + private final Option DETACHED; + private final Option ZOOKEEPER_NAMESPACE; + + private final Options ALL_OPTIONS; + + /** + * Dynamic properties allow the user to specify additional configuration values with -D, such as + * -D fs.overwrite-files=true -D taskmanager.network.numberOfBuffers=16368 + */ + private final Option DYNAMIC_PROPERTIES; + + //------------------------------------ Internal fields ------------------------- + // use detach mode as default + private boolean detachedMode = true; + + public FlinkYarnCLI(String shortPrefix, String longPrefix) { + + QUEUE = new Option(shortPrefix + "qu", longPrefix + "queue", true, "Specify YARN queue."); + SHIP_PATH = new Option(shortPrefix + "t", longPrefix + "ship", true, "Ship files in the specified directory (t for transfer)"); + FLINK_JAR = new Option(shortPrefix + "j", longPrefix + "jar", true, "Path to Flink jar file"); + JM_MEMORY = new Option(shortPrefix + "jm", longPrefix + "jobManagerMemory", true, "Memory for JobManager Container [in MB]"); + DYNAMIC_PROPERTIES = new Option(shortPrefix + "D", true, "Dynamic properties"); + DETACHED = new Option(shortPrefix + "a", longPrefix + "attached", false, "Start attached"); + ZOOKEEPER_NAMESPACE = new Option(shortPrefix + "z", longPrefix + "zookeeperNamespace", true, "Namespace to create the Zookeeper sub-paths for high availability mode"); + + ALL_OPTIONS = new Options(); + ALL_OPTIONS.addOption(FLINK_JAR); + ALL_OPTIONS.addOption(JM_MEMORY); + ALL_OPTIONS.addOption(QUEUE); + ALL_OPTIONS.addOption(SHIP_PATH); + ALL_OPTIONS.addOption(DYNAMIC_PROPERTIES); + ALL_OPTIONS.addOption(DETACHED); + ALL_OPTIONS.addOption(ZOOKEEPER_NAMESPACE); + } + + public YarnClusterDescriptorV2 createDescriptor(String defaultApplicationName, CommandLine cmd) { + + YarnClusterDescriptorV2 yarnClusterDescriptor = new YarnClusterDescriptorV2(); + + // Jar Path + Path localJarPath; + if (cmd.hasOption(FLINK_JAR.getOpt())) { + String userPath = cmd.getOptionValue(FLINK_JAR.getOpt()); + if (!userPath.startsWith("file://")) { + userPath = "file://" + userPath; + } + localJarPath = new Path(userPath); + } else { + LOG.info("No path for the flink jar passed. Using the location of " + + yarnClusterDescriptor.getClass() + " to locate the jar"); + String encodedJarPath = + yarnClusterDescriptor.getClass().getProtectionDomain().getCodeSource().getLocation().getPath(); + try { + // we have to decode the url encoded parts of the path + String decodedPath = URLDecoder.decode(encodedJarPath, Charset.defaultCharset().name()); + localJarPath = new Path(new File(decodedPath).toURI()); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException("Couldn't decode the encoded Flink dist jar path: " + encodedJarPath + + " Please supply a path manually via the -" + FLINK_JAR.getOpt() + " option."); + } + } + + yarnClusterDescriptor.setLocalJarPath(localJarPath); + + List<File> shipFiles = new ArrayList<>(); + // path to directory to ship + if (cmd.hasOption(SHIP_PATH.getOpt())) { + String shipPath = cmd.getOptionValue(SHIP_PATH.getOpt()); + File shipDir = new File(shipPath); + if (shipDir.isDirectory()) { + shipFiles.add(shipDir); + } else { + LOG.warn("Ship directory is not a directory. Ignoring it."); + } + } + + yarnClusterDescriptor.addShipFiles(shipFiles); + + // queue + if (cmd.hasOption(QUEUE.getOpt())) { + yarnClusterDescriptor.setQueue(cmd.getOptionValue(QUEUE.getOpt())); + } + + // JobManager Memory + if (cmd.hasOption(JM_MEMORY.getOpt())) { + int jmMemory = Integer.valueOf(cmd.getOptionValue(JM_MEMORY.getOpt())); + yarnClusterDescriptor.setJobManagerMemory(jmMemory); + } + + String[] dynamicProperties = null; + if (cmd.hasOption(DYNAMIC_PROPERTIES.getOpt())) { + dynamicProperties = cmd.getOptionValues(DYNAMIC_PROPERTIES.getOpt()); + } + String dynamicPropertiesEncoded = StringUtils.join(dynamicProperties, YARN_DYNAMIC_PROPERTIES_SEPARATOR); + + yarnClusterDescriptor.setDynamicPropertiesEncoded(dynamicPropertiesEncoded); + + if (cmd.hasOption(DETACHED.getOpt()) || cmd.hasOption(CliFrontendParser.DETACHED_OPTION.getOpt())) { + // TODO: not support non detach mode now. + //this.detachedMode = false; + } + yarnClusterDescriptor.setDetachedMode(this.detachedMode); + + if(defaultApplicationName != null) { + yarnClusterDescriptor.setName(defaultApplicationName); + } + + if (cmd.hasOption(ZOOKEEPER_NAMESPACE.getOpt())) { + String zookeeperNamespace = cmd.getOptionValue(ZOOKEEPER_NAMESPACE.getOpt()); + yarnClusterDescriptor.setZookeeperNamespace(zookeeperNamespace); + } + + return yarnClusterDescriptor; + } + + private void printUsage() { + System.out.println("Usage:"); + HelpFormatter formatter = new HelpFormatter(); + formatter.setWidth(200); + formatter.setLeftPadding(5); + + formatter.setSyntaxPrefix(" Optional"); + Options options = new Options(); + addGeneralOptions(options); + addRunOptions(options); + formatter.printHelp(" ", options); + } + + @Override + public boolean isActive(CommandLine commandLine, Configuration configuration) { + String jobManagerOption = commandLine.getOptionValue(ADDRESS_OPTION.getOpt(), null); + boolean yarnJobManager = ID.equals(jobManagerOption); + return yarnJobManager; + } + + @Override + public String getId() { + return ID; + } + + @Override + public void addRunOptions(Options baseOptions) { + for (Object option : ALL_OPTIONS.getOptions()) { + baseOptions.addOption((Option) option); + } + } + + @Override + public void addGeneralOptions(Options baseOptions) { + } + + @Override + public YarnClusterClientV2 retrieveCluster( + CommandLine cmdLine, + Configuration config) throws UnsupportedOperationException { + + throw new UnsupportedOperationException("Not support retrieveCluster since Flip-6."); + } + + @Override + public YarnClusterClientV2 createCluster( + String applicationName, + CommandLine cmdLine, + Configuration config, + List<URL> userJarFiles) { + Preconditions.checkNotNull(userJarFiles, "User jar files should not be null."); + + YarnClusterDescriptorV2 yarnClusterDescriptor = createDescriptor(applicationName, cmdLine); + yarnClusterDescriptor.setFlinkConfiguration(config); + yarnClusterDescriptor.setProvidedUserJarFiles(userJarFiles); + + YarnClusterClientV2 client = null; + try { + client = new YarnClusterClientV2(yarnClusterDescriptor, config); + } + catch (IOException e) { + throw new RuntimeException("Fail to create YarnClusterClientV2", e.getCause()); + } + return client; + + } + + /** + * Utility method + */ + private void logAndSysout(String message) { + LOG.info(message); + System.out.println(message); + } + +}
