kl0u commented on a change in pull request #10311: [FLINK-14762][client] Enrich 
JobClient API
URL: https://github.com/apache/flink/pull/10311#discussion_r351143741
 
 

 ##########
 File path: 
flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractSessionClusterExecutor.java
 ##########
 @@ -50,18 +50,20 @@ public AbstractSessionClusterExecutor(@Nonnull final 
ClientFactory clusterClient
        }
 
        @Override
-       public CompletableFuture<JobClient> execute(@Nonnull final Pipeline 
pipeline, @Nonnull final Configuration configuration) throws Exception {
+       public CompletableFuture<? extends JobClient> execute(@Nonnull final 
Pipeline pipeline, @Nonnull final Configuration configuration) throws Exception 
{
                final JobGraph jobGraph = ExecutorUtils.getJobGraph(pipeline, 
configuration);
 
                try (final ClusterDescriptor<ClusterID> clusterDescriptor = 
clusterClientFactory.createClusterDescriptor(configuration)) {
                        final ClusterID clusterID = 
clusterClientFactory.getClusterId(configuration);
                        checkState(clusterID != null);
 
                        final ClusterClient<ClusterID> clusterClient = 
clusterDescriptor.retrieve(clusterID);
-                       return clusterClient
-                                       .submitJob(jobGraph)
-                                       
.thenApply(JobSubmissionResult::getJobID)
-                                       .thenApply(jobID -> new 
ClusterClientJobClientAdapter<>(clusterClient, jobID, false));
+                       return 
clusterClient.submitJob(jobGraph).thenApply(jobClient -> {
+                               if (jobClient instanceof 
ClusterClientJobClientAdapter) {
 
 Review comment:
   This is pretty fragile and also not the best in terms of coding practices.
   
   Actually after checking both versions of the PR, maybe it is better to keep 
the newly introduced `addOnCloseActions()` for the shutdown hook in the 
attached, per-job cluster case, and also keep the `moveOwnership` flag in the 
constructor of the `ClusterClientJobClientAdapter` and the 
`ClusterClient.submitJob()` method for closing, or not, the underlying cluster 
client. So, the `sae` flag will add a "closing action" and the `submitJob` will 
have an additional parameter.
   
   In the future, we can think of how to get rid of it in a proper way.
   
   I know that this is a bit of back and forth on my side, but I hope that it 
is not a big problem for you.
   
   WDYT?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to