This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch release-1.19 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 216d3923380ea91383337678cda82be233aa9bb6 Author: Matthias Pohl <[email protected]> AuthorDate: Fri Jan 31 13:04:32 2025 +0100 [FLINK-34227][runtime] Fixes IO thread leaking into owner of the scheduler instance (i.e. the JobMaster) - Adds test for checking whether the scheduler closing leaks an IO thread via the CheckpointsCleaner to the *SchedulerTests - Makes CheckpointsCleaner available in AdaptiveBatchSchedulerFactory.createScheduler --- .../apache/flink/util/concurrent/FutureUtils.java | 34 ++++++++++- .../flink/runtime/scheduler/SchedulerBase.java | 5 +- .../scheduler/adaptive/AdaptiveScheduler.java | 7 ++- .../runtime/scheduler/DefaultSchedulerTest.java | 70 ++++++++++++++++++++++ .../scheduler/adaptive/AdaptiveSchedulerTest.java | 14 +++++ .../adaptivebatch/AdaptiveBatchSchedulerTest.java | 23 +++++++ 6 files changed, 146 insertions(+), 7 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/util/concurrent/FutureUtils.java b/flink-core/src/main/java/org/apache/flink/util/concurrent/FutureUtils.java index f20baa9f2cc..91e338f42c7 100644 --- a/flink-core/src/main/java/org/apache/flink/util/concurrent/FutureUtils.java +++ b/flink-core/src/main/java/org/apache/flink/util/concurrent/FutureUtils.java @@ -587,13 +587,41 @@ public class FutureUtils { */ public static CompletableFuture<Void> composeAfterwards( CompletableFuture<?> future, Supplier<CompletableFuture<?>> composedAction) { + return composeAfterwardsInternal(future, composedAction, CompletableFuture::whenComplete); + } + + /** + * Run the given asynchronous action after the completion of the given future. The given future + * can be completed normally or exceptionally. In case of an exceptional completion, the + * asynchronous action's exception will be added to the initial exception. + * + * @param future to wait for its completion + * @param composedAction asynchronous action which is triggered after the future's completion + * @return Future which is completed on the passed {@link Executor} after the asynchronous + * action has completed. This future can contain an exception if an error occurred in the + * given future or asynchronous action. + */ + public static CompletableFuture<Void> composeAfterwardsAsync( + CompletableFuture<?> future, + Supplier<CompletableFuture<?>> composedAction, + Executor executor) { + return composeAfterwardsInternal( + future, + composedAction, + (composedActionFuture, resultFutureCompletion) -> + composedActionFuture.whenCompleteAsync(resultFutureCompletion, executor)); + } + + private static CompletableFuture<Void> composeAfterwardsInternal( + CompletableFuture<?> future, + Supplier<CompletableFuture<?>> composedAction, + BiConsumer<CompletableFuture<?>, BiConsumer<Object, Throwable>> forwardAction) { final CompletableFuture<Void> resultFuture = new CompletableFuture<>(); future.whenComplete( (Object outerIgnored, Throwable outerThrowable) -> { - final CompletableFuture<?> composedActionFuture = composedAction.get(); - - composedActionFuture.whenComplete( + forwardAction.accept( + composedAction.get(), (Object innerIgnored, Throwable innerThrowable) -> { if (innerThrowable != null) { resultFuture.completeExceptionally( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java index 3c9241c6b41..31654223af9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java @@ -665,12 +665,13 @@ public abstract class SchedulerBase implements SchedulerNG, CheckpointScheduling final FlinkException cause = new FlinkException("Scheduler is being stopped."); final CompletableFuture<Void> checkpointServicesShutdownFuture = - FutureUtils.composeAfterwards( + FutureUtils.composeAfterwardsAsync( executionGraph .getTerminationFuture() .thenAcceptAsync( this::shutDownCheckpointServices, getMainThreadExecutor()), - checkpointsCleaner::closeAsync); + checkpointsCleaner::closeAsync, + getMainThreadExecutor()); FutureUtils.assertNoException(checkpointServicesShutdownFuture); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index 238c594fd55..ff8f307ec68 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -571,12 +571,15 @@ public class AdaptiveScheduler backgroundTask.abort(); // wait for the background task to finish and then close services - return FutureUtils.composeAfterwards( + return FutureUtils.composeAfterwardsAsync( FutureUtils.runAfterwardsAsync( backgroundTask.getTerminationFuture(), () -> stopCheckpointServicesSafely(jobTerminationFuture.get()), getMainThreadExecutor()), - checkpointsCleaner::closeAsync); + // closing the CheckpointsCleaner can complete in the ioExecutor when cleaning up a + // PendingCheckpoint + checkpointsCleaner::closeAsync, + getMainThreadExecutor()); } private void stopCheckpointServicesSafely(JobStatus terminalState) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java index d109bb635d7..e9a3cc9cac5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java @@ -101,8 +101,10 @@ import org.apache.flink.runtime.util.ResourceCounter; import org.apache.flink.util.ExecutorUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.concurrent.FutureUtils; import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor; import org.apache.flink.util.concurrent.ScheduledExecutor; +import org.apache.flink.util.function.BiFunctionWithException; import org.apache.flink.shaded.guava31.com.google.common.collect.Iterables; @@ -1786,6 +1788,19 @@ public class DefaultSchedulerTest { } } + @Test + void testCloseAsyncReturnsMainThreadFuture() throws Exception { + runCloseAsyncCompletesInMainThreadTest( + scheduledExecutorService, + (mainThreadExecutor, checkpointsCleaner) -> + createSchedulerBuilder( + singleJobVertexJobGraph(1), + mainThreadExecutor, + Collections.emptyList()) + .setCheckpointCleaner(checkpointsCleaner) + .build()); + } + @Test void testJobStatusHookWithJobFailed() throws Exception { commonJobStatusHookTest(ExecutionState.FAILED, JobStatus.FAILED); @@ -1931,6 +1946,61 @@ public class DefaultSchedulerTest { schedulerClosed.get(); } + // visible to expose test logic to other Scheduler test classes + public static void runCloseAsyncCompletesInMainThreadTest( + ScheduledExecutorService singleThreadExecutorService, + BiFunctionWithException< + ComponentMainThreadExecutor, CheckpointsCleaner, SchedulerNG, Exception> + schedulerFactory) + throws Exception { + final OneShotLatch cleanerCloseLatch = new OneShotLatch(); + final CompletableFuture<Void> cleanerCloseFuture = new CompletableFuture<>(); + final CheckpointsCleaner checkpointsCleaner = + new CheckpointsCleaner() { + @Override + public CompletableFuture<Void> closeAsync() { + cleanerCloseLatch.trigger(); + return cleanerCloseFuture; + } + }; + + final ComponentMainThreadExecutor mainThreadExecutor = + ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor( + singleThreadExecutorService); + final SchedulerNG scheduler = + schedulerFactory.apply(mainThreadExecutor, checkpointsCleaner); + + mainThreadExecutor.execute(scheduler::startScheduling); + + final CompletableFuture<Void> closeFuture = new CompletableFuture<>(); + mainThreadExecutor.execute( + () -> { + // we shouldn't block the closeAsync call here because it's triggering + // additional tasks on the main thread internally + FutureUtils.forward( + scheduler + .closeAsync() + .thenRun( + () -> { + mainThreadExecutor.assertRunningInMainThread(); + }), + closeFuture); + }); + + // wait for the CheckpointsCleaner#close call to not complete the future prematurely + cleanerCloseLatch.await(); + + // there is a race condition between returning the future and completing it which is due to + // the fact that we are triggering the latch before returning the future. That gives a small + // chance that the future completion is executed too early causing the future composition to + // end up in the main thread which is what we prevent in this test + Thread.sleep(50); + // completing this future in the test code simulates completing the + // CheckpointCleaner#closeAsync outside the main thread + cleanerCloseFuture.complete(null); + closeFuture.join(); + } + private static long initiateFailure( DefaultScheduler scheduler, ExecutionAttemptID executionAttemptId, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java index 979d16a29a0..7e3476ddf78 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java @@ -141,6 +141,7 @@ import java.util.stream.Collectors; import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture; import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId; import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex; +import static org.apache.flink.runtime.jobgraph.JobGraphTestUtils.singleNoOpJobGraph; import static org.apache.flink.runtime.jobgraph.JobGraphTestUtils.streamingJobGraph; import static org.apache.flink.runtime.jobmaster.slotpool.SlotPoolTestUtils.createSlotOffersForResourceRequirements; import static org.apache.flink.runtime.jobmaster.slotpool.SlotPoolTestUtils.offerSlots; @@ -945,6 +946,19 @@ public class AdaptiveSchedulerTest { assertThat(checkpointIdCounterShutdownFuture.get()).isEqualTo(JobStatus.FAILED); } + @Test + void testCloseAsyncReturnsMainThreadFuture() throws Exception { + DefaultSchedulerTest.runCloseAsyncCompletesInMainThreadTest( + TEST_EXECUTOR_RESOURCE.getExecutor(), + (mainThreadExecutor, checkpointsCleaner) -> + new AdaptiveSchedulerBuilder( + singleNoOpJobGraph(), + mainThreadExecutor, + EXECUTOR_RESOURCE.getExecutor()) + .setCheckpointCleaner(checkpointsCleaner) + .build()); + } + @Test void testTransitionToStateCallsOnLeave() throws Exception { final AdaptiveScheduler scheduler = diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java index 5c18de43f54..b1a4cafce96 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java @@ -68,11 +68,13 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.stream.Collectors; import java.util.stream.LongStream; import static org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder.createCustomParallelismDecider; +import static org.apache.flink.runtime.scheduler.DefaultSchedulerTest.runCloseAsyncCompletesInMainThreadTest; import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.createFailedTaskExecutionState; import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.createFinishedTaskExecutionState; import static org.apache.flink.runtime.scheduler.adaptivebatch.DefaultVertexParallelismAndInputInfosDeciderTest.createDecider; @@ -379,6 +381,22 @@ class AdaptiveBatchSchedulerTest { assertThat(mergedSourceParallelismFuture.join()).isEqualTo(4); } + @Test + void testCloseAsyncReturnsMainThreadFuture() throws Exception { + final ScheduledExecutorService scheduledExecutorServiceForMainThread = + Executors.newSingleThreadScheduledExecutor(); + try { + runCloseAsyncCompletesInMainThreadTest( + scheduledExecutorServiceForMainThread, + (mainThread, checkpointsCleaner) -> + createSchedulerBuilder(createJobGraph(), mainThread) + .setCheckpointCleaner(checkpointsCleaner) + .buildAdaptiveBatchJobScheduler()); + } finally { + scheduledExecutorServiceForMainThread.shutdownNow(); + } + } + void testUserConfiguredMaxParallelism( int globalMinParallelism, int globalMaxParallelism, @@ -567,6 +585,11 @@ class AdaptiveBatchSchedulerTest { } private DefaultSchedulerBuilder createSchedulerBuilder(JobGraph jobGraph) { + return createSchedulerBuilder(jobGraph, mainThreadExecutor); + } + + private DefaultSchedulerBuilder createSchedulerBuilder( + JobGraph jobGraph, ComponentMainThreadExecutor mainThreadExecutor) { return new DefaultSchedulerBuilder( jobGraph, mainThreadExecutor, EXECUTOR_RESOURCE.getExecutor()) .setDelayExecutor(taskRestartExecutor);
