This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new c3d56df56fe [FLINK-28091][tests] Replaces ForkJoinPool by TestExecutorExtension c3d56df56fe is described below commit c3d56df56fed02d92fbaef36830d0fd73bfe4845 Author: Matthias Pohl <mp...@confluent.io> AuthorDate: Sun Dec 29 16:47:28 2024 +0100 [FLINK-28091][tests] Replaces ForkJoinPool by TestExecutorExtension --- .../CheckpointResourcesCleanupRunnerTest.java | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunnerTest.java index 40123ebaa06..5dbe48c120e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunnerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunnerTest.java @@ -37,6 +37,7 @@ import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.state.SharedStateRegistryFactory; +import org.apache.flink.testutils.executor.TestExecutorExtension; import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; import org.apache.flink.util.SerializedThrowable; @@ -44,13 +45,14 @@ import org.apache.flink.util.concurrent.Executors; import org.apache.flink.util.function.ThrowingConsumer; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; import java.time.Duration; import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; -import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ExecutorService; import java.util.function.Function; import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture; @@ -63,6 +65,10 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; */ class CheckpointResourcesCleanupRunnerTest { + @RegisterExtension + private static final TestExecutorExtension<ExecutorService> EXECUTOR_EXTENSION = + new TestExecutorExtension<>(java.util.concurrent.Executors::newCachedThreadPool); + private static final Duration TIMEOUT_FOR_REQUESTS = Duration.ofMillis(0); private static final ThrowingConsumer<CheckpointResourcesCleanupRunner, ? extends Exception> @@ -120,7 +126,7 @@ class CheckpointResourcesCleanupRunnerTest { final CheckpointResourcesCleanupRunner testInstance = new TestInstanceBuilder() .withCheckpointRecoveryFactory(checkpointRecoveryFactory) - .withExecutor(ForkJoinPool.commonPool()) + .withExecutor(EXECUTOR_EXTENSION.getExecutor()) .build(); testInstance.start(); @@ -169,7 +175,7 @@ class CheckpointResourcesCleanupRunnerTest { final CheckpointResourcesCleanupRunner testInstance = new TestInstanceBuilder() .withCheckpointRecoveryFactory(checkpointRecoveryFactory) - .withExecutor(ForkJoinPool.commonPool()) + .withExecutor(EXECUTOR_EXTENSION.getExecutor()) .build(); testInstance.start(); @@ -214,7 +220,7 @@ class CheckpointResourcesCleanupRunnerTest { final CheckpointResourcesCleanupRunner testInstance = new TestInstanceBuilder() .withCheckpointRecoveryFactory(checkpointRecoveryFactory) - .withExecutor(ForkJoinPool.commonPool()) + .withExecutor(EXECUTOR_EXTENSION.getExecutor()) .build(); testInstance.start(); @@ -242,7 +248,7 @@ class CheckpointResourcesCleanupRunnerTest { @Test void testCancellationBeforeStart() throws Exception { final CheckpointResourcesCleanupRunner testInstance = - new TestInstanceBuilder().withExecutor(ForkJoinPool.commonPool()).build(); + new TestInstanceBuilder().withExecutor(EXECUTOR_EXTENSION.getExecutor()).build(); assertThatFuture(testInstance.cancel(TIMEOUT_FOR_REQUESTS)) .eventuallyFailsWith(ExecutionException.class) @@ -262,7 +268,7 @@ class CheckpointResourcesCleanupRunnerTest { final CheckpointResourcesCleanupRunner testInstance = new TestInstanceBuilder() .withCheckpointRecoveryFactory(checkpointRecoveryFactory) - .withExecutor(ForkJoinPool.commonPool()) + .withExecutor(EXECUTOR_EXTENSION.getExecutor()) .build(); AFTER_START.accept(testInstance); assertThatFuture(testInstance.cancel(TIMEOUT_FOR_REQUESTS)) @@ -278,7 +284,7 @@ class CheckpointResourcesCleanupRunnerTest { @Test void testCancellationAfterClose() throws Exception { final CheckpointResourcesCleanupRunner testInstance = - new TestInstanceBuilder().withExecutor(ForkJoinPool.commonPool()).build(); + new TestInstanceBuilder().withExecutor(EXECUTOR_EXTENSION.getExecutor()).build(); AFTER_CLOSE.accept(testInstance); assertThatFuture(testInstance.cancel(TIMEOUT_FOR_REQUESTS)) .eventuallyFailsWith(ExecutionException.class)