[FLINK-3937] implement -yid option to Flink CLI

- enables to use list, savepoint, cancel and stop subcommands
- adapt FlinkYarnSessionCli to also accept YARN application Id to attach to
- update documentation

This closes #2034


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/875d4d23
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/875d4d23
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/875d4d23

Branch: refs/heads/master
Commit: 875d4d238567a0503941488eb4e7d03b38c0fc42
Parents: 9e98424
Author: Sebastian Klemke <[email protected]>
Authored: Wed May 25 14:28:59 2016 +0200
Committer: Maximilian Michels <[email protected]>
Committed: Fri Jun 17 10:37:58 2016 +0200

----------------------------------------------------------------------
 docs/apis/cli.md                                | 28 ++++++++++
 docs/setup/yarn_setup.md                        | 28 ++++++++++
 .../org/apache/flink/client/CliFrontend.java    |  4 ++
 .../flink/client/cli/CliFrontendParser.java     | 10 ++++
 .../yarn/AbstractYarnClusterDescriptor.java     | 15 ++++++
 .../flink/yarn/cli/FlinkYarnSessionCli.java     | 55 +++++++++++++++++++-
 6 files changed, 139 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/875d4d23/docs/apis/cli.md
----------------------------------------------------------------------
diff --git a/docs/apis/cli.md b/docs/apis/cli.md
index 08a2aa6..511862c 100644
--- a/docs/apis/cli.md
+++ b/docs/apis/cli.md
@@ -105,6 +105,10 @@ The command line can be used to
 
         ./bin/flink list -r
 
+-   List running Flink jobs inside Flink YARN session:
+
+        ./bin/flink list -m yarn-cluster -yid <yarnApplicationID> -r
+
 -   Cancel a job:
 
         ./bin/flink cancel <jobID>
@@ -252,6 +256,12 @@ Action "list" lists running and scheduled programs.
                                    configuration.
      -r,--running                  Show only running programs and their JobIDs
      -s,--scheduled                Show only scheduled programs and their 
JobIDs
+  Additional arguments if -m yarn-cluster is set:
+     -yid <yarnApplicationId>      YARN application ID of Flink YARN session to
+                                   connect to. Must not be set if JobManager HA
+                                   is used. In this case, JobManager RPC
+                                   location is automatically retrieved from
+                                   Zookeeper.
 
 
 Action "cancel" cancels a running program.
@@ -264,6 +274,12 @@ Action "cancel" cancels a running program.
                                    job. Use this flag to connect to a different
                                    JobManager than the one specified in the
                                    configuration.
+  Additional arguments if -m yarn-cluster is set:
+     -yid <yarnApplicationId>      YARN application ID of Flink YARN session to
+                                   connect to. Must not be set if JobManager HA
+                                   is used. In this case, JobManager RPC
+                                   location is automatically retrieved from
+                                   Zookeeper.
 
 
 Action "stop" stops a running program (streaming jobs only). There are no 
strong consistency
@@ -275,6 +291,12 @@ guarantees for a stop request.
                                    to connect. Use this flag to connect to a
                                    different JobManager than the one specified
                                    in the configuration.
+  Additional arguments if -m yarn-cluster is set:
+     -yid <yarnApplicationId>      YARN application ID of Flink YARN session to
+                                   connect to. Must not be set if JobManager HA
+                                   is used. In this case, JobManager RPC
+                                   location is automatically retrieved from
+                                   Zookeeper.
 
 
 Action "savepoint" triggers savepoints for a running job or disposes existing 
ones.
@@ -288,4 +310,10 @@ Action "savepoint" triggers savepoints for a running job 
or disposes existing on
                                     job. Use this flag to connect to a 
different
                                     JobManager than the one specified in the
                                     configuration.
+  Additional arguments if -m yarn-cluster is set:
+     -yid <yarnApplicationId>      YARN application ID of Flink YARN session to
+                                   connect to. Must not be set if JobManager HA
+                                   is used. In this case, JobManager RPC
+                                   location is automatically retrieved from
+                                   Zookeeper.
 ~~~

http://git-wip-us.apache.org/repos/asf/flink/blob/875d4d23/docs/setup/yarn_setup.md
----------------------------------------------------------------------
diff --git a/docs/setup/yarn_setup.md b/docs/setup/yarn_setup.md
index aa0f7a4..1e76a58 100644
--- a/docs/setup/yarn_setup.md
+++ b/docs/setup/yarn_setup.md
@@ -143,6 +143,34 @@ Note that in this case its not possible to stop the YARN 
session using Flink.
 
 Use the YARN utilities (`yarn application -kill <appId>`) to stop the YARN 
session.
 
