[
https://issues.apache.org/jira/browse/FLINK-8343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16316528#comment-16316528
]
ASF GitHub Bot commented on FLINK-8343:
---------------------------------------
Github user GJL commented on a diff in the pull request:
https://github.com/apache/flink/pull/5229#discussion_r160166040
--- Diff:
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -534,215 +557,235 @@ private Configuration
applyYarnProperties(Configuration configuration) throws Fl
return effectiveConfiguration;
}
- public int run(
- String[] args,
- Configuration configuration,
- String configurationDirectory) {
+ public int run(String[] args) throws CliArgsException, FlinkException {
//
// Command Line Options
//
- Options options = new Options();
- addGeneralOptions(options);
- addRunOptions(options);
+ final CommandLine cmd = parseCommandLineOptions(args, true);
- CommandLineParser parser = new PosixParser();
- CommandLine cmd;
- try {
- cmd = parser.parse(options, args);
- } catch (Exception e) {
- System.out.println(e.getMessage());
- printUsage();
- return 1;
- }
+ final AbstractYarnClusterDescriptor yarnClusterDescriptor =
createClusterDescriptor(cmd);
- // Query cluster for metrics
- if (cmd.hasOption(query.getOpt())) {
- AbstractYarnClusterDescriptor yarnDescriptor =
getClusterDescriptor(
- configuration,
- configurationDirectory,
- cmd.hasOption(flip6.getOpt()));
- String description;
- try {
- description =
yarnDescriptor.getClusterDescription();
- } catch (Exception e) {
- System.err.println("Error while querying the
YARN cluster for available resources: " + e.getMessage());
- e.printStackTrace(System.err);
- return 1;
- }
- System.out.println(description);
- return 0;
- } else if (cmd.hasOption(applicationId.getOpt())) {
+ try {
+ // Query cluster for metrics
+ if (cmd.hasOption(query.getOpt())) {
+ final String description =
yarnClusterDescriptor.getClusterDescription();
+ System.out.println(description);
+ return 0;
+ } else {
+ final ClusterClient clusterClient;
+ final ApplicationId yarnApplicationId;
- AbstractYarnClusterDescriptor yarnDescriptor =
getClusterDescriptor(
- configuration,
- configurationDirectory,
- cmd.hasOption(flip6.getOpt()));
+ if (cmd.hasOption(applicationId.getOpt())) {
+ yarnApplicationId =
ConverterUtils.toApplicationId(cmd.getOptionValue(applicationId.getOpt()));
- //configure ZK namespace depending on the value passed
- String zkNamespace =
cmd.hasOption(zookeeperNamespace.getOpt()) ?
-
cmd.getOptionValue(zookeeperNamespace.getOpt())
- :
yarnDescriptor.getFlinkConfiguration()
-
.getString(HA_CLUSTER_ID, cmd.getOptionValue(applicationId.getOpt()));
- LOG.info("Going to use the ZK namespace: {}",
zkNamespace);
-
yarnDescriptor.getFlinkConfiguration().setString(HA_CLUSTER_ID, zkNamespace);
+ clusterClient =
yarnClusterDescriptor.retrieve(cmd.getOptionValue(applicationId.getOpt()));
+ } else {
+ final ClusterSpecification
clusterSpecification = getClusterSpecification(cmd);
- try {
- yarnCluster =
yarnDescriptor.retrieve(cmd.getOptionValue(applicationId.getOpt()));
- } catch (Exception e) {
- throw new RuntimeException("Could not retrieve
existing Yarn application", e);
- }
+ clusterClient =
yarnClusterDescriptor.deploySessionCluster(clusterSpecification);
- if (detachedMode) {
- LOG.info("The Flink YARN client has been
started in detached mode. In order to stop " +
- "Flink on YARN, use the following
command or a YARN web interface to stop it:\n" +
- "yarn application -kill " +
applicationId.getOpt());
- yarnCluster.disconnect();
- } else {
- ScheduledThreadPoolExecutor
scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
-
- try (YarnApplicationStatusMonitor
yarnApplicationStatusMonitor = new YarnApplicationStatusMonitor(
- yarnDescriptor.getYarnClient(),
- yarnCluster.getApplicationId(),
- new
ScheduledExecutorServiceAdapter(scheduledExecutorService))) {
- runInteractiveCli(
- yarnCluster,
- yarnApplicationStatusMonitor,
- true);
- } catch (Exception e) {
- LOG.info("Could not properly close the
Yarn application status monitor.", e);
- } finally {
- // shut down the scheduled executor
service
- ExecutorUtils.gracefulShutdown(
- 1000L,
- TimeUnit.MILLISECONDS,
- scheduledExecutorService);
- }
- }
- } else {
+ //------------------ ClusterClient
deployed, handle connection details
+ yarnApplicationId =
ConverterUtils.toApplicationId(clusterClient.getClusterIdentifier());
- try (AbstractYarnClusterDescriptor yarnDescriptor =
createClusterDescriptor(cmd)){
- final ClusterSpecification clusterSpecification;
+ String jobManagerAddress =
+
clusterClient.getJobManagerAddress().getAddress().getHostName() +
+ ':' +
clusterClient.getJobManagerAddress().getPort();
- try {
- clusterSpecification =
getClusterSpecification(cmd);
- } catch (FlinkException e) {
- System.err.println("Error while
creating the cluster specification: " + e.getMessage());
- e.printStackTrace();
- return 1;
- }
+ System.out.println("Flink JobManager is
now running on " + jobManagerAddress);
+ System.out.println("JobManager Web
Interface: " + clusterClient.getWebInterfaceURL());
- try {
- yarnCluster =
yarnDescriptor.deploySessionCluster(clusterSpecification);
- } catch (Exception e) {
- System.err.println("Error while
deploying YARN cluster: " + e.getMessage());
- e.printStackTrace(System.err);
- return 1;
- }
- //------------------ ClusterClient deployed,
handle connection details
- String jobManagerAddress =
-
yarnCluster.getJobManagerAddress().getAddress().getHostName() +
- ":" +
yarnCluster.getJobManagerAddress().getPort();
-
- System.out.println("Flink JobManager is now
running on " + jobManagerAddress);
- System.out.println("JobManager Web Interface: "
+ yarnCluster.getWebInterfaceURL());
-
- // file that we write into the conf/ dir
containing the jobManager address and the dop.
- File yarnPropertiesFile =
getYarnPropertiesLocation(configuration.getValue(YarnConfigOptions.PROPERTIES_FILE_LOCATION));
-
- Properties yarnProps = new Properties();
- yarnProps.setProperty(YARN_APPLICATION_ID_KEY,
yarnCluster.getApplicationId().toString());
- if
(clusterSpecification.getSlotsPerTaskManager() != -1) {
- String parallelism =
-
Integer.toString(clusterSpecification.getSlotsPerTaskManager() *
clusterSpecification.getNumberTaskManagers());
-
yarnProps.setProperty(YARN_PROPERTIES_PARALLELISM, parallelism);
+ writeYarnPropertiesFile(
+ yarnApplicationId,
+
clusterSpecification.getNumberTaskManagers() *
clusterSpecification.getSlotsPerTaskManager(),
+
yarnClusterDescriptor.getDynamicPropertiesEncoded());
}
- // add dynamic properties
- if
(yarnDescriptor.getDynamicPropertiesEncoded() != null) {
-
yarnProps.setProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING,
-
yarnDescriptor.getDynamicPropertiesEncoded());
- }
- writeYarnProperties(yarnProps,
yarnPropertiesFile);
-
- //------------------ ClusterClient running, let
user control it ------------
if (detachedMode) {
- // print info and quit:
LOG.info("The Flink YARN client has
been started in detached mode. In order to stop " +
"Flink on YARN, use the
following command or a YARN web interface to stop it:\n" +
- "yarn application -kill " +
yarnCluster.getApplicationId());
- yarnCluster.waitForClusterToBeReady();
- yarnCluster.disconnect();
+ "yarn application -kill " +
applicationId.getOpt());
} else {
-
ScheduledThreadPoolExecutor
scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
--- End diff --
I would prefer `Executors.newSingleThreadScheduledExecutor();` because the
`coreSize` will always be `1`.
> Add support for job cluster deployment
> --------------------------------------
>
> Key: FLINK-8343
> URL: https://issues.apache.org/jira/browse/FLINK-8343
> Project: Flink
> Issue Type: Sub-task
> Components: Client
> Affects Versions: 1.5.0
> Reporter: Till Rohrmann
> Labels: flip-6
> Fix For: 1.5.0
>
>
> For Flip-6 we have to enable a different job cluster deployment. The
> difference is that we directly submit the job when we deploy the Flink
> cluster instead of following a two step approach (first deployment and then
> submission).
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)