This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 53ac701e22d27fe537bd9fb790ac67e1033f4842
Author: Till Rohrmann <[email protected]>
AuthorDate: Sun Mar 14 13:28:53 2021 +0100

    [hotfix] Let delayed AdaptiveScheduler.runIfState method return 
ScheduledFuture
    
    By letting the AdaptiveScheduler.runIfState method return a ScheduledFuture 
it is now
    possible to cancel scheduled actions.
---
 .../scheduler/adaptive/AdaptiveScheduler.java      |   5 +-
 .../scheduler/adaptive/CreatingExecutionGraph.java |   4 +-
 .../runtime/scheduler/adaptive/Restarting.java     |   4 +-
 .../scheduler/adaptive/WaitingForResources.java    |   4 +-
 .../adaptive/CreatingExecutionGraphTest.java       |   6 +-
 .../runtime/scheduler/adaptive/RestartingTest.java |   5 +-
 .../adaptive/WaitingForResourcesTest.java          |  66 +++++-------
 .../core/testutils/CompletedScheduledFuture.java   |  79 ++++++++++++++
 .../ManuallyTriggeredScheduledExecutorService.java |  85 ---------------
 .../apache/flink/core/testutils/ScheduledTask.java | 115 +++++++++++++++++++++
 10 files changed, 241 insertions(+), 132 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index f03e162..be7b068 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -127,6 +127,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -962,8 +963,8 @@ public class AdaptiveScheduler
     }
 
     @Override
