This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c43e7915ffe2932a266e6aa2294724bb39a603d1
Author: Zdenek Tison <[email protected]>
AuthorDate: Mon Aug 12 13:43:53 2024 +0200

    [FLINK-36013] [runtime] Introduce the transition from Restarting to 
CreatingExecutionGraph state
---
 .../scheduler/adaptive/AdaptiveScheduler.java      |  2 +
 .../runtime/scheduler/adaptive/Executing.java      |  1 +
 .../scheduler/adaptive/FailureResultUtil.java      |  1 +
 .../runtime/scheduler/adaptive/Restarting.java     | 32 +++++++----
 .../scheduler/adaptive/StateTransitions.java       |  4 ++
 .../runtime/scheduler/adaptive/ExecutingTest.java  | 36 +++++++++++--
 .../runtime/scheduler/adaptive/RestartingTest.java | 62 +++++++++++++++++-----
 .../scheduler/adaptive/StopWithSavepointTest.java  | 10 +++-
 8 files changed, 120 insertions(+), 28 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index 7ef897936d2..5686c0e47ed 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -1229,6 +1229,7 @@ public class AdaptiveScheduler
             ExecutionGraphHandler executionGraphHandler,
             OperatorCoordinatorHandler operatorCoordinatorHandler,
             Duration backoffTime,
+            boolean forcedRestart,
             List<ExceptionHistoryEntry> failureCollection) {
 
         for (ExecutionVertex executionVertex : 
executionGraph.getAllExecutionVertices()) {
@@ -1249,6 +1250,7 @@ public class AdaptiveScheduler
                         operatorCoordinatorHandler,
                         LOG,
                         backoffTime,
+                        forcedRestart,
                         userCodeClassLoader,
                         failureCollection));
         numRestarts++;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
index bebcbbbb8a4..6139767a5c5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
@@ -156,6 +156,7 @@ class Executing extends StateWithExecutionGraph
                 getExecutionGraphHandler(),
                 getOperatorCoordinatorHandler(),
                 Duration.ofMillis(0L),
