Repository: flink
Updated Branches:
  refs/heads/master 417c5a4b4 -> 98710ead5


[FLINK-4751] [futures] Add thenCombineAsync function to Flink's futures

The thenCombineAsync method allows to combine two futures and apply a 
BiFunction on
the results of both futures. The BiFunction is only applied after both futures 
have
completed.

Add ThrowableWrapperException to properly handle throwables

This closes #2600.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/98710ead
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/98710ead
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/98710ead

Branch: refs/heads/master
Commit: 98710ead59305fc067cb8fdfab2c47d3bdc2e3fc
Parents: 417c5a4
Author: Till Rohrmann <[email protected]>
Authored: Wed Oct 5 18:54:38 2016 +0200
Committer: Till Rohrmann <[email protected]>
Committed: Thu Oct 6 13:34:32 2016 +0200

----------------------------------------------------------------------
 .../apache/flink/runtime/concurrent/Future.java | 20 ++++-
 .../runtime/concurrent/impl/FlinkFuture.java    | 83 ++++++++++++++++++--
 .../runtime/concurrent/FlinkFutureTest.java     | 82 ++++++++++++++++++-
 3 files changed, 174 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/98710ead/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Future.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Future.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Future.java
index b32bcd4..409c978 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Future.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Future.java
@@ -138,12 +138,13 @@ public interface Future<T> {
         * @param <R> type of the returned future's value
         * @return future representing the flattened return value of the apply 
function
         */
-       <R> Future<R> thenComposeAsync(ApplyFunction<? super T, Future<? 
extends R>> composeFunction, Executor executor);
+       <R> Future<R> thenComposeAsync(ApplyFunction<? super T, ? extends 
Future<R>> composeFunction, Executor executor);
 
        /**
         * Applies the given handle function to the result of the future. The 
result can either be the
         * future's value or the exception with which the future has been 
completed. The two cases are
-        * mutually exclusive. The result of the handle function is the 
returned future's value.
+        * mutually exclusive. This means that either the left or right 
argument of the handle function
+        * are non null. The result of the handle function is the returned 
future's value.
         * <p>
         * The handle function is executed asynchronously by the given executor.
         *
@@ -153,4 +154,19 @@ public interface Future<T> {
         * @return future representing the handle function's return value
         */
        <R> Future<R> handleAsync(BiFunction<? super T, Throwable, ? extends R> 
biFunction, Executor executor);
+
+       /**
+        * Applies the given function to the result of this and the other 
future after both futures
+        * have completed. The result of the bi-function is the result of the 
returned future.
+        * <p>
+        * The bi-function is executed asynchronously by the given executor.
+        *
+        * @param other future whose result is the right input to the 
bi-function
+        * @param biFunction applied to the result of this and that future
+        * @param executor used to execute the bi-function asynchronously
+        * @param <U> type of that future's return value
+        * @param <R> type of the bi-function's return value
+        * @return future representing the bi-function's return value
+        */
+       <U, R> Future<R> thenCombineAsync(Future<U> other, BiFunction<? super 
T, ? super U, ? extends R> biFunction, Executor executor);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/98710ead/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
index 3f2c5e4..277f4fa 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.BiFunction;
 import org.apache.flink.util.Preconditions;
 import scala.Option;
+import scala.Tuple2;
 import scala.concurrent.Await;
 import scala.concurrent.ExecutionContext;
 import scala.concurrent.duration.Duration;
@@ -81,6 +82,8 @@ public class FlinkFuture<T> implements Future<T> {
                        return Await.result(scalaFuture, Duration.Inf());
                } catch (InterruptedException e) {
                        throw e;
+               } catch (FlinkFuture.ThrowableWrapperException e) {
+                       throw new ExecutionException(e.getCause());
                } catch (Exception e) {
                        throw new ExecutionException(e);
                }
@@ -171,11 +174,13 @@ public class FlinkFuture<T> implements Future<T> {
        }
 
        @Override
-       public <R> Future<R> thenComposeAsync(final ApplyFunction<? super T, 
Future<? extends R>> applyFunction, final Executor executor) {
+       public <R> Future<R> thenComposeAsync(final ApplyFunction<? super T, ? 
extends Future<R>> applyFunction, Executor executor) {
                Preconditions.checkNotNull(scalaFuture);
                Preconditions.checkNotNull(applyFunction);
                Preconditions.checkNotNull(executor);
 
+               final ExecutionContext executionContext = 
createExecutionContext(executor);
+
                scala.concurrent.Future<R> flatMappedFuture = 
scalaFuture.flatMap(new Mapper<T, scala.concurrent.Future<R>>() {
                        @Override
                        public scala.concurrent.Future<R> apply(T value) {
@@ -190,12 +195,21 @@ public class FlinkFuture<T> implements Future<T> {
                                        return Futures.future(new Callable<R>() 
{
                                                @Override
                                                public R call() throws 
Exception {
-                                                       return future.get();
+                                                       try {
+                                                               return 
future.get();
+                                                       } catch 
(ExecutionException e) {
+                                                               // unwrap the 
execution exception if it's not a throwable
+                                                               if 
(e.getCause() instanceof Exception) {
+                                                                       throw 
(Exception) e.getCause();
+                                                               } else {
+                                                                       throw 
new FlinkFuture.ThrowableWrapperException(e.getCause());
+                                                               }
+                                                       }
                                                }
-                                       }, createExecutionContext(executor));
+                                       }, executionContext);
                                }
                        }
-               }, createExecutionContext(executor));
+               }, executionContext);
 
                return new FlinkFuture<>(flatMappedFuture);
        }
