This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 83be264569d [FLINK-36168][runtime] Replaces goToFinished with the 
cancel method (#25305)
83be264569d is described below

commit 83be264569d3d8c66dc7b82c062e65f34e35d119
Author: Matthias Pohl <[email protected]>
AuthorDate: Thu Sep 12 17:44:32 2024 +0200

    [FLINK-36168][runtime] Replaces goToFinished with the cancel method (#25305)
    
    We have to transition to all the expected state transitions properly to 
handle all the cleanup. This also requires proper handling of the state 
transitions by the DummyState test implementations.
---
 .../scheduler/adaptive/AdaptiveSchedulerTest.java  | 89 ++++++++++++----------
 .../adaptive/WaitingForResourcesTest.java          | 15 ++--
 2 files changed, 57 insertions(+), 47 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
index 49a58b286ac..c09fbfadcb1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
@@ -143,6 +143,7 @@ import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -206,24 +207,40 @@ public class AdaptiveSchedulerTest {
     }
 
     private static void closeInExecutorService(
-            @Nullable AdaptiveScheduler scheduler, ComponentMainThreadExecutor 
executor) {
+            @Nullable AdaptiveScheduler scheduler, Executor executor) {
         if (scheduler != null) {
             final CompletableFuture<Void> closeFuture = new 
CompletableFuture<>();
             executor.execute(
                     () -> {
                         try {
-                            // no matter what state the scheduler is in; we 
have to go to Finished
-                            // state to please the Preconditions of the close 
call
-                            if (scheduler.getState().getClass() != 
Finished.class) {
-                                scheduler.goToFinished(
-                                        scheduler.getArchivedExecutionGraph(
-                                                JobStatus.CANCELED, null));
-                            }
+                            scheduler.cancel();
+
                             FutureUtils.forward(scheduler.closeAsync(), 
closeFuture);
                         } catch (Throwable t) {
                             closeFuture.completeExceptionally(t);
                         }
                     });
+
+            // we have to wait for the job termination outside the main thread 
because the
+            // cancellation tasks are scheduled on the main thread as well.
+            scheduler
+                    .getJobTerminationFuture()
+                    .whenCompleteAsync(
+                            (jobStatus, error) -> {
+                                assertThat(scheduler.getState().getClass())
+                                        .isEqualTo(Finished.class);
+
+                                if (error != null) {
+                                    closeFuture.completeExceptionally(error);
+                                } else {
+                                    try {
+                                        
FutureUtils.forward(scheduler.closeAsync(), closeFuture);
+                                    } catch (Throwable t) {
+                                        closeFuture.completeExceptionally(t);
+                                    }
+                                }
+                            },
+                            executor);
             assertThatFuture(closeFuture).eventuallySucceeds();
         }
     }
@@ -310,7 +327,7 @@ public class AdaptiveSchedulerTest {
         final State state = scheduler.getState();
 
         assertThat(scheduler.isState(state)).isTrue();
-        assertThat(scheduler.isState(new DummyState())).isFalse();
+        assertThat(scheduler.isState(new DummyState(scheduler))).isFalse();
     }
 
     @Test
@@ -337,7 +354,7 @@ public class AdaptiveSchedulerTest {
                         .build();
 
         AtomicBoolean ran = new AtomicBoolean(false);
-        scheduler.runIfState(new DummyState(), () -> ran.set(true));
+        scheduler.runIfState(new DummyState(scheduler), () -> ran.set(true));
         assertThat(ran.get()).isFalse();
     }
 
@@ -891,7 +908,9 @@ public class AdaptiveSchedulerTest {
 
         // transition into next state, for which the job state is still 
INITIALIZING
         runInMainThread(
-                () -> scheduler.transitionToState(new 
DummyState.Factory(JobStatus.INITIALIZING)));
+                () ->
+                        scheduler.transitionToState(
+                                new DummyState.Factory(scheduler, 
JobStatus.INITIALIZING)));
 
         assertThat(numStatusUpdates).hasValue(0);
     }
@@ -1021,13 +1040,14 @@ public class AdaptiveSchedulerTest {
                                 EXECUTOR_RESOURCE.getExecutor())
                         .build();
 
-        final LifecycleMethodCapturingState firstState = new 
LifecycleMethodCapturingState();
+        final LifecycleMethodCapturingState firstState =
+                new LifecycleMethodCapturingState(scheduler);
 
         runInMainThread(() -> scheduler.transitionToState(new 
StateInstanceFactory(firstState)));
 
         firstState.reset();
 
-        runInMainThread(() -> scheduler.transitionToState(new 
DummyState.Factory()));
+        runInMainThread(() -> scheduler.transitionToState(new 
DummyState.Factory(scheduler)));
 
         assertThat(firstState.onLeaveCalled).isTrue();
         
assertThat(firstState.onLeaveNewStateArgument.equals(DummyState.class)).isTrue();
@@ -2508,6 +2528,10 @@ public class AdaptiveSchedulerTest {
         boolean onLeaveCalled = false;
         @Nullable Class<? extends State> onLeaveNewStateArgument = null;
 
+        public LifecycleMethodCapturingState(Context context) {
+            super(context);
+        }
+
         void reset() {
             onLeaveCalled = false;
             onLeaveNewStateArgument = null;
@@ -2607,52 +2631,35 @@ public class AdaptiveSchedulerTest {
         }
     }
 
-    static class DummyState implements State {
+    static class DummyState extends StateWithoutExecutionGraph {
 
         private final JobStatus jobStatus;
 
-        public DummyState() {
-            this(JobStatus.RUNNING);
+        public DummyState(StateWithoutExecutionGraph.Context context) {
+            this(context, JobStatus.RUNNING);
         }
 
-        public DummyState(JobStatus jobStatus) {
+        public DummyState(StateWithoutExecutionGraph.Context context, 
JobStatus jobStatus) {
+            super(context, AdaptiveSchedulerTest.LOG);
             this.jobStatus = jobStatus;
         }
 
-        @Override
-        public void cancel() {}
-
-        @Override
-        public void suspend(Throwable cause) {}
-
         @Override
         public JobStatus getJobStatus() {
             return jobStatus;
         }
 
-        @Override
-        public ArchivedExecutionGraph getJob() {
-            return null;
-        }
-
-        @Override
-        public void handleGlobalFailure(
-                Throwable cause, CompletableFuture<Map<String, String>> 
failureLabels) {}
-
-        @Override
-        public Logger getLogger() {
-            return null;
-        }
-
         private static class Factory implements StateFactory<DummyState> {
 
+            private final StateWithoutExecutionGraph.Context context;
             private final JobStatus jobStatus;
 
-            public Factory() {
-                this(JobStatus.RUNNING);
+            public Factory(StateWithoutExecutionGraph.Context context) {
+                this(context, JobStatus.RUNNING);
             }
 
-            public Factory(JobStatus jobStatus) {
+            public Factory(StateWithoutExecutionGraph.Context context, 
JobStatus jobStatus) {
+                this.context = context;
                 this.jobStatus = jobStatus;
             }
 
@@ -2663,7 +2670,7 @@ public class AdaptiveSchedulerTest {
 
             @Override
             public DummyState getState() {
-                return new DummyState(jobStatus);
+                return new DummyState(context, jobStatus);
             }
         }
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java
index a7b40cc50bf..b9d80e05cba 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java
@@ -220,13 +220,15 @@ class WaitingForResourcesTest {
                 };
 
         ctx.runIfState(
-                new AdaptiveSchedulerTest.DummyState(),
+                new AdaptiveSchedulerTest.DummyState(ctx),
                 runLastBecauseOfHighDelay,
                 Duration.ofMillis(999));
         ctx.runIfState(
-                new AdaptiveSchedulerTest.DummyState(), 
runFirstBecauseOfLowDelay, Duration.ZERO);
+                new AdaptiveSchedulerTest.DummyState(ctx),
+                runFirstBecauseOfLowDelay,
+                Duration.ZERO);
         ctx.runIfState(
-                new AdaptiveSchedulerTest.DummyState(),
+                new AdaptiveSchedulerTest.DummyState(ctx),
                 runSecondBecauseOfScheduleOrder,
                 Duration.ZERO);
 
@@ -244,7 +246,7 @@ class WaitingForResourcesTest {
                     executed.set(true);
                 };
 
-        ctx.runIfState(new AdaptiveSchedulerTest.DummyState(), executeOnce, 
Duration.ZERO);
+        ctx.runIfState(new AdaptiveSchedulerTest.DummyState(ctx), executeOnce, 
Duration.ZERO);
 
         // execute at least twice
         ctx.runScheduledTasks();
@@ -256,14 +258,15 @@ class WaitingForResourcesTest {
     void testInternalRunScheduledTasks_upperBoundRespected() {
         Runnable executeNever = () -> fail("Not expected");
 
-        ctx.runIfState(new AdaptiveSchedulerTest.DummyState(), executeNever, 
Duration.ofMillis(10));
+        ctx.runIfState(
+                new AdaptiveSchedulerTest.DummyState(ctx), executeNever, 
Duration.ofMillis(10));
 
         ctx.runScheduledTasks(4);
     }
 
     @Test
     void testInternalRunScheduledTasks_scheduleTaskFromRunnable() {
-        final State state = new AdaptiveSchedulerTest.DummyState();
+        final State state = new AdaptiveSchedulerTest.DummyState(ctx);
 
         AtomicBoolean executed = new AtomicBoolean(false);
         ctx.runIfState(

Reply via email to