[FLINK-4930] [client, yarn] Implement FLIP-6 YARN client

Summary: Implement FLIP-6 YARN client

Test Plan: NA

Reviewers: biao.liub

Differential Revision: http://phabricator.taobao.net/D6563


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3695a8e9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3695a8e9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3695a8e9

Branch: refs/heads/master
Commit: 3695a8e92e9c3deee368d9cc3ce89a5ab117d6a1
Parents: 2a7dbda
Author: shuai.xus <[email protected]>
Authored: Wed Nov 23 17:19:35 2016 +0800
Committer: Stephan Ewen <[email protected]>
Committed: Fri Dec 23 20:54:27 2016 +0100

----------------------------------------------------------------------
 .../org/apache/flink/client/CliFrontend.java    |   1 +
 .../yarn/AbstractYarnClusterDescriptor.java     | 101 +++++---
 .../apache/flink/yarn/YarnClusterClientV2.java  | 169 +++++++++++++
 .../flink/yarn/YarnClusterDescriptorV2.java     |  34 +++
 .../org/apache/flink/yarn/cli/FlinkYarnCLI.java | 253 +++++++++++++++++++
 5 files changed, 524 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3695a8e9/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java 
b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 1ec0674..dc3280e 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -131,6 +131,7 @@ public class CliFrontend {
                /** command line interface of the YARN session, with a special 
initialization here
                 *  to prefix all options with y/yarn. */
                
loadCustomCommandLine("org.apache.flink.yarn.cli.FlinkYarnSessionCli", "y", 
"yarn");
+               loadCustomCommandLine("org.apache.flink.yarn.cli.FlinkYarnCLI", 
"y", "yarn");
                customCommandLine.add(new DefaultCLI());
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3695a8e9/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index ca18439..b4c87b8 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -60,6 +61,8 @@ import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.PrintStream;
+import java.io.FileOutputStream;
+import java.io.ObjectOutputStream;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.net.URISyntaxException;
@@ -460,28 +463,6 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                        flinkConfiguration.setString(dynProperty.getKey(), 
dynProperty.getValue());
                }
 
-               // ------------------ Set default file system scheme 
-------------------------
-
-               try {
-                       
org.apache.flink.core.fs.FileSystem.setDefaultScheme(flinkConfiguration);
-               } catch (IOException e) {
-                       throw new IOException("Error while setting the default 
" +
-                               "filesystem scheme from configuration.", e);
-               }
-
-               // initialize file system
-               // Copy the application master jar to the filesystem
-               // Create a local resource to point to the destination jar path
-               final FileSystem fs = FileSystem.get(conf);
-
-               // hard coded check for the GoogleHDFS client because its not 
overriding the getScheme() method.
-               if 
(!fs.getClass().getSimpleName().equals("GoogleHadoopFileSystem") &&
-                       fs.getScheme().startsWith("file")) {
-                       LOG.warn("The file system scheme is '" + fs.getScheme() 
+ "'. This indicates that the "
-                               + "specified Hadoop configuration path is wrong 
and the system is using the default Hadoop configuration values."
-                               + "The Flink YARN client needs to store its 
files in a distributed file system");
-               }
-
                // ------------------ Check if the YARN ClusterClient has the 
requested resources --------------
 
                // the yarnMinAllocationMB specifies the smallest possible 
container allocation size.
@@ -505,6 +486,7 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                // Create application via yarnClient
                final YarnClientApplication yarnApplication = 
yarnClient.createApplication();
                GetNewApplicationResponse appResponse = 
yarnApplication.getNewApplicationResponse();
+               ApplicationSubmissionContext appContext = 
yarnApplication.getApplicationSubmissionContext();
 
                Resource maxRes = appResponse.getMaximumResourceCapability();
                final String NOTE = "Please check the 
'yarn.scheduler.maximum-allocation-mb' and the 
'yarn.nodemanager.resource.memory-mb' configuration values\n";
@@ -560,6 +542,45 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                        }
                }
 
