[ 
https://issues.apache.org/jira/browse/FLINK-8343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16323060#comment-16323060
 ] 

ASF GitHub Bot commented on FLINK-8343:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5229#discussion_r161090177
  
    --- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
    @@ -534,215 +557,235 @@ private Configuration 
applyYarnProperties(Configuration configuration) throws Fl
                return effectiveConfiguration;
        }
     
    -   public int run(
    -                   String[] args,
    -                   Configuration configuration,
    -                   String configurationDirectory) {
    +   public int run(String[] args) throws CliArgsException, FlinkException {
                //
                //      Command Line Options
                //
    -           Options options = new Options();
    -           addGeneralOptions(options);
    -           addRunOptions(options);
    +           final CommandLine cmd = parseCommandLineOptions(args, true);
     
    -           CommandLineParser parser = new PosixParser();
    -           CommandLine cmd;
    -           try {
    -                   cmd = parser.parse(options, args);
    -           } catch (Exception e) {
    -                   System.out.println(e.getMessage());
    -                   printUsage();
    -                   return 1;
    -           }
    +           final AbstractYarnClusterDescriptor yarnClusterDescriptor = 
createClusterDescriptor(cmd);
     
    -           // Query cluster for metrics
    -           if (cmd.hasOption(query.getOpt())) {
    -                   AbstractYarnClusterDescriptor yarnDescriptor = 
getClusterDescriptor(
    -                           configuration,
    -                           configurationDirectory,
    -                           cmd.hasOption(flip6.getOpt()));
    -                   String description;
    -                   try {
    -                           description = 
yarnDescriptor.getClusterDescription();
    -                   } catch (Exception e) {
    -                           System.err.println("Error while querying the 
YARN cluster for available resources: " + e.getMessage());
    -                           e.printStackTrace(System.err);
    -                           return 1;
    -                   }
    -                   System.out.println(description);
    -                   return 0;
    -           } else if (cmd.hasOption(applicationId.getOpt())) {
    +           try {
    +                   // Query cluster for metrics
    +                   if (cmd.hasOption(query.getOpt())) {
    +                           final String description = 
yarnClusterDescriptor.getClusterDescription();
    +                           System.out.println(description);
    +                           return 0;
    +                   } else {
    +                           final ClusterClient clusterClient;
    +                           final ApplicationId yarnApplicationId;
     
    -                   AbstractYarnClusterDescriptor yarnDescriptor = 
getClusterDescriptor(
    -                           configuration,
    -                           configurationDirectory,
    -                           cmd.hasOption(flip6.getOpt()));
    +                           if (cmd.hasOption(applicationId.getOpt())) {
    +                                   yarnApplicationId = 
ConverterUtils.toApplicationId(cmd.getOptionValue(applicationId.getOpt()));
     
    -                   //configure ZK namespace depending on the value passed
    -                   String zkNamespace = 
cmd.hasOption(zookeeperNamespace.getOpt()) ?
    -                                                                   
cmd.getOptionValue(zookeeperNamespace.getOpt())
    -                                                                   : 
yarnDescriptor.getFlinkConfiguration()
    -                                                                   
.getString(HA_CLUSTER_ID, cmd.getOptionValue(applicationId.getOpt()));
    -                   LOG.info("Going to use the ZK namespace: {}", 
zkNamespace);
    --- End diff --
    
    It should not matter.


> Add support for job cluster deployment
> --------------------------------------
>
>                 Key: FLINK-8343
>                 URL: https://issues.apache.org/jira/browse/FLINK-8343
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Client
>    Affects Versions: 1.5.0
>            Reporter: Till Rohrmann
>              Labels: flip-6
>             Fix For: 1.5.0
>
>
> For Flip-6 we have to enable a different job cluster deployment. The 
> difference is that we directly submit the job when we deploy the Flink 
> cluster instead of following a two step approach (first deployment and then 
> submission).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to