+                true,
                 getFailures());
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/FailureResultUtil.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/FailureResultUtil.java
index eb2c64eae99..bc16cafbf14 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/FailureResultUtil.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/FailureResultUtil.java
@@ -30,6 +30,7 @@ public class FailureResultUtil {
                     sweg.getExecutionGraphHandler(),
                     sweg.getOperatorCoordinatorHandler(),
                     failureResult.getBackoffTime(),
+                    false,
                     sweg.getFailures());
         } else {
             sweg.getLogger().info("Failing job.", 
failureResult.getFailureCause());
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java
index f647967edb4..1dd3f29778f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java
@@ -42,7 +42,9 @@ class Restarting extends StateWithExecutionGraph {
 
     private final Duration backoffTime;
 
-    @Nullable private ScheduledFuture<?> goToWaitingForResourcesFuture;
+    @Nullable private ScheduledFuture<?> goToSubsequentStateFuture;
+
+    private final boolean forcedRestart;
 
     Restarting(
             Context context,
@@ -51,6 +53,7 @@ class Restarting extends StateWithExecutionGraph {
             OperatorCoordinatorHandler operatorCoordinatorHandler,
             Logger logger,
             Duration backoffTime,
+            boolean forcedRestart,
             ClassLoader userCodeClassLoader,
             List<ExceptionHistoryEntry> failureCollection) {
         super(
@@ -63,14 +66,15 @@ class Restarting extends StateWithExecutionGraph {
                 failureCollection);
         this.context = context;
         this.backoffTime = backoffTime;
+        this.forcedRestart = forcedRestart;
 
         getExecutionGraph().cancel();
     }
 
     @Override
     public void onLeave(Class<? extends State> newState) {
-        if (goToWaitingForResourcesFuture != null) {
-            goToWaitingForResourcesFuture.cancel(false);
+        if (goToSubsequentStateFuture != null) {
+            goToSubsequentStateFuture.cancel(false);
         }
 
         super.onLeave(newState);
@@ -103,18 +107,24 @@ class Restarting extends StateWithExecutionGraph {
     @Override
     void onGloballyTerminalState(JobStatus globallyTerminalState) {
         Preconditions.checkArgument(globallyTerminalState == 
JobStatus.CANCELED);
-        goToWaitingForResourcesFuture =
-                context.runIfState(
-                        this,
-                        () -> 
context.goToWaitingForResources(getExecutionGraph()),
-                        backoffTime);
+        goToSubsequentStateFuture =
+                context.runIfState(this, this::goToSubsequentState, 
backoffTime);
+    }
+
+    private void goToSubsequentState() {
+        if (forcedRestart) {
+            context.goToCreatingExecutionGraph(getExecutionGraph());
+        } else {
+            context.goToWaitingForResources(getExecutionGraph());
+        }
     }
 
     /** Context of the {@link Restarting} state. */
     interface Context
             extends StateWithExecutionGraph.Context,
                     StateTransitions.ToCancelling,
-                    StateTransitions.ToWaitingForResources {
+                    StateTransitions.ToWaitingForResources,
+                    StateTransitions.ToCreatingExecutionGraph {
 
         /**
          * Runs the given action after the specified delay if the state is the 
expected state at
@@ -137,6 +147,7 @@ class Restarting extends StateWithExecutionGraph {
         private final ExecutionGraphHandler executionGraphHandler;
         private final OperatorCoordinatorHandler operatorCoordinatorHandler;
         private final Duration backoffTime;
+        private final boolean forcedRestart;
         private final ClassLoader userCodeClassLoader;
         private final List<ExceptionHistoryEntry> failureCollection;
 
@@ -147,6 +158,7 @@ class Restarting extends StateWithExecutionGraph {
                 OperatorCoordinatorHandler operatorCoordinatorHandler,
                 Logger log,
                 Duration backoffTime,
+                boolean forcedRestart,
                 ClassLoader userCodeClassLoader,
                 List<ExceptionHistoryEntry> failureCollection) {
             this.context = context;
@@ -155,6 +167,7 @@ class Restarting extends StateWithExecutionGraph {
             this.executionGraphHandler = executionGraphHandler;
             this.operatorCoordinatorHandler = operatorCoordinatorHandler;
             this.backoffTime = backoffTime;
+            this.forcedRestart = forcedRestart;
             this.userCodeClassLoader = userCodeClassLoader;
             this.failureCollection = failureCollection;
         }
@@ -171,6 +184,7 @@ class Restarting extends StateWithExecutionGraph {
                     operatorCoordinatorHandler,
                     log,
                     backoffTime,
+                    forcedRestart,
                     userCodeClassLoader,
                     failureCollection);
         }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateTransitions.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateTransitions.java
index f6881be4dde..7f949afeee1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateTransitions.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateTransitions.java
@@ -128,6 +128,9 @@ public interface StateTransitions {
          *     Restarting} state
          * @param backoffTime backoffTime to wait before transitioning to the 
{@link Restarting}
          *     state
+         * @param forcedRestart if the {@link WaitingForResources} state 
should be omitted and the
+         *     {@link CreatingExecutionGraph} state should be entered directly 
from the {@link
+         *     Restarting} state
          * @param failureCollection collection of failures that are propagated
          */
         void goToRestarting(
@@ -135,6 +138,7 @@ public interface StateTransitions {
                 ExecutionGraphHandler executionGraphHandler,
                 OperatorCoordinatorHandler operatorCoordinatorHandler,
                 Duration backoffTime,
+                boolean forcedRestart,
                 List<ExceptionHistoryEntry> failureCollection);
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
index 4f81ec7346c..34183a07940 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
@@ -356,8 +356,10 @@ class ExecutingTest {
         try (MockExecutingContext ctx = new MockExecutingContext()) {
             Executing exec = new ExecutingStateBuilder().build(ctx);
             ctx.setExpectRestarting(
-                    restartingArguments ->
-                            
assertThat(restartingArguments.getBackoffTime()).isEqualTo(duration));
+                    restartingArguments -> {
+                        
assertThat(restartingArguments.getBackoffTime()).isEqualTo(duration);
+                        
assertThat(restartingArguments.isForcedRestart()).isFalse();
+                    });
             ctx.setHowToHandleFailure(f -> FailureResult.canRestart(f, 
duration));
             exec.handleGlobalFailure(
                     new RuntimeException("Recoverable error"),
@@ -439,7 +441,11 @@ class ExecutingTest {
                             
.setExecutionGraph(returnsFailedStateExecutionGraph)
                             .build(ctx);
             ctx.setHowToHandleFailure(failure -> 
FailureResult.canRestart(failure, Duration.ZERO));
-            ctx.setExpectRestarting(assertNonNull());
+            ctx.setExpectRestarting(
+                    restartingArguments -> {
+                        assertThat(restartingArguments).isNotNull();
+                        
assertThat(restartingArguments.isForcedRestart()).isFalse();
+                    });
 
             Exception exception = new RuntimeException();
             TestingAccessExecution execution =
@@ -611,6 +617,17 @@ class ExecutingTest {
         }
     }
 
+    @Test
+    public void testOmitsWaitingForResourcesStateWhenRestarting() throws 
Exception {
+        try (MockExecutingContext ctx = new MockExecutingContext()) {
+            final Executing testInstance = new 
ExecutingStateBuilder().build(ctx);
+            ctx.setExpectRestarting(
+                    restartingArguments ->
+                            
assertThat(restartingArguments.isForcedRestart()).isTrue());
+            testInstance.transitionToSubsequentState();
+        }
+    }
+
     @ParameterizedTest
     @ValueSource(booleans = {true, false})
     public void testInternalParallelismChangeBehavior(boolean 
parallelismChanged) throws Exception {
@@ -815,13 +832,15 @@ class ExecutingTest {
                 ExecutionGraphHandler executionGraphHandler,
                 OperatorCoordinatorHandler operatorCoordinatorHandler,
                 Duration backoffTime,
+                boolean forcedRestart,
                 List<ExceptionHistoryEntry> failureCollection) {
             restartingStateValidator.validateInput(
                     new RestartingArguments(
                             executionGraph,
                             executionGraphHandler,
                             operatorCoordinatorHandler,
-                            backoffTime));
+                            backoffTime,
+                            forcedRestart));
             hadStateTransition = true;
         }
 
@@ -937,19 +956,26 @@ class ExecutingTest {
 
     static class RestartingArguments extends CancellingArguments {
         private final Duration backoffTime;
+        private final boolean forcedRestart;
 
         public RestartingArguments(
                 ExecutionGraph executionGraph,
                 ExecutionGraphHandler executionGraphHandler,
                 OperatorCoordinatorHandler operatorCoordinatorHandler,
-                Duration backoffTime) {
+                Duration backoffTime,
+                boolean forcedRestart) {
             super(executionGraph, executionGraphHandler, 
operatorCoordinatorHandler);
             this.backoffTime = backoffTime;
+            this.forcedRestart = forcedRestart;
         }
 
         public Duration getBackoffTime() {
             return backoffTime;
         }
+
+        public boolean isForcedRestart() {
+            return forcedRestart;
+        }
     }
 
     static class FailingArguments extends CancellingArguments {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java
index 97a4bc2c1a9..77d9c8a2373 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java
@@ -20,8 +20,6 @@ package org.apache.flink.runtime.scheduler.adaptive;
 
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.core.testutils.CompletedScheduledFuture;
-import org.apache.flink.runtime.JobException;
-import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.failure.FailureEnricherUtils;
 import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
@@ -30,9 +28,13 @@ import 
org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry
 import 
org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
 
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
@@ -57,11 +59,17 @@ class RestartingTest {
         }
     }
 
-    @Test
-    void testTransitionToWaitingForResourcesWhenCancellationComplete() throws 
Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void 
testTransitionToSubsequentStateWhenCancellationComplete(boolean forcedRestart)
+            throws Exception {
         try (MockRestartingContext ctx = new MockRestartingContext()) {
-            Restarting restarting = createRestartingState(ctx);
-            ctx.setExpectWaitingForResources();
+            Restarting restarting = createRestartingState(ctx, forcedRestart);
+            if (forcedRestart) {
+                ctx.setExpectCreatingExecutionGraph();
+            } else {
+                ctx.setExpectWaitingForResources();
+            }
             restarting.onGloballyTerminalState(JobStatus.CANCELED);
         }
     }
@@ -114,15 +122,22 @@ class RestartingTest {
         }
     }
 
-    @Test
-    void testStateDoesNotExposeGloballyTerminalExecutionGraph() throws 
Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testStateDoesNotExposeGloballyTerminalExecutionGraph(boolean 
forcedRestart)
+            throws Exception {
         try (MockRestartingContext ctx = new MockRestartingContext()) {
             StateTrackingMockExecutionGraph mockExecutionGraph =
                     new StateTrackingMockExecutionGraph();
-            Restarting restarting = createRestartingState(ctx, 
mockExecutionGraph);
+            Restarting restarting = createRestartingState(ctx, 
mockExecutionGraph, forcedRestart);
 
             // ideally we'd just delay the state transitions, but the context 
does not support that
-            ctx.setExpectWaitingForResources();
+            if (forcedRestart) {
+                ctx.setExpectCreatingExecutionGraph();
+            } else {
+                ctx.setExpectWaitingForResources();
+            }
+
             mockExecutionGraph.completeTerminationFuture(JobStatus.CANCELED);
 
             // this is just a sanity check for the test
@@ -134,8 +149,17 @@ class RestartingTest {
         }
     }
 
+    public Restarting createRestartingState(MockRestartingContext ctx, boolean 
forcedRestart) {
+        return createRestartingState(ctx, new 
StateTrackingMockExecutionGraph(), forcedRestart);
+    }
+
     public Restarting createRestartingState(
             MockRestartingContext ctx, ExecutionGraph executionGraph) {
+        return createRestartingState(ctx, executionGraph, false);
+    }
+
+    public Restarting createRestartingState(
+            MockRestartingContext ctx, ExecutionGraph executionGraph, boolean 
forcedRestart) {
         final ExecutionGraphHandler executionGraphHandler =
                 new ExecutionGraphHandler(
                         executionGraph,
@@ -152,12 +176,12 @@ class RestartingTest {
                 operatorCoordinatorHandler,
                 log,
                 Duration.ZERO,
+                forcedRestart,
                 ClassLoader.getSystemClassLoader(),
                 new ArrayList<>());
     }
 
-    public Restarting createRestartingState(MockRestartingContext ctx)
-            throws JobException, JobExecutionException {
+    public Restarting createRestartingState(MockRestartingContext ctx) {
         return createRestartingState(ctx, new 
StateTrackingMockExecutionGraph());
     }
 
@@ -170,6 +194,9 @@ class RestartingTest {
         private final StateValidator<Void> waitingForResourcesStateValidator =
                 new StateValidator<>("WaitingForResources");
 
+        private final StateValidator<ExecutionGraph> 
creatingExecutionGraphStateValidator =
+                new StateValidator<>("CreatingExecutionGraph");
+
         public void 
setExpectCancelling(Consumer<ExecutingTest.CancellingArguments> asserter) {
             cancellingStateValidator.expectInput(asserter);
         }
@@ -178,6 +205,10 @@ class RestartingTest {
             waitingForResourcesStateValidator.expectInput((none) -> {});
         }
 
+        public void setExpectCreatingExecutionGraph() {
+            creatingExecutionGraphStateValidator.expectInput(assertNonNull());
+        }
+
         @Override
         public void goToCanceling(
                 ExecutionGraph executionGraph,
@@ -199,6 +230,12 @@ class RestartingTest {
             hadStateTransition = true;
         }
 
+        @Override
+        public void goToCreatingExecutionGraph(@Nullable ExecutionGraph 
previousExecutionGraph) {
+            
creatingExecutionGraphStateValidator.validateInput(previousExecutionGraph);
+            hadStateTransition = true;
+        }
+
         @Override
         public ScheduledFuture<?> runIfState(State expectedState, Runnable 
action, Duration delay) {
             if (!hadStateTransition) {
@@ -212,6 +249,7 @@ class RestartingTest {
             super.close();
             cancellingStateValidator.close();
             waitingForResourcesStateValidator.close();
+            creatingExecutionGraphStateValidator.close();
         }
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java
index 383c3bf57ac..33744f6c50a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java
@@ -264,7 +264,11 @@ class StopWithSavepointTest {
             ctx.setStopWithSavepoint(sws);
             ctx.setHowToHandleFailure(failure -> 
FailureResult.canRestart(failure, Duration.ZERO));
 
-            ctx.setExpectRestarting(assertNonNull());
+            ctx.setExpectRestarting(
+                    (restartingArguments) -> {
+                        assertThat(restartingArguments).isNotNull();
+                        
assertThat(restartingArguments.isForcedRestart()).isFalse();
+                    });
 
             Exception exception = new RuntimeException();
             TestingAccessExecution execution =
@@ -576,6 +580,7 @@ class StopWithSavepointTest {
                 ExecutionGraphHandler executionGraphHandler,
                 OperatorCoordinatorHandler operatorCoordinatorHandler,
                 Duration backoffTime,
+                boolean forcedRestart,
                 List<ExceptionHistoryEntry> failureCollection) {
             if (hadStateTransition) {
                 throw new IllegalStateException("Only one state transition is 
allowed.");
@@ -586,7 +591,8 @@ class StopWithSavepointTest {
                             executionGraph,
                             executionGraphHandler,
                             operatorCoordinatorHandler,
-                            backoffTime));
+                            backoffTime,
+                            forcedRestart));
             hadStateTransition = true;
         }
 

Reply via email to