Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5432#discussion_r168195242
  
    --- Diff: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java ---
    @@ -210,51 +225,72 @@ protected void run(String[] args) throws Exception {
     
                        final ClusterClient<T> client;
     
    -                   if (clusterId != null) {
    -                           client = clusterDescriptor.retrieve(clusterId);
    -                   } else {
    -                           final ClusterSpecification clusterSpecification 
= customCommandLine.getClusterSpecification(commandLine);
    -                           client = 
clusterDescriptor.deploySessionCluster(clusterSpecification);
    -                   }
    +                   // directly deploy the job if the cluster is started in 
job mode and detached
    +                   if (flip6 && clusterId == null && 
runOptions.getDetachedMode()) {
    +                           int parallelism = runOptions.getParallelism() 
== -1 ? defaultParallelism : runOptions.getParallelism();
     
    -                   try {
    -                           
client.setPrintStatusDuringExecution(runOptions.getStdoutLogging());
    -                           
client.setDetached(runOptions.getDetachedMode());
    -                           LOG.debug("Client slots is set to {}", 
client.getMaxSlots());
    -
    -                           
LOG.debug(runOptions.getSavepointRestoreSettings().toString());
    -
    -                           int userParallelism = 
runOptions.getParallelism();
    -                           LOG.debug("User parallelism is set to {}", 
userParallelism);
    -                           if (client.getMaxSlots() != -1 && 
userParallelism == -1) {
    -                                   logAndSysout("Using the parallelism 
provided by the remote cluster ("
    -                                           + client.getMaxSlots() + "). "
    -                                           + "To use another parallelism, 
set it at the ./bin/flink client.");
    -                                   userParallelism = client.getMaxSlots();
    -                           } else if (ExecutionConfig.PARALLELISM_DEFAULT 
== userParallelism) {
    -                                   userParallelism = defaultParallelism;
    -                           }
    +                           final JobGraph jobGraph = 
createJobGraph(configuration, program, parallelism);
     
    -                           executeProgram(program, client, 
userParallelism);
    -                   } finally {
    -                           if (clusterId == null && !client.isDetached()) {
    -                                   // terminate the cluster only if we 
have started it before and if it's not detached
    -                                   try {
    -                                           
clusterDescriptor.terminateCluster(client.getClusterId());
    -                                   } catch (FlinkException e) {
    -                                           LOG.info("Could not properly 
terminate the Flink cluster.", e);
    -                                   }
    -                           }
    +                           final ClusterSpecification clusterSpecification 
= customCommandLine.getClusterSpecification(commandLine);
    +                           client = clusterDescriptor.deployJobCluster(
    +                                   clusterSpecification,
    +                                   jobGraph,
    +                                   runOptions.getDetachedMode());
    +
    +                           logAndSysout("Job has been submitted with JobID 
" + jobGraph.getJobID());
     
                                try {
                                        client.shutdown();
                                } catch (Exception e) {
                                        LOG.info("Could not properly shut down 
the client.", e);
                                }
    +                   } else {
    +                           if (clusterId != null) {
    +                                   client = 
clusterDescriptor.retrieve(clusterId);
    +                           } else {
    +                                   // also in job mode we have to deploy a 
session cluster because the job
    +                                   // might consist of multiple parts 
(e.g. when using collect)
    +                                   final ClusterSpecification 
clusterSpecification = customCommandLine.getClusterSpecification(commandLine);
    +                                   client = 
clusterDescriptor.deploySessionCluster(clusterSpecification);
    +                           }
    +
    +                           try {
    +                                   
client.setPrintStatusDuringExecution(runOptions.getStdoutLogging());
    +                                   
client.setDetached(runOptions.getDetachedMode());
    +                                   LOG.debug("Client slots is set to {}", 
client.getMaxSlots());
    +
    +                                   
LOG.debug(runOptions.getSavepointRestoreSettings().toString());
    --- End diff --
    
    True, will improve it.


---

Reply via email to