This is an automated email from the ASF dual-hosted git repository.

panyuepeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 3fb0d04d225 [FLINK-39182][runtime/test] Fix the flaky test class 
ExecutionGraphRestartTest
3fb0d04d225 is described below

commit 3fb0d04d225f06f494d05210046cab6f7681b45b
Author: Yuepeng Pan <[email protected]>
AuthorDate: Sun Mar 1 23:30:14 2026 +0800

    [FLINK-39182][runtime/test] Fix the flaky test class 
ExecutionGraphRestartTest
    
    Co-Authored-By: Aleksandr Iushmanov <[email protected]>
---
 .../executiongraph/ExecutionGraphRestartTest.java  | 458 +++++++++++----------
 .../runtime/jobmaster/slotpool/SlotPoolUtils.java  | 122 ++++--
 2 files changed, 325 insertions(+), 255 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index f87bcef4d63..426ea53be28 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge;
+import 
org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridgeBuilder;
 import 
org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSlotSelectionStrategy;
 import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
 import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl;
@@ -40,7 +41,6 @@ import 
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolUtils;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
 import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
-import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
 import org.apache.flink.runtime.scheduler.SchedulerBase;
 import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
@@ -49,6 +49,7 @@ import org.apache.flink.testutils.TestingUtils;
 import org.apache.flink.testutils.executor.TestExecutorExtension;
 import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
 
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
@@ -56,6 +57,7 @@ import org.junit.jupiter.api.extension.RegisterExtension;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.function.Consumer;
 
@@ -67,17 +69,36 @@ class ExecutionGraphRestartTest {
     private static final int NUM_TASKS = 31;
 
     @RegisterExtension
-    static final TestExecutorExtension<ScheduledExecutorService> 
EXECUTOR_RESOURCE =
-            TestingUtils.defaultExecutorExtension();
+    static final TestExecutorExtension<ScheduledExecutorService> 
EXECUTOR_EXTENSION =
+            TestingUtils.jmAsyncThreadExecutorExtension();
 
-    private static final ComponentMainThreadExecutor mainThreadExecutor =
-            ComponentMainThreadExecutorServiceAdapter.forMainThread();
+    @RegisterExtension
+    static final TestExecutorExtension<ScheduledExecutorService> 
JM_MAIN_THREAD_EXECUTOR_EXTENSION =
+            TestingUtils.jmMainThreadExecutorExtension();
+
+    private ComponentMainThreadExecutor mainThreadExecutor;
 
     private ManuallyTriggeredScheduledExecutor taskRestartExecutor;
 
+    private DeclarativeSlotPoolBridge slotPool;
+
     @BeforeEach
     void setUp() {
         taskRestartExecutor = new ManuallyTriggeredScheduledExecutor();
+        mainThreadExecutor =
+                
ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
+                        JM_MAIN_THREAD_EXECUTOR_EXTENSION.getExecutor());
+        slotPool =
+                new DeclarativeSlotPoolBridgeBuilder()
+                        .setMainThreadExecutor(mainThreadExecutor)
+                        .build();
+    }
+
+    @AfterEach
+    void tearDown() {
+        if (slotPool != null) {
+            runInMainThread(slotPool::close);
+        }
     }
 
     // ------------------------------------------------------------------------
