rmetzger commented on a change in pull request #13412:
URL: https://github.com/apache/flink/pull/13412#discussion_r491975797



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
##########
@@ -1292,23 +1292,28 @@ void vertexFinished() {
                                // we do the final cleanup in the I/O executor, 
because it may involve
                                // some heavier work
 
-                               try {
-                                       for (ExecutionJobVertex ejv : 
verticesInCreationOrder) {
-                                               
ejv.getJobVertex().finalizeOnMaster(getUserClassLoader());
-                                       }
-                               }
-                               catch (Throwable t) {
-                                       ExceptionUtils.rethrowIfFatalError(t);
-                                       
ClusterEntryPointExceptionUtils.tryEnrichClusterEntryPointError(t);
-                                       failGlobal(new Exception("Failed to 
finalize execution on master", t));
-                                       return;
+                               final List<CompletableFuture<Void>> futures = 
new ArrayList<>();
+                               for (ExecutionJobVertex ejv : 
verticesInCreationOrder) {
+                                       
futures.add(CompletableFuture.runAsync(() -> {
+                                               try {
+                                                       
ejv.getJobVertex().finalizeOnMaster(getUserClassLoader());
+                                               } catch (Throwable t) {
+                                                       
ExceptionUtils.rethrow(t);
+                                               }
+                                       }, ioExecutor));
                                }
 
-                               // if we do not make this state transition, 
then a concurrent
-                               // cancellation or failure happened
-                               if (transitionState(JobStatus.RUNNING, 
JobStatus.FINISHED)) {
-                                       onTerminalState(JobStatus.FINISHED);
-                               }
+                               
FutureUtils.combineAll(futures).whenCompleteAsync((ignored, t) -> {

Review comment:
       ```suggestion
                                
FutureUtils.assertNoException(FutureUtils.combineAll(futures).whenCompleteAsync((ignored,
 t) -> {
   ```
   
   Otherwise, the `ExceptionUtils.rethrowIfFatalError` won't do much.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to