rmetzger commented on a change in pull request #13412: URL: https://github.com/apache/flink/pull/13412#discussion_r491975797
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ########## @@ -1292,23 +1292,28 @@ void vertexFinished() { // we do the final cleanup in the I/O executor, because it may involve // some heavier work - try { - for (ExecutionJobVertex ejv : verticesInCreationOrder) { - ejv.getJobVertex().finalizeOnMaster(getUserClassLoader()); - } - } - catch (Throwable t) { - ExceptionUtils.rethrowIfFatalError(t); - ClusterEntryPointExceptionUtils.tryEnrichClusterEntryPointError(t); - failGlobal(new Exception("Failed to finalize execution on master", t)); - return; + final List<CompletableFuture<Void>> futures = new ArrayList<>(); + for (ExecutionJobVertex ejv : verticesInCreationOrder) { + futures.add(CompletableFuture.runAsync(() -> { + try { + ejv.getJobVertex().finalizeOnMaster(getUserClassLoader()); + } catch (Throwable t) { + ExceptionUtils.rethrow(t); + } + }, ioExecutor)); } - // if we do not make this state transition, then a concurrent - // cancellation or failure happened - if (transitionState(JobStatus.RUNNING, JobStatus.FINISHED)) { - onTerminalState(JobStatus.FINISHED); - } + FutureUtils.combineAll(futures).whenCompleteAsync((ignored, t) -> { Review comment: ```suggestion FutureUtils.assertNoException(FutureUtils.combineAll(futures).whenCompleteAsync((ignored, t) -> { ``` Otherwise, the `ExceptionUtils.rethrowIfFatalError` won't do much. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org