This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c3d56df56fe [FLINK-28091][tests] Replaces ForkJoinPool by 
TestExecutorExtension
c3d56df56fe is described below

commit c3d56df56fed02d92fbaef36830d0fd73bfe4845
Author: Matthias Pohl <mp...@confluent.io>
AuthorDate: Sun Dec 29 16:47:28 2024 +0100

    [FLINK-28091][tests] Replaces ForkJoinPool by TestExecutorExtension
---
 .../CheckpointResourcesCleanupRunnerTest.java        | 20 +++++++++++++-------
 1 file changed, 13 insertions(+), 7 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunnerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunnerTest.java
index 40123ebaa06..5dbe48c120e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunnerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunnerTest.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.SharedStateRegistryFactory;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedThrowable;
@@ -44,13 +45,14 @@ import org.apache.flink.util.concurrent.Executors;
 import org.apache.flink.util.function.ThrowingConsumer;
 
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
 
 import java.time.Duration;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
-import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ExecutorService;
 import java.util.function.Function;
 
 import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
@@ -63,6 +65,10 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
  */
 class CheckpointResourcesCleanupRunnerTest {
 
+    @RegisterExtension
+    private static final TestExecutorExtension<ExecutorService> 
EXECUTOR_EXTENSION =
+            new 
TestExecutorExtension<>(java.util.concurrent.Executors::newCachedThreadPool);
+
     private static final Duration TIMEOUT_FOR_REQUESTS = Duration.ofMillis(0);
 
     private static final ThrowingConsumer<CheckpointResourcesCleanupRunner, ? 
extends Exception>
@@ -120,7 +126,7 @@ class CheckpointResourcesCleanupRunnerTest {
         final CheckpointResourcesCleanupRunner testInstance =
                 new TestInstanceBuilder()
                         
.withCheckpointRecoveryFactory(checkpointRecoveryFactory)
-                        .withExecutor(ForkJoinPool.commonPool())
+                        .withExecutor(EXECUTOR_EXTENSION.getExecutor())
                         .build();
         testInstance.start();
 
@@ -169,7 +175,7 @@ class CheckpointResourcesCleanupRunnerTest {
         final CheckpointResourcesCleanupRunner testInstance =
                 new TestInstanceBuilder()
                         
.withCheckpointRecoveryFactory(checkpointRecoveryFactory)
-                        .withExecutor(ForkJoinPool.commonPool())
+                        .withExecutor(EXECUTOR_EXTENSION.getExecutor())
                         .build();
         testInstance.start();
 
@@ -214,7 +220,7 @@ class CheckpointResourcesCleanupRunnerTest {
         final CheckpointResourcesCleanupRunner testInstance =
                 new TestInstanceBuilder()
                         
.withCheckpointRecoveryFactory(checkpointRecoveryFactory)
-                        .withExecutor(ForkJoinPool.commonPool())
+                        .withExecutor(EXECUTOR_EXTENSION.getExecutor())
                         .build();
         testInstance.start();
 
@@ -242,7 +248,7 @@ class CheckpointResourcesCleanupRunnerTest {
     @Test
     void testCancellationBeforeStart() throws Exception {
         final CheckpointResourcesCleanupRunner testInstance =
-                new 
TestInstanceBuilder().withExecutor(ForkJoinPool.commonPool()).build();
+                new 
TestInstanceBuilder().withExecutor(EXECUTOR_EXTENSION.getExecutor()).build();
 
         assertThatFuture(testInstance.cancel(TIMEOUT_FOR_REQUESTS))
                 .eventuallyFailsWith(ExecutionException.class)
@@ -262,7 +268,7 @@ class CheckpointResourcesCleanupRunnerTest {
         final CheckpointResourcesCleanupRunner testInstance =
                 new TestInstanceBuilder()
                         
.withCheckpointRecoveryFactory(checkpointRecoveryFactory)
-                        .withExecutor(ForkJoinPool.commonPool())
+                        .withExecutor(EXECUTOR_EXTENSION.getExecutor())
                         .build();
         AFTER_START.accept(testInstance);
         assertThatFuture(testInstance.cancel(TIMEOUT_FOR_REQUESTS))
@@ -278,7 +284,7 @@ class CheckpointResourcesCleanupRunnerTest {
     @Test
     void testCancellationAfterClose() throws Exception {
         final CheckpointResourcesCleanupRunner testInstance =
-                new 
TestInstanceBuilder().withExecutor(ForkJoinPool.commonPool()).build();
+                new 
TestInstanceBuilder().withExecutor(EXECUTOR_EXTENSION.getExecutor()).build();
         AFTER_CLOSE.accept(testInstance);
         assertThatFuture(testInstance.cancel(TIMEOUT_FOR_REQUESTS))
                 .eventuallyFailsWith(ExecutionException.class)

Reply via email to