rmetzger commented on a change in pull request #13412:
URL: https://github.com/apache/flink/pull/13412#discussion_r491975797



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
##########
@@ -1292,23 +1292,28 @@ void vertexFinished() {
                                // we do the final cleanup in the I/O executor, 
because it may involve
                                // some heavier work
 
-                               try {
-                                       for (ExecutionJobVertex ejv : 
verticesInCreationOrder) {
-                                               
ejv.getJobVertex().finalizeOnMaster(getUserClassLoader());
-                                       }
-                               }
-                               catch (Throwable t) {
-                                       ExceptionUtils.rethrowIfFatalError(t);
-                                       
ClusterEntryPointExceptionUtils.tryEnrichClusterEntryPointError(t);
-                                       failGlobal(new Exception("Failed to 
finalize execution on master", t));
-                                       return;
+                               final List<CompletableFuture<Void>> futures = 
new ArrayList<>();
+                               for (ExecutionJobVertex ejv : 
verticesInCreationOrder) {
+                                       
futures.add(CompletableFuture.runAsync(() -> {
+                                               try {
+                                                       
ejv.getJobVertex().finalizeOnMaster(getUserClassLoader());
+                                               } catch (Throwable t) {
+                                                       
ExceptionUtils.rethrow(t);
+                                               }
+                                       }, ioExecutor));
                                }
 
-                               // if we do not make this state transition, 
then a concurrent
-                               // cancellation or failure happened
-                               if (transitionState(JobStatus.RUNNING, 
JobStatus.FINISHED)) {
-                                       onTerminalState(JobStatus.FINISHED);
-                               }
+                               
FutureUtils.combineAll(futures).whenCompleteAsync((ignored, t) -> {

Review comment:
       ```suggestion
                                
FutureUtils.assertNoException(FutureUtils.combineAll(futures).whenCompleteAsync((ignored,
 t) -> {
   ```
   
   Otherwise, the `ExceptionUtils.rethrowIfFatalError` won't do much.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
##########
@@ -1292,23 +1292,28 @@ void vertexFinished() {
                                // we do the final cleanup in the I/O executor, 
because it may involve
                                // some heavier work
 
-                               try {
-                                       for (ExecutionJobVertex ejv : 
verticesInCreationOrder) {
-                                               
ejv.getJobVertex().finalizeOnMaster(getUserClassLoader());
-                                       }
-                               }
-                               catch (Throwable t) {
-                                       ExceptionUtils.rethrowIfFatalError(t);
-                                       
ClusterEntryPointExceptionUtils.tryEnrichClusterEntryPointError(t);
-                                       failGlobal(new Exception("Failed to 
finalize execution on master", t));
-                                       return;
+                               final List<CompletableFuture<Void>> futures = 
new ArrayList<>();
+                               for (ExecutionJobVertex ejv : 
verticesInCreationOrder) {
+                                       
futures.add(CompletableFuture.runAsync(() -> {
+                                               try {
+                                                       
ejv.getJobVertex().finalizeOnMaster(getUserClassLoader());
+                                               } catch (Throwable t) {
+                                                       
ExceptionUtils.rethrow(t);
+                                               }
+                                       }, ioExecutor));
                                }
 
-                               // if we do not make this state transition, 
then a concurrent
-                               // cancellation or failure happened
-                               if (transitionState(JobStatus.RUNNING, 
JobStatus.FINISHED)) {
-                                       onTerminalState(JobStatus.FINISHED);
-                               }
+                               
FutureUtils.combineAll(futures).whenCompleteAsync((ignored, t) -> {
+                                       if (t != null) {
+                                               
ExceptionUtils.rethrowIfFatalError(ExceptionUtils.stripCompletionException(t));
+                                               
ClusterEntryPointExceptionUtils.tryEnrichClusterEntryPointError(t);
+                                               failGlobal(new 
Exception("Failed to finalize execution on master", t));
+                                       } else {
+                                               if 
(transitionState(JobStatus.RUNNING, JobStatus.FINISHED)) {

Review comment:
       What do we do if the job has transitioned into a terminal state in the 
meantime (say CANCELLED?) 
   If I'm reading the code correctly, will throw a IllegalStateException if the 
current state is terminal.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
##########
@@ -1292,23 +1292,28 @@ void vertexFinished() {
                                // we do the final cleanup in the I/O executor, 
because it may involve
                                // some heavier work
 
-                               try {
-                                       for (ExecutionJobVertex ejv : 
verticesInCreationOrder) {
-                                               
ejv.getJobVertex().finalizeOnMaster(getUserClassLoader());
-                                       }
-                               }
-                               catch (Throwable t) {
-                                       ExceptionUtils.rethrowIfFatalError(t);
-                                       
ClusterEntryPointExceptionUtils.tryEnrichClusterEntryPointError(t);
-                                       failGlobal(new Exception("Failed to 
finalize execution on master", t));
-                                       return;
+                               final List<CompletableFuture<Void>> futures = 
new ArrayList<>();
+                               for (ExecutionJobVertex ejv : 
verticesInCreationOrder) {
+                                       
futures.add(CompletableFuture.runAsync(() -> {
+                                               try {
+                                                       
ejv.getJobVertex().finalizeOnMaster(getUserClassLoader());
+                                               } catch (Throwable t) {
+                                                       
ExceptionUtils.rethrow(t);
+                                               }
+                                       }, ioExecutor));
                                }
 
-                               // if we do not make this state transition, 
then a concurrent
-                               // cancellation or failure happened
-                               if (transitionState(JobStatus.RUNNING, 
JobStatus.FINISHED)) {
-                                       onTerminalState(JobStatus.FINISHED);
-                               }
+                               
FutureUtils.combineAll(futures).whenCompleteAsync((ignored, t) -> {
+                                       if (t != null) {
+                                               
ExceptionUtils.rethrowIfFatalError(ExceptionUtils.stripCompletionException(t));
+                                               
ClusterEntryPointExceptionUtils.tryEnrichClusterEntryPointError(t);
+                                               failGlobal(new 
Exception("Failed to finalize execution on master", t));
+                                       } else {
+                                               if 
(transitionState(JobStatus.RUNNING, JobStatus.FINISHED)) {

Review comment:
       Also, I believe we should not confirm any terminal state until the 
`finalizeOnMaster` method has finished.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to