[ 
https://issues.apache.org/jira/browse/FLINK-1771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14387939#comment-14387939
 ] 

ASF GitHub Bot commented on FLINK-1771:
---------------------------------------

Github user hsaputra commented on a diff in the pull request:

    https://github.com/apache/flink/pull/542#discussion_r27451902
  
    --- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
    @@ -265,12 +266,32 @@ protected int run(String[] args) {
                }
     
                try {
    -                   Client client = getClient(options, 
program.getUserCodeClassLoader(), program.getMainClassName());
    -
    -                   int parallelism = options.getParallelism();
    -                   int exitCode = executeProgram(program, client, 
parallelism);
    -
    -                   if (yarnCluster != null) {
    +                   int userParallelism = options.getParallelism();
    +                   LOG.debug("User parallelism is set to {}", 
userParallelism);
    +
    +                   Client client = getClient(options, 
program.getUserCodeClassLoader(), program.getMainClassName(), userParallelism);
    +                   LOG.debug("Client slots is set to {}", 
client.getMaxSlots());
    +                   if(client.getMaxSlots() != -1 && userParallelism == -1) 
{
    +                           logAndSysout("Using the parallelism provided by 
the remote cluster ("+client.getMaxSlots()+"). " +
    +                                           "To use another parallelism, 
set it at the ./bin/flink client.");
    +                           userParallelism = client.getMaxSlots();
    +                   }
    +                   int exitCode = 0;
    +
    +                   // check if detached per job yarn cluster is used to 
start flink
    +                   if(yarnCluster != null && yarnCluster.isDetached()) {
    +                           logAndSysout("The Flink YARN client has been 
started in detached mode. In order to stop " +
    +                                           "Flink on YARN, use the 
following command or a YARN web interface to stop it:\n" +
    +                                           "yarn application -kill 
"+yarnCluster.getApplicationId()+"\n" +
    +                                           "Please also note that the 
temporary files of the YARN session in the home directoy will not be removed.");
    +                           executeProgram(program, client, 
userParallelism, false);
    +                   } else {
    +                           // regular (blocking) execution.
    +                           exitCode = executeProgram(program, client, 
userParallelism, true);
    +                   }
    +
    +                   // show YARN cluster status if its not a detached YARN 
cluster.
    +                   if (yarnCluster != null && !yarnCluster.isDetached()) {
    --- End diff --
    
    Since finally trumps returns statement, the code block following this check 
could be moved to finally statement below?


> Add support for submitting single jobs to a detached YARN session
> -----------------------------------------------------------------
>
>                 Key: FLINK-1771
>                 URL: https://issues.apache.org/jira/browse/FLINK-1771
>             Project: Flink
>          Issue Type: Improvement
>          Components: YARN Client
>    Affects Versions: 0.9
>            Reporter: Robert Metzger
>            Assignee: Robert Metzger
>
> We need tests ensuring that the processing slots are set properly when 
> starting Flink on YARN, in particular with the per job YARN session feature.
> Also, the YARN tests for detached YARN sessions / per job yarn clusters are 
> polluting the local home-directory.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to