This is an automated email from the ASF dual-hosted git repository.
panyuepeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 3fb0d04d225 [FLINK-39182][runtime/test] Fix the flaky test class
ExecutionGraphRestartTest
3fb0d04d225 is described below
commit 3fb0d04d225f06f494d05210046cab6f7681b45b
Author: Yuepeng Pan <[email protected]>
AuthorDate: Sun Mar 1 23:30:14 2026 +0800
[FLINK-39182][runtime/test] Fix the flaky test class
ExecutionGraphRestartTest
Co-Authored-By: Aleksandr Iushmanov <[email protected]>
---
.../executiongraph/ExecutionGraphRestartTest.java | 458 +++++++++++----------
.../runtime/jobmaster/slotpool/SlotPoolUtils.java | 122 ++++--
2 files changed, 325 insertions(+), 255 deletions(-)
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index f87bcef4d63..426ea53be28 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge;
+import
org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridgeBuilder;
import
org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSlotSelectionStrategy;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl;
@@ -40,7 +41,6 @@ import
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolUtils;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
-import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
@@ -49,6 +49,7 @@ import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
@@ -56,6 +57,7 @@ import org.junit.jupiter.api.extension.RegisterExtension;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
@@ -67,17 +69,36 @@ class ExecutionGraphRestartTest {
private static final int NUM_TASKS = 31;
@RegisterExtension
- static final TestExecutorExtension<ScheduledExecutorService>
EXECUTOR_RESOURCE =
- TestingUtils.defaultExecutorExtension();
+ static final TestExecutorExtension<ScheduledExecutorService>
EXECUTOR_EXTENSION =
+ TestingUtils.jmAsyncThreadExecutorExtension();
- private static final ComponentMainThreadExecutor mainThreadExecutor =
- ComponentMainThreadExecutorServiceAdapter.forMainThread();
+ @RegisterExtension
+ static final TestExecutorExtension<ScheduledExecutorService>
JM_MAIN_THREAD_EXECUTOR_EXTENSION =
+ TestingUtils.jmMainThreadExecutorExtension();
+
+ private ComponentMainThreadExecutor mainThreadExecutor;
private ManuallyTriggeredScheduledExecutor taskRestartExecutor;
+ private DeclarativeSlotPoolBridge slotPool;
+
@BeforeEach
void setUp() {
taskRestartExecutor = new ManuallyTriggeredScheduledExecutor();
+ mainThreadExecutor =
+
ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
+ JM_MAIN_THREAD_EXECUTOR_EXTENSION.getExecutor());
+ slotPool =
+ new DeclarativeSlotPoolBridgeBuilder()
+ .setMainThreadExecutor(mainThreadExecutor)
+ .build();
+ }
+
+ @AfterEach
+ void tearDown() {
+ if (slotPool != null) {
+ runInMainThread(slotPool::close);
+ }
}
// ------------------------------------------------------------------------
@@ -95,179 +116,165 @@ class ExecutionGraphRestartTest {
@Test
void testCancelAllPendingRequestWhileCanceling() throws Exception {
- try (DeclarativeSlotPoolBridge slotPool =
SlotPoolUtils.createDeclarativeSlotPoolBridge()) {
-
- final int numTasksExceedSlotPool = 50;
- // create a graph with task count larger than slot pool
- JobVertex sender =
- ExecutionGraphTestUtils.createJobVertex(
- "Task", NUM_TASKS + numTasksExceedSlotPool,
NoOpInvokable.class);
- JobGraph graph = JobGraphTestUtils.streamingJobGraph(sender);
- SchedulerBase scheduler =
- new DefaultSchedulerBuilder(
- graph, mainThreadExecutor,
EXECUTOR_RESOURCE.getExecutor())
- .setExecutionSlotAllocatorFactory(
-
createExecutionSlotAllocatorFactory(slotPool))
- .build();
- ExecutionGraph executionGraph = scheduler.getExecutionGraph();
-
- startScheduling(scheduler);
- offerSlots(slotPool, NUM_TASKS);
-
-
assertThat(slotPool.getNumPendingRequests()).isEqualTo(numTasksExceedSlotPool);
-
- scheduler.cancel();
-
assertThat(executionGraph.getState()).isEqualTo(JobStatus.CANCELLING);
- assertThat(slotPool.getNumPendingRequests()).isZero();
- }
+ final int numTasksExceedSlotPool = 50;
+ // create a graph with task count larger than slot pool
+ JobVertex sender =
+ ExecutionGraphTestUtils.createJobVertex(
+ "Task", NUM_TASKS + numTasksExceedSlotPool,
NoOpInvokable.class);
+ JobGraph graph = JobGraphTestUtils.streamingJobGraph(sender);
+ SchedulerBase scheduler = createSchedulerBuilder(graph).build();
+
+ runInMainThread(
+ () -> {
+ ExecutionGraph executionGraph =
scheduler.getExecutionGraph();
+ startScheduling(scheduler);
+ SlotPoolUtils.offerSlotsFromMainThread(
+ slotPool, Collections.nCopies(NUM_TASKS,
ResourceProfile.ANY));
+
+
assertThat(slotPool.getNumPendingRequests()).isEqualTo(numTasksExceedSlotPool);
+
+ scheduler.cancel();
+
assertThat(executionGraph.getState()).isEqualTo(JobStatus.CANCELLING);
+ assertThat(slotPool.getNumPendingRequests()).isZero();
+ });
}
@Test
void testCancelAllPendingRequestWhileFailing() throws Exception {
- try (DeclarativeSlotPoolBridge slotPool =
SlotPoolUtils.createDeclarativeSlotPoolBridge()) {
-
- final int numTasksExceedSlotPool = 50;
- // create a graph with task count larger than slot pool
- JobVertex sender =
- ExecutionGraphTestUtils.createJobVertex(
- "Task", NUM_TASKS + numTasksExceedSlotPool,
NoOpInvokable.class);
- JobGraph graph = JobGraphTestUtils.streamingJobGraph(sender);
- SchedulerBase scheduler =
- new DefaultSchedulerBuilder(
- graph, mainThreadExecutor,
EXECUTOR_RESOURCE.getExecutor())
- .setExecutionSlotAllocatorFactory(
-
createExecutionSlotAllocatorFactory(slotPool))
- .build();
- ExecutionGraph executionGraph = scheduler.getExecutionGraph();
-
- startScheduling(scheduler);
- offerSlots(slotPool, NUM_TASKS);
-
-
assertThat(slotPool.getNumPendingRequests()).isEqualTo(numTasksExceedSlotPool);
-
- scheduler.handleGlobalFailure(new Exception("test"));
- assertThat(executionGraph.getState()).isEqualTo(JobStatus.FAILING);
- assertThat(slotPool.getNumPendingRequests()).isZero();
- }
+ final int numTasksExceedSlotPool = 50;
+ // create a graph with task count larger than slot pool
+ JobVertex sender =
+ ExecutionGraphTestUtils.createJobVertex(
+ "Task", NUM_TASKS + numTasksExceedSlotPool,
NoOpInvokable.class);
+ JobGraph graph = JobGraphTestUtils.streamingJobGraph(sender);
+ SchedulerBase scheduler = createSchedulerBuilder(graph).build();
+ runInMainThread(
+ () -> {
+ ExecutionGraph executionGraph =
scheduler.getExecutionGraph();
+
+ startScheduling(scheduler);
+ SlotPoolUtils.offerSlotsFromMainThread(
+ slotPool, Collections.nCopies(NUM_TASKS,
ResourceProfile.ANY));
+
+
assertThat(slotPool.getNumPendingRequests()).isEqualTo(numTasksExceedSlotPool);
+
+ scheduler.handleGlobalFailure(new Exception("test"));
+
assertThat(executionGraph.getState()).isEqualTo(JobStatus.FAILING);
+ assertThat(slotPool.getNumPendingRequests()).isZero();
+ });
}
@Test
void testCancelWhileRestarting() throws Exception {
// We want to manually control the restart and delay
- try (SlotPool slotPool =
SlotPoolUtils.createDeclarativeSlotPoolBridge()) {
- SchedulerBase scheduler =
- new DefaultSchedulerBuilder(
- createJobGraph(),
- mainThreadExecutor,
- EXECUTOR_RESOURCE.getExecutor())
- .setExecutionSlotAllocatorFactory(
-
createExecutionSlotAllocatorFactory(slotPool))
- .setRestartBackoffTimeStrategy(
- new TestRestartBackoffTimeStrategy(true,
Long.MAX_VALUE))
- .setDelayExecutor(taskRestartExecutor)
- .build();
- ExecutionGraph executionGraph = scheduler.getExecutionGraph();
-
- startScheduling(scheduler);
-
- final ResourceID taskManagerResourceId = offerSlots(slotPool,
NUM_TASKS);
-
- // Release the TaskManager and wait for the job to restart
- slotPool.releaseTaskManager(taskManagerResourceId, new
Exception("Test Exception"));
-
assertThat(executionGraph.getState()).isEqualTo(JobStatus.RESTARTING);
-
- // Canceling needs to abort the restart
- scheduler.cancel();
-
-
assertThat(executionGraph.getState()).isEqualTo(JobStatus.CANCELED);
-
- taskRestartExecutor.triggerScheduledTasks();
-
-
assertThat(executionGraph.getState()).isEqualTo(JobStatus.CANCELED);
- for (ExecutionVertex vertex :
executionGraph.getAllExecutionVertices()) {
-
assertThat(vertex.getExecutionState()).isEqualTo(ExecutionState.FAILED);
- }
- }
+ SchedulerBase scheduler =
+ createSchedulerBuilder(createJobGraph())
+ .setRestartBackoffTimeStrategy(
+ new TestRestartBackoffTimeStrategy(true,
Long.MAX_VALUE))
+ .setDelayExecutor(taskRestartExecutor)
+ .build();
+
+ runInMainThread(
+ () -> {
+ ExecutionGraph executionGraph =
scheduler.getExecutionGraph();
+
+ startScheduling(scheduler);
+
+ final ResourceID taskManagerResourceId =
+ SlotPoolUtils.offerSlotsFromMainThread(
+ slotPool, Collections.nCopies(NUM_TASKS,
ResourceProfile.ANY));
+
+ // Release the TaskManager and wait for the job to restart
+ slotPool.releaseTaskManager(
+ taskManagerResourceId, new Exception("Test
Exception"));
+
assertThat(executionGraph.getState()).isEqualTo(JobStatus.RESTARTING);
+
+ // Canceling needs to abort the restart
+ scheduler.cancel();
+
+
assertThat(executionGraph.getState()).isEqualTo(JobStatus.CANCELED);
+
+ taskRestartExecutor.triggerScheduledTasks();
+
+
assertThat(executionGraph.getState()).isEqualTo(JobStatus.CANCELED);
+ for (ExecutionVertex vertex :
executionGraph.getAllExecutionVertices()) {
+
assertThat(vertex.getExecutionState()).isEqualTo(ExecutionState.FAILED);
+ }
+ });
}
- private static ResourceID offerSlots(SlotPool slotPool, int numSlots) {
- return SlotPoolUtils.offerSlots(
- slotPool, mainThreadExecutor, Collections.nCopies(numSlots,
ResourceProfile.ANY));
+ private static void runInMainThread(final Runnable runnable) {
+ CompletableFuture.runAsync(runnable,
JM_MAIN_THREAD_EXECUTOR_EXTENSION.getExecutor())
+ .join();
}
@Test
void testCancelWhileFailing() throws Exception {
- try (SlotPool slotPool =
SlotPoolUtils.createDeclarativeSlotPoolBridge()) {
- SchedulerBase scheduler =
- new DefaultSchedulerBuilder(
- createJobGraph(),
- mainThreadExecutor,
- EXECUTOR_RESOURCE.getExecutor())
- .setExecutionSlotAllocatorFactory(
-
createExecutionSlotAllocatorFactory(slotPool))
- .setRestartBackoffTimeStrategy(
- new TestRestartBackoffTimeStrategy(false,
Long.MAX_VALUE))
- .build();
- ExecutionGraph graph = scheduler.getExecutionGraph();
+ SchedulerBase scheduler =
+ createSchedulerBuilder(createJobGraph())
+ .setRestartBackoffTimeStrategy(
+ new TestRestartBackoffTimeStrategy(false,
Long.MAX_VALUE))
+ .build();
+ runInMainThread(
+ () -> {
+ ExecutionGraph graph = scheduler.getExecutionGraph();
- startScheduling(scheduler);
+ startScheduling(scheduler);
- offerSlots(slotPool, NUM_TASKS);
+ SlotPoolUtils.offerSlotsFromMainThread(
+ slotPool, Collections.nCopies(NUM_TASKS,
ResourceProfile.ANY));
- assertThat(graph.getState()).isEqualTo(JobStatus.RUNNING);
+ assertThat(graph.getState()).isEqualTo(JobStatus.RUNNING);
- switchAllTasksToRunning(graph);
+ switchAllTasksToRunning(graph);
- scheduler.handleGlobalFailure(new Exception("test"));
+ scheduler.handleGlobalFailure(new Exception("test"));
- assertThat(graph.getState()).isEqualTo(JobStatus.FAILING);
+ assertThat(graph.getState()).isEqualTo(JobStatus.FAILING);
- scheduler.cancel();
+ scheduler.cancel();
- assertThat(graph.getState()).isEqualTo(JobStatus.CANCELLING);
+
assertThat(graph.getState()).isEqualTo(JobStatus.CANCELLING);
- // let all tasks finish cancelling
- completeCanceling(graph);
+ // let all tasks finish cancelling
+ completeCanceling(graph);
- assertThat(graph.getState()).isEqualTo(JobStatus.CANCELED);
- }
+ assertThat(graph.getState()).isEqualTo(JobStatus.CANCELED);
+ });
}
@Test
void testFailWhileCanceling() throws Exception {
- try (SlotPool slotPool =
SlotPoolUtils.createDeclarativeSlotPoolBridge()) {
- SchedulerBase scheduler =
- new DefaultSchedulerBuilder(
- createJobGraph(),
- mainThreadExecutor,
- EXECUTOR_RESOURCE.getExecutor())
- .setExecutionSlotAllocatorFactory(
-
createExecutionSlotAllocatorFactory(slotPool))
- .setRestartBackoffTimeStrategy(
- new TestRestartBackoffTimeStrategy(false,
Long.MAX_VALUE))
- .build();
- ExecutionGraph graph = scheduler.getExecutionGraph();
+ SchedulerBase scheduler =
+ createSchedulerBuilder(createJobGraph())
+ .setRestartBackoffTimeStrategy(
+ new TestRestartBackoffTimeStrategy(false,
Long.MAX_VALUE))
+ .build();
+ runInMainThread(
+ () -> {
+ ExecutionGraph graph = scheduler.getExecutionGraph();
- startScheduling(scheduler);
+ startScheduling(scheduler);
- offerSlots(slotPool, NUM_TASKS);
+ SlotPoolUtils.offerSlotsFromMainThread(
+ slotPool, Collections.nCopies(NUM_TASKS,
ResourceProfile.ANY));
- assertThat(graph.getState()).isEqualTo(JobStatus.RUNNING);
- switchAllTasksToRunning(graph);
+ assertThat(graph.getState()).isEqualTo(JobStatus.RUNNING);
+ switchAllTasksToRunning(graph);
- scheduler.cancel();
+ scheduler.cancel();
- assertThat(graph.getState()).isEqualTo(JobStatus.CANCELLING);
+
assertThat(graph.getState()).isEqualTo(JobStatus.CANCELLING);
- scheduler.handleGlobalFailure(new Exception("test"));
+ scheduler.handleGlobalFailure(new Exception("test"));
- assertThat(graph.getState()).isEqualTo(JobStatus.FAILING);
+ assertThat(graph.getState()).isEqualTo(JobStatus.FAILING);
- // let all tasks finish cancelling
- completeCanceling(graph);
+ // let all tasks finish cancelling
+ completeCanceling(graph);
- assertThat(graph.getState()).isEqualTo(JobStatus.FAILED);
- }
+ assertThat(graph.getState()).isEqualTo(JobStatus.FAILED);
+ });
}
private void switchAllTasksToRunning(ExecutionGraph graph) {
@@ -286,55 +293,74 @@ class ExecutionGraphRestartTest {
ExecutionGraphTestUtils.createJobVertex("Task2", 1,
NoOpInvokable.class);
JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(sender,
receiver);
- try (SlotPool slotPool =
SlotPoolUtils.createDeclarativeSlotPoolBridge()) {
- SchedulerBase scheduler =
- new DefaultSchedulerBuilder(
- jobGraph, mainThreadExecutor,
EXECUTOR_RESOURCE.getExecutor())
- .setExecutionSlotAllocatorFactory(
-
createExecutionSlotAllocatorFactory(slotPool))
- .setRestartBackoffTimeStrategy(
- new TestRestartBackoffTimeStrategy(true,
Long.MAX_VALUE))
- .setDelayExecutor(taskRestartExecutor)
- .build();
- ExecutionGraph eg = scheduler.getExecutionGraph();
+ SchedulerBase scheduler =
+ createSchedulerBuilder(jobGraph)
+ .setRestartBackoffTimeStrategy(
+ new TestRestartBackoffTimeStrategy(true,
Long.MAX_VALUE))
+ .setDelayExecutor(taskRestartExecutor)
+ .build();
- startScheduling(scheduler);
+ final ExecutionGraph eg = scheduler.getExecutionGraph();
+ // Hold the original finished execution reference across
runInMainThread calls
+ final Execution[] savedExecution = new Execution[1];
- offerSlots(slotPool, 2);
+ // Phase 1: Start, deploy, fail one task, and trigger restart.
+ // The restart callback (restartTasks) is queued on mainThreadExecutor
via
+ // cancelFuture.thenRunAsync(..., mainThreadExecutor) and will execute
after this
+ // runInMainThread call returns.
+ runInMainThread(
+ () -> {
+ startScheduling(scheduler);
- Iterator<ExecutionVertex> executionVertices =
eg.getAllExecutionVertices().iterator();
+ SlotPoolUtils.offerSlotsFromMainThread(
+ slotPool, Collections.nCopies(2,
ResourceProfile.ANY));
- Execution finishedExecution =
executionVertices.next().getCurrentExecutionAttempt();
- Execution failedExecution =
executionVertices.next().getCurrentExecutionAttempt();
+ Iterator<ExecutionVertex> executionVertices =
+ eg.getAllExecutionVertices().iterator();
- finishedExecution.markFinished();
+ Execution finishedExecution =
+
executionVertices.next().getCurrentExecutionAttempt();
+ Execution failedExecution =
+
executionVertices.next().getCurrentExecutionAttempt();
- failedExecution.fail(new Exception("Test Exception"));
- failedExecution.completeCancelling();
+ finishedExecution.markFinished();
- taskRestartExecutor.triggerScheduledTasks();
+ failedExecution.fail(new Exception("Test Exception"));
+ failedExecution.completeCancelling();
- assertThat(eg.getState()).isEqualTo(JobStatus.RUNNING);
+ savedExecution[0] = finishedExecution;
- // At this point all resources have been assigned
- for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
- assertThat(vertex.getCurrentAssignedResource()).isNotNull();
- vertex.getCurrentExecutionAttempt().switchToInitializing();
- vertex.getCurrentExecutionAttempt().switchToRunning();
- }
+ taskRestartExecutor.triggerScheduledTasks();
+ });
- // fail old finished execution, this should not affect the
execution
- finishedExecution.fail(new Exception("This should have no
effect"));
+ // Phase 2: The restart callback has now executed (it was queued ahead
of this
+ // lambda). Verify the graph restarted and the old finished execution
is unaffected.
+ runInMainThread(
+ () -> {
+ Execution finishedExecution = savedExecution[0];
- for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
- vertex.getCurrentExecutionAttempt().markFinished();
- }
+ assertThat(eg.getState()).isEqualTo(JobStatus.RUNNING);
- // the state of the finished execution should have not changed
since it is terminal
-
assertThat(finishedExecution.getState()).isEqualTo(ExecutionState.FINISHED);
+ // At this point all resources have been assigned
+ for (ExecutionVertex vertex :
eg.getAllExecutionVertices()) {
+
assertThat(vertex.getCurrentAssignedResource()).isNotNull();
+
vertex.getCurrentExecutionAttempt().switchToInitializing();
+ vertex.getCurrentExecutionAttempt().switchToRunning();
+ }
- assertThat(eg.getState()).isEqualTo(JobStatus.FINISHED);
- }
+ // fail old finished execution, this should not affect the
execution
+ finishedExecution.fail(new Exception("This should have no
effect"));
+
+ for (ExecutionVertex vertex :
eg.getAllExecutionVertices()) {
+ vertex.getCurrentExecutionAttempt().markFinished();
+ }
+
+ // the state of the finished execution should have not
changed since it
+ // is terminal
+
assertThat(finishedExecution.getState()).isEqualTo(ExecutionState.FINISHED);
+
+ assertThat(eg.getState()).isEqualTo(JobStatus.FINISHED);
+ });
}
/**
@@ -344,41 +370,41 @@ class ExecutionGraphRestartTest {
*/
@Test
void testFailExecutionAfterCancel() throws Exception {
- try (SlotPool slotPool =
SlotPoolUtils.createDeclarativeSlotPoolBridge()) {
- SchedulerBase scheduler =
- new DefaultSchedulerBuilder(
- createJobGraphToCancel(),
- mainThreadExecutor,
- EXECUTOR_RESOURCE.getExecutor())
- .setExecutionSlotAllocatorFactory(
-
createExecutionSlotAllocatorFactory(slotPool))
- .setRestartBackoffTimeStrategy(
- new TestRestartBackoffTimeStrategy(false,
Long.MAX_VALUE))
- .setDelayExecutor(taskRestartExecutor)
- .build();
- ExecutionGraph eg = scheduler.getExecutionGraph();
-
- startScheduling(scheduler);
-
- offerSlots(slotPool, 1);
-
- // Fail right after cancel (for example with concurrent slot
release)
- scheduler.cancel();
-
- for (ExecutionVertex v : eg.getAllExecutionVertices()) {
- v.getCurrentExecutionAttempt().fail(new Exception("Test
Exception"));
- }
-
- FlinkAssertions.assertThatFuture(eg.getTerminationFuture())
- .eventuallySucceeds()
- .isEqualTo(JobStatus.CANCELED);
-
- Execution execution =
-
eg.getAllExecutionVertices().iterator().next().getCurrentExecutionAttempt();
-
- execution.completeCancelling();
- assertThat(eg.getState()).isEqualTo(JobStatus.CANCELED);
- }
+ SchedulerBase scheduler =
+ createSchedulerBuilder(createJobGraphToCancel())
+ .setRestartBackoffTimeStrategy(
+ new TestRestartBackoffTimeStrategy(false,
Long.MAX_VALUE))
+ .setDelayExecutor(taskRestartExecutor)
+ .build();
+ runInMainThread(
+ () -> {
+ ExecutionGraph eg = scheduler.getExecutionGraph();
+
+ startScheduling(scheduler);
+
+ SlotPoolUtils.offerSlotsFromMainThread(
+ slotPool,
Collections.singletonList(ResourceProfile.ANY));
+
+ // Fail right after cancel (for example with concurrent
slot release)
+ scheduler.cancel();
+
+ for (ExecutionVertex v : eg.getAllExecutionVertices()) {
+ v.getCurrentExecutionAttempt().fail(new
Exception("Test Exception"));
+ }
+
+ FlinkAssertions.assertThatFuture(eg.getTerminationFuture())
+ .eventuallySucceeds()
+ .isEqualTo(JobStatus.CANCELED);
+
+ Execution execution =
+ eg.getAllExecutionVertices()
+ .iterator()
+ .next()
+ .getCurrentExecutionAttempt();
+
+ execution.completeCancelling();
+ assertThat(eg.getState()).isEqualTo(JobStatus.CANCELED);
+ });
}
// ------------------------------------------------------------------------
@@ -391,14 +417,16 @@ class ExecutionGraphRestartTest {
assertThat(scheduler.getExecutionGraph().getState()).isEqualTo(JobStatus.RUNNING);
}
- private static ExecutionSlotAllocatorFactory
createExecutionSlotAllocatorFactory(
- SlotPool slotPool) throws Exception {
+ private DefaultSchedulerBuilder createSchedulerBuilder(JobGraph jobGraph)
throws Exception {
setupSlotPool(slotPool);
PhysicalSlotProvider physicalSlotProvider =
new PhysicalSlotProviderImpl(
LocationPreferenceSlotSelectionStrategy.createDefault(), slotPool);
- return
SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory(
- physicalSlotProvider);
+ return new DefaultSchedulerBuilder(
+ jobGraph, mainThreadExecutor,
EXECUTOR_EXTENSION.getExecutor())
+ .setExecutionSlotAllocatorFactory(
+
SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory(
+ physicalSlotProvider));
}
private static void setupSlotPool(SlotPool slotPool) throws Exception {
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolUtils.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolUtils.java
index 33dcb81cd38..bea5dbdae16 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolUtils.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolUtils.java
@@ -74,23 +74,24 @@ public class SlotPoolUtils {
SlotPool slotPool,
ComponentMainThreadExecutor mainThreadExecutor,
List<ResourceProfile> resourceProfiles) {
- return offerSlots(
- slotPool,
- mainThreadExecutor,
- resourceProfiles,
- new SimpleAckingTaskManagerGateway());
+ return CompletableFuture.supplyAsync(
+ () -> offerSlotsFromMainThread(slotPool,
resourceProfiles),
+ mainThreadExecutor)
+ .join();
}
public static ResourceID tryOfferSlots(
SlotPool slotPool,
ComponentMainThreadExecutor mainThreadExecutor,
List<ResourceProfile> resourceProfiles) {
- return offerSlots(
- slotPool,
- mainThreadExecutor,
- resourceProfiles,
- new SimpleAckingTaskManagerGateway(),
- false);
+ return CompletableFuture.supplyAsync(
+ () ->
+ tryOfferSlotsFromMainThread(
+ slotPool,
+ resourceProfiles,
+ new SimpleAckingTaskManagerGateway()),
+ mainThreadExecutor)
+ .join();
}
public static ResourceID offerSlots(
@@ -98,40 +99,81 @@ public class SlotPoolUtils {
ComponentMainThreadExecutor mainThreadExecutor,
List<ResourceProfile> resourceProfiles,
TaskManagerGateway taskManagerGateway) {
- return offerSlots(slotPool, mainThreadExecutor, resourceProfiles,
taskManagerGateway, true);
+ return CompletableFuture.supplyAsync(
+ () ->
+ offerSlotsFromMainThread(
+ slotPool, resourceProfiles,
taskManagerGateway),
+ mainThreadExecutor)
+ .join();
+ }
+
+ /**
+ * Offers slots directly on the slot pool. Must be called from the main
thread (e.g. inside a
+ * {@code runInMainThread} block). All offered slots are expected to be
accepted.
+ *
+ * @param slotPool the slot pool to offer slots to
+ * @param resourceProfiles the resource profiles of the slots to offer
+ * @return the resource ID of the registered task manager
+ */
+ public static ResourceID offerSlotsFromMainThread(
+ SlotPool slotPool, List<ResourceProfile> resourceProfiles) {
+ return offerSlotsFromMainThread(
+ slotPool, resourceProfiles, new
SimpleAckingTaskManagerGateway());
}
- private static ResourceID offerSlots(
+ /**
+ * Offers slots directly on the slot pool. Must be called from the main
thread (e.g. inside a
+ * {@code runInMainThread} block). All offered slots are expected to be
accepted.
+ *
+ * @param slotPool the slot pool to offer slots to
+ * @param resourceProfiles the resource profiles of the slots to offer
+ * @param taskManagerGateway the task manager gateway to use
+ * @return the resource ID of the registered task manager
+ */
+ public static ResourceID offerSlotsFromMainThread(
SlotPool slotPool,
- ComponentMainThreadExecutor mainThreadExecutor,
List<ResourceProfile> resourceProfiles,
- TaskManagerGateway taskManagerGateway,
- boolean assertAllSlotsAreAccepted) {
+ TaskManagerGateway taskManagerGateway) {
final TaskManagerLocation taskManagerLocation = new
LocalTaskManagerLocation();
- CompletableFuture.runAsync(
- () -> {
-
slotPool.registerTaskManager(taskManagerLocation.getResourceID());
-
- final Collection<SlotOffer> slotOffers =
- IntStream.range(0, resourceProfiles.size())
- .mapToObj(
- i ->
- new SlotOffer(
- new
AllocationID(),
- i,
-
resourceProfiles.get(i)))
- .collect(Collectors.toList());
-
- final Collection<SlotOffer> acceptedOffers =
- slotPool.offerSlots(
- taskManagerLocation,
taskManagerGateway, slotOffers);
-
- if (assertAllSlotsAreAccepted) {
-
assertThat(acceptedOffers).isEqualTo(slotOffers);
- }
- },
- mainThreadExecutor)
- .join();
+ slotPool.registerTaskManager(taskManagerLocation.getResourceID());
+
+ final Collection<SlotOffer> slotOffers =
+ IntStream.range(0, resourceProfiles.size())
+ .mapToObj(
+ i -> new SlotOffer(new AllocationID(), i,
resourceProfiles.get(i)))
+ .collect(Collectors.toList());
+
+ final Collection<SlotOffer> acceptedOffers =
+ slotPool.offerSlots(taskManagerLocation, taskManagerGateway,
slotOffers);
+
+ assertThat(acceptedOffers).isEqualTo(slotOffers);
+
+ return taskManagerLocation.getResourceID();
+ }
+
+ /**
+ * Offers slots directly on the slot pool without asserting that all slots
are accepted. Must be
+ * called from the main thread (e.g. inside a {@code runInMainThread}
block).
+ *
+ * @param slotPool the slot pool to offer slots to
+ * @param resourceProfiles the resource profiles of the slots to offer
+ * @param taskManagerGateway the task manager gateway to use
+ * @return the resource ID of the registered task manager
+ */
+ public static ResourceID tryOfferSlotsFromMainThread(
+ SlotPool slotPool,
+ List<ResourceProfile> resourceProfiles,
+ TaskManagerGateway taskManagerGateway) {
+ final TaskManagerLocation taskManagerLocation = new
LocalTaskManagerLocation();
+ slotPool.registerTaskManager(taskManagerLocation.getResourceID());
+
+ final Collection<SlotOffer> slotOffers =
+ IntStream.range(0, resourceProfiles.size())
+ .mapToObj(
+ i -> new SlotOffer(new AllocationID(), i,
resourceProfiles.get(i)))
+ .collect(Collectors.toList());
+
+ slotPool.offerSlots(taskManagerLocation, taskManagerGateway,
slotOffers);
return taskManagerLocation.getResourceID();
}