Repository: flink Updated Branches: refs/heads/master 417c5a4b4 -> 98710ead5
[FLINK-4751] [futures] Add thenCombineAsync function to Flink's futures The thenCombineAsync method allows to combine two futures and apply a BiFunction on the results of both futures. The BiFunction is only applied after both futures have completed. Add ThrowableWrapperException to properly handle throwables This closes #2600. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/98710ead Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/98710ead Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/98710ead Branch: refs/heads/master Commit: 98710ead59305fc067cb8fdfab2c47d3bdc2e3fc Parents: 417c5a4 Author: Till Rohrmann <[email protected]> Authored: Wed Oct 5 18:54:38 2016 +0200 Committer: Till Rohrmann <[email protected]> Committed: Thu Oct 6 13:34:32 2016 +0200 ---------------------------------------------------------------------- .../apache/flink/runtime/concurrent/Future.java | 20 ++++- .../runtime/concurrent/impl/FlinkFuture.java | 83 ++++++++++++++++++-- .../runtime/concurrent/FlinkFutureTest.java | 82 ++++++++++++++++++- 3 files changed, 174 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/98710ead/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Future.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Future.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Future.java index b32bcd4..409c978 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Future.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Future.java @@ -138,12 +138,13 @@ public interface Future<T> { * @param <R> type of the returned future's value * @return future representing the flattened return value of the apply function */ - <R> Future<R> thenComposeAsync(ApplyFunction<? super T, Future<? extends R>> composeFunction, Executor executor); + <R> Future<R> thenComposeAsync(ApplyFunction<? super T, ? extends Future<R>> composeFunction, Executor executor); /** * Applies the given handle function to the result of the future. The result can either be the * future's value or the exception with which the future has been completed. The two cases are - * mutually exclusive. The result of the handle function is the returned future's value. + * mutually exclusive. This means that either the left or right argument of the handle function + * are non null. The result of the handle function is the returned future's value. * <p> * The handle function is executed asynchronously by the given executor. * @@ -153,4 +154,19 @@ public interface Future<T> { * @return future representing the handle function's return value */ <R> Future<R> handleAsync(BiFunction<? super T, Throwable, ? extends R> biFunction, Executor executor); + + /** + * Applies the given function to the result of this and the other future after both futures + * have completed. The result of the bi-function is the result of the returned future. + * <p> + * The bi-function is executed asynchronously by the given executor. + * + * @param other future whose result is the right input to the bi-function + * @param biFunction applied to the result of this and that future + * @param executor used to execute the bi-function asynchronously + * @param <U> type of that future's return value + * @param <R> type of the bi-function's return value + * @return future representing the bi-function's return value + */ + <U, R> Future<R> thenCombineAsync(Future<U> other, BiFunction<? super T, ? super U, ? extends R> biFunction, Executor executor); } http://git-wip-us.apache.org/repos/asf/flink/blob/98710ead/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java index 3f2c5e4..277f4fa 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.concurrent.BiFunction; import org.apache.flink.util.Preconditions; import scala.Option; +import scala.Tuple2; import scala.concurrent.Await; import scala.concurrent.ExecutionContext; import scala.concurrent.duration.Duration; @@ -81,6 +82,8 @@ public class FlinkFuture<T> implements Future<T> { return Await.result(scalaFuture, Duration.Inf()); } catch (InterruptedException e) { throw e; + } catch (FlinkFuture.ThrowableWrapperException e) { + throw new ExecutionException(e.getCause()); } catch (Exception e) { throw new ExecutionException(e); } @@ -171,11 +174,13 @@ public class FlinkFuture<T> implements Future<T> { } @Override - public <R> Future<R> thenComposeAsync(final ApplyFunction<? super T, Future<? extends R>> applyFunction, final Executor executor) { + public <R> Future<R> thenComposeAsync(final ApplyFunction<? super T, ? extends Future<R>> applyFunction, Executor executor) { Preconditions.checkNotNull(scalaFuture); Preconditions.checkNotNull(applyFunction); Preconditions.checkNotNull(executor); + final ExecutionContext executionContext = createExecutionContext(executor); + scala.concurrent.Future<R> flatMappedFuture = scalaFuture.flatMap(new Mapper<T, scala.concurrent.Future<R>>() { @Override public scala.concurrent.Future<R> apply(T value) { @@ -190,12 +195,21 @@ public class FlinkFuture<T> implements Future<T> { return Futures.future(new Callable<R>() { @Override public R call() throws Exception { - return future.get(); + try { + return future.get(); + } catch (ExecutionException e) { + // unwrap the execution exception if it's not a throwable + if (e.getCause() instanceof Exception) { + throw (Exception) e.getCause(); + } else { + throw new FlinkFuture.ThrowableWrapperException(e.getCause()); + } + } } - }, createExecutionContext(executor)); + }, executionContext); } } - }, createExecutionContext(executor)); + }, executionContext); return new FlinkFuture<>(flatMappedFuture); } @@ -206,6 +220,8 @@ public class FlinkFuture<T> implements Future<T> { Preconditions.checkNotNull(biFunction); Preconditions.checkNotNull(executor); + final ExecutionContext executionContext = createExecutionContext(executor); + scala.concurrent.Future<R> mappedFuture = scalaFuture.map(new Mapper<T, R>() { @Override public R checkedApply(T value) throws Exception { @@ -215,7 +231,7 @@ public class FlinkFuture<T> implements Future<T> { throw new FlinkFuture.WrapperException(t); } } - }, createExecutionContext(executor)); + }, executionContext); scala.concurrent.Future<R> recoveredFuture = mappedFuture.recover(new Recover<R>() { @Override @@ -226,12 +242,52 @@ public class FlinkFuture<T> implements Future<T> { return biFunction.apply(null, failure); } } - }, createExecutionContext(executor)); - + }, executionContext); return new FlinkFuture<>(recoveredFuture); } + @Override + public <U, R> Future<R> thenCombineAsync(final Future<U> other, final BiFunction<? super T, ? super U, ? extends R> biFunction, final Executor executor) { + Preconditions.checkNotNull(other); + Preconditions.checkNotNull(biFunction); + Preconditions.checkNotNull(executor); + + final ExecutionContext executionContext = createExecutionContext(executor); + + final scala.concurrent.Future<U> thatScalaFuture; + + if (other instanceof FlinkFuture) { + thatScalaFuture = ((FlinkFuture<U>) other).scalaFuture; + } else { + thatScalaFuture = Futures.future(new Callable<U>() { + @Override + public U call() throws Exception { + try { + return other.get(); + } catch (ExecutionException e) { + // unwrap the execution exception if the cause is an Exception + if (e.getCause() instanceof Exception) { + throw (Exception) e.getCause(); + } else { + // it's an error or a throwable which we have to wrap for the moment + throw new FlinkFuture.ThrowableWrapperException(e.getCause()); + } + } + } + }, executionContext); + } + + scala.concurrent.Future<R> result = scalaFuture.zip(thatScalaFuture).map(new Mapper<Tuple2<T, U>, R>() { + @Override + public R apply(Tuple2<T, U> tuple2) { + return biFunction.apply(tuple2._1, tuple2._2); + } + }, executionContext); + + return new FlinkFuture<>(result); + } + //----------------------------------------------------------------------------------- // Static factory methods //----------------------------------------------------------------------------------- @@ -269,4 +325,17 @@ public class FlinkFuture<T> implements Future<T> { super(cause); } } + + /** + * Wrapper for {@link Throwable} which is used to emit the proper exception when calling + * {@link Future#get}. + */ + private static class ThrowableWrapperException extends Exception { + + private static final long serialVersionUID = 3855668690181179801L; + + ThrowableWrapperException(Throwable throwable) { + super(Preconditions.checkNotNull(throwable)); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/98710ead/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java index bd5af66..905f5b5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java @@ -154,9 +154,9 @@ public class FlinkFutureTest extends TestLogger { final int expectedValue = 42; - Future<Integer> composedFuture = initialFuture.thenComposeAsync(new ApplyFunction<Integer, Future<? extends Integer>>() { + Future<Integer> composedFuture = initialFuture.thenComposeAsync(new ApplyFunction<Integer, Future<Integer>>() { @Override - public Future<? extends Integer> apply(Integer value) { + public Future<Integer> apply(Integer value) { return FlinkFuture.supplyAsync(new Callable<Integer>() { @Override public Integer call() throws Exception { @@ -174,6 +174,84 @@ public class FlinkFutureTest extends TestLogger { } @Test + public void testCombine() throws ExecutionException, InterruptedException { + CompletableFuture<Integer> leftFuture = new FlinkCompletableFuture<>(); + CompletableFuture<String> rightFuture = new FlinkCompletableFuture<>(); + + final int expectedLeftValue = 42; + final String expectedRightValue = "foobar"; + + + Future<String> resultFuture = leftFuture.thenCombineAsync(rightFuture, new BiFunction<Integer, String, String>() { + @Override + public String apply(Integer integer, String s) { + return s + integer; + } + }, executor); + + leftFuture.complete(expectedLeftValue); + rightFuture.complete(expectedRightValue); + + String result = resultFuture.get(); + + assertEquals(expectedRightValue + expectedLeftValue, result); + } + + @Test + public void testCombineLeftFailure() throws InterruptedException { + CompletableFuture<Integer> leftFuture = new FlinkCompletableFuture<>(); + CompletableFuture<String> rightFuture = new FlinkCompletableFuture<>(); + + final String expectedRightValue = "foobar"; + final TestException testException = new TestException("barfoo"); + + + Future<String> resultFuture = leftFuture.thenCombineAsync(rightFuture, new BiFunction<Integer, String, String>() { + @Override + public String apply(Integer integer, String s) { + return s + integer; + } + }, executor); + + leftFuture.completeExceptionally(testException); + rightFuture.complete(expectedRightValue); + + try { + resultFuture.get(); + fail("We should have caught an ExecutionException."); + } catch (ExecutionException e) { + assertEquals(testException, e.getCause()); + } + } + + @Test + public void testCombineRightFailure() throws ExecutionException, InterruptedException { + CompletableFuture<Integer> leftFuture = new FlinkCompletableFuture<>(); + CompletableFuture<String> rightFuture = new FlinkCompletableFuture<>(); + + final int expectedLeftValue = 42; + final TestException testException = new TestException("barfoo"); + + + Future<String> resultFuture = leftFuture.thenCombineAsync(rightFuture, new BiFunction<Integer, String, String>() { + @Override + public String apply(Integer integer, String s) { + return s + integer; + } + }, executor); + + leftFuture.complete(expectedLeftValue); + rightFuture.completeExceptionally(testException); + + try { + resultFuture.get(); + fail("We should have caught an ExecutionException."); + } catch (ExecutionException e) { + assertEquals(testException, e.getCause()); + } + } + + @Test public void testGetNow() throws ExecutionException { CompletableFuture<Integer> initialFuture = new FlinkCompletableFuture<>();
