This is an automated email from the ASF dual-hosted git repository.
panyuepeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new bfd13de1520 [FLINK-38406][runtime] Fix
DefaultSchedulerCheckpointCoordinatorTest failures (#27061)
bfd13de1520 is described below
commit bfd13de1520f842c03b73939ecd651dd3005ebdc
Author: Mate Czagany <[email protected]>
AuthorDate: Mon Jan 12 11:13:49 2026 +0100
[FLINK-38406][runtime] Fix DefaultSchedulerCheckpointCoordinatorTest
failures (#27061)
* [FLINK-38406][runtime] Fix DefaultSchedulerCheckpointCoordinatorTest
failures
* [FLINK-38406][runtime] Run ExecutionGraph operations from main thread
---
.../DefaultSchedulerCheckpointCoordinatorTest.java | 56 +++++++++++++++-------
1 file changed, 38 insertions(+), 18 deletions(-)
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultSchedulerCheckpointCoordinatorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultSchedulerCheckpointCoordinatorTest.java
index 363fe516b8e..51562ef19ab 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultSchedulerCheckpointCoordinatorTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultSchedulerCheckpointCoordinatorTest.java
@@ -19,11 +19,11 @@
package org.apache.flink.runtime.checkpoint;
import org.apache.flink.api.common.JobStatus;
-import
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import
org.apache.flink.runtime.executiongraph.TestingComponentMainThreadExecutor;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -56,6 +56,13 @@ class DefaultSchedulerCheckpointCoordinatorTest {
private static final TestExecutorExtension<ScheduledExecutorService>
EXECUTOR_EXTENSION =
TestingUtils.defaultExecutorExtension();
+ @RegisterExtension
+ static final TestingComponentMainThreadExecutor.Extension
MAIN_EXECUTOR_RESOURCE =
+ new TestingComponentMainThreadExecutor.Extension();
+
+ private final TestingComponentMainThreadExecutor mainThreadExecutor =
+ MAIN_EXECUTOR_RESOURCE.getComponentMainThreadTestExecutor();
+
/** Tests that the checkpoint coordinator is shut down if the execution
graph is failed. */
@Test
void
testClosingSchedulerShutsDownCheckpointCoordinatorOnFailedExecutionGraph()
@@ -77,9 +84,14 @@ class DefaultSchedulerCheckpointCoordinatorTest {
assertThat(checkpointCoordinator).isNotNull();
assertThat(checkpointCoordinator.isShutdown()).isFalse();
- graph.failJob(new Exception("Test Exception"),
System.currentTimeMillis());
-
- scheduler.closeAsync().get();
+ mainThreadExecutor
+ .execute(
+ () -> {
+ graph.failJob(
+ new Exception("Test Exception"),
System.currentTimeMillis());
+ return scheduler.closeAsync();
+ })
+ .get();
assertThat(checkpointCoordinator.isShutdown()).isTrue();
assertThat(counterShutdownFuture).isCompletedWithValue(JobStatus.FAILED);
@@ -107,9 +119,13 @@ class DefaultSchedulerCheckpointCoordinatorTest {
assertThat(checkpointCoordinator).isNotNull();
assertThat(checkpointCoordinator.isShutdown()).isFalse();
- graph.suspend(new Exception("Test Exception"));
-
- scheduler.closeAsync().get();
+ mainThreadExecutor
+ .execute(
+ () -> {
+ graph.suspend(new Exception("Test Exception"));
+ return scheduler.closeAsync();
+ })
+ .get();
assertThat(checkpointCoordinator.isShutdown()).isTrue();
assertThat(counterShutdownFuture).isCompletedWithValue(JobStatus.SUSPENDED);
@@ -137,18 +153,22 @@ class DefaultSchedulerCheckpointCoordinatorTest {
assertThat(checkpointCoordinator).isNotNull();
assertThat(checkpointCoordinator.isShutdown()).isFalse();
- scheduler.startScheduling();
-
- for (ExecutionVertex executionVertex :
graph.getAllExecutionVertices()) {
- final Execution currentExecutionAttempt =
executionVertex.getCurrentExecutionAttempt();
- scheduler.updateTaskExecutionState(
- new TaskExecutionState(
- currentExecutionAttempt.getAttemptId(),
ExecutionState.FINISHED));
- }
+ mainThreadExecutor.execute(
+ () -> {
+ scheduler.startScheduling();
+ for (ExecutionVertex executionVertex :
graph.getAllExecutionVertices()) {
+ final Execution currentExecutionAttempt =
+ executionVertex.getCurrentExecutionAttempt();
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ currentExecutionAttempt.getAttemptId(),
+ ExecutionState.FINISHED));
+ }
+ });
assertThat(graph.getTerminationFuture()).isCompletedWithValue(JobStatus.FINISHED);
- scheduler.closeAsync().get();
+ mainThreadExecutor.execute(scheduler::closeAsync).get();
assertThat(checkpointCoordinator.isShutdown()).isTrue();
assertThat(counterShutdownFuture).isCompletedWithValue(JobStatus.FINISHED);
@@ -176,7 +196,7 @@ class DefaultSchedulerCheckpointCoordinatorTest {
assertThat(checkpointCoordinator).isNotNull();
assertThat(checkpointCoordinator.isShutdown()).isFalse();
- scheduler.closeAsync().get();
+ mainThreadExecutor.execute(scheduler::closeAsync).get();
assertThat(graph.getState()).isEqualTo(JobStatus.SUSPENDED);
assertThat(checkpointCoordinator.isShutdown()).isTrue();
@@ -208,7 +228,7 @@ class DefaultSchedulerCheckpointCoordinatorTest {
return new DefaultSchedulerBuilder(
jobGraph,
-
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
+ mainThreadExecutor.getMainThreadExecutor(),
EXECUTOR_EXTENSION.getExecutor())
.setCheckpointRecoveryFactory(new
TestingCheckpointRecoveryFactory(store, counter))
.setRpcTimeout(timeout)