[FLINK-8669] Add completeAll and runAfterwards(Async) to FutureUtils

FutureUtils#completeAll(Collection) takes a collection of futures and returns
a future which is completed after all of the given futures are completed. This
also includes exceptional completions. Potentially occurring exceptions are
recorded and combined into a single exception with which the resulting future
is completed.

FutureUtils#runAfterwards takes a future and runs a given action after the
completion of the given future. This also includes an exceptional completion.
In this case, a potentially occurring exception as the result of the provided
action will be combined with the future's exception.

This closes #5503.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/62b6cea6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/62b6cea6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/62b6cea6

Branch: refs/heads/master
Commit: 62b6cea685fbf83e475d3dd5d05614fc533b410c
Parents: 5909b5b
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Fri Feb 16 10:08:04 2018 +0100
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Mon Feb 19 16:04:17 2018 +0100

----------------------------------------------------------------------
 .../flink/runtime/concurrent/FutureUtils.java   | 178 ++++++++++++++++---
 .../runtime/concurrent/FutureUtilsTest.java     | 154 ++++++++++++++++
 2 files changed, 309 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/62b6cea6/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index e9310c0..181bc5d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -20,7 +20,9 @@ package org.apache.flink.runtime.concurrent;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.RunnableWithException;
 
 import akka.dispatch.OnComplete;
 
@@ -241,6 +243,83 @@ public class FutureUtils {
                }
        }
 
+       /**
+        * Times the given future out after the timeout.
+        *
+        * @param future to time out
+        * @param timeout after which the given future is timed out
+        * @param timeUnit time unit of the timeout
+        * @param <T> type of the given future
+        * @return The timeout enriched future
+        */
+       public static <T> CompletableFuture<T> orTimeout(CompletableFuture<T> 
future, long timeout, TimeUnit timeUnit) {
+               if (!future.isDone()) {
+                       final ScheduledFuture<?> timeoutFuture = 
Delayer.delay(new Timeout(future), timeout, timeUnit);
+
+                       future.whenComplete((T value, Throwable throwable) -> {
+                               if (!timeoutFuture.isDone()) {
+                                       timeoutFuture.cancel(false);
+                               }
+                       });
+               }
+
+               return future;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Future actions
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Run the given action after the completion of the given future. The 
given future can be
+        * completed normally or exceptionally. In case of an exceptional 
completion the, the
+        * action's exception will be added to the initial exception.
+        *
+        * @param future to wait for its completion
+        * @param runnable action which is triggered after the future's 
completion
+        * @return Future which is completed after the action has completed. 
This future can contain an exception,
+        * if an error occurred in the given future or action.
+        */
+       public static CompletableFuture<Void> 
runAfterwards(CompletableFuture<?> future, RunnableWithException runnable) {
+               return runAfterwardsAsync(future, runnable, 
Executors.directExecutor());
+       }
+
+       /**
+        * Run the given action after the completion of the given future. The 
given future can be
+        * completed normally or exceptionally. In case of an exceptional 
completion the, the
+        * action's exception will be added to the initial exception.
+        *
+        * @param future to wait for its completion
+        * @param runnable action which is triggered after the future's 
completion
+        * @param executor to run the given action
+        * @return Future which is completed after the action has completed. 
This future can contain an exception,
+        * if an error occurred in the given future or action.
+        */
+       public static CompletableFuture<Void> runAfterwardsAsync(
+               CompletableFuture<?> future,
+               RunnableWithException runnable,
+               Executor executor) {
+               final CompletableFuture<Void> resultFuture = new 
CompletableFuture<>();
+
+               future.whenCompleteAsync(
+                       (Object ignored, Throwable throwable) -> {
+                               try {
+                                       runnable.run();
+                               } catch (Throwable e) {
+                                       throwable = 
ExceptionUtils.firstOrSuppressed(e, throwable);
+                               }
+
+                               if (throwable != null) {
+                                       
resultFuture.completeExceptionally(throwable);
+                               } else {
+                                       resultFuture.complete(null);
+                               }
+                       },
+                       executor);
+
+               return resultFuture;
+       }
+
        // 
------------------------------------------------------------------------
        //  composing futures
        // 
------------------------------------------------------------------------
@@ -414,6 +493,82 @@ public class FutureUtils {
                }
        }
 
