tillrohrmann commented on a change in pull request #13217:
URL: https://github.com/apache/flink/pull/13217#discussion_r482801219



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java
##########
@@ -107,18 +106,13 @@ private DispatcherJob(
                
FutureUtils.assertNoException(this.jobManagerRunnerFuture.handle((jobManagerRunner,
 throwable) -> {
                        // JM has been initialized, or the initialization failed
                        synchronized (lock) {
-                               if (jobStatus != 
DispatcherJobStatus.CANCELLING) {
-                                       jobStatus = 
DispatcherJobStatus.JOB_MANAGER_RUNNER_INITIALIZED;
-                               }
-
+                               jobStatus = 
DispatcherJobStatus.JOB_MANAGER_RUNNER_INITIALIZED;
                                if (throwable == null) { // initialization 
succeeded
                                        // Forward result future
                                        
jobManagerRunner.getResultFuture().whenComplete((archivedExecutionGraph, 
resultThrowable) -> {
-                                               if (archivedExecutionGraph != 
null) {
-                                                       
jobResultFuture.complete(DispatcherJobResult.forSuccess(archivedExecutionGraph));
-                                               } else {
-                                                       
jobResultFuture.completeExceptionally(ExceptionUtils.stripCompletionException(resultThrowable));
-                                               }
+                                               FutureUtils.forward(
+                                                       
jobManagerRunner.getResultFuture().thenApply(DispatcherJobResult::forSuccess),
+                                                       jobResultFuture);
                                        });

Review comment:
       This was again ill advice because by calling `thenApply` we potentially 
introduce a `CompletionException` which will make the 
`DispatcherResourceCleanupTest.testBlobServerCleanupWhenJobNotFinished` fail. 
Hence, I would suggest to revert this change. 
   
   On a side note: What I meant was actually to replace the `whenComplete` call 
with
   
   ```
   FutureUtils.forward(
          
jobManagerRunner.getResultFuture().thenApply(DispatcherJobResult::forSuccess),
          jobResultFuture);
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to