Repository: flink
Updated Branches:
  refs/heads/master b410c393c -> 1e1e4dc70


[FLINK-4903] [futures] Introduce synchronous future operations

The synchronous future operations are executed by the thread which adds the 
operation
if the future is already completed or by the thread executing the last 
operation.

Increase FlinkFutureTest timeouts to 10s

This closes #2689.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1e1e4dc7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1e1e4dc7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1e1e4dc7

Branch: refs/heads/master
Commit: 1e1e4dc701d3606bdcaa291d2d85460fdfbb0dc7
Parents: b410c39
Author: Till Rohrmann <[email protected]>
Authored: Tue Oct 25 10:31:53 2016 +0200
Committer: Till Rohrmann <[email protected]>
Committed: Thu Oct 27 15:40:54 2016 +0200

----------------------------------------------------------------------
 .../apache/flink/runtime/concurrent/Future.java |  67 ++++++++
 .../runtime/concurrent/impl/FlinkFuture.java    |  31 ++++
 .../runtime/concurrent/FlinkFutureTest.java     | 156 ++++++++++++++++---
 3 files changed, 235 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1e1e4dc7/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Future.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Future.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Future.java
index 409c978..a6d5a48 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Future.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Future.java
@@ -99,6 +99,16 @@ public interface Future<T> {
        <R> Future<R> thenApplyAsync(ApplyFunction<? super T, ? extends R> 
applyFunction, Executor executor);
 
        /**
+        * Applies the given function to the value of the future. The result of 
the apply function is
+        * the value of the newly returned future.
+        *
+        * @param applyFunction function to apply to the future's value
+        * @param <R> type of the apply function's return value
+        * @return future representing the return value of the given apply 
function
+        */
+       <R> Future<R> thenApply(ApplyFunction<? super T, ? extends R> 
applyFunction);
+
+       /**
         * Applies the accept function to the value of the future. Unlike the 
{@link ApplyFunction}, the
         * {@link AcceptFunction} does not return a value. The returned future, 
thus, represents only
         * the completion of the accept callback.
@@ -112,6 +122,16 @@ public interface Future<T> {
        Future<Void> thenAcceptAsync(AcceptFunction<? super T> acceptFunction, 
Executor executor);
 
        /**
+        * Applies the accept function to the value of the future. Unlike the 
{@link ApplyFunction}, the
+        * {@link AcceptFunction} does not return a value. The returned future, 
thus, represents only
+        * the completion of the accept callback.
+        *
+        * @param acceptFunction function to apply to the future's value
+        * @return future representing the completion of the accept callback
+        */
+       Future<Void> thenAccept(AcceptFunction<? super T> acceptFunction);
+
+       /**
         * Applies the given function to the value of the future if the future 
has been completed
         * exceptionally. The completing exception is given to the apply 
function which can return a new
         * value which is the value of the returned future.
@@ -126,6 +146,17 @@ public interface Future<T> {
        <R> Future<R> exceptionallyAsync(ApplyFunction<Throwable, ? extends R> 
exceptionallyFunction, Executor executor);
 
        /**
+        * Applies the given function to the value of the future if the future 
has been completed
+        * exceptionally. The completing exception is given to the apply 
function which can return a new
+        * value which is the value of the returned future.
+        *
+        * @param exceptionallyFunction to apply to the future's value if it is 
an exception
+        * @param <R> type of the apply function's return value
+        * @return future representing the return value of the given apply 
function
+        */
+       <R> Future<R> exceptionally(ApplyFunction<Throwable, ? extends R> 
exceptionallyFunction);
+
+       /**
         * Applies the given function to the value of the future. The apply 
function returns a future
         * result, which is flattened. This means that the resulting future of 
this method represents
         * the future's value of the apply function.
@@ -141,6 +172,18 @@ public interface Future<T> {
        <R> Future<R> thenComposeAsync(ApplyFunction<? super T, ? extends 
Future<R>> composeFunction, Executor executor);
 
        /**
+        * Applies the given function to the value of the future. The apply 
function returns a future
+        * result, which is flattened. This means that the resulting future of 
this method represents
+        * the future's value of the apply function.
+        *
+        * @param composeFunction to apply to the future's value. The function 
returns a future which is
+        *                        flattened
+        * @param <R> type of the returned future's value
+        * @return future representing the flattened return value of the apply 
function
+        */
+       <R> Future<R> thenCompose(ApplyFunction<? super T, ? extends Future<R>> 
composeFunction);
+
+       /**
         * Applies the given handle function to the result of the future. The 
result can either be the
         * future's value or the exception with which the future has been 
completed. The two cases are
         * mutually exclusive. This means that either the left or right 
argument of the handle function
@@ -156,6 +199,18 @@ public interface Future<T> {
        <R> Future<R> handleAsync(BiFunction<? super T, Throwable, ? extends R> 
biFunction, Executor executor);
 
        /**
+        * Applies the given handle function to the result of the future. The 
result can either be the
+        * future's value or the exception with which the future has been 
completed. The two cases are
+        * mutually exclusive. This means that either the left or right 
argument of the handle function
+        * are non null. The result of the handle function is the returned 
future's value.
+        *
+        * @param biFunction applied to the result (normal and exceptional) of 
the future
+        * @param <R> type of the handle function's return value
+        * @return future representing the handle function's return value
+        */
+       <R> Future<R> handle(BiFunction<? super T, Throwable, ? extends R> 
biFunction);
+
+       /**
         * Applies the given function to the result of this and the other 
future after both futures
         * have completed. The result of the bi-function is the result of the 
returned future.
         * <p>
@@ -169,4 +224,16 @@ public interface Future<T> {
         * @return future representing the bi-function's return value
         */
        <U, R> Future<R> thenCombineAsync(Future<U> other, BiFunction<? super 
T, ? super U, ? extends R> biFunction, Executor executor);
+
+       /**
+        * Applies the given function to the result of this and the other 
future after both futures
+        * have completed. The result of the bi-function is the result of the 
returned future.
+        *
+        * @param other future whose result is the right input to the 
bi-function
+        * @param biFunction applied to the result of this and that future
+        * @param <U> type of that future's return value
+        * @param <R> type of the bi-function's return value
+        * @return future representing the bi-function's return value
+        */
+       <U, R> Future<R> thenCombine(Future<U> other, BiFunction<? super T, ? 
super U, ? extends R> biFunction);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1e1e4dc7/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
index 277f4fa..e881399 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
@@ -24,6 +24,7 @@ import akka.dispatch.Mapper;
 import akka.dispatch.Recover;
 import org.apache.flink.runtime.concurrent.AcceptFunction;
 import org.apache.flink.runtime.concurrent.ApplyFunction;
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.BiFunction;
 import org.apache.flink.util.Preconditions;
@@ -140,6 +141,11 @@ public class FlinkFuture<T> implements Future<T> {
        }
 
        @Override
+       public <R> Future<R> thenApply(final ApplyFunction<? super T, ? extends 
R> applyFunction) {
+               return thenApplyAsync(applyFunction, 
Executors.directExecutor());
+       }
+
+       @Override
        public Future<Void> thenAcceptAsync(final AcceptFunction<? super T> 
acceptFunction, Executor executor) {
                Preconditions.checkNotNull(scalaFuture);
                Preconditions.checkNotNull(acceptFunction);
@@ -158,6 +164,11 @@ public class FlinkFuture<T> implements Future<T> {
        }
 
        @Override
+       public Future<Void> thenAccept(AcceptFunction<? super T> 
acceptFunction) {
+               return thenAcceptAsync(acceptFunction, 
Executors.directExecutor());
+       }
+
+       @Override
        public <R> Future<R> exceptionallyAsync(final ApplyFunction<Throwable, 
? extends R> exceptionallyFunction, Executor executor) {
                Preconditions.checkNotNull(scalaFuture);
                Preconditions.checkNotNull(exceptionallyFunction);
@@ -174,6 +185,11 @@ public class FlinkFuture<T> implements Future<T> {
        }
 
        @Override
+       public <R> Future<R> exceptionally(ApplyFunction<Throwable, ? extends 
R> exceptionallyFunction) {
+               return exceptionallyAsync(exceptionallyFunction, 
Executors.directExecutor());
+       }
+
+       @Override
        public <R> Future<R> thenComposeAsync(final ApplyFunction<? super T, ? 
extends Future<R>> applyFunction, Executor executor) {
                Preconditions.checkNotNull(scalaFuture);
                Preconditions.checkNotNull(applyFunction);
@@ -215,6 +231,11 @@ public class FlinkFuture<T> implements Future<T> {
        }
 
        @Override
+       public <R> Future<R> thenCompose(ApplyFunction<? super T, ? extends 
Future<R>> composeFunction) {
+               return thenComposeAsync(composeFunction, 
Executors.directExecutor());
+       }
+
+       @Override
        public <R> Future<R> handleAsync(final BiFunction<? super T, Throwable, 
? extends R> biFunction, Executor executor) {
                Preconditions.checkNotNull(scalaFuture);
                Preconditions.checkNotNull(biFunction);
@@ -248,6 +269,11 @@ public class FlinkFuture<T> implements Future<T> {
        }
 
        @Override
+       public <R> Future<R> handle(BiFunction<? super T, Throwable, ? extends 
R> biFunction) {
+               return handleAsync(biFunction, Executors.directExecutor());
+       }
+
+       @Override
        public <U, R> Future<R> thenCombineAsync(final Future<U> other, final 
BiFunction<? super T, ? super U, ? extends R> biFunction, final Executor 
executor) {
                Preconditions.checkNotNull(other);
                Preconditions.checkNotNull(biFunction);
@@ -288,6 +314,11 @@ public class FlinkFuture<T> implements Future<T> {
                return new FlinkFuture<>(result);
        }
 
+       @Override
+       public <U, R> Future<R> thenCombine(Future<U> other, BiFunction<? super 
T, ? super U, ? extends R> biFunction) {
+               return thenCombineAsync(other, biFunction, 
Executors.directExecutor());
+       }
+
        
//-----------------------------------------------------------------------------------
        // Static factory methods
        
//-----------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/1e1e4dc7/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java
index 905f5b5..25d010b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java
@@ -55,8 +55,8 @@ public class FlinkFutureTest extends TestLogger {
                executor.shutdown();
        }
 
-       @Test
-       public void testFutureApply() throws Exception {
+       @Test(timeout = 10000L)
+       public void testFutureApplyAsync() throws Exception {
                int expectedValue = 42;
 
                CompletableFuture<Integer> initialFuture = new 
FlinkCompletableFuture<>();
@@ -129,8 +129,8 @@ public class FlinkFutureTest extends TestLogger {
                }
        }
 
-       @Test
-       public void testExceptionally() throws ExecutionException, 
InterruptedException {
+       @Test(timeout = 10000L)
+       public void testExceptionallyAsync() throws ExecutionException, 
InterruptedException {
                CompletableFuture<Integer> initialFuture = new 
FlinkCompletableFuture<>();
                String exceptionMessage = "Foobar";
 
@@ -148,8 +148,8 @@ public class FlinkFutureTest extends TestLogger {
                assertEquals(exceptionMessage, actualMessage);
        }
 
-       @Test
-       public void testCompose() throws ExecutionException, 
InterruptedException {
+       @Test(timeout = 10000L)
+       public void testComposeAsync() throws ExecutionException, 
InterruptedException {
                CompletableFuture<Integer> initialFuture = new 
FlinkCompletableFuture<>();
 
                final int expectedValue = 42;
@@ -173,8 +173,8 @@ public class FlinkFutureTest extends TestLogger {
                assertEquals(expectedValue, actualValue);
        }
 
-       @Test
-       public void testCombine() throws ExecutionException, 
InterruptedException {
+       @Test(timeout = 10000L)
+       public void testCombineAsync() throws ExecutionException, 
InterruptedException {
                CompletableFuture<Integer> leftFuture = new 
FlinkCompletableFuture<>();
                CompletableFuture<String> rightFuture = new 
FlinkCompletableFuture<>();
 
@@ -197,8 +197,8 @@ public class FlinkFutureTest extends TestLogger {
                assertEquals(expectedRightValue + expectedLeftValue, result);
        }
 
-       @Test
-       public void testCombineLeftFailure() throws InterruptedException {
+       @Test(timeout = 10000L)
+       public void testCombineAsyncLeftFailure() throws InterruptedException {
                CompletableFuture<Integer> leftFuture = new 
FlinkCompletableFuture<>();
                CompletableFuture<String> rightFuture = new 
FlinkCompletableFuture<>();
 
@@ -224,8 +224,8 @@ public class FlinkFutureTest extends TestLogger {
                }
        }
 
-       @Test
-       public void testCombineRightFailure() throws ExecutionException, 
InterruptedException {
+       @Test(timeout = 10000L)
+       public void testCombineAsyncRightFailure() throws ExecutionException, 
InterruptedException {
                CompletableFuture<Integer> leftFuture = new 
FlinkCompletableFuture<>();
                CompletableFuture<String> rightFuture = new 
FlinkCompletableFuture<>();
 
@@ -260,8 +260,8 @@ public class FlinkFutureTest extends TestLogger {
                assertEquals(new Integer(absentValue), 
initialFuture.getNow(absentValue));
        }
 
-       @Test
-       public void testAccept() throws ExecutionException, 
InterruptedException {
+       @Test(timeout = 10000L)
+       public void testAcceptAsync() throws ExecutionException, 
InterruptedException {
                CompletableFuture<Integer> initialFuture = new 
FlinkCompletableFuture<>();
                final AtomicInteger atomicInteger = new AtomicInteger(0);
                int expectedValue = 42;
@@ -280,8 +280,8 @@ public class FlinkFutureTest extends TestLogger {
                assertEquals(expectedValue, atomicInteger.get());
        }
 
-       @Test
-       public void testHandle() throws ExecutionException, 
InterruptedException {
+       @Test(timeout = 10000L)
+       public void testHandleAsync() throws ExecutionException, 
InterruptedException {
                CompletableFuture<Integer> initialFuture = new 
FlinkCompletableFuture<>();
                int expectedValue = 43;
 
@@ -301,8 +301,8 @@ public class FlinkFutureTest extends TestLogger {
                assertEquals(String.valueOf(expectedValue), result.get());
        }
 
-       @Test
-       public void testHandleException() throws ExecutionException, 
InterruptedException {
+       @Test(timeout = 10000L)
+       public void testHandleAsyncException() throws ExecutionException, 
InterruptedException {
                CompletableFuture<Integer> initialFuture = new 
FlinkCompletableFuture<>();
                String exceptionMessage = "foobar";
 
@@ -322,7 +322,7 @@ public class FlinkFutureTest extends TestLogger {
                assertEquals(exceptionMessage, result.get());
        }
 
-       @Test
+       @Test(timeout = 10000L)
        public void testMultipleCompleteOperations() throws ExecutionException, 
InterruptedException {
                CompletableFuture<Integer> initialFuture = new 
FlinkCompletableFuture<>();
                int expectedValue = 42;
@@ -336,6 +336,124 @@ public class FlinkFutureTest extends TestLogger {
                assertEquals(new Integer(expectedValue), initialFuture.get());
        }
 
+       @Test
+       public void testApply() throws ExecutionException, InterruptedException 
{
+               int expectedValue = 42;
+
+               CompletableFuture<Integer> initialFuture = new 
FlinkCompletableFuture<>();
+
+               Future<String> appliedFuture = initialFuture.thenApply(new 
ApplyFunction<Integer, String>() {
+                       @Override
+                       public String apply(Integer value) {
+                               return String.valueOf(value);
+                       }
+               });
+
+               initialFuture.complete(expectedValue);
+
+               assertEquals(String.valueOf(expectedValue), 
appliedFuture.get());
+       }
+
+       @Test
+       public void testAccept() throws ExecutionException, 
InterruptedException {
+               int expectedValue = 42;
+               Future<Integer> initialFuture = 
FlinkCompletableFuture.completed(expectedValue);
+               final AtomicInteger atomicInteger = new AtomicInteger(0);
+
+               Future<Void> result = initialFuture.thenAccept(new 
AcceptFunction<Integer>() {
+                       @Override
+                       public void accept(Integer value) {
+                               atomicInteger.set(value);
+                       }
+               });
+
+               result.get();
+
+               assertEquals(expectedValue, atomicInteger.get());
+       }
+
+       @Test
+       public void testExceptionally() throws ExecutionException, 
InterruptedException {
+               String exceptionMessage = "Foobar";
+               Future<Integer> initialFuture = FlinkCompletableFuture
+                       .completedExceptionally(new 
TestException(exceptionMessage));
+
+
+               Future<String> recovered = initialFuture.exceptionally(new 
ApplyFunction<Throwable, String>() {
+                       @Override
+                       public String apply(Throwable value) {
+                               return value.getMessage();
+                       }
+               });
+
+               String actualMessage = recovered.get();
+
+               assertEquals(exceptionMessage, actualMessage);
+       }
+
+       @Test
+       public void testHandle() throws ExecutionException, 
InterruptedException {
+               int expectedValue = 43;
+               Future<Integer> initialFuture = 
FlinkCompletableFuture.completed(expectedValue);
+
+               Future<String> result = initialFuture.handle(new 
BiFunction<Integer, Throwable, String>() {
+                       @Override
+                       public String apply(Integer integer, Throwable 
throwable) {
+                               if (integer != null) {
+                                       return String.valueOf(integer);
+                               } else {
+                                       return throwable.getMessage();
+                               }
+                       }
+               });
+
+               assertEquals(String.valueOf(expectedValue), result.get());
+       }
+
+       @Test
+       public void testCompose() throws ExecutionException, 
InterruptedException {
+               CompletableFuture<Integer> initialFuture = new 
FlinkCompletableFuture<>();
+               final int expectedValue = 42;
+
+               Future<Integer> composedFuture = initialFuture.thenCompose(new 
ApplyFunction<Integer, Future<Integer>>() {
+                       @Override
+                       public Future<Integer> apply(Integer value) {
+                               return FlinkFuture.supplyAsync(new 
Callable<Integer>() {
+                                       @Override
+                                       public Integer call() throws Exception {
+                                               return expectedValue;
+                                       }
+                               }, executor);
+                       }
+               });
+
+               initialFuture.complete(42);
+
+               int actualValue = composedFuture.get();
+
+               assertEquals(expectedValue, actualValue);
+       }
+
+       @Test
+       public void testCombine() throws ExecutionException, 
InterruptedException {
+               int expectedLeftValue = 1;
+               int expectedRightValue = 2;
+
+               Future<Integer> left = 
FlinkCompletableFuture.completed(expectedLeftValue);
+               Future<Integer> right = 
FlinkCompletableFuture.completed(expectedRightValue);
+
+               Future<Integer> sum = left.thenCombine(right, new 
BiFunction<Integer, Integer, Integer>() {
+                       @Override
+                       public Integer apply(Integer left, Integer right) {
+                               return left + right;
+                       }
+               });
+
+               int result = sum.get();
+
+               assertEquals(expectedLeftValue + expectedRightValue, result);
+       }
+
        private static class TestException extends RuntimeException {
 
                private static final long serialVersionUID = 
-1274022962838535130L;

Reply via email to