This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 67aeb8165c623e35f773de7e50d0f00c24cca539 Author: Zhu Zhu <[email protected]> AuthorDate: Mon Jun 27 14:59:01 2022 +0800 [FLINK-28134][runtime] Wait for all the executions to be terminated before finishing a job In speculative execution, it is possible that when all the execution vertices finish, some executions are still running. We need to waiting for all them to be terminated(canceled) before conducting job finalization. --- .../executiongraph/DefaultExecutionGraph.java | 54 ++++++++++++++-------- .../runtime/executiongraph/ExecutionVertex.java | 4 ++ .../executiongraph/SpeculativeExecutionVertex.java | 9 ++++ .../SpeculativeExecutionVertexTest.java | 26 +++++++++++ 4 files changed, 74 insertions(+), 19 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java index 4e144050e7e..1d777577f42 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java @@ -96,6 +96,7 @@ import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -1168,29 +1169,44 @@ public class DefaultExecutionGraph implements ExecutionGraph, InternalExecutionG assertRunningInJobMasterMainThread(); final int numFinished = ++numFinishedJobVertices; if (numFinished == numJobVerticesTotal) { - // done :-) + FutureUtils.assertNoException( + waitForAllExecutionsTermination().thenAccept(ignored -> jobFinished())); + } + } - // check whether we are still in "RUNNING" and trigger the final cleanup - if (state == JobStatus.RUNNING) { - // we do the final cleanup in the I/O executor, because it may involve - // some heavier work + private CompletableFuture<?> waitForAllExecutionsTermination() { + final List<CompletableFuture<?>> terminationFutures = + verticesInCreationOrder.stream() + .flatMap(ejv -> Arrays.stream(ejv.getTaskVertices())) + .map(ExecutionVertex::getTerminationFuture) + .collect(Collectors.toList()); - try { - for (ExecutionJobVertex ejv : verticesInCreationOrder) { - ejv.getJobVertex().finalizeOnMaster(getUserClassLoader()); - } - } catch (Throwable t) { - ExceptionUtils.rethrowIfFatalError(t); - ClusterEntryPointExceptionUtils.tryEnrichClusterEntryPointError(t); - failGlobal(new Exception("Failed to finalize execution on master", t)); - return; - } + return FutureUtils.waitForAll(terminationFutures); + } - // if we do not make this state transition, then a concurrent - // cancellation or failure happened - if (transitionState(JobStatus.RUNNING, JobStatus.FINISHED)) { - onTerminalState(JobStatus.FINISHED); + private void jobFinished() { + assertRunningInJobMasterMainThread(); + + // check whether we are still in "RUNNING" and trigger the final cleanup + if (state == JobStatus.RUNNING) { + // we do the final cleanup in the I/O executor, because it may involve + // some heavier work + + try { + for (ExecutionJobVertex ejv : verticesInCreationOrder) { + ejv.getJobVertex().finalizeOnMaster(getUserClassLoader()); } + } catch (Throwable t) { + ExceptionUtils.rethrowIfFatalError(t); + ClusterEntryPointExceptionUtils.tryEnrichClusterEntryPointError(t); + failGlobal(new Exception("Failed to finalize execution on master", t)); + return; + } + + // if we do not make this state transition, then a concurrent + // cancellation or failure happened + if (transitionState(JobStatus.RUNNING, JobStatus.FINISHED)) { + onTerminalState(JobStatus.FINISHED); } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index abda5427f4a..189fe921dd7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -318,6 +318,10 @@ public class ExecutionVertex return resultPartitions; } + CompletableFuture<?> getTerminationFuture() { + return currentExecution.getTerminalStateFuture(); + } + // -------------------------------------------------------------------------------------------- // Graph building // -------------------------------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java index f819c4539b7..d76cee8f38a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java @@ -123,6 +123,15 @@ public class SpeculativeExecutionVertex extends ExecutionVertex { currentExecutions.values().forEach(e -> e.markFailed(t)); } + @Override + CompletableFuture<?> getTerminationFuture() { + final List<CompletableFuture<?>> terminationFutures = + currentExecutions.values().stream() + .map(Execution::getTerminalStateFuture) + .collect(Collectors.toList()); + return FutureUtils.waitForAll(terminationFutures); + } + @Override public void resetForNewExecution() { super.resetForNewExecution(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertexTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertexTest.java index dff88244fee..944b32278b6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertexTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertexTest.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.executiongraph; +import org.apache.flink.api.common.JobStatus; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -34,6 +35,7 @@ import org.junit.jupiter.api.extension.RegisterExtension; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; import static org.assertj.core.api.Assertions.assertThat; @@ -124,6 +126,30 @@ class SpeculativeExecutionVertexTest { .containsExactly(e1.getAttemptId(), e2.getAttemptId()); } + @Test + void testVertexTerminationAndJobTermination() throws Exception { + final JobVertex jobVertex = ExecutionGraphTestUtils.createNoOpVertex(1); + final JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(jobVertex); + final ExecutionGraph eg = createExecutionGraph(jobGraph); + eg.transitionToRunning(); + + final SpeculativeExecutionVertex ev = + (SpeculativeExecutionVertex) + eg.getJobVertex(jobVertex.getID()).getTaskVertices()[0]; + final Execution e1 = ev.getCurrentExecutionAttempt(); + final Execution e2 = ev.createNewSpeculativeExecution(System.currentTimeMillis()); + final CompletableFuture<?> terminationFuture = ev.getTerminationFuture(); + + e1.transitionState(ExecutionState.RUNNING); + e1.markFinished(); + assertThat(terminationFuture.isDone()).isFalse(); + assertThat(eg.getState()).isSameAs(JobStatus.RUNNING); + + e2.cancel(); + assertThat(terminationFuture.isDone()).isTrue(); + assertThat(eg.getState()).isSameAs(JobStatus.FINISHED); + } + @Test void testArchiveFailedExecutions() throws Exception { final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
