This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit c43e7915ffe2932a266e6aa2294724bb39a603d1 Author: Zdenek Tison <[email protected]> AuthorDate: Mon Aug 12 13:43:53 2024 +0200 [FLINK-36013] [runtime] Introduce the transition from Restarting to CreatingExecutionGraph state --- .../scheduler/adaptive/AdaptiveScheduler.java | 2 + .../runtime/scheduler/adaptive/Executing.java | 1 + .../scheduler/adaptive/FailureResultUtil.java | 1 + .../runtime/scheduler/adaptive/Restarting.java | 32 +++++++---- .../scheduler/adaptive/StateTransitions.java | 4 ++ .../runtime/scheduler/adaptive/ExecutingTest.java | 36 +++++++++++-- .../runtime/scheduler/adaptive/RestartingTest.java | 62 +++++++++++++++++----- .../scheduler/adaptive/StopWithSavepointTest.java | 10 +++- 8 files changed, 120 insertions(+), 28 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index 7ef897936d2..5686c0e47ed 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -1229,6 +1229,7 @@ public class AdaptiveScheduler ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler, Duration backoffTime, + boolean forcedRestart, List<ExceptionHistoryEntry> failureCollection) { for (ExecutionVertex executionVertex : executionGraph.getAllExecutionVertices()) { @@ -1249,6 +1250,7 @@ public class AdaptiveScheduler operatorCoordinatorHandler, LOG, backoffTime, + forcedRestart, userCodeClassLoader, failureCollection)); numRestarts++; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java index bebcbbbb8a4..6139767a5c5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java @@ -156,6 +156,7 @@ class Executing extends StateWithExecutionGraph getExecutionGraphHandler(), getOperatorCoordinatorHandler(), Duration.ofMillis(0L), + true, getFailures()); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/FailureResultUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/FailureResultUtil.java index eb2c64eae99..bc16cafbf14 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/FailureResultUtil.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/FailureResultUtil.java @@ -30,6 +30,7 @@ public class FailureResultUtil { sweg.getExecutionGraphHandler(), sweg.getOperatorCoordinatorHandler(), failureResult.getBackoffTime(), + false, sweg.getFailures()); } else { sweg.getLogger().info("Failing job.", failureResult.getFailureCause()); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java index f647967edb4..1dd3f29778f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java @@ -42,7 +42,9 @@ class Restarting extends StateWithExecutionGraph { private final Duration backoffTime; - @Nullable private ScheduledFuture<?> goToWaitingForResourcesFuture; + @Nullable private ScheduledFuture<?> goToSubsequentStateFuture; + + private final boolean forcedRestart; Restarting( Context context, @@ -51,6 +53,7 @@ class Restarting extends StateWithExecutionGraph { OperatorCoordinatorHandler operatorCoordinatorHandler, Logger logger, Duration backoffTime, + boolean forcedRestart, ClassLoader userCodeClassLoader, List<ExceptionHistoryEntry> failureCollection) { super( @@ -63,14 +66,15 @@ class Restarting extends StateWithExecutionGraph { failureCollection); this.context = context; this.backoffTime = backoffTime; + this.forcedRestart = forcedRestart; getExecutionGraph().cancel(); } @Override public void onLeave(Class<? extends State> newState) { - if (goToWaitingForResourcesFuture != null) { - goToWaitingForResourcesFuture.cancel(false); + if (goToSubsequentStateFuture != null) { + goToSubsequentStateFuture.cancel(false); } super.onLeave(newState); @@ -103,18 +107,24 @@ class Restarting extends StateWithExecutionGraph { @Override void onGloballyTerminalState(JobStatus globallyTerminalState) { Preconditions.checkArgument(globallyTerminalState == JobStatus.CANCELED); - goToWaitingForResourcesFuture = - context.runIfState( - this, - () -> context.goToWaitingForResources(getExecutionGraph()), - backoffTime); + goToSubsequentStateFuture = + context.runIfState(this, this::goToSubsequentState, backoffTime); + } + + private void goToSubsequentState() { + if (forcedRestart) { + context.goToCreatingExecutionGraph(getExecutionGraph()); + } else { + context.goToWaitingForResources(getExecutionGraph()); + } } /** Context of the {@link Restarting} state. */ interface Context extends StateWithExecutionGraph.Context, StateTransitions.ToCancelling, - StateTransitions.ToWaitingForResources { + StateTransitions.ToWaitingForResources, + StateTransitions.ToCreatingExecutionGraph { /** * Runs the given action after the specified delay if the state is the expected state at @@ -137,6 +147,7 @@ class Restarting extends StateWithExecutionGraph { private final ExecutionGraphHandler executionGraphHandler; private final OperatorCoordinatorHandler operatorCoordinatorHandler; private final Duration backoffTime; + private final boolean forcedRestart; private final ClassLoader userCodeClassLoader; private final List<ExceptionHistoryEntry> failureCollection; @@ -147,6 +158,7 @@ class Restarting extends StateWithExecutionGraph { OperatorCoordinatorHandler operatorCoordinatorHandler, Logger log, Duration backoffTime, + boolean forcedRestart, ClassLoader userCodeClassLoader, List<ExceptionHistoryEntry> failureCollection) { this.context = context; @@ -155,6 +167,7 @@ class Restarting extends StateWithExecutionGraph { this.executionGraphHandler = executionGraphHandler; this.operatorCoordinatorHandler = operatorCoordinatorHandler; this.backoffTime = backoffTime; + this.forcedRestart = forcedRestart; this.userCodeClassLoader = userCodeClassLoader; this.failureCollection = failureCollection; } @@ -171,6 +184,7 @@ class Restarting extends StateWithExecutionGraph { operatorCoordinatorHandler, log, backoffTime, + forcedRestart, userCodeClassLoader, failureCollection); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateTransitions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateTransitions.java index f6881be4dde..7f949afeee1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateTransitions.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateTransitions.java @@ -128,6 +128,9 @@ public interface StateTransitions { * Restarting} state * @param backoffTime backoffTime to wait before transitioning to the {@link Restarting} * state + * @param forcedRestart if the {@link WaitingForResources} state should be omitted and the + * {@link CreatingExecutionGraph} state should be entered directly from the {@link + * Restarting} state * @param failureCollection collection of failures that are propagated */ void goToRestarting( @@ -135,6 +138,7 @@ public interface StateTransitions { ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler, Duration backoffTime, + boolean forcedRestart, List<ExceptionHistoryEntry> failureCollection); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java index 4f81ec7346c..34183a07940 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java @@ -356,8 +356,10 @@ class ExecutingTest { try (MockExecutingContext ctx = new MockExecutingContext()) { Executing exec = new ExecutingStateBuilder().build(ctx); ctx.setExpectRestarting( - restartingArguments -> - assertThat(restartingArguments.getBackoffTime()).isEqualTo(duration)); + restartingArguments -> { + assertThat(restartingArguments.getBackoffTime()).isEqualTo(duration); + assertThat(restartingArguments.isForcedRestart()).isFalse(); + }); ctx.setHowToHandleFailure(f -> FailureResult.canRestart(f, duration)); exec.handleGlobalFailure( new RuntimeException("Recoverable error"), @@ -439,7 +441,11 @@ class ExecutingTest { .setExecutionGraph(returnsFailedStateExecutionGraph) .build(ctx); ctx.setHowToHandleFailure(failure -> FailureResult.canRestart(failure, Duration.ZERO)); - ctx.setExpectRestarting(assertNonNull()); + ctx.setExpectRestarting( + restartingArguments -> { + assertThat(restartingArguments).isNotNull(); + assertThat(restartingArguments.isForcedRestart()).isFalse(); + }); Exception exception = new RuntimeException(); TestingAccessExecution execution = @@ -611,6 +617,17 @@ class ExecutingTest { } } + @Test + public void testOmitsWaitingForResourcesStateWhenRestarting() throws Exception { + try (MockExecutingContext ctx = new MockExecutingContext()) { + final Executing testInstance = new ExecutingStateBuilder().build(ctx); + ctx.setExpectRestarting( + restartingArguments -> + assertThat(restartingArguments.isForcedRestart()).isTrue()); + testInstance.transitionToSubsequentState(); + } + } + @ParameterizedTest @ValueSource(booleans = {true, false}) public void testInternalParallelismChangeBehavior(boolean parallelismChanged) throws Exception { @@ -815,13 +832,15 @@ class ExecutingTest { ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler, Duration backoffTime, + boolean forcedRestart, List<ExceptionHistoryEntry> failureCollection) { restartingStateValidator.validateInput( new RestartingArguments( executionGraph, executionGraphHandler, operatorCoordinatorHandler, - backoffTime)); + backoffTime, + forcedRestart)); hadStateTransition = true; } @@ -937,19 +956,26 @@ class ExecutingTest { static class RestartingArguments extends CancellingArguments { private final Duration backoffTime; + private final boolean forcedRestart; public RestartingArguments( ExecutionGraph executionGraph, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler, - Duration backoffTime) { + Duration backoffTime, + boolean forcedRestart) { super(executionGraph, executionGraphHandler, operatorCoordinatorHandler); this.backoffTime = backoffTime; + this.forcedRestart = forcedRestart; } public Duration getBackoffTime() { return backoffTime; } + + public boolean isForcedRestart() { + return forcedRestart; + } } static class FailingArguments extends CancellingArguments { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java index 97a4bc2c1a9..77d9c8a2373 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java @@ -20,8 +20,6 @@ package org.apache.flink.runtime.scheduler.adaptive; import org.apache.flink.api.common.JobStatus; import org.apache.flink.core.testutils.CompletedScheduledFuture; -import org.apache.flink.runtime.JobException; -import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.failure.FailureEnricherUtils; import org.apache.flink.runtime.scheduler.ExecutionGraphHandler; @@ -30,9 +28,13 @@ import org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.time.Duration; import java.util.ArrayList; import java.util.List; @@ -57,11 +59,17 @@ class RestartingTest { } } - @Test - void testTransitionToWaitingForResourcesWhenCancellationComplete() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testTransitionToSubsequentStateWhenCancellationComplete(boolean forcedRestart) + throws Exception { try (MockRestartingContext ctx = new MockRestartingContext()) { - Restarting restarting = createRestartingState(ctx); - ctx.setExpectWaitingForResources(); + Restarting restarting = createRestartingState(ctx, forcedRestart); + if (forcedRestart) { + ctx.setExpectCreatingExecutionGraph(); + } else { + ctx.setExpectWaitingForResources(); + } restarting.onGloballyTerminalState(JobStatus.CANCELED); } } @@ -114,15 +122,22 @@ class RestartingTest { } } - @Test - void testStateDoesNotExposeGloballyTerminalExecutionGraph() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testStateDoesNotExposeGloballyTerminalExecutionGraph(boolean forcedRestart) + throws Exception { try (MockRestartingContext ctx = new MockRestartingContext()) { StateTrackingMockExecutionGraph mockExecutionGraph = new StateTrackingMockExecutionGraph(); - Restarting restarting = createRestartingState(ctx, mockExecutionGraph); + Restarting restarting = createRestartingState(ctx, mockExecutionGraph, forcedRestart); // ideally we'd just delay the state transitions, but the context does not support that - ctx.setExpectWaitingForResources(); + if (forcedRestart) { + ctx.setExpectCreatingExecutionGraph(); + } else { + ctx.setExpectWaitingForResources(); + } + mockExecutionGraph.completeTerminationFuture(JobStatus.CANCELED); // this is just a sanity check for the test @@ -134,8 +149,17 @@ class RestartingTest { } } + public Restarting createRestartingState(MockRestartingContext ctx, boolean forcedRestart) { + return createRestartingState(ctx, new StateTrackingMockExecutionGraph(), forcedRestart); + } + public Restarting createRestartingState( MockRestartingContext ctx, ExecutionGraph executionGraph) { + return createRestartingState(ctx, executionGraph, false); + } + + public Restarting createRestartingState( + MockRestartingContext ctx, ExecutionGraph executionGraph, boolean forcedRestart) { final ExecutionGraphHandler executionGraphHandler = new ExecutionGraphHandler( executionGraph, @@ -152,12 +176,12 @@ class RestartingTest { operatorCoordinatorHandler, log, Duration.ZERO, + forcedRestart, ClassLoader.getSystemClassLoader(), new ArrayList<>()); } - public Restarting createRestartingState(MockRestartingContext ctx) - throws JobException, JobExecutionException { + public Restarting createRestartingState(MockRestartingContext ctx) { return createRestartingState(ctx, new StateTrackingMockExecutionGraph()); } @@ -170,6 +194,9 @@ class RestartingTest { private final StateValidator<Void> waitingForResourcesStateValidator = new StateValidator<>("WaitingForResources"); + private final StateValidator<ExecutionGraph> creatingExecutionGraphStateValidator = + new StateValidator<>("CreatingExecutionGraph"); + public void setExpectCancelling(Consumer<ExecutingTest.CancellingArguments> asserter) { cancellingStateValidator.expectInput(asserter); } @@ -178,6 +205,10 @@ class RestartingTest { waitingForResourcesStateValidator.expectInput((none) -> {}); } + public void setExpectCreatingExecutionGraph() { + creatingExecutionGraphStateValidator.expectInput(assertNonNull()); + } + @Override public void goToCanceling( ExecutionGraph executionGraph, @@ -199,6 +230,12 @@ class RestartingTest { hadStateTransition = true; } + @Override + public void goToCreatingExecutionGraph(@Nullable ExecutionGraph previousExecutionGraph) { + creatingExecutionGraphStateValidator.validateInput(previousExecutionGraph); + hadStateTransition = true; + } + @Override public ScheduledFuture<?> runIfState(State expectedState, Runnable action, Duration delay) { if (!hadStateTransition) { @@ -212,6 +249,7 @@ class RestartingTest { super.close(); cancellingStateValidator.close(); waitingForResourcesStateValidator.close(); + creatingExecutionGraphStateValidator.close(); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java index 383c3bf57ac..33744f6c50a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java @@ -264,7 +264,11 @@ class StopWithSavepointTest { ctx.setStopWithSavepoint(sws); ctx.setHowToHandleFailure(failure -> FailureResult.canRestart(failure, Duration.ZERO)); - ctx.setExpectRestarting(assertNonNull()); + ctx.setExpectRestarting( + (restartingArguments) -> { + assertThat(restartingArguments).isNotNull(); + assertThat(restartingArguments.isForcedRestart()).isFalse(); + }); Exception exception = new RuntimeException(); TestingAccessExecution execution = @@ -576,6 +580,7 @@ class StopWithSavepointTest { ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler, Duration backoffTime, + boolean forcedRestart, List<ExceptionHistoryEntry> failureCollection) { if (hadStateTransition) { throw new IllegalStateException("Only one state transition is allowed."); @@ -586,7 +591,8 @@ class StopWithSavepointTest { executionGraph, executionGraphHandler, operatorCoordinatorHandler, - backoffTime)); + backoffTime, + forcedRestart)); hadStateTransition = true; }