+#### Attach to an existing Session
+
+Use the following command to start a session
+
+~~~bash
+./bin/yarn-session.sh
+~~~
+
+This command will show you the following overview:
+
+~~~bash
+Usage:
+   Required
+     -id,--applicationId <yarnAppId> YARN application Id
+~~~
+
+As already mentioned, `YARN_CONF_DIR` or `HADOOP_CONF_DIR` environment 
variable must be set to read the YARN and HDFS configuration.
+
+**Example:** Issue the following command to attach to running Flink YARN 
session `application_1463870264508_0029`:
+
+~~~bash
+./bin/yarn-session.sh -id application_1463870264508_0029
+~~~
+
+Attaching to a running session uses YARN ResourceManager to determine Job 
Manager RPC port.
+
+Stop the YARN session by stopping the unix process (using CTRL+C) or by 
entering 'stop' into the client.
+
 ### Submit Job to Flink
 
 Use the following command to submit a Flink program to the YARN cluster:

http://git-wip-us.apache.org/repos/asf/flink/blob/875d4d23/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java 
b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index cf7a8c2..3064f8d 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -800,6 +800,10 @@ public class CliFrontend {
         */
        protected void updateConfig(CommandLineOptions options) {
                if(options.getJobManagerAddress() != null){
+                       if 
(YARN_DEPLOY_JOBMANAGER.equals(options.getJobManagerAddress())) {
+                               jobManagerAddress = 
CliFrontendParser.getFlinkYarnSessionCli()
+                                       
.attachFlinkYarnClient(options.getCommandLine())
+                                       .getJobManagerAddress();
                        InetSocketAddress jobManagerAddress = 
ClientUtils.parseHostPortAddress(options.getJobManagerAddress());
                        writeJobManagerAddressToConfig(config, 
jobManagerAddress);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/875d4d23/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
index f28d1b6..9b935e8 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
@@ -188,6 +188,8 @@ public class CliFrontendParser {
 
        private static Options getJobManagerAddressOption(Options options) {
                options.addOption(ADDRESS_OPTION);
+               yarnSessionCLi.getYARNAttachCLIOptions(options);
+
                return options;
        }
 
@@ -280,6 +282,10 @@ public class CliFrontendParser {
                System.out.println("\n  Syntax: info [OPTIONS] <jar-file> 
<arguments>");
                formatter.setSyntaxPrefix("  \"info\" action options:");
                formatter.printHelp(" ", 
getInfoOptionsWithoutDeprecatedOptions(new Options()));
+               formatter.setSyntaxPrefix("  Additional arguments if -m " + 
CliFrontend.YARN_DEPLOY_JOBMANAGER + " is set:");
+               Options yarnOpts = new Options();
+               yarnSessionCLi.getYARNSessionCLIOptions(yarnOpts);
+               formatter.printHelp(" ", yarnOpts);
                System.out.println();
        }
 
@@ -316,6 +322,10 @@ public class CliFrontendParser {
                System.out.println("\n  Syntax: cancel [OPTIONS] <Job ID>");
                formatter.setSyntaxPrefix("  \"cancel\" action options:");
                formatter.printHelp(" ", getCancelOptions(new Options()));
+               formatter.setSyntaxPrefix("  Additional arguments if -m " + 
CliFrontend.YARN_DEPLOY_JOBMANAGER + " is set:");
+               Options yarnOpts = new Options();
+               yarnSessionCLi.getYARNSessionCLIOptions(yarnOpts);
+               formatter.printHelp(" ", yarnOpts);
                System.out.println();
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/875d4d23/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 7220a29..c471fa4 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.client.api.YarnClientApplication;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Records;
 
 import org.slf4j.Logger;
@@ -349,6 +350,20 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                }
        }
 
+       @Override
+       public AbstractFlinkYarnCluster attach(String appId) throws Exception {
+               // check if required Hadoop environment variables are set. If 
not, warn user
+               if(System.getenv("HADOOP_CONF_DIR") == null &&
+                       System.getenv("YARN_CONF_DIR") == null) {
+                       LOG.warn("Neither the HADOOP_CONF_DIR nor the 
YARN_CONF_DIR environment variable is set." +
+                               "The Flink YARN Client needs one of these to be 
set to properly load the Hadoop " +
+                               "configuration for accessing YARN.");
+               }
+
+               final ApplicationId yarnAppId = 
ConverterUtils.toApplicationId(appId);
+
+               return new FlinkYarnCluster(yarnClient, yarnAppId, conf, 
flinkConfiguration, sessionFilesDir, detached);
+       }
        /**
         * This method will block until the ApplicationMaster/JobManager have 
been
         * deployed on YARN.

http://git-wip-us.apache.org/repos/asf/flink/blob/875d4d23/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index a2375c5..fdcc858 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -32,6 +32,7 @@ import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.StandaloneClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.yarn.YarnClusterDescriptor;
 import org.apache.flink.yarn.YarnClusterClient;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
@@ -86,6 +87,8 @@ public class FlinkYarnSessionCli implements 
CustomCommandLine<YarnClusterClient>
        // the prefix transformation is used by the CliFrontend static 
constructor.
        private final Option QUERY;
        // --- or ---
+       private final Option APPLICATION_ID;
+       // --- or ---
        private final Option QUEUE;
        private final Option SHIP_PATH;
        private final Option FLINK_JAR;
@@ -117,6 +120,7 @@ public class FlinkYarnSessionCli implements 
CustomCommandLine<YarnClusterClient>
                this.acceptInteractiveInput = acceptInteractiveInput;
                
                QUERY = new Option(shortPrefix + "q", longPrefix + "query", 
false, "Display available YARN resources (memory, cores)");
+               APPLICATION_ID = new Option(shortPrefix + "id", longPrefix + 
"applicationId", true, "Attach to running YARN session");
                QUEUE = new Option(shortPrefix + "qu", longPrefix + "queue", 
true, "Specify YARN queue.");
                SHIP_PATH = new Option(shortPrefix + "t", longPrefix + "ship", 
true, "Ship files in the specified directory (t for transfer)");
                FLINK_JAR = new Option(shortPrefix + "j", longPrefix + "jar", 
true, "Path to Flink jar file");
@@ -131,6 +135,35 @@ public class FlinkYarnSessionCli implements 
CustomCommandLine<YarnClusterClient>
        }
 
        /**
+        * Attaches a new Yarn Client to running YARN application.
+        *
+        */
+       public AbstractFlinkYarnCluster attachFlinkYarnClient(CommandLine cmd) {
+               AbstractFlinkYarnClient flinkYarnClient = getFlinkYarnClient();
+               if (flinkYarnClient == null) {
+                       return null;
+               }
+
+               if (!cmd.hasOption(APPLICATION_ID.getOpt())) {
+                       LOG.error("Missing required argument " + 
APPLICATION_ID.getOpt());
+                       printUsage();
+                       return null;
+               }
+
+               String confDirPath = 
CliFrontend.getConfigurationDirectoryFromEnv();
+               GlobalConfiguration.loadConfiguration(confDirPath);
+               Configuration flinkConfiguration = 
GlobalConfiguration.getConfiguration();
+               flinkYarnClient.setFlinkConfiguration(flinkConfiguration);
+               flinkYarnClient.setConfigurationDirectory(confDirPath);
+
+               try {
+                       return 
flinkYarnClient.attach(cmd.getOptionValue(APPLICATION_ID.getOpt()));
+               } catch (Exception e) {
+                       LOG.error("Could not attach to YARN session", e);
+                       return null;
+               }
+       }
+       /**
         * Resumes from a Flink Yarn properties file
         * @param flinkConfiguration The flink configuration
         * @return True if the properties were loaded, false otherwise
@@ -452,6 +485,11 @@ public class FlinkYarnSessionCli implements 
CustomCommandLine<YarnClusterClient>
                options.addOption(NAME);
        }
 
+
+       public void getYARNAttachCLIOptions(Options options) {
+               options.addOption(APPLICATION_ID);
+       }
+
        @Override
        public ClusterClient retrieveCluster(Configuration config) throws 
Exception {
 
@@ -478,7 +516,7 @@ public class FlinkYarnSessionCli implements 
CustomCommandLine<YarnClusterClient>
                        printUsage();
                        return 1;
                }
-               
+
                // Query cluster for metrics
                if (cmd.hasOption(QUERY.getOpt())) {
                        YarnClusterDescriptor flinkYarnClient = new 
YarnClusterDescriptor();
@@ -492,6 +530,21 @@ public class FlinkYarnSessionCli implements 
CustomCommandLine<YarnClusterClient>
                        }
                        System.out.println(description);
                        return 0;
+               } else if (cmd.hasOption(APPLICATION_ID.getOpt())) {
+                       yarnCluster = attachFlinkYarnClient(cmd);
+
+                       if (detachedMode) {
+                               LOG.info("The Flink YARN client has been 
started in detached mode. In order to stop " +
+                                       "Flink on YARN, use the following 
command or a YARN web interface to stop it:\n" +
+                                       "yarn application -kill 
"+yarnCluster.getApplicationId());
+                       } else {
+                               runInteractiveCli(yarnCluster);
+
+                               if (!yarnCluster.hasBeenStopped()) {
+                                       LOG.info("Command Line Interface 
requested session shutdown");
+                                       yarnCluster.shutdown(false);
+                               }
+                       }
                } else {
 
                        YarnClusterDescriptor flinkYarnClient;

Reply via email to