[
https://issues.apache.org/jira/browse/FLINK-3937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15324585#comment-15324585
]
ASF GitHub Bot commented on FLINK-3937:
---------------------------------------
Github user mxm commented on a diff in the pull request:
https://github.com/apache/flink/pull/2085#discussion_r66628410
--- Diff:
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -980,110 +845,41 @@ protected ActorGateway
getJobManagerGateway(CommandLineOptions options) throws E
}
/**
- * Retrieves a {@link Client} object from the given command line
options and other parameters.
+ * Retrieves a {@link ClusterClient} object from the given command line
options and other parameters.
*
* @param options Command line options which contain JobManager address
* @param programName Program name
- * @param userParallelism Given user parallelism
* @throws Exception
*/
- protected Client getClient(
+ protected ClusterClient getClient(
CommandLineOptions options,
- String programName,
- int userParallelism,
- boolean detachedMode)
- throws Exception {
- InetSocketAddress jobManagerAddress;
- int maxSlots = -1;
+ String programName) throws Exception {
- if
(YARN_DEPLOY_JOBMANAGER.equals(options.getJobManagerAddress())) {
- logAndSysout("YARN cluster mode detected. Switching
Log4j output to console");
-
- // Default yarn application name to use, if nothing is
specified on the command line
- String applicationName = "Flink Application: " +
programName;
-
- // user wants to run Flink in YARN cluster.
- CommandLine commandLine = options.getCommandLine();
- AbstractFlinkYarnClient flinkYarnClient =
CliFrontendParser
-
.getFlinkYarnSessionCli()
-
.withDefaultApplicationName(applicationName)
-
.createFlinkYarnClient(commandLine);
-
- if (flinkYarnClient == null) {
- throw new RuntimeException("Unable to create
Flink YARN Client. Check previous log messages");
- }
-
- // in case the main detached mode wasn't set, we don't
wanna overwrite the one loaded
- // from yarn options.
- if (detachedMode) {
- flinkYarnClient.setDetachedMode(true);
- }
-
- // the number of slots available from YARN:
- int yarnTmSlots = flinkYarnClient.getTaskManagerSlots();
- if (yarnTmSlots == -1) {
- yarnTmSlots = 1;
- }
- maxSlots = yarnTmSlots *
flinkYarnClient.getTaskManagerCount();
- if (userParallelism != -1) {
- int slotsPerTM = userParallelism /
flinkYarnClient.getTaskManagerCount();
- logAndSysout("The YARN cluster has " + maxSlots
+ " slots available, " +
- "but the user requested a
parallelism of " + userParallelism + " on YARN. " +
- "Each of the " +
flinkYarnClient.getTaskManagerCount() + " TaskManagers " +
- "will get "+slotsPerTM+"
slots.");
- flinkYarnClient.setTaskManagerSlots(slotsPerTM);
- }
-
- try {
- yarnCluster = flinkYarnClient.deploy();
- yarnCluster.connectToCluster();
- }
- catch (Exception e) {
- throw new RuntimeException("Error deploying the
YARN cluster", e);
- }
+ // Get the custom command-line (e.g. Standalone/Yarn/Mesos)
+ CustomCommandLine<?> activeCommandLine =
+
CliFrontendParser.getActiveCustomCommandLine(options.getJobManagerAddress());
--- End diff --
Well spotted. I think I'll move this logic to the implementation of the
CustomCommandLine.
I don't quite understand your renaming suggestion, are you suggesting to
break up the CustomCommandLine into CustomParser and CustomCLI?
> Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters
> ------------------------------------------------------------------------------
>
> Key: FLINK-3937
> URL: https://issues.apache.org/jira/browse/FLINK-3937
> Project: Flink
> Issue Type: Improvement
> Reporter: Sebastian Klemke
> Assignee: Maximilian Michels
> Priority: Trivial
> Attachments: improve_flink_cli_yarn_integration.patch
>
>
> Currently, flink cli can't figure out JobManager RPC location for
> Flink-on-YARN clusters. Therefore, list, savepoint, cancel and stop
> subcommands are hard to invoke if you only know the YARN application ID. As
> an improvement, I suggest adding a -yid <yarnApplicationId> option to the
> mentioned subcommands that can be used together with -m yarn-cluster. Flink
> cli would then retrieve JobManager RPC location from YARN ResourceManager.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)