[FLINK-3937] implement -yid option to Flink CLI - enables to use list, savepoint, cancel and stop subcommands - adapt FlinkYarnSessionCli to also accept YARN application Id to attach to - update documentation
This closes #2034 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/875d4d23 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/875d4d23 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/875d4d23 Branch: refs/heads/master Commit: 875d4d238567a0503941488eb4e7d03b38c0fc42 Parents: 9e98424 Author: Sebastian Klemke <[email protected]> Authored: Wed May 25 14:28:59 2016 +0200 Committer: Maximilian Michels <[email protected]> Committed: Fri Jun 17 10:37:58 2016 +0200 ---------------------------------------------------------------------- docs/apis/cli.md | 28 ++++++++++ docs/setup/yarn_setup.md | 28 ++++++++++ .../org/apache/flink/client/CliFrontend.java | 4 ++ .../flink/client/cli/CliFrontendParser.java | 10 ++++ .../yarn/AbstractYarnClusterDescriptor.java | 15 ++++++ .../flink/yarn/cli/FlinkYarnSessionCli.java | 55 +++++++++++++++++++- 6 files changed, 139 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/875d4d23/docs/apis/cli.md ---------------------------------------------------------------------- diff --git a/docs/apis/cli.md b/docs/apis/cli.md index 08a2aa6..511862c 100644 --- a/docs/apis/cli.md +++ b/docs/apis/cli.md @@ -105,6 +105,10 @@ The command line can be used to ./bin/flink list -r +- List running Flink jobs inside Flink YARN session: + + ./bin/flink list -m yarn-cluster -yid <yarnApplicationID> -r + - Cancel a job: ./bin/flink cancel <jobID> @@ -252,6 +256,12 @@ Action "list" lists running and scheduled programs. configuration. -r,--running Show only running programs and their JobIDs -s,--scheduled Show only scheduled programs and their JobIDs + Additional arguments if -m yarn-cluster is set: + -yid <yarnApplicationId> YARN application ID of Flink YARN session to + connect to. Must not be set if JobManager HA + is used. In this case, JobManager RPC + location is automatically retrieved from + Zookeeper. Action "cancel" cancels a running program. @@ -264,6 +274,12 @@ Action "cancel" cancels a running program. job. Use this flag to connect to a different JobManager than the one specified in the configuration. + Additional arguments if -m yarn-cluster is set: + -yid <yarnApplicationId> YARN application ID of Flink YARN session to + connect to. Must not be set if JobManager HA + is used. In this case, JobManager RPC + location is automatically retrieved from + Zookeeper. Action "stop" stops a running program (streaming jobs only). There are no strong consistency @@ -275,6 +291,12 @@ guarantees for a stop request. to connect. Use this flag to connect to a different JobManager than the one specified in the configuration. + Additional arguments if -m yarn-cluster is set: + -yid <yarnApplicationId> YARN application ID of Flink YARN session to + connect to. Must not be set if JobManager HA + is used. In this case, JobManager RPC + location is automatically retrieved from + Zookeeper. Action "savepoint" triggers savepoints for a running job or disposes existing ones. @@ -288,4 +310,10 @@ Action "savepoint" triggers savepoints for a running job or disposes existing on job. Use this flag to connect to a different JobManager than the one specified in the configuration. + Additional arguments if -m yarn-cluster is set: + -yid <yarnApplicationId> YARN application ID of Flink YARN session to + connect to. Must not be set if JobManager HA + is used. In this case, JobManager RPC + location is automatically retrieved from + Zookeeper. ~~~ http://git-wip-us.apache.org/repos/asf/flink/blob/875d4d23/docs/setup/yarn_setup.md ---------------------------------------------------------------------- diff --git a/docs/setup/yarn_setup.md b/docs/setup/yarn_setup.md index aa0f7a4..1e76a58 100644 --- a/docs/setup/yarn_setup.md +++ b/docs/setup/yarn_setup.md @@ -143,6 +143,34 @@ Note that in this case its not possible to stop the YARN session using Flink. Use the YARN utilities (`yarn application -kill <appId>`) to stop the YARN session. +#### Attach to an existing Session + +Use the following command to start a session + +~~~bash +./bin/yarn-session.sh +~~~ + +This command will show you the following overview: + +~~~bash +Usage: + Required + -id,--applicationId <yarnAppId> YARN application Id +~~~ + +As already mentioned, `YARN_CONF_DIR` or `HADOOP_CONF_DIR` environment variable must be set to read the YARN and HDFS configuration. + +**Example:** Issue the following command to attach to running Flink YARN session `application_1463870264508_0029`: + +~~~bash +./bin/yarn-session.sh -id application_1463870264508_0029 +~~~ + +Attaching to a running session uses YARN ResourceManager to determine Job Manager RPC port. + +Stop the YARN session by stopping the unix process (using CTRL+C) or by entering 'stop' into the client. + ### Submit Job to Flink Use the following command to submit a Flink program to the YARN cluster: http://git-wip-us.apache.org/repos/asf/flink/blob/875d4d23/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java index cf7a8c2..3064f8d 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java @@ -800,6 +800,10 @@ public class CliFrontend { */ protected void updateConfig(CommandLineOptions options) { if(options.getJobManagerAddress() != null){ + if (YARN_DEPLOY_JOBMANAGER.equals(options.getJobManagerAddress())) { + jobManagerAddress = CliFrontendParser.getFlinkYarnSessionCli() + .attachFlinkYarnClient(options.getCommandLine()) + .getJobManagerAddress(); InetSocketAddress jobManagerAddress = ClientUtils.parseHostPortAddress(options.getJobManagerAddress()); writeJobManagerAddressToConfig(config, jobManagerAddress); } http://git-wip-us.apache.org/repos/asf/flink/blob/875d4d23/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java index f28d1b6..9b935e8 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java @@ -188,6 +188,8 @@ public class CliFrontendParser { private static Options getJobManagerAddressOption(Options options) { options.addOption(ADDRESS_OPTION); + yarnSessionCLi.getYARNAttachCLIOptions(options); + return options; } @@ -280,6 +282,10 @@ public class CliFrontendParser { System.out.println("\n Syntax: info [OPTIONS] <jar-file> <arguments>"); formatter.setSyntaxPrefix(" \"info\" action options:"); formatter.printHelp(" ", getInfoOptionsWithoutDeprecatedOptions(new Options())); + formatter.setSyntaxPrefix(" Additional arguments if -m " + CliFrontend.YARN_DEPLOY_JOBMANAGER + " is set:"); + Options yarnOpts = new Options(); + yarnSessionCLi.getYARNSessionCLIOptions(yarnOpts); + formatter.printHelp(" ", yarnOpts); System.out.println(); } @@ -316,6 +322,10 @@ public class CliFrontendParser { System.out.println("\n Syntax: cancel [OPTIONS] <Job ID>"); formatter.setSyntaxPrefix(" \"cancel\" action options:"); formatter.printHelp(" ", getCancelOptions(new Options())); + formatter.setSyntaxPrefix(" Additional arguments if -m " + CliFrontend.YARN_DEPLOY_JOBMANAGER + " is set:"); + Options yarnOpts = new Options(); + yarnSessionCLi.getYARNSessionCLIOptions(yarnOpts); + formatter.printHelp(" ", yarnOpts); System.out.println(); } http://git-wip-us.apache.org/repos/asf/flink/blob/875d4d23/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java index 7220a29..c471fa4 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java @@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.YarnClientApplication; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; import org.slf4j.Logger; @@ -349,6 +350,20 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor } } + @Override + public AbstractFlinkYarnCluster attach(String appId) throws Exception { + // check if required Hadoop environment variables are set. If not, warn user + if(System.getenv("HADOOP_CONF_DIR") == null && + System.getenv("YARN_CONF_DIR") == null) { + LOG.warn("Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set." + + "The Flink YARN Client needs one of these to be set to properly load the Hadoop " + + "configuration for accessing YARN."); + } + + final ApplicationId yarnAppId = ConverterUtils.toApplicationId(appId); + + return new FlinkYarnCluster(yarnClient, yarnAppId, conf, flinkConfiguration, sessionFilesDir, detached); + } /** * This method will block until the ApplicationMaster/JobManager have been * deployed on YARN. http://git-wip-us.apache.org/repos/asf/flink/blob/875d4d23/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java index a2375c5..fdcc858 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java @@ -32,6 +32,7 @@ import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.StandaloneClusterClient; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.yarn.YarnClusterDescriptor; import org.apache.flink.yarn.YarnClusterClient; import org.apache.flink.runtime.clusterframework.ApplicationStatus; @@ -86,6 +87,8 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient> // the prefix transformation is used by the CliFrontend static constructor. private final Option QUERY; // --- or --- + private final Option APPLICATION_ID; + // --- or --- private final Option QUEUE; private final Option SHIP_PATH; private final Option FLINK_JAR; @@ -117,6 +120,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient> this.acceptInteractiveInput = acceptInteractiveInput; QUERY = new Option(shortPrefix + "q", longPrefix + "query", false, "Display available YARN resources (memory, cores)"); + APPLICATION_ID = new Option(shortPrefix + "id", longPrefix + "applicationId", true, "Attach to running YARN session"); QUEUE = new Option(shortPrefix + "qu", longPrefix + "queue", true, "Specify YARN queue."); SHIP_PATH = new Option(shortPrefix + "t", longPrefix + "ship", true, "Ship files in the specified directory (t for transfer)"); FLINK_JAR = new Option(shortPrefix + "j", longPrefix + "jar", true, "Path to Flink jar file"); @@ -131,6 +135,35 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient> } /** + * Attaches a new Yarn Client to running YARN application. + * + */ + public AbstractFlinkYarnCluster attachFlinkYarnClient(CommandLine cmd) { + AbstractFlinkYarnClient flinkYarnClient = getFlinkYarnClient(); + if (flinkYarnClient == null) { + return null; + } + + if (!cmd.hasOption(APPLICATION_ID.getOpt())) { + LOG.error("Missing required argument " + APPLICATION_ID.getOpt()); + printUsage(); + return null; + } + + String confDirPath = CliFrontend.getConfigurationDirectoryFromEnv(); + GlobalConfiguration.loadConfiguration(confDirPath); + Configuration flinkConfiguration = GlobalConfiguration.getConfiguration(); + flinkYarnClient.setFlinkConfiguration(flinkConfiguration); + flinkYarnClient.setConfigurationDirectory(confDirPath); + + try { + return flinkYarnClient.attach(cmd.getOptionValue(APPLICATION_ID.getOpt())); + } catch (Exception e) { + LOG.error("Could not attach to YARN session", e); + return null; + } + } + /** * Resumes from a Flink Yarn properties file * @param flinkConfiguration The flink configuration * @return True if the properties were loaded, false otherwise @@ -452,6 +485,11 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient> options.addOption(NAME); } + + public void getYARNAttachCLIOptions(Options options) { + options.addOption(APPLICATION_ID); + } + @Override public ClusterClient retrieveCluster(Configuration config) throws Exception { @@ -478,7 +516,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient> printUsage(); return 1; } - + // Query cluster for metrics if (cmd.hasOption(QUERY.getOpt())) { YarnClusterDescriptor flinkYarnClient = new YarnClusterDescriptor(); @@ -492,6 +530,21 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient> } System.out.println(description); return 0; + } else if (cmd.hasOption(APPLICATION_ID.getOpt())) { + yarnCluster = attachFlinkYarnClient(cmd); + + if (detachedMode) { + LOG.info("The Flink YARN client has been started in detached mode. In order to stop " + + "Flink on YARN, use the following command or a YARN web interface to stop it:\n" + + "yarn application -kill "+yarnCluster.getApplicationId()); + } else { + runInteractiveCli(yarnCluster); + + if (!yarnCluster.hasBeenStopped()) { + LOG.info("Command Line Interface requested session shutdown"); + yarnCluster.shutdown(false); + } + } } else { YarnClusterDescriptor flinkYarnClient;
