[ 
https://issues.apache.org/jira/browse/FLINK-8609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16364020#comment-16364020
 ] 

ASF GitHub Bot commented on FLINK-8609:
---------------------------------------

Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5432#discussion_r168173010
  
    --- Diff: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java ---
    @@ -210,51 +225,72 @@ protected void run(String[] args) throws Exception {
     
                        final ClusterClient<T> client;
     
    -                   if (clusterId != null) {
    -                           client = clusterDescriptor.retrieve(clusterId);
    -                   } else {
    -                           final ClusterSpecification clusterSpecification 
= customCommandLine.getClusterSpecification(commandLine);
    -                           client = 
clusterDescriptor.deploySessionCluster(clusterSpecification);
    -                   }
    +                   // directly deploy the job if the cluster is started in 
job mode and detached
    +                   if (flip6 && clusterId == null && 
runOptions.getDetachedMode()) {
    +                           int parallelism = runOptions.getParallelism() 
== -1 ? defaultParallelism : runOptions.getParallelism();
     
    -                   try {
    -                           
client.setPrintStatusDuringExecution(runOptions.getStdoutLogging());
    -                           
client.setDetached(runOptions.getDetachedMode());
    -                           LOG.debug("Client slots is set to {}", 
client.getMaxSlots());
    -
    -                           
LOG.debug(runOptions.getSavepointRestoreSettings().toString());
    -
    -                           int userParallelism = 
runOptions.getParallelism();
    -                           LOG.debug("User parallelism is set to {}", 
userParallelism);
    -                           if (client.getMaxSlots() != -1 && 
userParallelism == -1) {
    -                                   logAndSysout("Using the parallelism 
provided by the remote cluster ("
    -                                           + client.getMaxSlots() + "). "
    -                                           + "To use another parallelism, 
set it at the ./bin/flink client.");
    -                                   userParallelism = client.getMaxSlots();
    -                           } else if (ExecutionConfig.PARALLELISM_DEFAULT 
== userParallelism) {
    -                                   userParallelism = defaultParallelism;
    -                           }
    +                           final JobGraph jobGraph = 
createJobGraph(configuration, program, parallelism);
     
    -                           executeProgram(program, client, 
userParallelism);
    -                   } finally {
    -                           if (clusterId == null && !client.isDetached()) {
    -                                   // terminate the cluster only if we 
have started it before and if it's not detached
    -                                   try {
    -                                           
clusterDescriptor.terminateCluster(client.getClusterId());
    -                                   } catch (FlinkException e) {
    -                                           LOG.info("Could not properly 
terminate the Flink cluster.", e);
    -                                   }
    -                           }
    +                           final ClusterSpecification clusterSpecification 
= customCommandLine.getClusterSpecification(commandLine);
    +                           client = clusterDescriptor.deployJobCluster(
    +                                   clusterSpecification,
    +                                   jobGraph,
    +                                   runOptions.getDetachedMode());
    +
    +                           logAndSysout("Job has been submitted with JobID 
" + jobGraph.getJobID());
     
                                try {
                                        client.shutdown();
                                } catch (Exception e) {
                                        LOG.info("Could not properly shut down 
the client.", e);
                                }
    +                   } else {
    +                           if (clusterId != null) {
    +                                   client = 
clusterDescriptor.retrieve(clusterId);
    +                           } else {
    +                                   // also in job mode we have to deploy a 
session cluster because the job
    +                                   // might consist of multiple parts 
(e.g. when using collect)
    +                                   final ClusterSpecification 
clusterSpecification = customCommandLine.getClusterSpecification(commandLine);
    +                                   client = 
clusterDescriptor.deploySessionCluster(clusterSpecification);
    +                           }
    +
    +                           try {
    +                                   
client.setPrintStatusDuringExecution(runOptions.getStdoutLogging());
    +                                   
client.setDetached(runOptions.getDetachedMode());
    +                                   LOG.debug("Client slots is set to {}", 
client.getMaxSlots());
    +
    +                                   
LOG.debug(runOptions.getSavepointRestoreSettings().toString());
    --- End diff --
    
    Maybe `.debug("{}", runOptions.getSavepointRestoreSettings())` to save the 
`toString()` invocation.


> Add support to deploy detached job mode clusters
> ------------------------------------------------
>
>                 Key: FLINK-8609
>                 URL: https://issues.apache.org/jira/browse/FLINK-8609
>             Project: Flink
>          Issue Type: New Feature
>          Components: Client
>    Affects Versions: 1.5.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>            Priority: Major
>              Labels: flip-6
>             Fix For: 1.5.0
>
>
> After adding FLINK-8608, we can add support to the {{CliFrontend}} to deploy 
> detached job mode clusters.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to