@@ -95,179 +116,165 @@ class ExecutionGraphRestartTest {
 
     @Test
     void testCancelAllPendingRequestWhileCanceling() throws Exception {
-        try (DeclarativeSlotPoolBridge slotPool = 
SlotPoolUtils.createDeclarativeSlotPoolBridge()) {
-
-            final int numTasksExceedSlotPool = 50;
-            // create a graph with task count larger than slot pool
-            JobVertex sender =
-                    ExecutionGraphTestUtils.createJobVertex(
-                            "Task", NUM_TASKS + numTasksExceedSlotPool, 
NoOpInvokable.class);
-            JobGraph graph = JobGraphTestUtils.streamingJobGraph(sender);
-            SchedulerBase scheduler =
-                    new DefaultSchedulerBuilder(
-                                    graph, mainThreadExecutor, 
EXECUTOR_RESOURCE.getExecutor())
-                            .setExecutionSlotAllocatorFactory(
-                                    
createExecutionSlotAllocatorFactory(slotPool))
-                            .build();
-            ExecutionGraph executionGraph = scheduler.getExecutionGraph();
-
-            startScheduling(scheduler);
-            offerSlots(slotPool, NUM_TASKS);
-
-            
assertThat(slotPool.getNumPendingRequests()).isEqualTo(numTasksExceedSlotPool);
-
-            scheduler.cancel();
-            
assertThat(executionGraph.getState()).isEqualTo(JobStatus.CANCELLING);
-            assertThat(slotPool.getNumPendingRequests()).isZero();
-        }
+        final int numTasksExceedSlotPool = 50;
+        // create a graph with task count larger than slot pool
+        JobVertex sender =
+                ExecutionGraphTestUtils.createJobVertex(
+                        "Task", NUM_TASKS + numTasksExceedSlotPool, 
NoOpInvokable.class);
+        JobGraph graph = JobGraphTestUtils.streamingJobGraph(sender);
+        SchedulerBase scheduler = createSchedulerBuilder(graph).build();
+
+        runInMainThread(
+                () -> {
+                    ExecutionGraph executionGraph = 
scheduler.getExecutionGraph();
+                    startScheduling(scheduler);
+                    SlotPoolUtils.offerSlotsFromMainThread(
+                            slotPool, Collections.nCopies(NUM_TASKS, 
ResourceProfile.ANY));
+
+                    
assertThat(slotPool.getNumPendingRequests()).isEqualTo(numTasksExceedSlotPool);
+
+                    scheduler.cancel();
+                    
assertThat(executionGraph.getState()).isEqualTo(JobStatus.CANCELLING);
+                    assertThat(slotPool.getNumPendingRequests()).isZero();
+                });
     }
 
     @Test
     void testCancelAllPendingRequestWhileFailing() throws Exception {
-        try (DeclarativeSlotPoolBridge slotPool = 
SlotPoolUtils.createDeclarativeSlotPoolBridge()) {
-
-            final int numTasksExceedSlotPool = 50;
-            // create a graph with task count larger than slot pool
-            JobVertex sender =
-                    ExecutionGraphTestUtils.createJobVertex(
-                            "Task", NUM_TASKS + numTasksExceedSlotPool, 
NoOpInvokable.class);
-            JobGraph graph = JobGraphTestUtils.streamingJobGraph(sender);
-            SchedulerBase scheduler =
-                    new DefaultSchedulerBuilder(
-                                    graph, mainThreadExecutor, 
EXECUTOR_RESOURCE.getExecutor())
-                            .setExecutionSlotAllocatorFactory(
-                                    
createExecutionSlotAllocatorFactory(slotPool))
-                            .build();
-            ExecutionGraph executionGraph = scheduler.getExecutionGraph();
-
-            startScheduling(scheduler);
-            offerSlots(slotPool, NUM_TASKS);
-
-            
assertThat(slotPool.getNumPendingRequests()).isEqualTo(numTasksExceedSlotPool);
-
-            scheduler.handleGlobalFailure(new Exception("test"));
-            assertThat(executionGraph.getState()).isEqualTo(JobStatus.FAILING);
-            assertThat(slotPool.getNumPendingRequests()).isZero();
-        }
+        final int numTasksExceedSlotPool = 50;
+        // create a graph with task count larger than slot pool
+        JobVertex sender =
+                ExecutionGraphTestUtils.createJobVertex(
+                        "Task", NUM_TASKS + numTasksExceedSlotPool, 
NoOpInvokable.class);
+        JobGraph graph = JobGraphTestUtils.streamingJobGraph(sender);
+        SchedulerBase scheduler = createSchedulerBuilder(graph).build();
+        runInMainThread(
+                () -> {
+                    ExecutionGraph executionGraph = 
scheduler.getExecutionGraph();
+
+                    startScheduling(scheduler);
+                    SlotPoolUtils.offerSlotsFromMainThread(
+                            slotPool, Collections.nCopies(NUM_TASKS, 
ResourceProfile.ANY));
+
+                    
assertThat(slotPool.getNumPendingRequests()).isEqualTo(numTasksExceedSlotPool);
+
+                    scheduler.handleGlobalFailure(new Exception("test"));
+                    
assertThat(executionGraph.getState()).isEqualTo(JobStatus.FAILING);
+                    assertThat(slotPool.getNumPendingRequests()).isZero();
+                });
     }
 
     @Test
     void testCancelWhileRestarting() throws Exception {
         // We want to manually control the restart and delay
-        try (SlotPool slotPool = 
SlotPoolUtils.createDeclarativeSlotPoolBridge()) {
-            SchedulerBase scheduler =
-                    new DefaultSchedulerBuilder(
-                                    createJobGraph(),
-                                    mainThreadExecutor,
-                                    EXECUTOR_RESOURCE.getExecutor())
-                            .setExecutionSlotAllocatorFactory(
-                                    
createExecutionSlotAllocatorFactory(slotPool))
-                            .setRestartBackoffTimeStrategy(
-                                    new TestRestartBackoffTimeStrategy(true, 
Long.MAX_VALUE))
-                            .setDelayExecutor(taskRestartExecutor)
-                            .build();
-            ExecutionGraph executionGraph = scheduler.getExecutionGraph();
-
-            startScheduling(scheduler);
-
-            final ResourceID taskManagerResourceId = offerSlots(slotPool, 
NUM_TASKS);
-
-            // Release the TaskManager and wait for the job to restart
-            slotPool.releaseTaskManager(taskManagerResourceId, new 
Exception("Test Exception"));
-            
assertThat(executionGraph.getState()).isEqualTo(JobStatus.RESTARTING);
-
-            // Canceling needs to abort the restart
-            scheduler.cancel();
-
-            
assertThat(executionGraph.getState()).isEqualTo(JobStatus.CANCELED);
-
-            taskRestartExecutor.triggerScheduledTasks();
-
-            
assertThat(executionGraph.getState()).isEqualTo(JobStatus.CANCELED);
-            for (ExecutionVertex vertex : 
executionGraph.getAllExecutionVertices()) {
-                
assertThat(vertex.getExecutionState()).isEqualTo(ExecutionState.FAILED);
-            }
-        }
+        SchedulerBase scheduler =
+                createSchedulerBuilder(createJobGraph())
+                        .setRestartBackoffTimeStrategy(
+                                new TestRestartBackoffTimeStrategy(true, 
Long.MAX_VALUE))
+                        .setDelayExecutor(taskRestartExecutor)
+                        .build();
+
+        runInMainThread(
+                () -> {
+                    ExecutionGraph executionGraph = 
scheduler.getExecutionGraph();
+
+                    startScheduling(scheduler);
+
+                    final ResourceID taskManagerResourceId =
+                            SlotPoolUtils.offerSlotsFromMainThread(
+                                    slotPool, Collections.nCopies(NUM_TASKS, 
ResourceProfile.ANY));
+
+                    // Release the TaskManager and wait for the job to restart
+                    slotPool.releaseTaskManager(
+                            taskManagerResourceId, new Exception("Test 
Exception"));
+                    
assertThat(executionGraph.getState()).isEqualTo(JobStatus.RESTARTING);
+
+                    // Canceling needs to abort the restart
+                    scheduler.cancel();
+
+                    
assertThat(executionGraph.getState()).isEqualTo(JobStatus.CANCELED);
+
+                    taskRestartExecutor.triggerScheduledTasks();
+
+                    
assertThat(executionGraph.getState()).isEqualTo(JobStatus.CANCELED);
+                    for (ExecutionVertex vertex : 
executionGraph.getAllExecutionVertices()) {
+                        
assertThat(vertex.getExecutionState()).isEqualTo(ExecutionState.FAILED);
+                    }
+                });
     }
 
-    private static ResourceID offerSlots(SlotPool slotPool, int numSlots) {
-        return SlotPoolUtils.offerSlots(
-                slotPool, mainThreadExecutor, Collections.nCopies(numSlots, 
ResourceProfile.ANY));
+    private static void runInMainThread(final Runnable runnable) {
+        CompletableFuture.runAsync(runnable, 
JM_MAIN_THREAD_EXECUTOR_EXTENSION.getExecutor())
+                .join();
     }
 
     @Test
     void testCancelWhileFailing() throws Exception {
-        try (SlotPool slotPool = 
SlotPoolUtils.createDeclarativeSlotPoolBridge()) {
-            SchedulerBase scheduler =
-                    new DefaultSchedulerBuilder(
-                                    createJobGraph(),
-                                    mainThreadExecutor,
-                                    EXECUTOR_RESOURCE.getExecutor())
-                            .setExecutionSlotAllocatorFactory(
-                                    
createExecutionSlotAllocatorFactory(slotPool))
-                            .setRestartBackoffTimeStrategy(
-                                    new TestRestartBackoffTimeStrategy(false, 
Long.MAX_VALUE))
-                            .build();
-            ExecutionGraph graph = scheduler.getExecutionGraph();
+        SchedulerBase scheduler =
+                createSchedulerBuilder(createJobGraph())
+                        .setRestartBackoffTimeStrategy(
+                                new TestRestartBackoffTimeStrategy(false, 
Long.MAX_VALUE))
+                        .build();
+        runInMainThread(
+                () -> {
+                    ExecutionGraph graph = scheduler.getExecutionGraph();
 
-            startScheduling(scheduler);
+                    startScheduling(scheduler);
 
-            offerSlots(slotPool, NUM_TASKS);
+                    SlotPoolUtils.offerSlotsFromMainThread(
+                            slotPool, Collections.nCopies(NUM_TASKS, 
ResourceProfile.ANY));
 
-            assertThat(graph.getState()).isEqualTo(JobStatus.RUNNING);
+                    assertThat(graph.getState()).isEqualTo(JobStatus.RUNNING);
 
-            switchAllTasksToRunning(graph);
+                    switchAllTasksToRunning(graph);
 
-            scheduler.handleGlobalFailure(new Exception("test"));
+                    scheduler.handleGlobalFailure(new Exception("test"));
 
-            assertThat(graph.getState()).isEqualTo(JobStatus.FAILING);
+                    assertThat(graph.getState()).isEqualTo(JobStatus.FAILING);
 
-            scheduler.cancel();
+                    scheduler.cancel();
 
-            assertThat(graph.getState()).isEqualTo(JobStatus.CANCELLING);
+                    
assertThat(graph.getState()).isEqualTo(JobStatus.CANCELLING);
 
-            // let all tasks finish cancelling
-            completeCanceling(graph);
+                    // let all tasks finish cancelling
+                    completeCanceling(graph);
 
-            assertThat(graph.getState()).isEqualTo(JobStatus.CANCELED);
-        }
+                    assertThat(graph.getState()).isEqualTo(JobStatus.CANCELED);
+                });
     }
 
     @Test
     void testFailWhileCanceling() throws Exception {
-        try (SlotPool slotPool = 
SlotPoolUtils.createDeclarativeSlotPoolBridge()) {
-            SchedulerBase scheduler =
-                    new DefaultSchedulerBuilder(
-                                    createJobGraph(),
-                                    mainThreadExecutor,
-                                    EXECUTOR_RESOURCE.getExecutor())
-                            .setExecutionSlotAllocatorFactory(
-                                    
createExecutionSlotAllocatorFactory(slotPool))
-                            .setRestartBackoffTimeStrategy(
-                                    new TestRestartBackoffTimeStrategy(false, 
Long.MAX_VALUE))
-                            .build();
-            ExecutionGraph graph = scheduler.getExecutionGraph();
+        SchedulerBase scheduler =
+                createSchedulerBuilder(createJobGraph())
+                        .setRestartBackoffTimeStrategy(
+                                new TestRestartBackoffTimeStrategy(false, 
Long.MAX_VALUE))
+                        .build();
+        runInMainThread(
+                () -> {
+                    ExecutionGraph graph = scheduler.getExecutionGraph();
 
-            startScheduling(scheduler);
+                    startScheduling(scheduler);
 
-            offerSlots(slotPool, NUM_TASKS);
+                    SlotPoolUtils.offerSlotsFromMainThread(
+                            slotPool, Collections.nCopies(NUM_TASKS, 
ResourceProfile.ANY));
 
-            assertThat(graph.getState()).isEqualTo(JobStatus.RUNNING);
-            switchAllTasksToRunning(graph);
+                    assertThat(graph.getState()).isEqualTo(JobStatus.RUNNING);
+                    switchAllTasksToRunning(graph);
 
-            scheduler.cancel();
+                    scheduler.cancel();
 
-            assertThat(graph.getState()).isEqualTo(JobStatus.CANCELLING);
+                    
assertThat(graph.getState()).isEqualTo(JobStatus.CANCELLING);
 
-            scheduler.handleGlobalFailure(new Exception("test"));
+                    scheduler.handleGlobalFailure(new Exception("test"));
 
-            assertThat(graph.getState()).isEqualTo(JobStatus.FAILING);
+                    assertThat(graph.getState()).isEqualTo(JobStatus.FAILING);
 
-            // let all tasks finish cancelling
-            completeCanceling(graph);
+                    // let all tasks finish cancelling
+                    completeCanceling(graph);
 
-            assertThat(graph.getState()).isEqualTo(JobStatus.FAILED);
-        }
+                    assertThat(graph.getState()).isEqualTo(JobStatus.FAILED);
+                });
     }
 
     private void switchAllTasksToRunning(ExecutionGraph graph) {
@@ -286,55 +293,74 @@ class ExecutionGraphRestartTest {
                 ExecutionGraphTestUtils.createJobVertex("Task2", 1, 
NoOpInvokable.class);
         JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(sender, 
receiver);
 
-        try (SlotPool slotPool = 
SlotPoolUtils.createDeclarativeSlotPoolBridge()) {
-            SchedulerBase scheduler =
-                    new DefaultSchedulerBuilder(
-                                    jobGraph, mainThreadExecutor, 
EXECUTOR_RESOURCE.getExecutor())
-                            .setExecutionSlotAllocatorFactory(
-                                    
createExecutionSlotAllocatorFactory(slotPool))
-                            .setRestartBackoffTimeStrategy(
-                                    new TestRestartBackoffTimeStrategy(true, 
Long.MAX_VALUE))
-                            .setDelayExecutor(taskRestartExecutor)
-                            .build();
-            ExecutionGraph eg = scheduler.getExecutionGraph();
+        SchedulerBase scheduler =
+                createSchedulerBuilder(jobGraph)
+                        .setRestartBackoffTimeStrategy(
+                                new TestRestartBackoffTimeStrategy(true, 
Long.MAX_VALUE))
+                        .setDelayExecutor(taskRestartExecutor)
+                        .build();
 
-            startScheduling(scheduler);
+        final ExecutionGraph eg = scheduler.getExecutionGraph();
+        // Hold the original finished execution reference across 
runInMainThread calls
+        final Execution[] savedExecution = new Execution[1];
 
-            offerSlots(slotPool, 2);
+        // Phase 1: Start, deploy, fail one task, and trigger restart.
+        // The restart callback (restartTasks) is queued on mainThreadExecutor 
via
+        // cancelFuture.thenRunAsync(..., mainThreadExecutor) and will execute 
after this
+        // runInMainThread call returns.
+        runInMainThread(
+                () -> {
+                    startScheduling(scheduler);
 
-            Iterator<ExecutionVertex> executionVertices = 
eg.getAllExecutionVertices().iterator();
+                    SlotPoolUtils.offerSlotsFromMainThread(
+                            slotPool, Collections.nCopies(2, 
ResourceProfile.ANY));
 
-            Execution finishedExecution = 
executionVertices.next().getCurrentExecutionAttempt();
-            Execution failedExecution = 
executionVertices.next().getCurrentExecutionAttempt();
+                    Iterator<ExecutionVertex> executionVertices =
+                            eg.getAllExecutionVertices().iterator();
 
-            finishedExecution.markFinished();
+                    Execution finishedExecution =
+                            
executionVertices.next().getCurrentExecutionAttempt();
+                    Execution failedExecution =
+                            
executionVertices.next().getCurrentExecutionAttempt();
 
-            failedExecution.fail(new Exception("Test Exception"));
-            failedExecution.completeCancelling();
+                    finishedExecution.markFinished();
 
-            taskRestartExecutor.triggerScheduledTasks();
+                    failedExecution.fail(new Exception("Test Exception"));
+                    failedExecution.completeCancelling();
 
-            assertThat(eg.getState()).isEqualTo(JobStatus.RUNNING);
+                    savedExecution[0] = finishedExecution;
 
-            // At this point all resources have been assigned
-            for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
-                assertThat(vertex.getCurrentAssignedResource()).isNotNull();
-                vertex.getCurrentExecutionAttempt().switchToInitializing();
-                vertex.getCurrentExecutionAttempt().switchToRunning();
-            }
+                    taskRestartExecutor.triggerScheduledTasks();
+                });
 
-            // fail old finished execution, this should not affect the 
execution
-            finishedExecution.fail(new Exception("This should have no 
effect"));
+        // Phase 2: The restart callback has now executed (it was queued ahead 
of this
+        // lambda). Verify the graph restarted and the old finished execution 
is unaffected.
+        runInMainThread(
+                () -> {
+                    Execution finishedExecution = savedExecution[0];
 
-            for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
-                vertex.getCurrentExecutionAttempt().markFinished();
-            }
+                    assertThat(eg.getState()).isEqualTo(JobStatus.RUNNING);
 
-            // the state of the finished execution should have not changed 
since it is terminal
-            
assertThat(finishedExecution.getState()).isEqualTo(ExecutionState.FINISHED);
+                    // At this point all resources have been assigned
+                    for (ExecutionVertex vertex : 
eg.getAllExecutionVertices()) {
+                        
assertThat(vertex.getCurrentAssignedResource()).isNotNull();
+                        
vertex.getCurrentExecutionAttempt().switchToInitializing();
+                        vertex.getCurrentExecutionAttempt().switchToRunning();
+                    }
 
-            assertThat(eg.getState()).isEqualTo(JobStatus.FINISHED);
-        }
+                    // fail old finished execution, this should not affect the 
execution
+                    finishedExecution.fail(new Exception("This should have no 
effect"));
+
+                    for (ExecutionVertex vertex : 
eg.getAllExecutionVertices()) {
+                        vertex.getCurrentExecutionAttempt().markFinished();
+                    }
+
+                    // the state of the finished execution should have not 
changed since it
+                    // is terminal
+                    
assertThat(finishedExecution.getState()).isEqualTo(ExecutionState.FINISHED);
+
+                    assertThat(eg.getState()).isEqualTo(JobStatus.FINISHED);
+                });
     }
 
     /**
@@ -344,41 +370,41 @@ class ExecutionGraphRestartTest {
      */
     @Test
     void testFailExecutionAfterCancel() throws Exception {
-        try (SlotPool slotPool = 
SlotPoolUtils.createDeclarativeSlotPoolBridge()) {
-            SchedulerBase scheduler =
-                    new DefaultSchedulerBuilder(
-                                    createJobGraphToCancel(),
-                                    mainThreadExecutor,
-                                    EXECUTOR_RESOURCE.getExecutor())
-                            .setExecutionSlotAllocatorFactory(
-                                    
createExecutionSlotAllocatorFactory(slotPool))
-                            .setRestartBackoffTimeStrategy(
-                                    new TestRestartBackoffTimeStrategy(false, 
Long.MAX_VALUE))
-                            .setDelayExecutor(taskRestartExecutor)
-                            .build();
-            ExecutionGraph eg = scheduler.getExecutionGraph();
-
-            startScheduling(scheduler);
-
-            offerSlots(slotPool, 1);
-
-            // Fail right after cancel (for example with concurrent slot 
release)
-            scheduler.cancel();
-
-            for (ExecutionVertex v : eg.getAllExecutionVertices()) {
-                v.getCurrentExecutionAttempt().fail(new Exception("Test 
Exception"));
-            }
-
-            FlinkAssertions.assertThatFuture(eg.getTerminationFuture())
-                    .eventuallySucceeds()
-                    .isEqualTo(JobStatus.CANCELED);
-
-            Execution execution =
-                    
eg.getAllExecutionVertices().iterator().next().getCurrentExecutionAttempt();
-
-            execution.completeCancelling();
-            assertThat(eg.getState()).isEqualTo(JobStatus.CANCELED);
-        }
+        SchedulerBase scheduler =
+                createSchedulerBuilder(createJobGraphToCancel())
+                        .setRestartBackoffTimeStrategy(
+                                new TestRestartBackoffTimeStrategy(false, 
Long.MAX_VALUE))
+                        .setDelayExecutor(taskRestartExecutor)
+                        .build();
+        runInMainThread(
+                () -> {
+                    ExecutionGraph eg = scheduler.getExecutionGraph();
+
+                    startScheduling(scheduler);
+
+                    SlotPoolUtils.offerSlotsFromMainThread(
+                            slotPool, 
Collections.singletonList(ResourceProfile.ANY));
+
+                    // Fail right after cancel (for example with concurrent 
slot release)
+                    scheduler.cancel();
+
+                    for (ExecutionVertex v : eg.getAllExecutionVertices()) {
+                        v.getCurrentExecutionAttempt().fail(new 
Exception("Test Exception"));
+                    }
+
+                    FlinkAssertions.assertThatFuture(eg.getTerminationFuture())
+                            .eventuallySucceeds()
+                            .isEqualTo(JobStatus.CANCELED);
+
+                    Execution execution =
+                            eg.getAllExecutionVertices()
+                                    .iterator()
+                                    .next()
+                                    .getCurrentExecutionAttempt();
+
+                    execution.completeCancelling();
+                    assertThat(eg.getState()).isEqualTo(JobStatus.CANCELED);
+                });
     }
 
     // ------------------------------------------------------------------------
@@ -391,14 +417,16 @@ class ExecutionGraphRestartTest {
         
assertThat(scheduler.getExecutionGraph().getState()).isEqualTo(JobStatus.RUNNING);
     }
 
-    private static ExecutionSlotAllocatorFactory 
createExecutionSlotAllocatorFactory(
-            SlotPool slotPool) throws Exception {
+    private DefaultSchedulerBuilder createSchedulerBuilder(JobGraph jobGraph) 
throws Exception {
         setupSlotPool(slotPool);
         PhysicalSlotProvider physicalSlotProvider =
                 new PhysicalSlotProviderImpl(
                         
LocationPreferenceSlotSelectionStrategy.createDefault(), slotPool);
-        return 
SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory(
-                physicalSlotProvider);
+        return new DefaultSchedulerBuilder(
+                        jobGraph, mainThreadExecutor, 
EXECUTOR_EXTENSION.getExecutor())
+                .setExecutionSlotAllocatorFactory(
+                        
SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory(
+                                physicalSlotProvider));
     }
 
     private static void setupSlotPool(SlotPool slotPool) throws Exception {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolUtils.java
index 33dcb81cd38..bea5dbdae16 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolUtils.java
@@ -74,23 +74,24 @@ public class SlotPoolUtils {
             SlotPool slotPool,
             ComponentMainThreadExecutor mainThreadExecutor,
             List<ResourceProfile> resourceProfiles) {
-        return offerSlots(
-                slotPool,
-                mainThreadExecutor,
-                resourceProfiles,
-                new SimpleAckingTaskManagerGateway());
+        return CompletableFuture.supplyAsync(
+                        () -> offerSlotsFromMainThread(slotPool, 
resourceProfiles),
+                        mainThreadExecutor)
+                .join();
     }
 
     public static ResourceID tryOfferSlots(
             SlotPool slotPool,
             ComponentMainThreadExecutor mainThreadExecutor,
             List<ResourceProfile> resourceProfiles) {
-        return offerSlots(
-                slotPool,
-                mainThreadExecutor,
-                resourceProfiles,
-                new SimpleAckingTaskManagerGateway(),
-                false);
+        return CompletableFuture.supplyAsync(
+                        () ->
+                                tryOfferSlotsFromMainThread(
+                                        slotPool,
+                                        resourceProfiles,
+                                        new SimpleAckingTaskManagerGateway()),
+                        mainThreadExecutor)
+                .join();
     }
 
     public static ResourceID offerSlots(
@@ -98,40 +99,81 @@ public class SlotPoolUtils {
             ComponentMainThreadExecutor mainThreadExecutor,
             List<ResourceProfile> resourceProfiles,
             TaskManagerGateway taskManagerGateway) {
-        return offerSlots(slotPool, mainThreadExecutor, resourceProfiles, 
taskManagerGateway, true);
+        return CompletableFuture.supplyAsync(
+                        () ->
+                                offerSlotsFromMainThread(
+                                        slotPool, resourceProfiles, 
taskManagerGateway),
+                        mainThreadExecutor)
+                .join();
+    }
+
+    /**
+     * Offers slots directly on the slot pool. Must be called from the main 
thread (e.g. inside a
+     * {@code runInMainThread} block). All offered slots are expected to be 
accepted.
+     *
+     * @param slotPool the slot pool to offer slots to
+     * @param resourceProfiles the resource profiles of the slots to offer
+     * @return the resource ID of the registered task manager
+     */
+    public static ResourceID offerSlotsFromMainThread(
+            SlotPool slotPool, List<ResourceProfile> resourceProfiles) {
+        return offerSlotsFromMainThread(
+                slotPool, resourceProfiles, new 
SimpleAckingTaskManagerGateway());
     }
 
-    private static ResourceID offerSlots(
+    /**
+     * Offers slots directly on the slot pool. Must be called from the main 
thread (e.g. inside a
+     * {@code runInMainThread} block). All offered slots are expected to be 
accepted.
+     *
+     * @param slotPool the slot pool to offer slots to
+     * @param resourceProfiles the resource profiles of the slots to offer
+     * @param taskManagerGateway the task manager gateway to use
+     * @return the resource ID of the registered task manager
+     */
+    public static ResourceID offerSlotsFromMainThread(
             SlotPool slotPool,
-            ComponentMainThreadExecutor mainThreadExecutor,
             List<ResourceProfile> resourceProfiles,
-            TaskManagerGateway taskManagerGateway,
-            boolean assertAllSlotsAreAccepted) {
+            TaskManagerGateway taskManagerGateway) {
         final TaskManagerLocation taskManagerLocation = new 
LocalTaskManagerLocation();
-        CompletableFuture.runAsync(
-                        () -> {
-                            
slotPool.registerTaskManager(taskManagerLocation.getResourceID());
-
-                            final Collection<SlotOffer> slotOffers =
-                                    IntStream.range(0, resourceProfiles.size())
-                                            .mapToObj(
-                                                    i ->
-                                                            new SlotOffer(
-                                                                    new 
AllocationID(),
-                                                                    i,
-                                                                    
resourceProfiles.get(i)))
-                                            .collect(Collectors.toList());
-
-                            final Collection<SlotOffer> acceptedOffers =
-                                    slotPool.offerSlots(
-                                            taskManagerLocation, 
taskManagerGateway, slotOffers);
-
-                            if (assertAllSlotsAreAccepted) {
-                                
assertThat(acceptedOffers).isEqualTo(slotOffers);
-                            }
-                        },
-                        mainThreadExecutor)
-                .join();
+        slotPool.registerTaskManager(taskManagerLocation.getResourceID());
+
+        final Collection<SlotOffer> slotOffers =
+                IntStream.range(0, resourceProfiles.size())
+                        .mapToObj(
+                                i -> new SlotOffer(new AllocationID(), i, 
resourceProfiles.get(i)))
+                        .collect(Collectors.toList());
+
+        final Collection<SlotOffer> acceptedOffers =
+                slotPool.offerSlots(taskManagerLocation, taskManagerGateway, 
slotOffers);
+
+        assertThat(acceptedOffers).isEqualTo(slotOffers);
+
+        return taskManagerLocation.getResourceID();
+    }
+
+    /**
+     * Offers slots directly on the slot pool without asserting that all slots 
are accepted. Must be
+     * called from the main thread (e.g. inside a {@code runInMainThread} 
block).
+     *
+     * @param slotPool the slot pool to offer slots to
+     * @param resourceProfiles the resource profiles of the slots to offer
+     * @param taskManagerGateway the task manager gateway to use
+     * @return the resource ID of the registered task manager
+     */
+    public static ResourceID tryOfferSlotsFromMainThread(
+            SlotPool slotPool,
+            List<ResourceProfile> resourceProfiles,
+            TaskManagerGateway taskManagerGateway) {
+        final TaskManagerLocation taskManagerLocation = new 
LocalTaskManagerLocation();
+        slotPool.registerTaskManager(taskManagerLocation.getResourceID());
+
+        final Collection<SlotOffer> slotOffers =
+                IntStream.range(0, resourceProfiles.size())
+                        .mapToObj(
+                                i -> new SlotOffer(new AllocationID(), i, 
resourceProfiles.get(i)))
+                        .collect(Collectors.toList());
+
+        slotPool.offerSlots(taskManagerLocation, taskManagerGateway, 
slotOffers);
 
         return taskManagerLocation.getResourceID();
     }


Reply via email to