+       /**
+        * Creates a {@link ConjunctFuture} which is only completed after all 
given futures have completed.
+        * Unlike {@link FutureUtils#waitForAll(Collection)}, the resulting 
future won't be completed directly
+        * if one of the given futures is completed exceptionally. Instead, all 
occurring exception will be
+        * collected and combined to a single exception. If at least on 
exception occurs, then the resulting
+        * future will be completed exceptionally.
+        *
+        * @param futuresToComplete futures to complete
+        * @return Future which is completed after all given futures have been 
completed.
+        */
+       public static ConjunctFuture<Void> completeAll(Collection<? extends 
CompletableFuture<?>> futuresToComplete) {
+               return new CompletionConjunctFuture(futuresToComplete);
+       }
+
+       /**
+        * {@link ConjunctFuture} implementation which is completed after all 
the given futures have been
+        * completed. Exceptional completions of the input futures will be 
recorded but it won't trigger the
+        * early completion of this future.
+        */
+       private static final class CompletionConjunctFuture extends 
ConjunctFuture<Void> {
+
+               private final Object lock = new Object();
+
+               private final int numFuturesTotal;
+
+               private int futuresCompleted;
+
+               private Throwable globalThrowable;
+
+               private CompletionConjunctFuture(Collection<? extends 
CompletableFuture<?>> futuresToComplete) {
+                       numFuturesTotal = futuresToComplete.size();
+
+                       futuresCompleted = 0;
+
+                       globalThrowable = null;
+
+                       if (futuresToComplete.isEmpty()) {
+                               complete(null);
+                       } else {
+                               for (CompletableFuture<?> completableFuture : 
futuresToComplete) {
+                                       
completableFuture.whenComplete(this::completeFuture);
+                               }
+                       }
+               }
+
+               private void completeFuture(Object ignored, Throwable 
throwable) {
+                       synchronized (lock) {
+                               futuresCompleted++;
+
+                               if (throwable != null) {
+                                       globalThrowable = 
ExceptionUtils.firstOrSuppressed(throwable, globalThrowable);
+                               }
+
+                               if (futuresCompleted == numFuturesTotal) {
+                                       if (globalThrowable != null) {
+                                               
completeExceptionally(globalThrowable);
+                                       } else {
+                                               complete(null);
+                                       }
+                               }
+                       }
+               }
+
+               @Override
+               public int getNumFuturesTotal() {
+                       return numFuturesTotal;
+               }
+
+               @Override
+               public int getNumFuturesCompleted() {
+                       synchronized (lock) {
+                               return futuresCompleted;
+                       }
+               }
+       }
+
        // 
------------------------------------------------------------------------
        //  Helper methods
        // 
------------------------------------------------------------------------
@@ -481,29 +636,6 @@ public class FutureUtils {
        }
 
        /**
-        * Times the given future out after the timeout.
-        *
-        * @param future to time out
-        * @param timeout after which the given future is timed out
-        * @param timeUnit time unit of the timeout
-        * @param <T> type of the given future
-        * @return The timeout enriched future
-        */
-       public static <T> CompletableFuture<T> orTimeout(CompletableFuture<T> 
future, long timeout, TimeUnit timeUnit) {
-               if (!future.isDone()) {
-                       final ScheduledFuture<?> timeoutFuture = 
Delayer.delay(new Timeout(future), timeout, timeUnit);
-
-                       future.whenComplete((T value, Throwable throwable) -> {
-                               if (!timeoutFuture.isDone()) {
-                                       timeoutFuture.cancel(false);
-                               }
-                       });
-               }
-
-               return future;
-       }
-
-       /**
         * Runnable to complete the given future with a {@link 
TimeoutException}.
         */
        private static final class Timeout implements Runnable {

http://git-wip-us.apache.org/repos/asf/flink/blob/62b6cea6/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
index cbc8a9a..7ad1638 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
@@ -27,7 +27,9 @@ import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
@@ -41,10 +43,15 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
 
 import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.Matchers.arrayContaining;
+import static org.hamcrest.Matchers.emptyArray;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * Tests for the utility methods in {@link FutureUtils}.
@@ -270,4 +277,151 @@ public class FutureUtilsTest extends TestLogger {
                        retryExecutor.shutdownNow();
                }
        }
