Izeren commented on code in PR #27719:
URL: https://github.com/apache/flink/pull/27719#discussion_r2883885259


##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java:
##########
@@ -105,20 +111,24 @@ void testCancelAllPendingRequestWhileCanceling() throws 
Exception {
             JobGraph graph = JobGraphTestUtils.streamingJobGraph(sender);
             SchedulerBase scheduler =
                     new DefaultSchedulerBuilder(
-                                    graph, mainThreadExecutor, 
EXECUTOR_RESOURCE.getExecutor())
+                                    graph, mainThreadExecutor, 
EXECUTOR_EXTENSION.getExecutor())
                             .setExecutionSlotAllocatorFactory(
                                     
createExecutionSlotAllocatorFactory(slotPool))
                             .build();
-            ExecutionGraph executionGraph = scheduler.getExecutionGraph();
 
-            startScheduling(scheduler);
-            offerSlots(slotPool, NUM_TASKS);
+            mainThreadExecutor.execute(

Review Comment:
   @RocMarshal, I have tried to run this and it has passed @RepeatedTest(300) 
   
   This is a dirty patch, ofc. I would move slot related stuff to slot utils + 
I would use @BeforeEach @AfterEach to do slot pool creation/cleanup, WDYT?
   
   
   ```suggestion
   Index: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
   IDEA additional info:
   Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
   <+>UTF-8
   ===================================================================
   diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
   --- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
     (revision b1d5bda2e386387f19d32ca71f8902f692650323)
   +++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
     (date 1772631541535)
   @@ -20,29 +20,34 @@
    
    import org.apache.flink.api.common.JobStatus;
    import org.apache.flink.core.testutils.FlinkAssertions;
   +import org.apache.flink.runtime.clusterframework.types.AllocationID;
    import org.apache.flink.runtime.clusterframework.types.ResourceID;
    import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
    import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
    import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
    import org.apache.flink.runtime.execution.ExecutionState;
    import 
org.apache.flink.runtime.executiongraph.failover.TestRestartBackoffTimeStrategy;
   +import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
    import org.apache.flink.runtime.jobgraph.JobGraph;
    import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
    import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
    import org.apache.flink.runtime.jobgraph.JobVertex;
    import org.apache.flink.runtime.jobmaster.JobMasterId;
    import 
org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge;
   +import 
org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridgeBuilder;
    import 
org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSlotSelectionStrategy;
    import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
    import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl;
    import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
   -import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolUtils;
    import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
    import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
    import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
    import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
    import org.apache.flink.runtime.scheduler.SchedulerBase;
    import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
   +import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
   +import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
   +import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
    import org.apache.flink.runtime.testtasks.NoOpInvokable;
    import org.apache.flink.streaming.util.RestartStrategyUtils;
    import org.apache.flink.testutils.TestingUtils;
   @@ -54,10 +59,13 @@
    import org.junit.jupiter.api.extension.RegisterExtension;
    
    import java.io.IOException;
   -import java.util.Collections;
   +import java.util.Collection;
    import java.util.Iterator;
   +import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.function.Consumer;
   +import java.util.stream.Collectors;
   +import java.util.stream.IntStream;
    
    import static org.assertj.core.api.Assertions.assertThat;
    
   @@ -101,8 +109,8 @@
    
        @Test
        void testCancelAllPendingRequestWhileCanceling() throws Exception {
   -        try (DeclarativeSlotPoolBridge slotPool = 
SlotPoolUtils.createDeclarativeSlotPoolBridge()) {
   -
   +        final DeclarativeSlotPoolBridge slotPool = createSlotPool();
   +        try (AutoCloseable ignored = closeOnMainThread(slotPool)) {
                final int numTasksExceedSlotPool = 50;
                // create a graph with task count larger than slot pool
                JobVertex sender =
   @@ -116,7 +124,7 @@
                                        
createExecutionSlotAllocatorFactory(slotPool))
                                .build();
    
   -            mainThreadExecutor.execute(
   +            runInMainThread(
                        () -> {
                            ExecutionGraph executionGraph = 
scheduler.getExecutionGraph();
                            startScheduling(scheduler);
   @@ -134,8 +142,8 @@
    
        @Test
        void testCancelAllPendingRequestWhileFailing() throws Exception {
   -        try (DeclarativeSlotPoolBridge slotPool = 
SlotPoolUtils.createDeclarativeSlotPoolBridge()) {
   -
   +        final DeclarativeSlotPoolBridge slotPool = createSlotPool();
   +        try (AutoCloseable ignored = closeOnMainThread(slotPool)) {
                final int numTasksExceedSlotPool = 50;
                // create a graph with task count larger than slot pool
                JobVertex sender =
   @@ -148,7 +156,7 @@
                                .setExecutionSlotAllocatorFactory(
                                        
createExecutionSlotAllocatorFactory(slotPool))
                                .build();
   -            mainThreadExecutor.execute(
   +            runInMainThread(
                        () -> {
                            ExecutionGraph executionGraph = 
scheduler.getExecutionGraph();
    
   @@ -168,7 +176,8 @@
        @Test
        void testCancelWhileRestarting() throws Exception {
            // We want to manually control the restart and delay
   -        try (SlotPool slotPool = 
SlotPoolUtils.createDeclarativeSlotPoolBridge()) {
   +        final DeclarativeSlotPoolBridge slotPool = createSlotPool();
   +        try (AutoCloseable ignored = closeOnMainThread(slotPool)) {
                SchedulerBase scheduler =
                        new DefaultSchedulerBuilder(
                                        createJobGraph(),
   @@ -181,7 +190,7 @@
                                .setDelayExecutor(taskRestartExecutor)
                                .build();
    
   -            mainThreadExecutor.execute(
   +            runInMainThread(
                        () -> {
                            ExecutionGraph executionGraph = 
scheduler.getExecutionGraph();
    
   @@ -209,14 +218,43 @@
            }
        }
    
   -    private ResourceID offerSlots(SlotPool slotPool, int numSlots) {
   -        return SlotPoolUtils.offerSlots(
   -                slotPool, mainThreadExecutor, Collections.nCopies(numSlots, 
ResourceProfile.ANY));
   +    /**
   +     * Offers slots directly on the slot pool. This method must be called 
from the main thread
   +     * (inside {@link #runInMainThread(Runnable)}), so it calls slot pool 
methods directly instead
   +     * of going through {@code SlotPoolUtils.offerSlots} which would 
deadlock due to re-entrant
   +     * {@code CompletableFuture.runAsync(..., mainThreadExecutor).join()}.
   +     */
   +    private static ResourceID offerSlots(SlotPool slotPool, int numSlots) {
   +        final TaskManagerLocation location = new LocalTaskManagerLocation();
   +        slotPool.registerTaskManager(location.getResourceID());
   +        final Collection<SlotOffer> offers =
   +                IntStream.range(0, numSlots)
   +                        .mapToObj(i -> new SlotOffer(new AllocationID(), i, 
ResourceProfile.ANY))
   +                        .collect(Collectors.toList());
   +        final Collection<SlotOffer> accepted =
   +                slotPool.offerSlots(location, new 
SimpleAckingTaskManagerGateway(), offers);
   +        assertThat(accepted).hasSameSizeAs(offers);
   +        return location.getResourceID();
   +    }
   +
   +    private static void runInMainThread(final Runnable runnable) {
   +        CompletableFuture.runAsync(runnable, 
JM_MAIN_THREAD_EXECUTOR_EXTENSION.getExecutor())
   +                .join();
   +    }
   +
   +    /**
   +     * Returns an {@link AutoCloseable} that closes the given slot pool on 
the main thread. This
   +     * allows using try-with-resources while respecting the slot pool's 
main-thread assertion in
   +     * {@link DeclarativeSlotPoolBridge#close()}.
   +     */
   +    private static AutoCloseable 
closeOnMainThread(DeclarativeSlotPoolBridge slotPool) {
   +        return () -> runInMainThread(slotPool::close);
        }
    
        @Test
        void testCancelWhileFailing() throws Exception {
   -        try (SlotPool slotPool = 
SlotPoolUtils.createDeclarativeSlotPoolBridge()) {
   +        final DeclarativeSlotPoolBridge slotPool = createSlotPool();
   +        try (AutoCloseable ignored = closeOnMainThread(slotPool)) {
                SchedulerBase scheduler =
                        new DefaultSchedulerBuilder(
                                        createJobGraph(),
   @@ -227,7 +265,7 @@
                                .setRestartBackoffTimeStrategy(
                                        new 
TestRestartBackoffTimeStrategy(false, Long.MAX_VALUE))
                                .build();
   -            mainThreadExecutor.execute(
   +            runInMainThread(
                        () -> {
                            ExecutionGraph graph = 
scheduler.getExecutionGraph();
    
   @@ -257,7 +295,8 @@
    
        @Test
        void testFailWhileCanceling() throws Exception {
   -        try (SlotPool slotPool = 
SlotPoolUtils.createDeclarativeSlotPoolBridge()) {
   +        final DeclarativeSlotPoolBridge slotPool = createSlotPool();
   +        try (AutoCloseable ignored = closeOnMainThread(slotPool)) {
                SchedulerBase scheduler =
                        new DefaultSchedulerBuilder(
                                        createJobGraph(),
   @@ -268,7 +307,7 @@
                                .setRestartBackoffTimeStrategy(
                                        new 
TestRestartBackoffTimeStrategy(false, Long.MAX_VALUE))
                                .build();
   -            mainThreadExecutor.execute(
   +            runInMainThread(
                        () -> {
                            ExecutionGraph graph = 
scheduler.getExecutionGraph();
    
   @@ -311,7 +350,8 @@
                    ExecutionGraphTestUtils.createJobVertex("Task2", 1, 
NoOpInvokable.class);
            JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(sender, 
receiver);
    
   -        try (SlotPool slotPool = 
SlotPoolUtils.createDeclarativeSlotPoolBridge()) {
   +        final DeclarativeSlotPoolBridge slotPool = createSlotPool();
   +        try (AutoCloseable ignored = closeOnMainThread(slotPool)) {
                SchedulerBase scheduler =
                        new DefaultSchedulerBuilder(
                                        jobGraph, mainThreadExecutor, 
EXECUTOR_EXTENSION.getExecutor())
   @@ -322,10 +362,16 @@
                                .setDelayExecutor(taskRestartExecutor)
                                .build();
    
   -            mainThreadExecutor.execute(
   +            final ExecutionGraph eg = scheduler.getExecutionGraph();
   +            // Hold the original finished execution reference across 
runInMainThread calls
   +            final Execution[] savedExecution = new Execution[1];
   +
   +            // Phase 1: Start, deploy, fail one task, and trigger restart.
   +            // The restart callback (restartTasks) is queued on 
mainThreadExecutor via
   +            // cancelFuture.thenRunAsync(..., mainThreadExecutor) and will 
execute after this
   +            // runInMainThread call returns.
   +            runInMainThread(
                        () -> {
   -                        ExecutionGraph eg = scheduler.getExecutionGraph();
   -
                            startScheduling(scheduler);
    
                            offerSlots(slotPool, 2);
   @@ -343,7 +389,16 @@
                            failedExecution.fail(new Exception("Test 
Exception"));
                            failedExecution.completeCancelling();
    
   +                        savedExecution[0] = finishedExecution;
   +
                            taskRestartExecutor.triggerScheduledTasks();
   +                    });
   +
   +            // Phase 2: The restart callback has now executed (it was 
queued ahead of this
   +            // lambda). Verify the graph restarted and the old finished 
execution is unaffected.
   +            runInMainThread(
   +                    () -> {
   +                        Execution finishedExecution = savedExecution[0];
    
                            
assertThat(eg.getState()).isEqualTo(JobStatus.RUNNING);
    
   @@ -361,8 +416,8 @@
                                
vertex.getCurrentExecutionAttempt().markFinished();
                            }
    
   -                        // the state of the finished execution should have 
not changed since it is
   -                        // terminal
   +                        // the state of the finished execution should have 
not changed since it
   +                        // is terminal
                            
assertThat(finishedExecution.getState()).isEqualTo(ExecutionState.FINISHED);
    
                            
assertThat(eg.getState()).isEqualTo(JobStatus.FINISHED);
   @@ -377,7 +432,8 @@
         */
        @Test
        void testFailExecutionAfterCancel() throws Exception {
   -        try (SlotPool slotPool = 
SlotPoolUtils.createDeclarativeSlotPoolBridge()) {
   +        final DeclarativeSlotPoolBridge slotPool = createSlotPool();
   +        try (AutoCloseable ignored = closeOnMainThread(slotPool)) {
                SchedulerBase scheduler =
                        new DefaultSchedulerBuilder(
                                        createJobGraphToCancel(),
   @@ -389,7 +445,7 @@
                                        new 
TestRestartBackoffTimeStrategy(false, Long.MAX_VALUE))
                                .setDelayExecutor(taskRestartExecutor)
                                .build();
   -            mainThreadExecutor.execute(
   +            runInMainThread(
                        () -> {
                            ExecutionGraph eg = scheduler.getExecutionGraph();
    
   @@ -440,6 +496,12 @@
                    physicalSlotProvider);
        }
    
   +    private DeclarativeSlotPoolBridge createSlotPool() {
   +        return new DeclarativeSlotPoolBridgeBuilder()
   +                .setMainThreadExecutor(mainThreadExecutor)
   +                .build();
   +    }
   +
        private static void setupSlotPool(SlotPool slotPool) throws Exception {
            final String jobManagerAddress = "foobar";
            final ResourceManagerGateway resourceManagerGateway = new 
TestingResourceManagerGateway();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to