This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d6750d66bd6a6116d3fc13f90f7a7e19ac232235
Author: Till Rohrmann <[email protected]>
AuthorDate: Wed Mar 17 11:39:04 2021 +0100

    [hotfix] Add FutureUtils.switchExecutor utility
    
    The FutureUtils.switchExecutor utility allows to switch the executor for a 
given
    CompletableFuture independent of whether it is completed normally or 
exceptionally.
---
 .../flink/runtime/concurrent/FutureUtils.java      | 22 +++++++
 .../flink/runtime/concurrent/FutureUtilsTest.java  | 67 +++++++++++++++++++++-
 2 files changed, 86 insertions(+), 3 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index 629c144..4334eda 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -1344,4 +1344,26 @@ public class FutureUtils {
             }
         };
     }
+
+    /**
+     * Switches the execution context of the given source future. This works 
for normally and
+     * exceptionally completed futures.
+     *
+     * @param source source to switch the execution context for
+     * @param executor executor representing the new execution context
+     * @param <T> type of the source
+     * @return future which is executed by the given executor
+     */
+    public static <T> CompletableFuture<T> switchExecutor(
+            CompletableFuture<? extends T> source, Executor executor) {
+        return source.handleAsync(
+                (t, throwable) -> {
+                    if (throwable != null) {
+                        throw new CompletionException(throwable);
+                    } else {
+                        return t;
+                    }
+                },
+                executor);
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
index 63f221f..5d5389c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
@@ -23,12 +23,14 @@ import org.apache.flink.core.testutils.FlinkMatchers;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.testutils.executor.TestExecutorResource;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Assert;
+import org.junit.ClassRule;
 import org.junit.Test;
 
 import java.time.Duration;
@@ -39,6 +41,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
@@ -67,6 +70,10 @@ import static org.junit.Assert.fail;
 /** Tests for the utility methods in {@link FutureUtils}. */
 public class FutureUtilsTest extends TestLogger {
 
+    @ClassRule
+    public static final TestExecutorResource<ScheduledExecutorService> 
TEST_EXECUTOR_RESOURCE =
+            new 
TestExecutorResource<>(Executors::newSingleThreadScheduledExecutor);
+
     /** Tests that we can retry an operation. */
     @Test
     public void testRetrySuccess() throws Exception {
@@ -409,7 +416,7 @@ public class FutureUtilsTest extends TestLogger {
 
     @Test
     public void testRetryWithDelayAndPredicate() throws Exception {
-        final ScheduledExecutorService retryExecutor = 
Executors.newSingleThreadScheduledExecutor();
+        final ScheduledExecutorService retryExecutor = 
TEST_EXECUTOR_RESOURCE.getExecutor();
         final String retryableExceptionMessage = "first exception";
         class TestStringSupplier implements 
Supplier<CompletableFuture<String>> {
             private final AtomicInteger counter = new AtomicInteger();
@@ -440,8 +447,6 @@ public class FutureUtilsTest extends TestLogger {
                     .get();
         } catch (final ExecutionException e) {
             assertThat(e.getMessage(), containsString("should propagate"));
-        } finally {
-            retryExecutor.shutdownNow();
         }
     }
 
@@ -953,6 +958,62 @@ public class FutureUtilsTest extends TestLogger {
         assertNull(FutureUtils.getWithoutException(completableFuture));
     }
 
+    @Test
+    public void testSwitchExecutorForNormallyCompletedFuture() {
+        final CompletableFuture<String> source = new CompletableFuture<>();
+
+        final ExecutorService singleThreadExecutor = 
TEST_EXECUTOR_RESOURCE.getExecutor();
+
+        final CompletableFuture<String> resultFuture =
+                FutureUtils.switchExecutor(source, singleThreadExecutor);
+
+        final String expectedThreadName =
+                FutureUtils.supplyAsync(
+                                () -> Thread.currentThread().getName(), 
singleThreadExecutor)
+                        .join();
+        final String expectedValue = "foobar";
+
+        final CompletableFuture<Void> assertionFuture =
+                resultFuture.handle(
+                        (s, throwable) -> {
+                            assertThat(s, is(expectedValue));
+                            assertThat(Thread.currentThread().getName(), 
is(expectedThreadName));
+
+                            return null;
+                        });
+        source.complete(expectedValue);
+
+        assertionFuture.join();
+    }
+
+    @Test
+    public void testSwitchExecutorForExceptionallyCompletedFuture() {
+        final CompletableFuture<String> source = new CompletableFuture<>();
+
+        final ExecutorService singleThreadExecutor = 
TEST_EXECUTOR_RESOURCE.getExecutor();
+
+        final CompletableFuture<String> resultFuture =
+                FutureUtils.switchExecutor(source, singleThreadExecutor);
+
+        final String expectedThreadName =
+                FutureUtils.supplyAsync(
+                                () -> Thread.currentThread().getName(), 
singleThreadExecutor)
+                        .join();
+        final Exception expectedException = new Exception("foobar");
+
+        final CompletableFuture<Void> assertionFuture =
+                resultFuture.handle(
+                        (s, throwable) -> {
+                            assertThat(throwable, 
FlinkMatchers.containsCause(expectedException));
+                            assertThat(Thread.currentThread().getName(), 
is(expectedThreadName));
+
+                            return null;
+                        });
+        source.completeExceptionally(expectedException);
+
+        assertionFuture.join();
+    }
+
     private static Throwable getThrowable(CompletableFuture<?> 
completableFuture) {
         try {
             completableFuture.join();

Reply via email to