[
https://issues.apache.org/jira/browse/FLINK-8343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16323060#comment-16323060
]
ASF GitHub Bot commented on FLINK-8343:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5229#discussion_r161090177
--- Diff:
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -534,215 +557,235 @@ private Configuration
applyYarnProperties(Configuration configuration) throws Fl
return effectiveConfiguration;
}
- public int run(
- String[] args,
- Configuration configuration,
- String configurationDirectory) {
+ public int run(String[] args) throws CliArgsException, FlinkException {
//
// Command Line Options
//
- Options options = new Options();
- addGeneralOptions(options);
- addRunOptions(options);
+ final CommandLine cmd = parseCommandLineOptions(args, true);
- CommandLineParser parser = new PosixParser();
- CommandLine cmd;
- try {
- cmd = parser.parse(options, args);
- } catch (Exception e) {
- System.out.println(e.getMessage());
- printUsage();
- return 1;
- }
+ final AbstractYarnClusterDescriptor yarnClusterDescriptor =
createClusterDescriptor(cmd);
- // Query cluster for metrics
- if (cmd.hasOption(query.getOpt())) {
- AbstractYarnClusterDescriptor yarnDescriptor =
getClusterDescriptor(
- configuration,
- configurationDirectory,
- cmd.hasOption(flip6.getOpt()));
- String description;
- try {
- description =
yarnDescriptor.getClusterDescription();
- } catch (Exception e) {
- System.err.println("Error while querying the
YARN cluster for available resources: " + e.getMessage());
- e.printStackTrace(System.err);
- return 1;
- }
- System.out.println(description);
- return 0;
- } else if (cmd.hasOption(applicationId.getOpt())) {
+ try {
+ // Query cluster for metrics
+ if (cmd.hasOption(query.getOpt())) {
+ final String description =
yarnClusterDescriptor.getClusterDescription();
+ System.out.println(description);
+ return 0;
+ } else {
+ final ClusterClient clusterClient;
+ final ApplicationId yarnApplicationId;
- AbstractYarnClusterDescriptor yarnDescriptor =
getClusterDescriptor(
- configuration,
- configurationDirectory,
- cmd.hasOption(flip6.getOpt()));
+ if (cmd.hasOption(applicationId.getOpt())) {
+ yarnApplicationId =
ConverterUtils.toApplicationId(cmd.getOptionValue(applicationId.getOpt()));
- //configure ZK namespace depending on the value passed
- String zkNamespace =
cmd.hasOption(zookeeperNamespace.getOpt()) ?
-
cmd.getOptionValue(zookeeperNamespace.getOpt())
- :
yarnDescriptor.getFlinkConfiguration()
-
.getString(HA_CLUSTER_ID, cmd.getOptionValue(applicationId.getOpt()));
- LOG.info("Going to use the ZK namespace: {}",
zkNamespace);
--- End diff --
It should not matter.
> Add support for job cluster deployment
> --------------------------------------
>
> Key: FLINK-8343
> URL: https://issues.apache.org/jira/browse/FLINK-8343
> Project: Flink
> Issue Type: Sub-task
> Components: Client
> Affects Versions: 1.5.0
> Reporter: Till Rohrmann
> Labels: flip-6
> Fix For: 1.5.0
>
>
> For Flip-6 we have to enable a different job cluster deployment. The
> difference is that we directly submit the job when we deploy the Flink
> cluster instead of following a two step approach (first deployment and then
> submission).
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)