@@ -206,6 +220,8 @@ public class FlinkFuture<T> implements Future<T> {
                Preconditions.checkNotNull(biFunction);
                Preconditions.checkNotNull(executor);
 
+               final ExecutionContext executionContext = 
createExecutionContext(executor);
+
                scala.concurrent.Future<R> mappedFuture = scalaFuture.map(new 
Mapper<T, R>() {
                        @Override
                        public R checkedApply(T value) throws Exception {
@@ -215,7 +231,7 @@ public class FlinkFuture<T> implements Future<T> {
                                        throw new 
FlinkFuture.WrapperException(t);
                                }
                        }
-               }, createExecutionContext(executor));
+               }, executionContext);
 
                scala.concurrent.Future<R> recoveredFuture = 
mappedFuture.recover(new Recover<R>() {
                        @Override
@@ -226,12 +242,52 @@ public class FlinkFuture<T> implements Future<T> {
                                        return biFunction.apply(null, failure);
                                }
                        }
-               }, createExecutionContext(executor));
-
+               }, executionContext);
 
                return new FlinkFuture<>(recoveredFuture);
        }
 
+       @Override
+       public <U, R> Future<R> thenCombineAsync(final Future<U> other, final 
BiFunction<? super T, ? super U, ? extends R> biFunction, final Executor 
executor) {
+               Preconditions.checkNotNull(other);
+               Preconditions.checkNotNull(biFunction);
+               Preconditions.checkNotNull(executor);
+
+               final ExecutionContext executionContext = 
createExecutionContext(executor);
+
+               final scala.concurrent.Future<U> thatScalaFuture;
+
+               if (other instanceof FlinkFuture) {
+                       thatScalaFuture = ((FlinkFuture<U>) other).scalaFuture;
+               } else {
+                       thatScalaFuture = Futures.future(new Callable<U>() {
+                               @Override
+                               public U call() throws Exception {
+                                       try {
+                                               return other.get();
+                                       } catch (ExecutionException e) {
+                                               // unwrap the execution 
exception if the cause is an Exception
+                                               if (e.getCause() instanceof 
Exception) {
+                                                       throw (Exception) 
e.getCause();
+                                               } else {
+                                                       // it's an error or a 
throwable which we have to wrap for the moment
+                                                       throw new 
FlinkFuture.ThrowableWrapperException(e.getCause());
+                                               }
+                                       }
+                               }
+                       }, executionContext);
+               }
+
+               scala.concurrent.Future<R>  result = 
scalaFuture.zip(thatScalaFuture).map(new Mapper<Tuple2<T, U>, R>() {
+                       @Override
+                       public R apply(Tuple2<T, U> tuple2) {
+                               return biFunction.apply(tuple2._1, tuple2._2);
+                       }
+               }, executionContext);
+
+               return new FlinkFuture<>(result);
+       }
+
        
//-----------------------------------------------------------------------------------
        // Static factory methods
        
//-----------------------------------------------------------------------------------
@@ -269,4 +325,17 @@ public class FlinkFuture<T> implements Future<T> {
                        super(cause);
                }
        }
