Github user mxm commented on a diff in the pull request:
https://github.com/apache/flink/pull/2085#discussion_r66626302
--- Diff:
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -980,110 +845,41 @@ protected ActorGateway
getJobManagerGateway(CommandLineOptions options) throws E
}
/**
- * Retrieves a {@link Client} object from the given command line
options and other parameters.
+ * Retrieves a {@link ClusterClient} object from the given command line
options and other parameters.
*
* @param options Command line options which contain JobManager address
* @param programName Program name
- * @param userParallelism Given user parallelism
* @throws Exception
*/
- protected Client getClient(
+ protected ClusterClient getClient(
CommandLineOptions options,
- String programName,
- int userParallelism,
- boolean detachedMode)
- throws Exception {
- InetSocketAddress jobManagerAddress;
- int maxSlots = -1;
+ String programName) throws Exception {
- if
(YARN_DEPLOY_JOBMANAGER.equals(options.getJobManagerAddress())) {
- logAndSysout("YARN cluster mode detected. Switching
Log4j output to console");
-
- // Default yarn application name to use, if nothing is
specified on the command line
- String applicationName = "Flink Application: " +
programName;
-
- // user wants to run Flink in YARN cluster.
- CommandLine commandLine = options.getCommandLine();
- AbstractFlinkYarnClient flinkYarnClient =
CliFrontendParser
-
.getFlinkYarnSessionCli()
-
.withDefaultApplicationName(applicationName)
-
.createFlinkYarnClient(commandLine);
-
- if (flinkYarnClient == null) {
- throw new RuntimeException("Unable to create
Flink YARN Client. Check previous log messages");
- }
-
- // in case the main detached mode wasn't set, we don't
wanna overwrite the one loaded
- // from yarn options.
- if (detachedMode) {
- flinkYarnClient.setDetachedMode(true);
- }
-
- // the number of slots available from YARN:
- int yarnTmSlots = flinkYarnClient.getTaskManagerSlots();
- if (yarnTmSlots == -1) {
- yarnTmSlots = 1;
- }
- maxSlots = yarnTmSlots *
flinkYarnClient.getTaskManagerCount();
- if (userParallelism != -1) {
- int slotsPerTM = userParallelism /
flinkYarnClient.getTaskManagerCount();
- logAndSysout("The YARN cluster has " + maxSlots
+ " slots available, " +
- "but the user requested a
parallelism of " + userParallelism + " on YARN. " +
- "Each of the " +
flinkYarnClient.getTaskManagerCount() + " TaskManagers " +
- "will get "+slotsPerTM+"
slots.");
- flinkYarnClient.setTaskManagerSlots(slotsPerTM);
- }
-
- try {
- yarnCluster = flinkYarnClient.deploy();
- yarnCluster.connectToCluster();
- }
- catch (Exception e) {
- throw new RuntimeException("Error deploying the
YARN cluster", e);
- }
+ // Get the custom command-line (e.g. Standalone/Yarn/Mesos)
+ CustomCommandLine<?> activeCommandLine =
+
CliFrontendParser.getActiveCustomCommandLine(options.getJobManagerAddress());
- jobManagerAddress = yarnCluster.getJobManagerAddress();
- writeJobManagerAddressToConfig(jobManagerAddress);
-
- // overwrite the yarn client config (because the client
parses the dynamic properties)
-
this.config.addAll(flinkYarnClient.getFlinkConfiguration());
-
- logAndSysout("YARN cluster started");
- logAndSysout("JobManager web interface address " +
yarnCluster.getWebInterfaceURL());
- logAndSysout("Waiting until all TaskManagers have
connected");
-
- while(true) {
- GetClusterStatusResponse status =
yarnCluster.getClusterStatus();
- if (status != null) {
- if (status.numRegisteredTaskManagers()
< flinkYarnClient.getTaskManagerCount()) {
- logAndSysout("TaskManager
status (" + status.numRegisteredTaskManagers() + "/"
- +
flinkYarnClient.getTaskManagerCount() + ")");
- } else {
- logAndSysout("All TaskManagers
are connected");
- break;
- }
- } else {
- logAndSysout("No status updates from
the YARN cluster received so far. Waiting ...");
- }
+ ClusterClient client =
activeCommandLine.retrieveCluster(options.getCommandLine(), config);
- try {
- Thread.sleep(500);
- }
- catch (InterruptedException e) {
- LOG.error("Interrupted while waiting
for TaskManagers");
- System.err.println("Thread is
interrupted");
- Thread.currentThread().interrupt();
- }
- }
- }
- else {
- if(options.getJobManagerAddress() != null) {
- jobManagerAddress =
ClientUtils.parseHostPortAddress(options.getJobManagerAddress());
-
writeJobManagerAddressToConfig(jobManagerAddress);
+ if (client != null) {
+ logAndSysout("Cluster retrieved");
+ } else {
--- End diff --
Yes, I have thought about letting the command-line decide whether a cluster
can be created or retrieved (it would return an Enum based on the options
supplied). I decided I would address this in a follow-up.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---