This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch release-1.19
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 216d3923380ea91383337678cda82be233aa9bb6
Author: Matthias Pohl <[email protected]>
AuthorDate: Fri Jan 31 13:04:32 2025 +0100

    [FLINK-34227][runtime] Fixes IO thread leaking into owner of the scheduler 
instance (i.e. the JobMaster)
    
    - Adds test for checking whether the scheduler closing leaks an IO thread 
via the CheckpointsCleaner to the *SchedulerTests
    - Makes CheckpointsCleaner available in 
AdaptiveBatchSchedulerFactory.createScheduler
---
 .../apache/flink/util/concurrent/FutureUtils.java  | 34 ++++++++++-
 .../flink/runtime/scheduler/SchedulerBase.java     |  5 +-
 .../scheduler/adaptive/AdaptiveScheduler.java      |  7 ++-
 .../runtime/scheduler/DefaultSchedulerTest.java    | 70 ++++++++++++++++++++++
 .../scheduler/adaptive/AdaptiveSchedulerTest.java  | 14 +++++
 .../adaptivebatch/AdaptiveBatchSchedulerTest.java  | 23 +++++++
 6 files changed, 146 insertions(+), 7 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/util/concurrent/FutureUtils.java 
b/flink-core/src/main/java/org/apache/flink/util/concurrent/FutureUtils.java
index f20baa9f2cc..91e338f42c7 100644
--- a/flink-core/src/main/java/org/apache/flink/util/concurrent/FutureUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/concurrent/FutureUtils.java
@@ -587,13 +587,41 @@ public class FutureUtils {
      */
     public static CompletableFuture<Void> composeAfterwards(
             CompletableFuture<?> future, Supplier<CompletableFuture<?>> 
composedAction) {
+        return composeAfterwardsInternal(future, composedAction, 
CompletableFuture::whenComplete);
+    }
+
+    /**
+     * Run the given asynchronous action after the completion of the given 
future. The given future
+     * can be completed normally or exceptionally. In case of an exceptional 
completion, the
+     * asynchronous action's exception will be added to the initial exception.
+     *
+     * @param future to wait for its completion
+     * @param composedAction asynchronous action which is triggered after the 
future's completion
+     * @return Future which is completed on the passed {@link Executor} after 
the asynchronous
+     *     action has completed. This future can contain an exception if an 
error occurred in the
+     *     given future or asynchronous action.
+     */
+    public static CompletableFuture<Void> composeAfterwardsAsync(
+            CompletableFuture<?> future,
+            Supplier<CompletableFuture<?>> composedAction,
+            Executor executor) {
+        return composeAfterwardsInternal(
+                future,
+                composedAction,
+                (composedActionFuture, resultFutureCompletion) ->
+                        
composedActionFuture.whenCompleteAsync(resultFutureCompletion, executor));
+    }
+
+    private static CompletableFuture<Void> composeAfterwardsInternal(
+            CompletableFuture<?> future,
+            Supplier<CompletableFuture<?>> composedAction,
+            BiConsumer<CompletableFuture<?>, BiConsumer<Object, Throwable>> 
forwardAction) {
         final CompletableFuture<Void> resultFuture = new CompletableFuture<>();
 
         future.whenComplete(
                 (Object outerIgnored, Throwable outerThrowable) -> {
-                    final CompletableFuture<?> composedActionFuture = 
composedAction.get();
-
-                    composedActionFuture.whenComplete(
+                    forwardAction.accept(
+                            composedAction.get(),
                             (Object innerIgnored, Throwable innerThrowable) -> 
{
                                 if (innerThrowable != null) {
                                     resultFuture.completeExceptionally(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index 3c9241c6b41..31654223af9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -665,12 +665,13 @@ public abstract class SchedulerBase implements 
SchedulerNG, CheckpointScheduling
         final FlinkException cause = new FlinkException("Scheduler is being 
stopped.");
 
         final CompletableFuture<Void> checkpointServicesShutdownFuture =
-                FutureUtils.composeAfterwards(
+                FutureUtils.composeAfterwardsAsync(
                         executionGraph
                                 .getTerminationFuture()
                                 .thenAcceptAsync(
                                         this::shutDownCheckpointServices, 
getMainThreadExecutor()),
-                        checkpointsCleaner::closeAsync);
+                        checkpointsCleaner::closeAsync,
+                        getMainThreadExecutor());
 
         FutureUtils.assertNoException(checkpointServicesShutdownFuture);
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index 238c594fd55..ff8f307ec68 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -571,12 +571,15 @@ public class AdaptiveScheduler
 
         backgroundTask.abort();
         // wait for the background task to finish and then close services
-        return FutureUtils.composeAfterwards(
+        return FutureUtils.composeAfterwardsAsync(
                 FutureUtils.runAfterwardsAsync(
                         backgroundTask.getTerminationFuture(),
                         () -> 
stopCheckpointServicesSafely(jobTerminationFuture.get()),
                         getMainThreadExecutor()),
-                checkpointsCleaner::closeAsync);
+                // closing the CheckpointsCleaner can complete in the 
ioExecutor when cleaning up a
+                // PendingCheckpoint
+                checkpointsCleaner::closeAsync,
+                getMainThreadExecutor());
     }
 
     private void stopCheckpointServicesSafely(JobStatus terminalState) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
index d109bb635d7..e9a3cc9cac5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
@@ -101,8 +101,10 @@ import org.apache.flink.runtime.util.ResourceCounter;
 import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.concurrent.FutureUtils;
 import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
 import org.apache.flink.util.concurrent.ScheduledExecutor;
+import org.apache.flink.util.function.BiFunctionWithException;
 
 import org.apache.flink.shaded.guava31.com.google.common.collect.Iterables;
 
@@ -1786,6 +1788,19 @@ public class DefaultSchedulerTest {
         }
     }
 
+    @Test
+    void testCloseAsyncReturnsMainThreadFuture() throws Exception {
+        runCloseAsyncCompletesInMainThreadTest(
+                scheduledExecutorService,
+                (mainThreadExecutor, checkpointsCleaner) ->
+                        createSchedulerBuilder(
+                                        singleJobVertexJobGraph(1),
+                                        mainThreadExecutor,
+                                        Collections.emptyList())
+                                .setCheckpointCleaner(checkpointsCleaner)
+                                .build());
+    }
+
     @Test
     void testJobStatusHookWithJobFailed() throws Exception {
         commonJobStatusHookTest(ExecutionState.FAILED, JobStatus.FAILED);
@@ -1931,6 +1946,61 @@ public class DefaultSchedulerTest {
         schedulerClosed.get();
     }
 
+    // visible to expose test logic to other Scheduler test classes
+    public static void runCloseAsyncCompletesInMainThreadTest(
+            ScheduledExecutorService singleThreadExecutorService,
+            BiFunctionWithException<
+                            ComponentMainThreadExecutor, CheckpointsCleaner, 
SchedulerNG, Exception>
+                    schedulerFactory)
+            throws Exception {
+        final OneShotLatch cleanerCloseLatch = new OneShotLatch();
+        final CompletableFuture<Void> cleanerCloseFuture = new 
CompletableFuture<>();
+        final CheckpointsCleaner checkpointsCleaner =
+                new CheckpointsCleaner() {
+                    @Override
+                    public CompletableFuture<Void> closeAsync() {
+                        cleanerCloseLatch.trigger();
+                        return cleanerCloseFuture;
+                    }
+                };
+
+        final ComponentMainThreadExecutor mainThreadExecutor =
+                
ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
+                        singleThreadExecutorService);
+        final SchedulerNG scheduler =
+                schedulerFactory.apply(mainThreadExecutor, checkpointsCleaner);
+
+        mainThreadExecutor.execute(scheduler::startScheduling);
+
+        final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
+        mainThreadExecutor.execute(
+                () -> {
+                    // we shouldn't block the closeAsync call here because 
it's triggering
+                    // additional tasks on the main thread internally
+                    FutureUtils.forward(
+                            scheduler
+                                    .closeAsync()
+                                    .thenRun(
+                                            () -> {
+                                                
mainThreadExecutor.assertRunningInMainThread();
+                                            }),
+                            closeFuture);
+                });
+
+        // wait for the CheckpointsCleaner#close call to not complete the 
future prematurely
+        cleanerCloseLatch.await();
+
+        // there is a race condition between returning the future and 
completing it which is due to
+        // the fact that we are triggering the latch before returning the 
future. That gives a small
+        // chance that the future completion is executed too early causing the 
future composition to
+        // end up in the main thread which is what we prevent in this test
+        Thread.sleep(50);
+        // completing this future in the test code simulates completing the
+        // CheckpointCleaner#closeAsync outside the main thread
+        cleanerCloseFuture.complete(null);
+        closeFuture.join();
+    }
+
     private static long initiateFailure(
             DefaultScheduler scheduler,
             ExecutionAttemptID executionAttemptId,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
index 979d16a29a0..7e3476ddf78 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
@@ -141,6 +141,7 @@ import java.util.stream.Collectors;
 import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex;
+import static 
org.apache.flink.runtime.jobgraph.JobGraphTestUtils.singleNoOpJobGraph;
 import static 
org.apache.flink.runtime.jobgraph.JobGraphTestUtils.streamingJobGraph;
 import static 
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolTestUtils.createSlotOffersForResourceRequirements;
 import static 
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolTestUtils.offerSlots;
@@ -945,6 +946,19 @@ public class AdaptiveSchedulerTest {
         
assertThat(checkpointIdCounterShutdownFuture.get()).isEqualTo(JobStatus.FAILED);
     }
 
+    @Test
+    void testCloseAsyncReturnsMainThreadFuture() throws Exception {
+        DefaultSchedulerTest.runCloseAsyncCompletesInMainThreadTest(
+                TEST_EXECUTOR_RESOURCE.getExecutor(),
+                (mainThreadExecutor, checkpointsCleaner) ->
+                        new AdaptiveSchedulerBuilder(
+                                        singleNoOpJobGraph(),
+                                        mainThreadExecutor,
+                                        EXECUTOR_RESOURCE.getExecutor())
+                                .setCheckpointCleaner(checkpointsCleaner)
+                                .build());
+    }
+
     @Test
     void testTransitionToStateCallsOnLeave() throws Exception {
         final AdaptiveScheduler scheduler =
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java
index 5c18de43f54..b1a4cafce96 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java
@@ -68,11 +68,13 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.stream.Collectors;
 import java.util.stream.LongStream;
 
 import static 
org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder.createCustomParallelismDecider;
+import static 
org.apache.flink.runtime.scheduler.DefaultSchedulerTest.runCloseAsyncCompletesInMainThreadTest;
 import static 
org.apache.flink.runtime.scheduler.SchedulerTestingUtils.createFailedTaskExecutionState;
 import static 
org.apache.flink.runtime.scheduler.SchedulerTestingUtils.createFinishedTaskExecutionState;
 import static 
org.apache.flink.runtime.scheduler.adaptivebatch.DefaultVertexParallelismAndInputInfosDeciderTest.createDecider;
@@ -379,6 +381,22 @@ class AdaptiveBatchSchedulerTest {
         assertThat(mergedSourceParallelismFuture.join()).isEqualTo(4);
     }
 
+    @Test
+    void testCloseAsyncReturnsMainThreadFuture() throws Exception {
+        final ScheduledExecutorService scheduledExecutorServiceForMainThread =
+                Executors.newSingleThreadScheduledExecutor();
+        try {
+            runCloseAsyncCompletesInMainThreadTest(
+                    scheduledExecutorServiceForMainThread,
+                    (mainThread, checkpointsCleaner) ->
+                            createSchedulerBuilder(createJobGraph(), 
mainThread)
+                                    .setCheckpointCleaner(checkpointsCleaner)
+                                    .buildAdaptiveBatchJobScheduler());
+        } finally {
+            scheduledExecutorServiceForMainThread.shutdownNow();
+        }
+    }
+
     void testUserConfiguredMaxParallelism(
             int globalMinParallelism,
             int globalMaxParallelism,
@@ -567,6 +585,11 @@ class AdaptiveBatchSchedulerTest {
     }
 
     private DefaultSchedulerBuilder createSchedulerBuilder(JobGraph jobGraph) {
+        return createSchedulerBuilder(jobGraph, mainThreadExecutor);
+    }
+
+    private DefaultSchedulerBuilder createSchedulerBuilder(
+            JobGraph jobGraph, ComponentMainThreadExecutor mainThreadExecutor) 
{
         return new DefaultSchedulerBuilder(
                         jobGraph, mainThreadExecutor, 
EXECUTOR_RESOURCE.getExecutor())
                 .setDelayExecutor(taskRestartExecutor);

Reply via email to