[FLINK-7940] Add FutureUtils.orTimeout

This commit adds a convenience function which allows to easily add a timeout to
a CompletableFuture.

This closes #4918.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c568aed1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c568aed1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c568aed1

Branch: refs/heads/master
Commit: c568aed1ecb7675a2776e15f120e45ad576eeb37
Parents: 747cf82
Author: Till Rohrmann <[email protected]>
Authored: Sun Oct 29 16:38:53 2017 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Tue Oct 31 00:08:53 2017 +0100

----------------------------------------------------------------------
 .../flink/runtime/concurrent/FutureUtils.java   | 67 ++++++++++++++++++++
 .../runtime/concurrent/FutureUtilsTest.java     | 18 ++++++
 2 files changed, 85 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c568aed1/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index b982c8e..c18068b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.concurrent;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.util.Preconditions;
 
 import akka.dispatch.OnComplete;
@@ -31,7 +32,9 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiFunction;
 import java.util.function.Supplier;
@@ -445,4 +448,68 @@ public class FutureUtils {
 
                return result;
        }
+
+       /**
+        * Times the given future out after the timeout.
+        *
+        * @param future to time out
+        * @param timeout after which the given future is timed out
+        * @param timeUnit time unit of the timeout
+        * @param <T> type of the given future
+        * @return The timeout enriched future
+        */
+       public static <T> CompletableFuture<T> orTimeout(CompletableFuture<T> 
future, long timeout, TimeUnit timeUnit) {
+               final ScheduledFuture<?> timeoutFuture = Delayer.delay(new 
Timeout(future), timeout, timeUnit);
+
+               future.whenComplete((T value, Throwable throwable) -> {
+                       if (!timeoutFuture.isDone()) {
+                               timeoutFuture.cancel(false);
+                       }
+               });
+
+               return future;
+       }
+
+       /**
+        * Runnable to complete the given future with a {@link 
TimeoutException}.
+        */
+       private static final class Timeout implements Runnable {
+
+               private final CompletableFuture<?> future;
+
+               private Timeout(CompletableFuture<?> future) {
+                       this.future = Preconditions.checkNotNull(future);
+               }
+
+               @Override
+               public void run() {
+                       future.completeExceptionally(new TimeoutException());
+               }
+       }
+
+       /**
+        * Delay scheduler used to timeout futures.
+        *
+        * <p>This class creates a singleton scheduler used to run the provided 
actions.
+        */
+       private static final class Delayer {
+               static final ScheduledThreadPoolExecutor delayer = new 
ScheduledThreadPoolExecutor(
+                       1,
+                       new 
ExecutorThreadFactory("FlinkCompletableFutureDelayScheduler"));
+
+               /**
+                * Delay the given action by the given delay.
+                *
+                * @param runnable to execute after the given delay
+                * @param delay after which to execute the runnable
+                * @param timeUnit time unit of the delay
+                * @return Future of the scheduled action
+                */
+               private static ScheduledFuture<?> delay(Runnable runnable, long 
delay, TimeUnit timeUnit) {
+                       Preconditions.checkNotNull(runnable);
+                       Preconditions.checkNotNull(timeUnit);
+
+                       return delayer.schedule(runnable, delay, timeUnit);
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c568aed1/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
index b779bc9..eb0ce2a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
@@ -33,6 +33,7 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -223,4 +224,21 @@ public class FutureUtilsTest extends TestLogger {
                assertTrue(retryFuture.isCancelled());
                verify(scheduledFutureMock).cancel(anyBoolean());
        }
+
+       /**
+        * Tests that a future is timed out after the specified timeout.
+        */
+       @Test
+       public void testOrTimeout() throws Exception {
+               final CompletableFuture<String> future = new 
CompletableFuture<>();
+               final long timeout = 10L;
+
+               FutureUtils.orTimeout(future, timeout, TimeUnit.MILLISECONDS);
+
+               try {
+                       future.get();
+               } catch (ExecutionException e) {
+                       assertTrue(ExceptionUtils.stripExecutionException(e) 
instanceof TimeoutException);
+               }
+       }
 }

Reply via email to