This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch release-1.19 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.19 by this push: new b44740b76cd [FLINK-28091][tests] Replaces ForkJoinPool by TestExecutorExtension b44740b76cd is described below commit b44740b76cdc975f2535e776828673f982547403 Author: Matthias Pohl <mp...@confluent.io> AuthorDate: Sun Dec 29 16:47:28 2024 +0100 [FLINK-28091][tests] Replaces ForkJoinPool by TestExecutorExtension --- .../CheckpointResourcesCleanupRunnerTest.java | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunnerTest.java index c9887ff633b..b344f7ba4e0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunnerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunnerTest.java @@ -38,6 +38,7 @@ import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.state.SharedStateRegistryFactory; +import org.apache.flink.testutils.executor.TestExecutorExtension; import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; import org.apache.flink.util.SerializedThrowable; @@ -45,12 +46,13 @@ import org.apache.flink.util.concurrent.Executors; import org.apache.flink.util.function.ThrowingConsumer; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; -import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ExecutorService; import java.util.function.Function; import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture; @@ -63,6 +65,10 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; */ class CheckpointResourcesCleanupRunnerTest { + @RegisterExtension + private static final TestExecutorExtension<ExecutorService> EXECUTOR_EXTENSION = + new TestExecutorExtension<>(java.util.concurrent.Executors::newCachedThreadPool); + private static final Time TIMEOUT_FOR_REQUESTS = Time.milliseconds(0); private static final ThrowingConsumer<CheckpointResourcesCleanupRunner, ? extends Exception> @@ -120,7 +126,7 @@ class CheckpointResourcesCleanupRunnerTest { final CheckpointResourcesCleanupRunner testInstance = new TestInstanceBuilder() .withCheckpointRecoveryFactory(checkpointRecoveryFactory) - .withExecutor(ForkJoinPool.commonPool()) + .withExecutor(EXECUTOR_EXTENSION.getExecutor()) .build(); testInstance.start(); @@ -169,7 +175,7 @@ class CheckpointResourcesCleanupRunnerTest { final CheckpointResourcesCleanupRunner testInstance = new TestInstanceBuilder() .withCheckpointRecoveryFactory(checkpointRecoveryFactory) - .withExecutor(ForkJoinPool.commonPool()) + .withExecutor(EXECUTOR_EXTENSION.getExecutor()) .build(); testInstance.start(); @@ -214,7 +220,7 @@ class CheckpointResourcesCleanupRunnerTest { final CheckpointResourcesCleanupRunner testInstance = new TestInstanceBuilder() .withCheckpointRecoveryFactory(checkpointRecoveryFactory) - .withExecutor(ForkJoinPool.commonPool()) + .withExecutor(EXECUTOR_EXTENSION.getExecutor()) .build(); testInstance.start(); @@ -242,7 +248,7 @@ class CheckpointResourcesCleanupRunnerTest { @Test void testCancellationBeforeStart() throws Exception { final CheckpointResourcesCleanupRunner testInstance = - new TestInstanceBuilder().withExecutor(ForkJoinPool.commonPool()).build(); + new TestInstanceBuilder().withExecutor(EXECUTOR_EXTENSION.getExecutor()).build(); assertThatFuture(testInstance.cancel(TIMEOUT_FOR_REQUESTS)) .eventuallyFailsWith(ExecutionException.class) @@ -262,7 +268,7 @@ class CheckpointResourcesCleanupRunnerTest { final CheckpointResourcesCleanupRunner testInstance = new TestInstanceBuilder() .withCheckpointRecoveryFactory(checkpointRecoveryFactory) - .withExecutor(ForkJoinPool.commonPool()) + .withExecutor(EXECUTOR_EXTENSION.getExecutor()) .build(); AFTER_START.accept(testInstance); assertThatFuture(testInstance.cancel(TIMEOUT_FOR_REQUESTS)) @@ -278,7 +284,7 @@ class CheckpointResourcesCleanupRunnerTest { @Test void testCancellationAfterClose() throws Exception { final CheckpointResourcesCleanupRunner testInstance = - new TestInstanceBuilder().withExecutor(ForkJoinPool.commonPool()).build(); + new TestInstanceBuilder().withExecutor(EXECUTOR_EXTENSION.getExecutor()).build(); AFTER_CLOSE.accept(testInstance); assertThatFuture(testInstance.cancel(TIMEOUT_FOR_REQUESTS)) .eventuallyFailsWith(ExecutionException.class)