Izeren commented on code in PR #27719:
URL: https://github.com/apache/flink/pull/27719#discussion_r2883885259
##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java:
##########
@@ -105,20 +111,24 @@ void testCancelAllPendingRequestWhileCanceling() throws
Exception {
JobGraph graph = JobGraphTestUtils.streamingJobGraph(sender);
SchedulerBase scheduler =
new DefaultSchedulerBuilder(
- graph, mainThreadExecutor,
EXECUTOR_RESOURCE.getExecutor())
+ graph, mainThreadExecutor,
EXECUTOR_EXTENSION.getExecutor())
.setExecutionSlotAllocatorFactory(
createExecutionSlotAllocatorFactory(slotPool))
.build();
- ExecutionGraph executionGraph = scheduler.getExecutionGraph();
- startScheduling(scheduler);
- offerSlots(slotPool, NUM_TASKS);
+ mainThreadExecutor.execute(
Review Comment:
@RocMarshal, I have tried to run this and it has passed @RepeatedTest(300)
This is a dirty patch, ofc. I would move slot related stuff to slot utils +
I would use BeforeEach@ AfterEach@ to do slot pool creation/cleanup, WDYT?
```
Index:
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
(revision b1d5bda2e386387f19d32ca71f8902f692650323)
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
(date 1772631541535)
@@ -20,29 +20,34 @@
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.core.testutils.FlinkAssertions;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import
org.apache.flink.runtime.executiongraph.failover.TestRestartBackoffTimeStrategy;
+import
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import
org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge;
+import
org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridgeBuilder;
import
org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSlotSelectionStrategy;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
-import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolUtils;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.streaming.util.RestartStrategyUtils;
import org.apache.flink.testutils.TestingUtils;
@@ -54,10 +59,13 @@
import org.junit.jupiter.api.extension.RegisterExtension;
import java.io.IOException;
-import java.util.Collections;
+import java.util.Collection;
import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import static org.assertj.core.api.Assertions.assertThat;
@@ -101,8 +109,8 @@
@Test
void testCancelAllPendingRequestWhileCanceling() throws Exception {
- try (DeclarativeSlotPoolBridge slotPool =
SlotPoolUtils.createDeclarativeSlotPoolBridge()) {
-
+ final DeclarativeSlotPoolBridge slotPool = createSlotPool();
+ try (AutoCloseable ignored = closeOnMainThread(slotPool)) {
final int numTasksExceedSlotPool = 50;
// create a graph with task count larger than slot pool
JobVertex sender =
@@ -116,7 +124,7 @@
createExecutionSlotAllocatorFactory(slotPool))
.build();
- mainThreadExecutor.execute(
+ runInMainThread(
() -> {
ExecutionGraph executionGraph =
scheduler.getExecutionGraph();
startScheduling(scheduler);
@@ -134,8 +142,8 @@
@Test
void testCancelAllPendingRequestWhileFailing() throws Exception {
- try (DeclarativeSlotPoolBridge slotPool =
SlotPoolUtils.createDeclarativeSlotPoolBridge()) {
-
+ final DeclarativeSlotPoolBridge slotPool = createSlotPool();
+ try (AutoCloseable ignored = closeOnMainThread(slotPool)) {
final int numTasksExceedSlotPool = 50;
// create a graph with task count larger than slot pool
JobVertex sender =
@@ -148,7 +156,7 @@
.setExecutionSlotAllocatorFactory(
createExecutionSlotAllocatorFactory(slotPool))
.build();
- mainThreadExecutor.execute(
+ runInMainThread(
() -> {
ExecutionGraph executionGraph =
scheduler.getExecutionGraph();
@@ -168,7 +176,8 @@
@Test
void testCancelWhileRestarting() throws Exception {
// We want to manually control the restart and delay
- try (SlotPool slotPool =
SlotPoolUtils.createDeclarativeSlotPoolBridge()) {
+ final DeclarativeSlotPoolBridge slotPool = createSlotPool();
+ try (AutoCloseable ignored = closeOnMainThread(slotPool)) {
SchedulerBase scheduler =
new DefaultSchedulerBuilder(
createJobGraph(),
@@ -181,7 +190,7 @@
.setDelayExecutor(taskRestartExecutor)
.build();
- mainThreadExecutor.execute(
+ runInMainThread(
() -> {
ExecutionGraph executionGraph =
scheduler.getExecutionGraph();
@@ -209,14 +218,43 @@
}
}
- private ResourceID offerSlots(SlotPool slotPool, int numSlots) {
- return SlotPoolUtils.offerSlots(
- slotPool, mainThreadExecutor, Collections.nCopies(numSlots,
ResourceProfile.ANY));
+ /**
+ * Offers slots directly on the slot pool. This method must be called
from the main thread
+ * (inside {@link #runInMainThread(Runnable)}), so it calls slot pool
methods directly instead
+ * of going through {@code SlotPoolUtils.offerSlots} which would
deadlock due to re-entrant
+ * {@code CompletableFuture.runAsync(..., mainThreadExecutor).join()}.
+ */
+ private static ResourceID offerSlots(SlotPool slotPool, int numSlots) {
+ final TaskManagerLocation location = new LocalTaskManagerLocation();
+ slotPool.registerTaskManager(location.getResourceID());
+ final Collection<SlotOffer> offers =
+ IntStream.range(0, numSlots)
+ .mapToObj(i -> new SlotOffer(new AllocationID(), i,
ResourceProfile.ANY))
+ .collect(Collectors.toList());
+ final Collection<SlotOffer> accepted =
+ slotPool.offerSlots(location, new
SimpleAckingTaskManagerGateway(), offers);
+ assertThat(accepted).hasSameSizeAs(offers);
+ return location.getResourceID();
+ }
+
+ private static void runInMainThread(final Runnable runnable) {
+ CompletableFuture.runAsync(runnable,
JM_MAIN_THREAD_EXECUTOR_EXTENSION.getExecutor())
+ .join();
+ }
+
+ /**
+ * Returns an {@link AutoCloseable} that closes the given slot pool on
the main thread. This
+ * allows using try-with-resources while respecting the slot pool's
main-thread assertion in
+ * {@link DeclarativeSlotPoolBridge#close()}.
+ */
+ private static AutoCloseable
closeOnMainThread(DeclarativeSlotPoolBridge slotPool) {
+ return () -> runInMainThread(slotPool::close);
}
@Test
void testCancelWhileFailing() throws Exception {
- try (SlotPool slotPool =
SlotPoolUtils.createDeclarativeSlotPoolBridge()) {
+ final DeclarativeSlotPoolBridge slotPool = createSlotPool();
+ try (AutoCloseable ignored = closeOnMainThread(slotPool)) {
SchedulerBase scheduler =
new DefaultSchedulerBuilder(
createJobGraph(),
@@ -227,7 +265,7 @@
.setRestartBackoffTimeStrategy(
new
TestRestartBackoffTimeStrategy(false, Long.MAX_VALUE))
.build();
- mainThreadExecutor.execute(
+ runInMainThread(
() -> {
ExecutionGraph graph =
scheduler.getExecutionGraph();
@@ -257,7 +295,8 @@
@Test
void testFailWhileCanceling() throws Exception {
- try (SlotPool slotPool =
SlotPoolUtils.createDeclarativeSlotPoolBridge()) {
+ final DeclarativeSlotPoolBridge slotPool = createSlotPool();
+ try (AutoCloseable ignored = closeOnMainThread(slotPool)) {
SchedulerBase scheduler =
new DefaultSchedulerBuilder(
createJobGraph(),
@@ -268,7 +307,7 @@
.setRestartBackoffTimeStrategy(
new
TestRestartBackoffTimeStrategy(false, Long.MAX_VALUE))
.build();
- mainThreadExecutor.execute(
+ runInMainThread(
() -> {
ExecutionGraph graph =
scheduler.getExecutionGraph();
@@ -311,7 +350,8 @@
ExecutionGraphTestUtils.createJobVertex("Task2", 1,
NoOpInvokable.class);
JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(sender,
receiver);
- try (SlotPool slotPool =
SlotPoolUtils.createDeclarativeSlotPoolBridge()) {
+ final DeclarativeSlotPoolBridge slotPool = createSlotPool();
+ try (AutoCloseable ignored = closeOnMainThread(slotPool)) {
SchedulerBase scheduler =
new DefaultSchedulerBuilder(
jobGraph, mainThreadExecutor,
EXECUTOR_EXTENSION.getExecutor())
@@ -322,10 +362,16 @@
.setDelayExecutor(taskRestartExecutor)
.build();
- mainThreadExecutor.execute(
+ final ExecutionGraph eg = scheduler.getExecutionGraph();
+ // Hold the original finished execution reference across
runInMainThread calls
+ final Execution[] savedExecution = new Execution[1];
+
+ // Phase 1: Start, deploy, fail one task, and trigger restart.
+ // The restart callback (restartTasks) is queued on
mainThreadExecutor via
+ // cancelFuture.thenRunAsync(..., mainThreadExecutor) and will
execute after this
+ // runInMainThread call returns.
+ runInMainThread(
() -> {
- ExecutionGraph eg = scheduler.getExecutionGraph();
-
startScheduling(scheduler);
offerSlots(slotPool, 2);
@@ -343,7 +389,16 @@
failedExecution.fail(new Exception("Test
Exception"));
failedExecution.completeCancelling();
+ savedExecution[0] = finishedExecution;
+
taskRestartExecutor.triggerScheduledTasks();
+ });
+
+ // Phase 2: The restart callback has now executed (it was
queued ahead of this
+ // lambda). Verify the graph restarted and the old finished
execution is unaffected.
+ runInMainThread(
+ () -> {
+ Execution finishedExecution = savedExecution[0];
assertThat(eg.getState()).isEqualTo(JobStatus.RUNNING);
@@ -361,8 +416,8 @@
vertex.getCurrentExecutionAttempt().markFinished();
}
- // the state of the finished execution should have
not changed since it is
- // terminal
+ // the state of the finished execution should have
not changed since it
+ // is terminal
assertThat(finishedExecution.getState()).isEqualTo(ExecutionState.FINISHED);
assertThat(eg.getState()).isEqualTo(JobStatus.FINISHED);
@@ -377,7 +432,8 @@
*/
@Test
void testFailExecutionAfterCancel() throws Exception {
- try (SlotPool slotPool =
SlotPoolUtils.createDeclarativeSlotPoolBridge()) {
+ final DeclarativeSlotPoolBridge slotPool = createSlotPool();
+ try (AutoCloseable ignored = closeOnMainThread(slotPool)) {
SchedulerBase scheduler =
new DefaultSchedulerBuilder(
createJobGraphToCancel(),
@@ -389,7 +445,7 @@
new
TestRestartBackoffTimeStrategy(false, Long.MAX_VALUE))
.setDelayExecutor(taskRestartExecutor)
.build();
- mainThreadExecutor.execute(
+ runInMainThread(
() -> {
ExecutionGraph eg = scheduler.getExecutionGraph();
@@ -440,6 +496,12 @@
physicalSlotProvider);
}
+ private DeclarativeSlotPoolBridge createSlotPool() {
+ return new DeclarativeSlotPoolBridgeBuilder()
+ .setMainThreadExecutor(mainThreadExecutor)
+ .build();
+ }
+
private static void setupSlotPool(SlotPool slotPool) throws Exception {
final String jobManagerAddress = "foobar";
final ResourceManagerGateway resourceManagerGateway = new
TestingResourceManagerGateway();
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]