+
+       @Test
+       public void testRunAfterwards() throws Exception {
+               final CompletableFuture<Void> inputFuture = new 
CompletableFuture<>();
+               final OneShotLatch runnableLatch = new OneShotLatch();
+
+               final CompletableFuture<Void> runFuture = 
FutureUtils.runAfterwards(
+                       inputFuture,
+                       runnableLatch::trigger);
+
+               assertThat(runnableLatch.isTriggered(), is(false));
+               assertThat(runFuture.isDone(), is(false));
+
+               inputFuture.complete(null);
+
+               assertThat(runnableLatch.isTriggered(), is(true));
+               assertThat(runFuture.isDone(), is(true));
+
+               // check that this future is not exceptionally completed
+               runFuture.get();
+       }
+
+       @Test
+       public void testRunAfterwardsExceptional() throws Exception {
+               final CompletableFuture<Void> inputFuture = new 
CompletableFuture<>();
+               final OneShotLatch runnableLatch = new OneShotLatch();
+               final FlinkException testException = new FlinkException("Test 
exception");
+
+               final CompletableFuture<Void> runFuture = 
FutureUtils.runAfterwards(
+                       inputFuture,
+                       runnableLatch::trigger);
+
+               assertThat(runnableLatch.isTriggered(), is(false));
+               assertThat(runFuture.isDone(), is(false));
+
+               inputFuture.completeExceptionally(testException);
+
+               assertThat(runnableLatch.isTriggered(), is(true));
+               assertThat(runFuture.isDone(), is(true));
+
+               try {
+                       runFuture.get();
+                       fail("Expected an exceptional completion");
+               } catch (ExecutionException ee) {
+                       assertThat(ExceptionUtils.stripExecutionException(ee), 
is(testException));
+               }
+       }
+
+       @Test
+       public void testCompleteAll() throws Exception {
+               final CompletableFuture<String> inputFuture1 = new 
CompletableFuture<>();
+               final CompletableFuture<Integer> inputFuture2 = new 
CompletableFuture<>();
+
+               final List<CompletableFuture<?>> futuresToComplete = 
Arrays.asList(inputFuture1, inputFuture2);
+               final FutureUtils.ConjunctFuture<Void> completeFuture = 
FutureUtils.completeAll(futuresToComplete);
+
+               assertThat(completeFuture.isDone(), is(false));
+               assertThat(completeFuture.getNumFuturesCompleted(), is(0));
+               assertThat(completeFuture.getNumFuturesTotal(), 
is(futuresToComplete.size()));
+
+               inputFuture2.complete(42);
+
+               assertThat(completeFuture.isDone(), is(false));
+               assertThat(completeFuture.getNumFuturesCompleted(), is(1));
+
+               inputFuture1.complete("foobar");
+
+               assertThat(completeFuture.isDone(), is(true));
+               assertThat(completeFuture.getNumFuturesCompleted(), is(2));
+
+               completeFuture.get();
+       }
+
+       @Test
+       public void testCompleteAllPartialExceptional() throws Exception {
+               final CompletableFuture<String> inputFuture1 = new 
CompletableFuture<>();
+               final CompletableFuture<Integer> inputFuture2 = new 
CompletableFuture<>();
+
+               final List<CompletableFuture<?>> futuresToComplete = 
Arrays.asList(inputFuture1, inputFuture2);
+               final FutureUtils.ConjunctFuture<Void> completeFuture = 
FutureUtils.completeAll(futuresToComplete);
+
+               assertThat(completeFuture.isDone(), is(false));
+               assertThat(completeFuture.getNumFuturesCompleted(), is(0));
+               assertThat(completeFuture.getNumFuturesTotal(), 
is(futuresToComplete.size()));
+
+               final FlinkException testException1 = new FlinkException("Test 
exception 1");
+               inputFuture2.completeExceptionally(testException1);
+
+               assertThat(completeFuture.isDone(), is(false));
+               assertThat(completeFuture.getNumFuturesCompleted(), is(1));
+
+               inputFuture1.complete("foobar");
+
+               assertThat(completeFuture.isDone(), is(true));
+               assertThat(completeFuture.getNumFuturesCompleted(), is(2));
+
+               try {
+                       completeFuture.get();
+                       fail("Expected an exceptional completion");
+               } catch (ExecutionException ee) {
+                       assertThat(ExceptionUtils.stripExecutionException(ee), 
is(testException1));
+               }
+       }
+
+       @Test
+       public void testCompleteAllExceptional() throws Exception {
+               final CompletableFuture<String> inputFuture1 = new 
CompletableFuture<>();
+               final CompletableFuture<Integer> inputFuture2 = new 
CompletableFuture<>();
+
+               final List<CompletableFuture<?>> futuresToComplete = 
Arrays.asList(inputFuture1, inputFuture2);
+               final FutureUtils.ConjunctFuture<Void> completeFuture = 
FutureUtils.completeAll(futuresToComplete);
+
+               assertThat(completeFuture.isDone(), is(false));
+               assertThat(completeFuture.getNumFuturesCompleted(), is(0));
+               assertThat(completeFuture.getNumFuturesTotal(), 
is(futuresToComplete.size()));
+
+               final FlinkException testException1 = new FlinkException("Test 
exception 1");
+               inputFuture1.completeExceptionally(testException1);
+
+               assertThat(completeFuture.isDone(), is(false));
+               assertThat(completeFuture.getNumFuturesCompleted(), is(1));
+
+               final FlinkException testException2 = new FlinkException("Test 
exception 2");
+               inputFuture2.completeExceptionally(testException2);
+
+               assertThat(completeFuture.isDone(), is(true));
+               assertThat(completeFuture.getNumFuturesCompleted(), is(2));
+
+               try {
+                       completeFuture.get();
+                       fail("Expected an exceptional completion");
+               } catch (ExecutionException ee) {
+                       final Throwable actual = 
ExceptionUtils.stripExecutionException(ee);
+
+                       final Throwable[] suppressed = actual.getSuppressed();
+                       final FlinkException suppressedException;
+
+                       if (actual.equals(testException1)) {
+                                suppressedException = testException2;
+                       } else {
+                               suppressedException = testException1;
+                       }
+
+                       assertThat(suppressed, is(not(emptyArray())));
+                       assertThat(suppressed, 
arrayContaining(suppressedException));
+               }
+       }
 }

Reply via email to