[hotfix] Do not schedule timeout when future is already  completed

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/66f61348
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/66f61348
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/66f61348

Branch: refs/heads/master
Commit: 66f61348be16dd1a3638e063936a43f45cb8e9db
Parents: 1ffd77a
Author: gyao <[email protected]>
Authored: Mon Jan 22 11:42:13 2018 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Fri Jan 26 13:50:24 2018 +0100

----------------------------------------------------------------------
 .../apache/flink/runtime/concurrent/FutureUtils.java  | 14 ++++++++------
 1 file changed, 8 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/66f61348/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index c18068b..7195957 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -459,13 +459,15 @@ public class FutureUtils {
         * @return The timeout enriched future
         */
        public static <T> CompletableFuture<T> orTimeout(CompletableFuture<T> 
future, long timeout, TimeUnit timeUnit) {
-               final ScheduledFuture<?> timeoutFuture = Delayer.delay(new 
Timeout(future), timeout, timeUnit);
+               if (!future.isDone()) {
+                       final ScheduledFuture<?> timeoutFuture = 
Delayer.delay(new Timeout(future), timeout, timeUnit);
 
-               future.whenComplete((T value, Throwable throwable) -> {
-                       if (!timeoutFuture.isDone()) {
-                               timeoutFuture.cancel(false);
-                       }
-               });
+                       future.whenComplete((T value, Throwable throwable) -> {
+                               if (!timeoutFuture.isDone()) {
+                                       timeoutFuture.cancel(false);
+                               }
+                       });
+               }
 
                return future;
        }

Reply via email to