+
+       /**
+        * Wrapper for {@link Throwable} which is used to emit the proper 
exception when calling
+        * {@link Future#get}.
+        */
+       private static class ThrowableWrapperException extends Exception {
+
+               private static final long serialVersionUID = 
3855668690181179801L;
+
+               ThrowableWrapperException(Throwable throwable) {
+                       super(Preconditions.checkNotNull(throwable));
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/98710ead/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java
index bd5af66..905f5b5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java
@@ -154,9 +154,9 @@ public class FlinkFutureTest extends TestLogger {
 
                final int expectedValue = 42;
 
-               Future<Integer> composedFuture = 
initialFuture.thenComposeAsync(new ApplyFunction<Integer, Future<? extends 
Integer>>() {
+               Future<Integer> composedFuture = 
initialFuture.thenComposeAsync(new ApplyFunction<Integer, Future<Integer>>() {
                        @Override
-                       public Future<? extends Integer> apply(Integer value) {
+                       public Future<Integer> apply(Integer value) {
                                return FlinkFuture.supplyAsync(new 
Callable<Integer>() {
                                        @Override
                                        public Integer call() throws Exception {
@@ -174,6 +174,84 @@ public class FlinkFutureTest extends TestLogger {
        }
 
        @Test
+       public void testCombine() throws ExecutionException, 
InterruptedException {
+               CompletableFuture<Integer> leftFuture = new 
FlinkCompletableFuture<>();
+               CompletableFuture<String> rightFuture = new 
FlinkCompletableFuture<>();
+
+               final int expectedLeftValue = 42;
+               final String expectedRightValue = "foobar";
+
+
+               Future<String> resultFuture = 
leftFuture.thenCombineAsync(rightFuture, new BiFunction<Integer, String, 
String>() {
+                       @Override
+                       public String apply(Integer integer, String s) {
+                               return s + integer;
+                       }
+               }, executor);
+
+               leftFuture.complete(expectedLeftValue);
+               rightFuture.complete(expectedRightValue);
+
+               String result = resultFuture.get();
+
+               assertEquals(expectedRightValue + expectedLeftValue, result);
+       }
+
+       @Test
+       public void testCombineLeftFailure() throws InterruptedException {
+               CompletableFuture<Integer> leftFuture = new 
FlinkCompletableFuture<>();
+               CompletableFuture<String> rightFuture = new 
FlinkCompletableFuture<>();
+
+               final String expectedRightValue = "foobar";
+               final TestException testException = new TestException("barfoo");
+
+
+               Future<String> resultFuture = 
leftFuture.thenCombineAsync(rightFuture, new BiFunction<Integer, String, 
String>() {
+                       @Override
+                       public String apply(Integer integer, String s) {
+                               return s + integer;
+                       }
+               }, executor);
+
+               leftFuture.completeExceptionally(testException);
+               rightFuture.complete(expectedRightValue);
+
+               try {
+                       resultFuture.get();
+                       fail("We should have caught an ExecutionException.");
+               } catch (ExecutionException e) {
+                       assertEquals(testException, e.getCause());
+               }
+       }
+
+       @Test
+       public void testCombineRightFailure() throws ExecutionException, 
InterruptedException {
+               CompletableFuture<Integer> leftFuture = new 
FlinkCompletableFuture<>();
+               CompletableFuture<String> rightFuture = new 
FlinkCompletableFuture<>();
+
+               final int expectedLeftValue = 42;
+               final TestException testException = new TestException("barfoo");
+
+
+               Future<String> resultFuture = 
leftFuture.thenCombineAsync(rightFuture, new BiFunction<Integer, String, 
String>() {
+                       @Override
+                       public String apply(Integer integer, String s) {
+                               return s + integer;
+                       }
+               }, executor);
+
+               leftFuture.complete(expectedLeftValue);
+               rightFuture.completeExceptionally(testException);
+
+               try {
+                       resultFuture.get();
+                       fail("We should have caught an ExecutionException.");
+               } catch (ExecutionException e) {
+                       assertEquals(testException, e.getCause());
+               }
+       }
+
+       @Test
        public void testGetNow() throws ExecutionException {
                CompletableFuture<Integer> initialFuture = new 
FlinkCompletableFuture<>();
 

Reply via email to