-    public void runIfState(State expectedState, Runnable action, Duration 
delay) {
-        componentMainThreadExecutor.schedule(
+    public ScheduledFuture<?> runIfState(State expectedState, Runnable action, 
Duration delay) {
+        return componentMainThreadExecutor.schedule(
                 () -> runIfState(expectedState, action), delay.toMillis(), 
TimeUnit.MILLISECONDS);
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java
index b6f8df4..5e21ba6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java
@@ -30,6 +30,7 @@ import javax.annotation.Nullable;
 
 import java.time.Duration;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledFuture;
 
 /**
  * State which waits for the creation of the {@link ExecutionGraph}. If the 
creation fails, then the
@@ -149,8 +150,9 @@ public class CreatingExecutionGraph implements State {
          *     the action
          * @param action action to run if the expected state equals the actual 
state
          * @param delay delay after which to run the action
+         * @return a ScheduledFuture representing pending completion of the 
task
          */
-        void runIfState(State expectedState, Runnable action, Duration delay);
+        ScheduledFuture<?> runIfState(State expectedState, Runnable action, 
Duration delay);
 
         /**
          * Try to assign slots to the created {@link ExecutionGraph}. If it is 
possible, then this
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java
index 5a49c48..307dfa02 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java
@@ -28,6 +28,7 @@ import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 
 import java.time.Duration;
+import java.util.concurrent.ScheduledFuture;
 
 /** State which describes a job which is currently being restarted. */
 class Restarting extends StateWithExecutionGraph {
@@ -105,8 +106,9 @@ class Restarting extends StateWithExecutionGraph {
          *     the delay
          * @param action action to run if the state equals the expected state
          * @param delay delay after which the action should be executed
+         * @return a ScheduledFuture representing pending completion of the 
task
          */
-        void runIfState(State expectedState, Runnable action, Duration delay);
+        ScheduledFuture<?> runIfState(State expectedState, Runnable action, 
Duration delay);
     }
 
     static class Factory implements StateFactory<Restarting> {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java
index 848d729..9489ddd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java
@@ -28,6 +28,7 @@ import org.slf4j.Logger;
 import javax.annotation.Nullable;
 
 import java.time.Duration;
+import java.util.concurrent.ScheduledFuture;
 
 /**
  * State which describes that the scheduler is waiting for resources in order 
to execute the job.
@@ -142,8 +143,9 @@ class WaitingForResources implements State, 
ResourceConsumer {
          *     the action
          * @param action action to run if the expected state equals the actual 
state
          * @param delay delay after which to run the action
+         * @return a ScheduledFuture representing pending completion of the 
task
          */
-        void runIfState(State expectedState, Runnable action, Duration delay);
+        ScheduledFuture<?> runIfState(State expectedState, Runnable action, 
Duration delay);
     }
 
     static class Factory implements StateFactory<WaitingForResources> {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java
index 3d467fb..a352734 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.scheduler.adaptive;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.core.testutils.CompletedScheduledFuture;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.util.FlinkException;
@@ -31,6 +32,7 @@ import javax.annotation.Nullable;
 
 import java.time.Duration;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledFuture;
 import java.util.function.Consumer;
 import java.util.function.Function;
 
@@ -187,10 +189,12 @@ public class CreatingExecutionGraphTest extends 
TestLogger {
         }
 
         @Override
-        public void runIfState(State expectedState, Runnable action, Duration 
delay) {
+        public ScheduledFuture<?> runIfState(State expectedState, Runnable 
action, Duration delay) {
             if (!hadStateTransitionHappened) {
                 action.run();
             }
+
+            return CompletedScheduledFuture.create(null);
         }
 
         @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java
index dd53263..2358709 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.scheduler.adaptive;
 
 import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.core.testutils.CompletedScheduledFuture;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
@@ -29,6 +30,7 @@ import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
 import java.time.Duration;
+import java.util.concurrent.ScheduledFuture;
 import java.util.function.Consumer;
 
 import static 
org.apache.flink.runtime.scheduler.adaptive.WaitingForResourcesTest.assertNonNull;
@@ -172,10 +174,11 @@ public class RestartingTest extends TestLogger {
         }
 
         @Override
-        public void runIfState(State expectedState, Runnable action, Duration 
delay) {
+        public ScheduledFuture<?> runIfState(State expectedState, Runnable 
action, Duration delay) {
             if (!hadStateTransition) {
                 action.run();
             }
+            return CompletedScheduledFuture.create(null);
         }
 
         @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java
index 8586551..c70dc2e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.scheduler.adaptive;
 
 import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.core.testutils.ScheduledTask;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ErrorInfo;
@@ -33,12 +34,12 @@ import javax.annotation.Nullable;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ScheduledFuture;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.notNullValue;
-import static org.hamcrest.Matchers.greaterThan;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
@@ -57,12 +58,7 @@ public class WaitingForResourcesTest extends TestLogger {
 
             new WaitingForResources(ctx, log, RESOURCE_COUNTER, Duration.ZERO);
             // run delayed actions
-            for (ScheduledRunnable scheduledRunnable : 
ctx.getScheduledRunnables()) {
-                scheduledRunnable.runAction();
-                if (ctx.hasStateTransition()) {
-                    break;
-                }
-            }
+            ctx.runScheduledTasks();
         }
     }
 
@@ -100,12 +96,7 @@ public class WaitingForResourcesTest extends TestLogger {
             ctx.setExpectCreatingExecutionGraph();
 
             // immediately execute all scheduled runnables
-            assertThat(ctx.getScheduledRunnables().size(), greaterThan(0));
-            for (ScheduledRunnable scheduledRunnable : 
ctx.getScheduledRunnables()) {
-                if (scheduledRunnable.getExpectedState() == wfr) {
-                    scheduledRunnable.runAction();
-                }
-            }
+            ctx.runScheduledTasks();
         }
     }
 
@@ -172,13 +163,9 @@ public class WaitingForResourcesTest extends TestLogger {
                 new StateValidator<>("finished");
 
         private Supplier<Boolean> hasEnoughResourcesSupplier = () -> false;
-        private final List<ScheduledRunnable> scheduledRunnables = new 
ArrayList<>();
+        private final List<ScheduledTask<Void>> scheduledTasks = new 
ArrayList<>();
         private boolean hasStateTransition = false;
 
-        public List<ScheduledRunnable> getScheduledRunnables() {
-            return scheduledRunnables;
-        }
-
         public void setHasEnoughResources(Supplier<Boolean> sup) {
             hasEnoughResourcesSupplier = sup;
         }
@@ -191,6 +178,12 @@ public class WaitingForResourcesTest extends TestLogger {
             creatingExecutionGraphStateValidator.expectInput(none -> {});
         }
 
+        void runScheduledTasks() {
+            for (ScheduledTask<Void> scheduledTask : scheduledTasks) {
+                scheduledTask.execute();
+            }
+        }
+
         @Override
         public void close() throws Exception {
             creatingExecutionGraphStateValidator.close();
@@ -212,8 +205,21 @@ public class WaitingForResourcesTest extends TestLogger {
         }
 
         @Override
-        public void runIfState(State expectedState, Runnable action, Duration 
delay) {
-            scheduledRunnables.add(new ScheduledRunnable(expectedState, 
action, delay));
+        public ScheduledFuture<?> runIfState(State expectedState, Runnable 
action, Duration delay) {
+            final ScheduledTask<Void> scheduledTask =
+                    new ScheduledTask<>(
+                            () -> {
+                                if (!hasStateTransition) {
+                                    action.run();
+                                }
+
+                                return null;
+                            },
+                            delay.toMillis());
+
+            scheduledTasks.add(scheduledTask);
+
+            return scheduledTask;
         }
 
         @Override
@@ -233,26 +239,6 @@ public class WaitingForResourcesTest extends TestLogger {
         }
     }
 
-    private static final class ScheduledRunnable {
-        private final Runnable action;
-        private final State expectedState;
-        private final Duration delay;
-
-        private ScheduledRunnable(State expectedState, Runnable action, 
Duration delay) {
-            this.expectedState = expectedState;
-            this.action = action;
-            this.delay = delay;
-        }
-
-        public void runAction() {
-            action.run();
-        }
-
-        public State getExpectedState() {
-            return expectedState;
-        }
-    }
-
     static <T> Consumer<T> assertNonNull() {
         return (item) -> assertThat(item, notNullValue());
     }
diff --git 
a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CompletedScheduledFuture.java
 
b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CompletedScheduledFuture.java
new file mode 100644
index 0000000..6667612
--- /dev/null
+++ 
b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CompletedScheduledFuture.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.testutils;
+
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Completed {@link ScheduledFuture} implementation.
+ *
+ * @param <T> type of the {@link ScheduledFuture}
+ */
+public final class CompletedScheduledFuture<T> implements ScheduledFuture<T> {
+
+    private final T value;
+
+    private CompletedScheduledFuture(T value) {
+        this.value = value;
+    }
+
+    @Override
+    public long getDelay(TimeUnit unit) {
+        return 0;
+    }
+
+    @Override
+    public int compareTo(Delayed o) {
+        return Long.compare(getDelay(TimeUnit.MILLISECONDS), 
o.getDelay(TimeUnit.MILLISECONDS));
+    }
+
+    @Override
+    public boolean cancel(boolean mayInterruptIfRunning) {
+        return false;
+    }
+
+    @Override
+    public boolean isCancelled() {
+        return false;
+    }
+
+    @Override
+    public boolean isDone() {
+        return true;
+    }
+
+    @Override
+    public T get() throws InterruptedException, ExecutionException {
+        return value;
+    }
+
+    @Override
+    public T get(long timeout, TimeUnit unit)
+            throws InterruptedException, ExecutionException, TimeoutException {
+        return value;
+    }
+
+    public static <T> CompletedScheduledFuture<T> create(T value) {
+        return new CompletedScheduledFuture<>(value);
+    }
+}
diff --git 
a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredScheduledExecutorService.java
 
b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredScheduledExecutorService.java
index 1752dc6..7683804 100644
--- 
a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredScheduledExecutorService.java
+++ 
b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredScheduledExecutorService.java
@@ -27,17 +27,12 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
-import java.util.Objects;
 import java.util.concurrent.Callable;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Delayed;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 
 /**
@@ -317,84 +312,4 @@ public class ManuallyTriggeredScheduledExecutorService 
implements ScheduledExecu
 
         return scheduledTask;
     }
-
-    private static final class ScheduledTask<T> implements ScheduledFuture<T> {
-
-        private final Callable<T> callable;
-
-        private final long delay;
-
-        private final long period;
-
-        private final CompletableFuture<T> result;
-
-        private ScheduledTask(Callable<T> callable, long delay) {
-            this(callable, delay, 0);
-        }
-
-        private ScheduledTask(Callable<T> callable, long delay, long period) {
-            this.callable = Objects.requireNonNull(callable);
-            this.result = new CompletableFuture<>();
-            this.delay = delay;
-            this.period = period;
-        }
-
-        private boolean isPeriodic() {
-            return period > 0;
-        }
-
-        public void execute() {
-            if (!result.isDone()) {
-                if (!isPeriodic()) {
-                    try {
-                        result.complete(callable.call());
-                    } catch (Exception e) {
-                        result.completeExceptionally(e);
-                    }
-                } else {
-                    try {
-                        callable.call();
-                    } catch (Exception e) {
-                        result.completeExceptionally(e);
-                    }
-                }
-            }
-        }
-
-        @Override
-        public long getDelay(TimeUnit unit) {
-            return unit.convert(delay, TimeUnit.MILLISECONDS);
-        }
-
-        @Override
-        public int compareTo(Delayed o) {
-            return Long.compare(getDelay(TimeUnit.MILLISECONDS), 
o.getDelay(TimeUnit.MILLISECONDS));
-        }
-
-        @Override
-        public boolean cancel(boolean mayInterruptIfRunning) {
-            return result.cancel(mayInterruptIfRunning);
-        }
-
-        @Override
-        public boolean isCancelled() {
-            return result.isCancelled();
-        }
-
-        @Override
-        public boolean isDone() {
-            return result.isDone();
-        }
-
-        @Override
-        public T get() throws InterruptedException, ExecutionException {
-            return result.get();
-        }
-
-        @Override
-        public T get(long timeout, @Nonnull TimeUnit unit)
-                throws InterruptedException, ExecutionException, 
TimeoutException {
-            return result.get(timeout, unit);
-        }
-    }
 }
diff --git 
a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ScheduledTask.java
 
b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ScheduledTask.java
new file mode 100644
index 0000000..899b6e1
--- /dev/null
+++ 
b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ScheduledTask.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.testutils;
+
+import javax.annotation.Nonnull;
+
+import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * ScheduledTask represents a task which is executed at a later point in time.
+ *
+ * @param <T> type of the result
+ */
+public final class ScheduledTask<T> implements ScheduledFuture<T> {
+
+    private final Callable<T> callable;
+
+    private final long delay;
+
+    private final long period;
+
+    private final CompletableFuture<T> result;
+
+    public ScheduledTask(Callable<T> callable, long delay) {
+        this(callable, delay, 0);
+    }
+
+    public ScheduledTask(Callable<T> callable, long delay, long period) {
+        this.callable = Objects.requireNonNull(callable);
+        this.result = new CompletableFuture<>();
+        this.delay = delay;
+        this.period = period;
+    }
+
+    private boolean isPeriodic() {
+        return period > 0;
+    }
+
+    public void execute() {
+        if (!result.isDone()) {
+            if (!isPeriodic()) {
+                try {
+                    result.complete(callable.call());
+                } catch (Exception e) {
+                    result.completeExceptionally(e);
+                }
+            } else {
+                try {
+                    callable.call();
+                } catch (Exception e) {
+                    result.completeExceptionally(e);
+                }
+            }
+        }
+    }
+
+    @Override
+    public long getDelay(TimeUnit unit) {
+        return unit.convert(delay, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public int compareTo(Delayed o) {
+        return Long.compare(getDelay(TimeUnit.MILLISECONDS), 
o.getDelay(TimeUnit.MILLISECONDS));
+    }
+
+    @Override
+    public boolean cancel(boolean mayInterruptIfRunning) {
+        return result.cancel(mayInterruptIfRunning);
+    }
+
+    @Override
+    public boolean isCancelled() {
+        return result.isCancelled();
+    }
+
+    @Override
+    public boolean isDone() {
+        return result.isDone();
+    }
+
+    @Override
+    public T get() throws InterruptedException, ExecutionException {
+        return result.get();
+    }
+
+    @Override
+    public T get(long timeout, @Nonnull TimeUnit unit)
+            throws InterruptedException, ExecutionException, TimeoutException {
+        return result.get(timeout, unit);
+    }
+}

Reply via email to