[
https://issues.apache.org/jira/browse/FLINK-8343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16323054#comment-16323054
]
ASF GitHub Bot commented on FLINK-8343:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5229#discussion_r161089633
--- Diff:
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -534,215 +557,235 @@ private Configuration
applyYarnProperties(Configuration configuration) throws Fl
return effectiveConfiguration;
}
- public int run(
- String[] args,
- Configuration configuration,
- String configurationDirectory) {
+ public int run(String[] args) throws CliArgsException, FlinkException {
//
// Command Line Options
//
- Options options = new Options();
- addGeneralOptions(options);
- addRunOptions(options);
+ final CommandLine cmd = parseCommandLineOptions(args, true);
- CommandLineParser parser = new PosixParser();
- CommandLine cmd;
- try {
- cmd = parser.parse(options, args);
- } catch (Exception e) {
- System.out.println(e.getMessage());
- printUsage();
- return 1;
- }
+ final AbstractYarnClusterDescriptor yarnClusterDescriptor =
createClusterDescriptor(cmd);
- // Query cluster for metrics
- if (cmd.hasOption(query.getOpt())) {
- AbstractYarnClusterDescriptor yarnDescriptor =
getClusterDescriptor(
- configuration,
- configurationDirectory,
- cmd.hasOption(flip6.getOpt()));
- String description;
- try {
- description =
yarnDescriptor.getClusterDescription();
- } catch (Exception e) {
- System.err.println("Error while querying the
YARN cluster for available resources: " + e.getMessage());
- e.printStackTrace(System.err);
- return 1;
- }
- System.out.println(description);
- return 0;
- } else if (cmd.hasOption(applicationId.getOpt())) {
+ try {
+ // Query cluster for metrics
+ if (cmd.hasOption(query.getOpt())) {
+ final String description =
yarnClusterDescriptor.getClusterDescription();
+ System.out.println(description);
+ return 0;
+ } else {
+ final ClusterClient clusterClient;
+ final ApplicationId yarnApplicationId;
- AbstractYarnClusterDescriptor yarnDescriptor =
getClusterDescriptor(
- configuration,
- configurationDirectory,
- cmd.hasOption(flip6.getOpt()));
+ if (cmd.hasOption(applicationId.getOpt())) {
+ yarnApplicationId =
ConverterUtils.toApplicationId(cmd.getOptionValue(applicationId.getOpt()));
- //configure ZK namespace depending on the value passed
- String zkNamespace =
cmd.hasOption(zookeeperNamespace.getOpt()) ?
-
cmd.getOptionValue(zookeeperNamespace.getOpt())
- :
yarnDescriptor.getFlinkConfiguration()
-
.getString(HA_CLUSTER_ID, cmd.getOptionValue(applicationId.getOpt()));
- LOG.info("Going to use the ZK namespace: {}",
zkNamespace);
-
yarnDescriptor.getFlinkConfiguration().setString(HA_CLUSTER_ID, zkNamespace);
+ clusterClient =
yarnClusterDescriptor.retrieve(cmd.getOptionValue(applicationId.getOpt()));
+ } else {
+ final ClusterSpecification
clusterSpecification = getClusterSpecification(cmd);
- try {
- yarnCluster =
yarnDescriptor.retrieve(cmd.getOptionValue(applicationId.getOpt()));
- } catch (Exception e) {
- throw new RuntimeException("Could not retrieve
existing Yarn application", e);
- }
+ clusterClient =
yarnClusterDescriptor.deploySessionCluster(clusterSpecification);
- if (detachedMode) {
- LOG.info("The Flink YARN client has been
started in detached mode. In order to stop " +
- "Flink on YARN, use the following
command or a YARN web interface to stop it:\n" +
- "yarn application -kill " +
applicationId.getOpt());
- yarnCluster.disconnect();
- } else {
- ScheduledThreadPoolExecutor
scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
-
- try (YarnApplicationStatusMonitor
yarnApplicationStatusMonitor = new YarnApplicationStatusMonitor(
- yarnDescriptor.getYarnClient(),
- yarnCluster.getApplicationId(),
- new
ScheduledExecutorServiceAdapter(scheduledExecutorService))) {
- runInteractiveCli(
- yarnCluster,
- yarnApplicationStatusMonitor,
- true);
- } catch (Exception e) {
- LOG.info("Could not properly close the
Yarn application status monitor.", e);
- } finally {
- // shut down the scheduled executor
service
- ExecutorUtils.gracefulShutdown(
- 1000L,
- TimeUnit.MILLISECONDS,
- scheduledExecutorService);
- }
- }
- } else {
+ //------------------ ClusterClient
deployed, handle connection details
+ yarnApplicationId =
ConverterUtils.toApplicationId(clusterClient.getClusterIdentifier());
- try (AbstractYarnClusterDescriptor yarnDescriptor =
createClusterDescriptor(cmd)){
- final ClusterSpecification clusterSpecification;
+ String jobManagerAddress =
+
clusterClient.getJobManagerAddress().getAddress().getHostName() +
+ ':' +
clusterClient.getJobManagerAddress().getPort();
- try {
- clusterSpecification =
getClusterSpecification(cmd);
- } catch (FlinkException e) {
- System.err.println("Error while
creating the cluster specification: " + e.getMessage());
- e.printStackTrace();
- return 1;
- }
+ System.out.println("Flink JobManager is
now running on " + jobManagerAddress);
+ System.out.println("JobManager Web
Interface: " + clusterClient.getWebInterfaceURL());
- try {
- yarnCluster =
yarnDescriptor.deploySessionCluster(clusterSpecification);
- } catch (Exception e) {
- System.err.println("Error while
deploying YARN cluster: " + e.getMessage());
- e.printStackTrace(System.err);
- return 1;
- }
- //------------------ ClusterClient deployed,
handle connection details
- String jobManagerAddress =
-
yarnCluster.getJobManagerAddress().getAddress().getHostName() +
- ":" +
yarnCluster.getJobManagerAddress().getPort();
-
- System.out.println("Flink JobManager is now
running on " + jobManagerAddress);
- System.out.println("JobManager Web Interface: "
+ yarnCluster.getWebInterfaceURL());
-
- // file that we write into the conf/ dir
containing the jobManager address and the dop.
- File yarnPropertiesFile =
getYarnPropertiesLocation(configuration.getValue(YarnConfigOptions.PROPERTIES_FILE_LOCATION));
-
- Properties yarnProps = new Properties();
- yarnProps.setProperty(YARN_APPLICATION_ID_KEY,
yarnCluster.getApplicationId().toString());
- if
(clusterSpecification.getSlotsPerTaskManager() != -1) {
- String parallelism =
-
Integer.toString(clusterSpecification.getSlotsPerTaskManager() *
clusterSpecification.getNumberTaskManagers());
-
yarnProps.setProperty(YARN_PROPERTIES_PARALLELISM, parallelism);
+ writeYarnPropertiesFile(
+ yarnApplicationId,
+
clusterSpecification.getNumberTaskManagers() *
clusterSpecification.getSlotsPerTaskManager(),
+
yarnClusterDescriptor.getDynamicPropertiesEncoded());
}
- // add dynamic properties
- if
(yarnDescriptor.getDynamicPropertiesEncoded() != null) {
-
yarnProps.setProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING,
-
yarnDescriptor.getDynamicPropertiesEncoded());
- }
- writeYarnProperties(yarnProps,
yarnPropertiesFile);
-
- //------------------ ClusterClient running, let
user control it ------------
if (detachedMode) {
- // print info and quit:
LOG.info("The Flink YARN client has
been started in detached mode. In order to stop " +
"Flink on YARN, use the
following command or a YARN web interface to stop it:\n" +
- "yarn application -kill " +
yarnCluster.getApplicationId());
- yarnCluster.waitForClusterToBeReady();
- yarnCluster.disconnect();
+ "yarn application -kill " +
applicationId.getOpt());
} else {
-
ScheduledThreadPoolExecutor
scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
- try (YarnApplicationStatusMonitor
yarnApplicationStatusMonitor = new YarnApplicationStatusMonitor(
- yarnDescriptor.getYarnClient(),
- yarnCluster.getApplicationId(),
- new
ScheduledExecutorServiceAdapter(scheduledExecutorService))) {
+ final YarnApplicationStatusMonitor
yarnApplicationStatusMonitor = new YarnApplicationStatusMonitor(
+
yarnClusterDescriptor.getYarnClient(),
+ yarnApplicationId,
+ new
ScheduledExecutorServiceAdapter(scheduledExecutorService));
+
+ try {
runInteractiveCli(
- yarnCluster,
+ clusterClient,
yarnApplicationStatusMonitor,
acceptInteractiveInput);
- } catch (Exception e) {
- LOG.info("Could not properly
close the Yarn application status monitor.", e);
} finally {
+ try {
+
yarnApplicationStatusMonitor.close();
+ } catch (Exception e) {
+ LOG.info("Could not
properly close the Yarn application status monitor.", e);
+ }
+
+ try {
+
clusterClient.shutdown();
+ } catch (Exception e) {
+ LOG.info("Could not
properly shutdown cluster client.", e);
+ }
+
+ try {
+
yarnClusterDescriptor.terminateCluster(yarnApplicationId);
+ } catch (FlinkException e) {
+ LOG.info("Could not
properly terminate the Flink cluster.", e);
+ }
+
// shut down the scheduled
executor service
ExecutorUtils.gracefulShutdown(
1000L,
TimeUnit.MILLISECONDS,
scheduledExecutorService);
+
+ deleteYarnPropertiesFile();
+
+ try {
+ final ApplicationReport
applicationReport = yarnClusterDescriptor
+ .getYarnClient()
+
.getApplicationReport(yarnApplicationId);
+
+
logFinalApplicationReport(applicationReport);
+ } catch (YarnException |
IOException e) {
+ LOG.info("Could not log
the final application report.", e);
+ }
}
}
- } catch (FlinkException e) {
- System.err.println("Error while deploying a
Flink cluster: " + e.getMessage());
- e.printStackTrace();
- return 1;
+ }
+ } finally {
+ try {
+ yarnClusterDescriptor.close();
+ } catch (Exception e) {
+ LOG.info("Could not properly close the yarn
cluster descriptor.", e);
}
}
+
return 0;
}
- /**
- * Utility method for tests.
- */
- public void stop() {
- if (yarnCluster != null) {
- LOG.info("Command line interface is shutting down the
yarnCluster");
+ private void logFinalApplicationReport(ApplicationReport appReport) {
+ try {
+ LOG.info("Application " + appReport.getApplicationId()
+ " finished with state " + appReport
+ .getYarnApplicationState() + " and final state
" + appReport
+ .getFinalApplicationStatus() + " at " +
appReport.getFinishTime());
+
+ if (appReport.getYarnApplicationState() ==
YarnApplicationState.FAILED || appReport.getYarnApplicationState()
+ == YarnApplicationState.KILLED) {
+ LOG.warn("Application failed. Diagnostics " +
appReport.getDiagnostics());
+ LOG.warn("If log aggregation is activated in
the Hadoop cluster, we recommend to retrieve "
+ + "the full application log using this
command:"
+ + System.lineSeparator()
+ + "\tyarn logs -applicationId " +
appReport.getApplicationId()
+ + System.lineSeparator()
+ + "(It sometimes takes a few seconds
until the logs are aggregated)");
+ }
+ } catch (Exception e) {
--- End diff --
True. Will fix it.
> Add support for job cluster deployment
> --------------------------------------
>
> Key: FLINK-8343
> URL: https://issues.apache.org/jira/browse/FLINK-8343
> Project: Flink
> Issue Type: Sub-task
> Components: Client
> Affects Versions: 1.5.0
> Reporter: Till Rohrmann
> Labels: flip-6
> Fix For: 1.5.0
>
>
> For Flip-6 we have to enable a different job cluster deployment. The
> difference is that we directly submit the job when we deploy the Flink
> cluster instead of following a two step approach (first deployment and then
> submission).
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)