This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit d6750d66bd6a6116d3fc13f90f7a7e19ac232235 Author: Till Rohrmann <[email protected]> AuthorDate: Wed Mar 17 11:39:04 2021 +0100 [hotfix] Add FutureUtils.switchExecutor utility The FutureUtils.switchExecutor utility allows to switch the executor for a given CompletableFuture independent of whether it is completed normally or exceptionally. --- .../flink/runtime/concurrent/FutureUtils.java | 22 +++++++ .../flink/runtime/concurrent/FutureUtilsTest.java | 67 +++++++++++++++++++++- 2 files changed, 86 insertions(+), 3 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java index 629c144..4334eda 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java @@ -1344,4 +1344,26 @@ public class FutureUtils { } }; } + + /** + * Switches the execution context of the given source future. This works for normally and + * exceptionally completed futures. + * + * @param source source to switch the execution context for + * @param executor executor representing the new execution context + * @param <T> type of the source + * @return future which is executed by the given executor + */ + public static <T> CompletableFuture<T> switchExecutor( + CompletableFuture<? extends T> source, Executor executor) { + return source.handleAsync( + (t, throwable) -> { + if (throwable != null) { + throw new CompletionException(throwable); + } else { + return t; + } + }, + executor); + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java index 63f221f..5d5389c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java @@ -23,12 +23,14 @@ import org.apache.flink.core.testutils.FlinkMatchers; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.testutils.executor.TestExecutorResource; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.TestLogger; import org.junit.Assert; +import org.junit.ClassRule; import org.junit.Test; import java.time.Duration; @@ -39,6 +41,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -67,6 +70,10 @@ import static org.junit.Assert.fail; /** Tests for the utility methods in {@link FutureUtils}. */ public class FutureUtilsTest extends TestLogger { + @ClassRule + public static final TestExecutorResource<ScheduledExecutorService> TEST_EXECUTOR_RESOURCE = + new TestExecutorResource<>(Executors::newSingleThreadScheduledExecutor); + /** Tests that we can retry an operation. */ @Test public void testRetrySuccess() throws Exception { @@ -409,7 +416,7 @@ public class FutureUtilsTest extends TestLogger { @Test public void testRetryWithDelayAndPredicate() throws Exception { - final ScheduledExecutorService retryExecutor = Executors.newSingleThreadScheduledExecutor(); + final ScheduledExecutorService retryExecutor = TEST_EXECUTOR_RESOURCE.getExecutor(); final String retryableExceptionMessage = "first exception"; class TestStringSupplier implements Supplier<CompletableFuture<String>> { private final AtomicInteger counter = new AtomicInteger(); @@ -440,8 +447,6 @@ public class FutureUtilsTest extends TestLogger { .get(); } catch (final ExecutionException e) { assertThat(e.getMessage(), containsString("should propagate")); - } finally { - retryExecutor.shutdownNow(); } } @@ -953,6 +958,62 @@ public class FutureUtilsTest extends TestLogger { assertNull(FutureUtils.getWithoutException(completableFuture)); } + @Test + public void testSwitchExecutorForNormallyCompletedFuture() { + final CompletableFuture<String> source = new CompletableFuture<>(); + + final ExecutorService singleThreadExecutor = TEST_EXECUTOR_RESOURCE.getExecutor(); + + final CompletableFuture<String> resultFuture = + FutureUtils.switchExecutor(source, singleThreadExecutor); + + final String expectedThreadName = + FutureUtils.supplyAsync( + () -> Thread.currentThread().getName(), singleThreadExecutor) + .join(); + final String expectedValue = "foobar"; + + final CompletableFuture<Void> assertionFuture = + resultFuture.handle( + (s, throwable) -> { + assertThat(s, is(expectedValue)); + assertThat(Thread.currentThread().getName(), is(expectedThreadName)); + + return null; + }); + source.complete(expectedValue); + + assertionFuture.join(); + } + + @Test + public void testSwitchExecutorForExceptionallyCompletedFuture() { + final CompletableFuture<String> source = new CompletableFuture<>(); + + final ExecutorService singleThreadExecutor = TEST_EXECUTOR_RESOURCE.getExecutor(); + + final CompletableFuture<String> resultFuture = + FutureUtils.switchExecutor(source, singleThreadExecutor); + + final String expectedThreadName = + FutureUtils.supplyAsync( + () -> Thread.currentThread().getName(), singleThreadExecutor) + .join(); + final Exception expectedException = new Exception("foobar"); + + final CompletableFuture<Void> assertionFuture = + resultFuture.handle( + (s, throwable) -> { + assertThat(throwable, FlinkMatchers.containsCause(expectedException)); + assertThat(Thread.currentThread().getName(), is(expectedThreadName)); + + return null; + }); + source.completeExceptionally(expectedException); + + assertionFuture.join(); + } + private static Throwable getThrowable(CompletableFuture<?> completableFuture) { try { completableFuture.join();
