rmetzger commented on a change in pull request #13412:
URL: https://github.com/apache/flink/pull/13412#discussion_r491986392



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
##########
@@ -1292,23 +1292,28 @@ void vertexFinished() {
                                // we do the final cleanup in the I/O executor, 
because it may involve
                                // some heavier work
 
-                               try {
-                                       for (ExecutionJobVertex ejv : 
verticesInCreationOrder) {
-                                               
ejv.getJobVertex().finalizeOnMaster(getUserClassLoader());
-                                       }
-                               }
-                               catch (Throwable t) {
-                                       ExceptionUtils.rethrowIfFatalError(t);
-                                       
ClusterEntryPointExceptionUtils.tryEnrichClusterEntryPointError(t);
-                                       failGlobal(new Exception("Failed to 
finalize execution on master", t));
-                                       return;
+                               final List<CompletableFuture<Void>> futures = 
new ArrayList<>();
+                               for (ExecutionJobVertex ejv : 
verticesInCreationOrder) {
+                                       
futures.add(CompletableFuture.runAsync(() -> {
+                                               try {
+                                                       
ejv.getJobVertex().finalizeOnMaster(getUserClassLoader());
+                                               } catch (Throwable t) {
+                                                       
ExceptionUtils.rethrow(t);
+                                               }
+                                       }, ioExecutor));
                                }
 
-                               // if we do not make this state transition, 
then a concurrent
-                               // cancellation or failure happened
-                               if (transitionState(JobStatus.RUNNING, 
JobStatus.FINISHED)) {
-                                       onTerminalState(JobStatus.FINISHED);
-                               }
+                               
FutureUtils.combineAll(futures).whenCompleteAsync((ignored, t) -> {
+                                       if (t != null) {
+                                               
ExceptionUtils.rethrowIfFatalError(ExceptionUtils.stripCompletionException(t));
+                                               
ClusterEntryPointExceptionUtils.tryEnrichClusterEntryPointError(t);
+                                               failGlobal(new 
Exception("Failed to finalize execution on master", t));
+                                       } else {
+                                               if 
(transitionState(JobStatus.RUNNING, JobStatus.FINISHED)) {

Review comment:
       Also, I believe we should not confirm any terminal state until the 
`finalizeOnMaster` method has finished.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to