This is an automated email from the ASF dual-hosted git repository.
mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 83be264569d [FLINK-36168][runtime] Replaces goToFinished with the
cancel method (#25305)
83be264569d is described below
commit 83be264569d3d8c66dc7b82c062e65f34e35d119
Author: Matthias Pohl <[email protected]>
AuthorDate: Thu Sep 12 17:44:32 2024 +0200
[FLINK-36168][runtime] Replaces goToFinished with the cancel method (#25305)
We have to transition to all the expected state transitions properly to
handle all the cleanup. This also requires proper handling of the state
transitions by the DummyState test implementations.
---
.../scheduler/adaptive/AdaptiveSchedulerTest.java | 89 ++++++++++++----------
.../adaptive/WaitingForResourcesTest.java | 15 ++--
2 files changed, 57 insertions(+), 47 deletions(-)
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
index 49a58b286ac..c09fbfadcb1 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
@@ -143,6 +143,7 @@ import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -206,24 +207,40 @@ public class AdaptiveSchedulerTest {
}
private static void closeInExecutorService(
- @Nullable AdaptiveScheduler scheduler, ComponentMainThreadExecutor
executor) {
+ @Nullable AdaptiveScheduler scheduler, Executor executor) {
if (scheduler != null) {
final CompletableFuture<Void> closeFuture = new
CompletableFuture<>();
executor.execute(
() -> {
try {
- // no matter what state the scheduler is in; we
have to go to Finished
- // state to please the Preconditions of the close
call
- if (scheduler.getState().getClass() !=
Finished.class) {
- scheduler.goToFinished(
- scheduler.getArchivedExecutionGraph(
- JobStatus.CANCELED, null));
- }
+ scheduler.cancel();
+
FutureUtils.forward(scheduler.closeAsync(),
closeFuture);
} catch (Throwable t) {
closeFuture.completeExceptionally(t);
}
});
+
+ // we have to wait for the job termination outside the main thread
because the
+ // cancellation tasks are scheduled on the main thread as well.
+ scheduler
+ .getJobTerminationFuture()
+ .whenCompleteAsync(
+ (jobStatus, error) -> {
+ assertThat(scheduler.getState().getClass())
+ .isEqualTo(Finished.class);
+
+ if (error != null) {
+ closeFuture.completeExceptionally(error);
+ } else {
+ try {
+
FutureUtils.forward(scheduler.closeAsync(), closeFuture);
+ } catch (Throwable t) {
+ closeFuture.completeExceptionally(t);
+ }
+ }
+ },
+ executor);
assertThatFuture(closeFuture).eventuallySucceeds();
}
}
@@ -310,7 +327,7 @@ public class AdaptiveSchedulerTest {
final State state = scheduler.getState();
assertThat(scheduler.isState(state)).isTrue();
- assertThat(scheduler.isState(new DummyState())).isFalse();
+ assertThat(scheduler.isState(new DummyState(scheduler))).isFalse();
}
@Test
@@ -337,7 +354,7 @@ public class AdaptiveSchedulerTest {
.build();
AtomicBoolean ran = new AtomicBoolean(false);
- scheduler.runIfState(new DummyState(), () -> ran.set(true));
+ scheduler.runIfState(new DummyState(scheduler), () -> ran.set(true));
assertThat(ran.get()).isFalse();
}
@@ -891,7 +908,9 @@ public class AdaptiveSchedulerTest {
// transition into next state, for which the job state is still
INITIALIZING
runInMainThread(
- () -> scheduler.transitionToState(new
DummyState.Factory(JobStatus.INITIALIZING)));
+ () ->
+ scheduler.transitionToState(
+ new DummyState.Factory(scheduler,
JobStatus.INITIALIZING)));
assertThat(numStatusUpdates).hasValue(0);
}
@@ -1021,13 +1040,14 @@ public class AdaptiveSchedulerTest {
EXECUTOR_RESOURCE.getExecutor())
.build();
- final LifecycleMethodCapturingState firstState = new
LifecycleMethodCapturingState();
+ final LifecycleMethodCapturingState firstState =
+ new LifecycleMethodCapturingState(scheduler);
runInMainThread(() -> scheduler.transitionToState(new
StateInstanceFactory(firstState)));
firstState.reset();
- runInMainThread(() -> scheduler.transitionToState(new
DummyState.Factory()));
+ runInMainThread(() -> scheduler.transitionToState(new
DummyState.Factory(scheduler)));
assertThat(firstState.onLeaveCalled).isTrue();
assertThat(firstState.onLeaveNewStateArgument.equals(DummyState.class)).isTrue();
@@ -2508,6 +2528,10 @@ public class AdaptiveSchedulerTest {
boolean onLeaveCalled = false;
@Nullable Class<? extends State> onLeaveNewStateArgument = null;
+ public LifecycleMethodCapturingState(Context context) {
+ super(context);
+ }
+
void reset() {
onLeaveCalled = false;
onLeaveNewStateArgument = null;
@@ -2607,52 +2631,35 @@ public class AdaptiveSchedulerTest {
}
}
- static class DummyState implements State {
+ static class DummyState extends StateWithoutExecutionGraph {
private final JobStatus jobStatus;
- public DummyState() {
- this(JobStatus.RUNNING);
+ public DummyState(StateWithoutExecutionGraph.Context context) {
+ this(context, JobStatus.RUNNING);
}
- public DummyState(JobStatus jobStatus) {
+ public DummyState(StateWithoutExecutionGraph.Context context,
JobStatus jobStatus) {
+ super(context, AdaptiveSchedulerTest.LOG);
this.jobStatus = jobStatus;
}
- @Override
- public void cancel() {}
-
- @Override
- public void suspend(Throwable cause) {}
-
@Override
public JobStatus getJobStatus() {
return jobStatus;
}
- @Override
- public ArchivedExecutionGraph getJob() {
- return null;
- }
-
- @Override
- public void handleGlobalFailure(
- Throwable cause, CompletableFuture<Map<String, String>>
failureLabels) {}
-
- @Override
- public Logger getLogger() {
- return null;
- }
-
private static class Factory implements StateFactory<DummyState> {
+ private final StateWithoutExecutionGraph.Context context;
private final JobStatus jobStatus;
- public Factory() {
- this(JobStatus.RUNNING);
+ public Factory(StateWithoutExecutionGraph.Context context) {
+ this(context, JobStatus.RUNNING);
}
- public Factory(JobStatus jobStatus) {
+ public Factory(StateWithoutExecutionGraph.Context context,
JobStatus jobStatus) {
+ this.context = context;
this.jobStatus = jobStatus;
}
@@ -2663,7 +2670,7 @@ public class AdaptiveSchedulerTest {
@Override
public DummyState getState() {
- return new DummyState(jobStatus);
+ return new DummyState(context, jobStatus);
}
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java
index a7b40cc50bf..b9d80e05cba 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java
@@ -220,13 +220,15 @@ class WaitingForResourcesTest {
};
ctx.runIfState(
- new AdaptiveSchedulerTest.DummyState(),
+ new AdaptiveSchedulerTest.DummyState(ctx),
runLastBecauseOfHighDelay,
Duration.ofMillis(999));
ctx.runIfState(
- new AdaptiveSchedulerTest.DummyState(),
runFirstBecauseOfLowDelay, Duration.ZERO);
+ new AdaptiveSchedulerTest.DummyState(ctx),
+ runFirstBecauseOfLowDelay,
+ Duration.ZERO);
ctx.runIfState(
- new AdaptiveSchedulerTest.DummyState(),
+ new AdaptiveSchedulerTest.DummyState(ctx),
runSecondBecauseOfScheduleOrder,
Duration.ZERO);
@@ -244,7 +246,7 @@ class WaitingForResourcesTest {
executed.set(true);
};
- ctx.runIfState(new AdaptiveSchedulerTest.DummyState(), executeOnce,
Duration.ZERO);
+ ctx.runIfState(new AdaptiveSchedulerTest.DummyState(ctx), executeOnce,
Duration.ZERO);
// execute at least twice
ctx.runScheduledTasks();
@@ -256,14 +258,15 @@ class WaitingForResourcesTest {
void testInternalRunScheduledTasks_upperBoundRespected() {
Runnable executeNever = () -> fail("Not expected");
- ctx.runIfState(new AdaptiveSchedulerTest.DummyState(), executeNever,
Duration.ofMillis(10));
+ ctx.runIfState(
+ new AdaptiveSchedulerTest.DummyState(ctx), executeNever,
Duration.ofMillis(10));
ctx.runScheduledTasks(4);
}
@Test
void testInternalRunScheduledTasks_scheduleTaskFromRunnable() {
- final State state = new AdaptiveSchedulerTest.DummyState();
+ final State state = new AdaptiveSchedulerTest.DummyState(ctx);
AtomicBoolean executed = new AtomicBoolean(false);
ctx.runIfState(