+               ApplicationReport report = startAppMaster(null, yarnClient);
+
+               String host = report.getHost();
+               int port = report.getRpcPort();
+
+               // Correctly initialize the Flink config
+               
flinkConfiguration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, host);
+               
flinkConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, port);
+
+               // the Flink cluster is deployed in YARN. Represent cluster
+               return createYarnClusterClient(this, yarnClient, report, 
flinkConfiguration, sessionFilesDir, true);
+       }
+
+       public ApplicationReport startAppMaster(JobGraph jobGraph, YarnClient 
yarnClient) throws Exception {
+
+               // ------------------ Set default file system scheme 
-------------------------
+
+               try {
+                       
org.apache.flink.core.fs.FileSystem.setDefaultScheme(flinkConfiguration);
+               } catch (IOException e) {
+                       throw new IOException("Error while setting the default 
" +
+                                       "filesystem scheme from 
configuration.", e);
+               }
+
+               // initialize file system
+               // Copy the application master jar to the filesystem
+               // Create a local resource to point to the destination jar path
+               final FileSystem fs = FileSystem.get(conf);
+
+               // hard coded check for the GoogleHDFS client because its not 
overriding the getScheme() method.
+               if 
(!fs.getClass().getSimpleName().equals("GoogleHadoopFileSystem") &&
+                               fs.getScheme().startsWith("file")) {
+                       LOG.warn("The file system scheme is '" + fs.getScheme() 
+ "'. This indicates that the "
+                                       + "specified Hadoop configuration path 
is wrong and the system is using the default Hadoop configuration values."
+                                       + "The Flink YARN client needs to store 
its files in a distributed file system");
+               }
+
+               final YarnClientApplication yarnApplication = 
yarnClient.createApplication();
+               ApplicationSubmissionContext appContext = 
yarnApplication.getApplicationSubmissionContext();
                Set<File> effectiveShipFiles = new HashSet<>(shipFiles.size());
                for (File file : shipFiles) {
                        effectiveShipFiles.add(file.getAbsoluteFile());
@@ -596,8 +617,8 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                        effectiveShipFiles.addAll(userJarFiles);
                }
 
+
                // Set-up ApplicationSubmissionContext for the application
-               ApplicationSubmissionContext appContext = 
yarnApplication.getApplicationSubmissionContext();
 
                final ApplicationId appId = appContext.getApplicationId();
 
@@ -694,6 +715,27 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                paths.add(remotePathConf);
                
classPathBuilder.append("flink-conf.yaml").append(File.pathSeparator);
 
+               // write job graph to tmp file and add it to local resource 
+               // TODO: need refine ?
+               if (jobGraph != null) {
+                       try {
+                               File fp = new File("/tmp/jobgraph-" + 
appId.toString());
+                               FileOutputStream input = new 
FileOutputStream(fp);
+                               ObjectOutputStream obInput = new 
ObjectOutputStream(input);
+                               obInput.writeObject(jobGraph);
+                               input.close();
+                               LocalResource jobgraph = 
Records.newRecord(LocalResource.class);
+                               Path remoteJobGraph =
+                                               Utils.setupLocalResource(fs, 
appId.toString(), new Path(fp.toURI()), jobgraph, fs.getHomeDirectory());
+                               localResources.put("job.graph", jobgraph);
+                               paths.add(remoteJobGraph);
+                               
classPathBuilder.append("job.graph").append(File.pathSeparator);
+                       } catch (Exception e) {
+                               LOG.warn("Add job graph to local resource 
fail");
+                               throw e;
+                       }
+               }
+
                sessionFilesDir = new Path(fs.getHomeDirectory(), ".flink/" + 
appId.toString() + "/");
 
                FsPermission permission = new FsPermission(FsAction.ALL, 
FsAction.NONE, FsAction.NONE);
@@ -835,7 +877,7 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                        LOG.debug("Application State: {}", appState);
                        switch(appState) {
                                case FAILED:
-                               case FINISHED:
+                               case FINISHED: //TODO: the finished state may 
be valid in flip-6
                                case KILLED:
                                        throw new YarnDeploymentException("The 
YARN application unexpectedly switched to state "
                                                + appState + " during 
deployment. \n" +
@@ -871,16 +913,7 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                } catch (IllegalStateException e) {
                        // we're already in the shut down hook.
                }
-
-               String host = report.getHost();
-               int port = report.getRpcPort();
-
-               // Correctly initialize the Flink config
-               
flinkConfiguration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, host);
-               
flinkConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, port);
-
-               // the Flink cluster is deployed in YARN. Represent cluster
-               return createYarnClusterClient(this, yarnClient, report, 
flinkConfiguration, sessionFilesDir, true);
+               return report;
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/3695a8e9/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientV2.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientV2.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientV2.java
new file mode 100644
index 0000000..daa2c3b
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientV2.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.List;
+
+/**
+ * Java representation of a running Flink job on YARN.
+ * Since flip-6, a flink job will be run as a yarn job by default, each job 
has a jobmaster, 
+ * so this class will be used as a client to communicate with yarn and start 
the job on yarn.
+ */
+public class YarnClusterClientV2 extends ClusterClient {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(YarnClusterClientV2.class);
+
+       private YarnClient yarnClient;
+
+       private final AbstractYarnClusterDescriptor clusterDescriptor;
+
+       private ApplicationId appId;
+
+       private String trackingURL;
+
+       /**
+        * Create a client to communicate with YARN cluster.
+        *
+        * @param clusterDescriptor The descriptor used to create yarn job
+        * @param flinkConfig Flink configuration
+        * @throws java.io.IOException
+        */
+       public YarnClusterClientV2(
+                       final AbstractYarnClusterDescriptor clusterDescriptor,
+                       org.apache.flink.configuration.Configuration 
flinkConfig) throws IOException {
+
+               super(flinkConfig);
+
+               this.clusterDescriptor = clusterDescriptor;
+               this.yarnClient = clusterDescriptor.getYarnClient();
+               this.trackingURL = "";
+       }
+
+       @Override
+       public org.apache.flink.configuration.Configuration 
getFlinkConfiguration() {
+               return flinkConfig;
+       }
+
+       @Override
+       public int getMaxSlots() {
+        // Now need not set max slot
+               return 0;
+       }
+
+       @Override
+       public boolean hasUserJarsInClassPath(List<URL> userJarFiles) {
+               return clusterDescriptor.hasUserJarFiles(userJarFiles);
+       }
+
+       @Override
+       protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader 
classLoader) throws ProgramInvocationException {
+               try {
+                       // Create application via yarnClient
+                       ApplicationReport report = 
this.clusterDescriptor.startAppMaster(jobGraph, yarnClient);
+                       if 
(report.getYarnApplicationState().equals(YarnApplicationState.RUNNING)) {
+                               appId = report.getApplicationId();
+                               trackingURL = report.getTrackingUrl();
+                               logAndSysout("Please refer to " + 
getWebInterfaceURL() 
+                                               + " for the running status of 
job " +  jobGraph.getJobID().toString());
+                               //TODO: not support attach mode now
+                               return new 
JobSubmissionResult(jobGraph.getJobID());
+                       }
+                       else {
+                               throw new ProgramInvocationException("Fail to 
submit the job.");
+                       }
+               }
+               catch (Exception e) {
+                       throw new ProgramInvocationException("Fail to submit 
the job", e.getCause());
+               }
+       }
+
+       @Override
+       public String getWebInterfaceURL() {
+               // there seems to be a difference between HD 2.2.0 and 2.6.0
+               if(!trackingURL.startsWith("http://";)) {
+                       return "http://"; + trackingURL;
+               } else {
+                       return trackingURL;
+               }
+       }
+
+       @Override
+       public String getClusterIdentifier() {
+               return "Yarn cluster with application id " + getApplicationId();
+       }
+
+       /**
+        * This method is only available if the cluster hasn't been started in 
detached mode.
+        */
+       @Override
+       public GetClusterStatusResponse getClusterStatus() {
+               throw new UnsupportedOperationException("Not support 
getClusterStatus since Flip-6.");
+       }
+
+       public ApplicationStatus getApplicationStatus() {
+               //TODO: this method is useful for later
+               return null;
+       }
+
+       @Override
+       public List<String> getNewMessages() {
+               throw new UnsupportedOperationException("Not support 
getNewMessages since Flip-6.");
+       }
+
+       @Override
+       public void finalizeCluster() {
+               throw new UnsupportedOperationException("Not support 
finalizeCluster since Flip-6.");
+       }
+
+       @Override
+       public boolean isDetached() {
+               return super.isDetached() || clusterDescriptor.isDetachedMode();
+       }
+
+       @Override
+       public void waitForClusterToBeReady() {
+               throw new UnsupportedOperationException("Not support 
waitForClusterToBeReady since Flip-6.");
+       }
+
+       @Override
+       public InetSocketAddress getJobManagerAddress() {
+               //TODO: just return a local address in order to be compatible 
with createClient in CliFrontend
+               return new InetSocketAddress("localhost", 0);
+       }
+
+       public ApplicationId getApplicationId() {
+               return appId;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3695a8e9/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
new file mode 100644
index 0000000..e3bd944
--- /dev/null
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.yarn;
+
+
+/**
+ * Implementation of {@link 
org.apache.flink.yarn.AbstractYarnClusterDescriptor} which is used to start the 
new application master for a job under flip-6.
+ * This implementation is now however tricky, since YarnClusterDescriptorV2 is 
related YarnClusterClientV2, but AbstractYarnClusterDescriptor is related
+ * to YarnClusterClient. We should let YarnClusterDescriptorV2 implements 
ClusterDescriptor<YarnClusterClientV2>.
+ * However, in order to use the code in AbstractYarnClusterDescriptor for 
setting environments and so on, we make YarnClusterDescriptorV2 as now.
+ */
+public class YarnClusterDescriptorV2 extends AbstractYarnClusterDescriptor {
+
+       @Override
+       protected Class<?> getApplicationMasterClass() {
+               return YarnFlinkApplicationMasterRunner.class;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3695a8e9/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java
new file mode 100644
index 0000000..ca5049c
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.yarn.cli;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.client.cli.CliFrontendParser;
+import org.apache.flink.client.cli.CustomCommandLine;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.yarn.YarnClusterClientV2;
+import org.apache.flink.yarn.YarnClusterDescriptorV2;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URL;
+import java.net.URLDecoder;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.client.cli.CliFrontendParser.ADDRESS_OPTION;
+
+/**
+ * Class handling the command line interface to the YARN per job mode under 
flip-6.
+ */
+public class FlinkYarnCLI implements CustomCommandLine<YarnClusterClientV2> {
+       private static final Logger LOG = 
LoggerFactory.getLogger(FlinkYarnCLI.class);
+
+       /** The id for the CommandLine interface */
+       private static final String ID = "yarn";
+
+       private static final String YARN_DYNAMIC_PROPERTIES_SEPARATOR = "@@"; 
// this has to be a regex for String.split()
+
+       //------------------------------------ Command Line argument options 
-------------------------
+       // the prefix transformation is used by the CliFrontend static 
constructor.
+       private final Option QUEUE;
+       private final Option SHIP_PATH;
+       private final Option FLINK_JAR;
+       private final Option JM_MEMORY;
+       private final Option DETACHED;
+       private final Option ZOOKEEPER_NAMESPACE;
+
+       private final Options ALL_OPTIONS;
+
+       /**
+        * Dynamic properties allow the user to specify additional 
configuration values with -D, such as
+        *  -D fs.overwrite-files=true  -D 
taskmanager.network.numberOfBuffers=16368
+        */
+       private final Option DYNAMIC_PROPERTIES;
+
+       //------------------------------------ Internal fields 
-------------------------
+       // use detach mode as default
+       private boolean detachedMode = true;
+
+       public FlinkYarnCLI(String shortPrefix, String longPrefix) {
+
+               QUEUE = new Option(shortPrefix + "qu", longPrefix + "queue", 
true, "Specify YARN queue.");
+               SHIP_PATH = new Option(shortPrefix + "t", longPrefix + "ship", 
true, "Ship files in the specified directory (t for transfer)");
+               FLINK_JAR = new Option(shortPrefix + "j", longPrefix + "jar", 
true, "Path to Flink jar file");
+               JM_MEMORY = new Option(shortPrefix + "jm", longPrefix + 
"jobManagerMemory", true, "Memory for JobManager Container [in MB]");
+               DYNAMIC_PROPERTIES = new Option(shortPrefix + "D", true, 
"Dynamic properties");
+               DETACHED = new Option(shortPrefix + "a", longPrefix + 
"attached", false, "Start attached");
+               ZOOKEEPER_NAMESPACE = new Option(shortPrefix + "z", longPrefix 
+ "zookeeperNamespace", true, "Namespace to create the Zookeeper sub-paths for 
high availability mode");
+
+               ALL_OPTIONS = new Options();
+               ALL_OPTIONS.addOption(FLINK_JAR);
+               ALL_OPTIONS.addOption(JM_MEMORY);
+               ALL_OPTIONS.addOption(QUEUE);
+               ALL_OPTIONS.addOption(SHIP_PATH);
+               ALL_OPTIONS.addOption(DYNAMIC_PROPERTIES);
+               ALL_OPTIONS.addOption(DETACHED);
+               ALL_OPTIONS.addOption(ZOOKEEPER_NAMESPACE);
+       }
+
+       public YarnClusterDescriptorV2 createDescriptor(String 
defaultApplicationName, CommandLine cmd) {
+
+               YarnClusterDescriptorV2 yarnClusterDescriptor = new 
YarnClusterDescriptorV2();
+
+               // Jar Path
+               Path localJarPath;
+               if (cmd.hasOption(FLINK_JAR.getOpt())) {
+                       String userPath = 
cmd.getOptionValue(FLINK_JAR.getOpt());
+                       if (!userPath.startsWith("file://")) {
+                               userPath = "file://" + userPath;
+                       }
+                       localJarPath = new Path(userPath);
+               } else {
+                       LOG.info("No path for the flink jar passed. Using the 
location of "
+                               + yarnClusterDescriptor.getClass() + " to 
locate the jar");
+                       String encodedJarPath =
+                               
yarnClusterDescriptor.getClass().getProtectionDomain().getCodeSource().getLocation().getPath();
+                       try {
+                               // we have to decode the url encoded parts of 
the path
+                               String decodedPath = 
URLDecoder.decode(encodedJarPath, Charset.defaultCharset().name());
+                               localJarPath = new Path(new 
File(decodedPath).toURI());
+                       } catch (UnsupportedEncodingException e) {
+                               throw new RuntimeException("Couldn't decode the 
encoded Flink dist jar path: " + encodedJarPath +
+                                       " Please supply a path manually via the 
-" + FLINK_JAR.getOpt() + " option.");
+                       }
+               }
+
+               yarnClusterDescriptor.setLocalJarPath(localJarPath);
+
+               List<File> shipFiles = new ArrayList<>();
+               // path to directory to ship
+               if (cmd.hasOption(SHIP_PATH.getOpt())) {
+                       String shipPath = 
cmd.getOptionValue(SHIP_PATH.getOpt());
+                       File shipDir = new File(shipPath);
+                       if (shipDir.isDirectory()) {
+                               shipFiles.add(shipDir);
+                       } else {
+                               LOG.warn("Ship directory is not a directory. 
Ignoring it.");
+                       }
+               }
+
+               yarnClusterDescriptor.addShipFiles(shipFiles);
+
+               // queue
+               if (cmd.hasOption(QUEUE.getOpt())) {
+                       
yarnClusterDescriptor.setQueue(cmd.getOptionValue(QUEUE.getOpt()));
+               }
+
+               // JobManager Memory
+               if (cmd.hasOption(JM_MEMORY.getOpt())) {
+                       int jmMemory = 
Integer.valueOf(cmd.getOptionValue(JM_MEMORY.getOpt()));
+                       yarnClusterDescriptor.setJobManagerMemory(jmMemory);
+               }
+
+               String[] dynamicProperties = null;
+               if (cmd.hasOption(DYNAMIC_PROPERTIES.getOpt())) {
+                       dynamicProperties = 
cmd.getOptionValues(DYNAMIC_PROPERTIES.getOpt());
+               }
+               String dynamicPropertiesEncoded = 
StringUtils.join(dynamicProperties, YARN_DYNAMIC_PROPERTIES_SEPARATOR);
+
+               
yarnClusterDescriptor.setDynamicPropertiesEncoded(dynamicPropertiesEncoded);
+
+               if (cmd.hasOption(DETACHED.getOpt()) || 
cmd.hasOption(CliFrontendParser.DETACHED_OPTION.getOpt())) {
+                       // TODO: not support non detach mode now.
+                       //this.detachedMode = false;
+               }
+               yarnClusterDescriptor.setDetachedMode(this.detachedMode);
+
+               if(defaultApplicationName != null) {
+                       yarnClusterDescriptor.setName(defaultApplicationName);
+               }
+
+               if (cmd.hasOption(ZOOKEEPER_NAMESPACE.getOpt())) {
+                       String zookeeperNamespace = 
cmd.getOptionValue(ZOOKEEPER_NAMESPACE.getOpt());
+                       
yarnClusterDescriptor.setZookeeperNamespace(zookeeperNamespace);
+               }
+
+               return yarnClusterDescriptor;
+       }
+
+       private void printUsage() {
+               System.out.println("Usage:");
+               HelpFormatter formatter = new HelpFormatter();
+               formatter.setWidth(200);
+               formatter.setLeftPadding(5);
+
+               formatter.setSyntaxPrefix("   Optional");
+               Options options = new Options();
+               addGeneralOptions(options);
+               addRunOptions(options);
+               formatter.printHelp(" ", options);
+       }
+
+       @Override
+       public boolean isActive(CommandLine commandLine, Configuration 
configuration) {
+               String jobManagerOption = 
commandLine.getOptionValue(ADDRESS_OPTION.getOpt(), null);
+               boolean yarnJobManager = ID.equals(jobManagerOption);
+               return yarnJobManager;
+       }
+
+       @Override
+       public String getId() {
+               return ID;
+       }
+
+       @Override
+       public void addRunOptions(Options baseOptions) {
+               for (Object option : ALL_OPTIONS.getOptions()) {
+                       baseOptions.addOption((Option) option);
+               }
+       }
+
+       @Override
+       public void addGeneralOptions(Options baseOptions) {
+       }
+
+       @Override
+       public YarnClusterClientV2 retrieveCluster(
+                       CommandLine cmdLine,
+                       Configuration config) throws 
UnsupportedOperationException {
+
+               throw new UnsupportedOperationException("Not support 
retrieveCluster since Flip-6.");
+       }
+
+       @Override
+       public YarnClusterClientV2 createCluster(
+                       String applicationName,
+                       CommandLine cmdLine,
+                       Configuration config,
+                       List<URL> userJarFiles) {
+               Preconditions.checkNotNull(userJarFiles, "User jar files should 
not be null.");
+
+               YarnClusterDescriptorV2 yarnClusterDescriptor = 
createDescriptor(applicationName, cmdLine);
+               yarnClusterDescriptor.setFlinkConfiguration(config);
+               yarnClusterDescriptor.setProvidedUserJarFiles(userJarFiles);
+
+               YarnClusterClientV2 client = null;
+               try {
+                       client = new YarnClusterClientV2(yarnClusterDescriptor, 
config);
+               }
+               catch (IOException e) {
+                       throw new RuntimeException("Fail to create 
YarnClusterClientV2", e.getCause());
+               }
+               return client;
+
+       }
+
+       /**
+        * Utility method
+        */
+       private void logAndSysout(String message) {
+               LOG.info(message);
+               System.out.println(message);
+       }
+
+}

Reply via email to