Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/4918#discussion_r147658765
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
---
@@ -446,19 +449,67 @@ public void onComplete(Throwable failure, T success)
throws Throwable {
return result;
}
- //
------------------------------------------------------------------------
- // Future Completed with an exception.
- //
------------------------------------------------------------------------
+ /**
+ * Times the given future out after the timeout.
+ *
+ * @param future to time out
+ * @param timeout after which the given future is timed out
+ * @param timeUnit time unit of the timeout
+ * @param <T> type of the given future
+ * @return The timeout enriched future
+ */
+ public static <T> CompletableFuture<T> orTimeout(CompletableFuture<T>
future, long timeout, TimeUnit timeUnit) {
+ final ScheduledFuture<?> timeoutFuture = Delayer.delay(new
Timeout(future), timeout, timeUnit);
+
+ future.whenComplete((T value, Throwable throwable) -> {
+ if (!timeoutFuture.isDone()) {
+ timeoutFuture.cancel(false);
+ }
+ });
+
+ return future;
+ }
/**
- * Returns a {@link CompletableFuture} that has failed with the
exception
- * provided as argument.
- * @param throwable the exception to fail the future with.
- * @return The failed future.
+ * Runnable to complete the given future with a {@link
TimeoutException}.
*/
- public static <T> CompletableFuture<T> getFailedFuture(Throwable
throwable) {
- CompletableFuture<T> failedAttempt = new CompletableFuture<>();
- failedAttempt.completeExceptionally(throwable);
- return failedAttempt;
+ static final class Timeout implements Runnable {
+
+ private final CompletableFuture<?> future;
+
+ Timeout(CompletableFuture<?> future) {
+ this.future = Preconditions.checkNotNull(future);
+ }
+
+ @Override
+ public void run() {
+ future.completeExceptionally(new TimeoutException());
--- End diff --
Not entirely sure, since people might use this to disambiguate different
timeouts from each other. I rather not offer this possibility.
---