[FLINK-7940] Add FutureUtils.orTimeout This commit adds a convenience function which allows to easily add a timeout to a CompletableFuture.
This closes #4918. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c568aed1 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c568aed1 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c568aed1 Branch: refs/heads/master Commit: c568aed1ecb7675a2776e15f120e45ad576eeb37 Parents: 747cf82 Author: Till Rohrmann <[email protected]> Authored: Sun Oct 29 16:38:53 2017 +0100 Committer: Till Rohrmann <[email protected]> Committed: Tue Oct 31 00:08:53 2017 +0100 ---------------------------------------------------------------------- .../flink/runtime/concurrent/FutureUtils.java | 67 ++++++++++++++++++++ .../runtime/concurrent/FutureUtilsTest.java | 18 ++++++ 2 files changed, 85 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c568aed1/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java index b982c8e..c18068b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.concurrent; import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.util.ExecutorThreadFactory; import org.apache.flink.util.Preconditions; import akka.dispatch.OnComplete; @@ -31,7 +32,9 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; import java.util.function.Supplier; @@ -445,4 +448,68 @@ public class FutureUtils { return result; } + + /** + * Times the given future out after the timeout. + * + * @param future to time out + * @param timeout after which the given future is timed out + * @param timeUnit time unit of the timeout + * @param <T> type of the given future + * @return The timeout enriched future + */ + public static <T> CompletableFuture<T> orTimeout(CompletableFuture<T> future, long timeout, TimeUnit timeUnit) { + final ScheduledFuture<?> timeoutFuture = Delayer.delay(new Timeout(future), timeout, timeUnit); + + future.whenComplete((T value, Throwable throwable) -> { + if (!timeoutFuture.isDone()) { + timeoutFuture.cancel(false); + } + }); + + return future; + } + + /** + * Runnable to complete the given future with a {@link TimeoutException}. + */ + private static final class Timeout implements Runnable { + + private final CompletableFuture<?> future; + + private Timeout(CompletableFuture<?> future) { + this.future = Preconditions.checkNotNull(future); + } + + @Override + public void run() { + future.completeExceptionally(new TimeoutException()); + } + } + + /** + * Delay scheduler used to timeout futures. + * + * <p>This class creates a singleton scheduler used to run the provided actions. + */ + private static final class Delayer { + static final ScheduledThreadPoolExecutor delayer = new ScheduledThreadPoolExecutor( + 1, + new ExecutorThreadFactory("FlinkCompletableFutureDelayScheduler")); + + /** + * Delay the given action by the given delay. + * + * @param runnable to execute after the given delay + * @param delay after which to execute the runnable + * @param timeUnit time unit of the delay + * @return Future of the scheduled action + */ + private static ScheduledFuture<?> delay(Runnable runnable, long delay, TimeUnit timeUnit) { + Preconditions.checkNotNull(runnable); + Preconditions.checkNotNull(timeUnit); + + return delayer.schedule(runnable, delay, timeUnit); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/c568aed1/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java index b779bc9..eb0ce2a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java @@ -33,6 +33,7 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -223,4 +224,21 @@ public class FutureUtilsTest extends TestLogger { assertTrue(retryFuture.isCancelled()); verify(scheduledFutureMock).cancel(anyBoolean()); } + + /** + * Tests that a future is timed out after the specified timeout. + */ + @Test + public void testOrTimeout() throws Exception { + final CompletableFuture<String> future = new CompletableFuture<>(); + final long timeout = 10L; + + FutureUtils.orTimeout(future, timeout, TimeUnit.MILLISECONDS); + + try { + future.get(); + } catch (ExecutionException e) { + assertTrue(ExceptionUtils.stripExecutionException(e) instanceof TimeoutException); + } + } }
