[FLINK-4361] Introduce Flink's own future abstraction

Flink's future abstraction whose API is similar to Java 8's CompletableFuture.
That's in order to ease a future transition to this class once we ditch Java 7.
The current set of operations comprises:

- isDone to check the completion of the future
- get/getNow to obtain the future's value
- cancel to cancel the future (best effort basis)
- thenApplyAsync to transform the future's value into another value
- thenAcceptAsync to register a callback for a successful completion of the 
future
- exceptionallyAsync to register a callback for an exception completion of the 
future
- thenComposeAsync to transform the future's value and flatten the returned 
future
- handleAsync to register a callback which is called either with the regular 
result
or the exceptional result

Additionally, Flink offers a CompletableFuture which can be completed with a 
regular
value or an exception:

- complete/completeExceptionally

Complete FlinkCompletableFuture exceptionally with a CanellationException upon 
cancel

This closes #2472.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f766f120
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f766f120
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f766f120

Branch: refs/heads/flip-6
Commit: f766f120e4af6b60a3e628836963eb981005325c
Parents: 0a134a3
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Fri Sep 2 21:13:34 2016 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Wed Sep 21 11:39:18 2016 +0200

----------------------------------------------------------------------
 .../runtime/concurrent/AcceptFunction.java      |  34 +++
 .../flink/runtime/concurrent/ApplyFunction.java |  36 +++
 .../flink/runtime/concurrent/BiFunction.java    |  38 +++
 .../runtime/concurrent/CompletableFuture.java   |  47 ++++
 .../apache/flink/runtime/concurrent/Future.java | 156 +++++++++++
 .../concurrent/impl/FlinkCompletableFuture.java |  71 +++++
 .../runtime/concurrent/impl/FlinkFuture.java    | 273 +++++++++++++++++++
 .../runtime/concurrent/FlinkFutureTest.java     | 269 ++++++++++++++++++
 8 files changed, 924 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f766f120/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/AcceptFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/AcceptFunction.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/AcceptFunction.java
new file mode 100644
index 0000000..a300647
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/AcceptFunction.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent;
+
+/**
+ * Function which is called with a single argument and does not return a value.
+ *
+ * @param <T> type of the argument
+ */
+public interface AcceptFunction<T> {
+
+       /**
+        * Method which handles the function call.
+        *
+        * @param value is the function's argument
+        */
+       void accept(T value);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f766f120/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ApplyFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ApplyFunction.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ApplyFunction.java
new file mode 100644
index 0000000..64def98
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ApplyFunction.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent;
+
+/**
+ * Function which is called with a single argument.
+ *
+ * @param <V> type of the argument
+ * @param <R> type of the return value
+ */
+public interface ApplyFunction<V, R> {
+
+       /**
+        * Method which handles the function call.
+        *
+        * @param value is the single argument
+        * @return the function value
+        */
+       R apply(V value);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f766f120/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/BiFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/BiFunction.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/BiFunction.java
new file mode 100644
index 0000000..2b09de8
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/BiFunction.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent;
+
+/**
+ * Function which is called with two arguments and returns a value.
+ *
+ * @param <T> type of the first argument
+ * @param <U> type of the second argument
+ * @param <R> type of the return value
+ */
+public interface BiFunction<T, U, R> {
+
+       /**
+        * Method which handles the function call.
+        *
+        * @param t first argument
+        * @param u second argument
+        * @return the function value
+        */
+       R apply(T t, U u);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f766f120/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/CompletableFuture.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/CompletableFuture.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/CompletableFuture.java
new file mode 100644
index 0000000..5288bf2
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/CompletableFuture.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent;
+
+/**
+ * Flink's completable future abstraction. A completable future can be 
completed with a regular
+ * value or an exception.
+ *
+ * @param <T> type of the future's value
+ */
+public interface CompletableFuture<T> extends Future<T> {
+
+       /**
+        * Completes the future with the given value. The complete operation 
only succeeds if the future
+        * has not been completed before. Whether it is successful or not is 
returned by the method.
+        *
+        * @param value to complete the future with
+        * @return true if the completion was successful; otherwise false
+        */
+       boolean complete(T value);
+
+       /**
+        * Completes the future with the given exception. The complete 
operation only succeeds if the
+        * future has not been completed before. Whether it is successful or 
not is returned by the
+        * method.
+        *
+        * @param t the exception to complete the future with
+        * @return true if the completion was successful; otherwise false
+        */
+       boolean completeExceptionally(Throwable t);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f766f120/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Future.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Future.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Future.java
new file mode 100644
index 0000000..b32bcd4
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Future.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Flink's basic future abstraction. A future represents an asynchronous 
operation whose result
+ * will be contained in this instance upon completion.
+ *
+ * @param <T> type of the future's result
+ */
+public interface Future<T> {
+
+       /**
+        * Checks if the future has been completed. A future is completed, if 
the result has been
+        * delivered.
+        *
+        * @return true if the future is completed; otherwise false
+        */
+       boolean isDone();
+
+       /**
+        * Tries to cancel the future's operation. Note that not all future 
operations can be canceled.
+        * The result of the cancelling will be returned.
+        *
+        * @param mayInterruptIfRunning true iff the future operation may be 
interrupted
+        * @return true if the cancelling was successful; otherwise false
+        */
+       boolean cancel(boolean mayInterruptIfRunning);
+
+       /**
+        * Gets the result value of the future. If the future has not been 
completed, then this
+        * operation will block indefinitely until the result has been 
delivered.
+        *
+        * @return the result value
+        * @throws CancellationException if the future has been cancelled
+        * @throws InterruptedException if the current thread was interrupted 
while waiting for the result
+        * @throws ExecutionException if the future has been completed with an 
exception
+        */
+       T get() throws InterruptedException, ExecutionException;
+
+       /**
+        * Gets the result value of the future. If the future has not been 
done, then this operation
+        * will block the given timeout value. If the result has not been 
delivered within the timeout,
+        * then the method throws an {@link TimeoutException}.
+        *
+        * @param timeout the time to wait for the future to be done
+        * @param unit time unit for the timeout argument
+        * @return the result value
+        * @throws CancellationException if the future has been cancelled
+        * @throws InterruptedException if the current thread was interrupted 
while waiting for the result
+        * @throws ExecutionException if the future has been completed with an 
exception
+        * @throws TimeoutException if the future has not been completed within 
the given timeout
+        */
+       T get(long timeout, TimeUnit unit) throws InterruptedException, 
ExecutionException, TimeoutException;
+
+       /**
+        * Gets the value of the future. If the future has not been completed 
when calling this
+        * function, the given value is returned.
+        *
+        * @param valueIfAbsent value which is returned if the future has not 
been completed
+        * @return value of the future or the given value if the future has not 
been completed
+        * @throws ExecutionException if the future has been completed with an 
exception
+        */
+       T getNow(T valueIfAbsent) throws ExecutionException;
+
+       /**
+        * Applies the given function to the value of the future. The result of 
the apply function is
+        * the value of the newly returned future.
+        * <p>
+        * The apply function is executed asynchronously by the given executor.
+        *
+        * @param applyFunction function to apply to the future's value
+        * @param executor used to execute the given apply function 
asynchronously
+        * @param <R> type of the apply function's return value
+        * @return future representing the return value of the given apply 
function
+        */
+       <R> Future<R> thenApplyAsync(ApplyFunction<? super T, ? extends R> 
applyFunction, Executor executor);
+
+       /**
+        * Applies the accept function to the value of the future. Unlike the 
{@link ApplyFunction}, the
+        * {@link AcceptFunction} does not return a value. The returned future, 
thus, represents only
+        * the completion of the accept callback.
+        * <p>
+        * The accept function is executed asynchronously by the given executor.
+        *
+        * @param acceptFunction function to apply to the future's value
+        * @param executor used to execute the given apply function 
asynchronously
+        * @return future representing the completion of the accept callback
+        */
+       Future<Void> thenAcceptAsync(AcceptFunction<? super T> acceptFunction, 
Executor executor);
+
+       /**
+        * Applies the given function to the value of the future if the future 
has been completed
+        * exceptionally. The completing exception is given to the apply 
function which can return a new
+        * value which is the value of the returned future.
+        * <p>
+        * The apply function is executed asynchronously by the given executor.
+        *
+        * @param exceptionallyFunction to apply to the future's value if it is 
an exception
+        * @param executor used to execute the given apply function 
asynchronously
+        * @param <R> type of the apply function's return value
+        * @return future representing the return value of the given apply 
function
+        */
+       <R> Future<R> exceptionallyAsync(ApplyFunction<Throwable, ? extends R> 
exceptionallyFunction, Executor executor);
+
+       /**
+        * Applies the given function to the value of the future. The apply 
function returns a future
+        * result, which is flattened. This means that the resulting future of 
this method represents
+        * the future's value of the apply function.
+        * <p>
+        * The apply function is executed asynchronously by the given executor.
+        *
+        * @param composeFunction to apply to the future's value. The function 
returns a future which is
+        *                        flattened
+        * @param executor used to execute the given apply function 
asynchronously
+        * @param <R> type of the returned future's value
+        * @return future representing the flattened return value of the apply 
function
+        */
+       <R> Future<R> thenComposeAsync(ApplyFunction<? super T, Future<? 
extends R>> composeFunction, Executor executor);
+
+       /**
+        * Applies the given handle function to the result of the future. The 
result can either be the
+        * future's value or the exception with which the future has been 
completed. The two cases are
+        * mutually exclusive. The result of the handle function is the 
returned future's value.
+        * <p>
+        * The handle function is executed asynchronously by the given executor.
+        *
+        * @param biFunction applied to the result (normal and exceptional) of 
the future
+        * @param executor used to execute the handle function asynchronously
+        * @param <R> type of the handle function's return value
+        * @return future representing the handle function's return value
+        */
+       <R> Future<R> handleAsync(BiFunction<? super T, Throwable, ? extends R> 
biFunction, Executor executor);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f766f120/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java
new file mode 100644
index 0000000..5566880
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent.impl;
+
+import org.apache.flink.runtime.concurrent.CompletableFuture;
+import org.apache.flink.util.Preconditions;
+import scala.concurrent.Promise;
+
+import java.util.concurrent.CancellationException;
+
+/**
+ * Implementation of {@link CompletableFuture} which is backed by {@link 
Promise}.
+ *
+ * @param <T> type of the future's value
+ */
+public class FlinkCompletableFuture<T> extends FlinkFuture<T> implements 
CompletableFuture<T> {
+
+       private final Promise<T> promise;
+
+       public FlinkCompletableFuture() {
+               promise = new scala.concurrent.impl.Promise.DefaultPromise<>();
+               scalaFuture = promise.future();
+       }
+
+       @Override
+       public boolean complete(T value) {
+               Preconditions.checkNotNull(value);
+
+               try {
+                       promise.success(value);
+
+                       return true;
+               } catch (IllegalStateException e) {
+                       return false;
+               }
+       }
+
+       @Override
+       public boolean completeExceptionally(Throwable t) {
+               Preconditions.checkNotNull(t);
+
+               try {
+                       promise.failure(t);
+
+                       return true;
+               } catch (IllegalStateException e) {
+                       return false;
+               }
+       }
+
+       @Override
+       public boolean cancel(boolean mayInterruptIfRunning) {
+               return completeExceptionally(new CancellationException("Future 
has been canceled."));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f766f120/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
new file mode 100644
index 0000000..361cd3d
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent.impl;
+
+import akka.dispatch.ExecutionContexts$;
+import akka.dispatch.Futures;
+import akka.dispatch.Mapper;
+import akka.dispatch.Recover;
+import org.apache.flink.runtime.concurrent.AcceptFunction;
+import org.apache.flink.runtime.concurrent.ApplyFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.BiFunction;
+import org.apache.flink.util.Preconditions;
+import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+import scala.util.Failure;
+import scala.util.Success;
+import scala.util.Try;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Implementation of {@link Future} which is backed by {@link 
scala.concurrent.Future}.
+ *
+ * @param <T> type of the future's value
+ */
+public class FlinkFuture<T> implements Future<T> {
+
+       protected scala.concurrent.Future<T> scalaFuture;
+
+       FlinkFuture() {
+               scalaFuture = null;
+       }
+
+       public FlinkFuture(scala.concurrent.Future<T> scalaFuture) {
+               this.scalaFuture = Preconditions.checkNotNull(scalaFuture);
+       }
+
+       
//-----------------------------------------------------------------------------------
+       // Future's methods
+       
//-----------------------------------------------------------------------------------
+
+       @Override
+       public boolean isDone() {
+               return scalaFuture.isCompleted();
+       }
+
+       @Override
+       public boolean cancel(boolean mayInterruptIfRunning) {
+               return false;
+       }
+
+       @Override
+       public T get() throws InterruptedException, ExecutionException {
+               Preconditions.checkNotNull(scalaFuture);
+
+               try {
+                       return Await.result(scalaFuture, Duration.Inf());
+               } catch (InterruptedException e) {
+                       throw e;
+               } catch (Exception e) {
+                       throw new ExecutionException(e);
+               }
+       }
+
+       @Override
+       public T get(long timeout, TimeUnit unit) throws InterruptedException, 
ExecutionException, TimeoutException {
+               Preconditions.checkNotNull(scalaFuture);
+               Preconditions.checkArgument(timeout >= 0L, "The timeout value 
has to be larger or " +
+                       "equal than 0.");
+
+               try {
+                       return Await.result(scalaFuture, new 
FiniteDuration(timeout, unit));
+               } catch (InterruptedException | TimeoutException e) {
+                       throw e;
+               } catch (Exception e) {
+                       throw new ExecutionException(e);
+               }
+       }
+
+       @Override
+       public T getNow(T valueIfAbsent) throws ExecutionException {
+               Preconditions.checkNotNull(scalaFuture);
+               Preconditions.checkNotNull(valueIfAbsent);
+
+               Option<Try<T>> value = scalaFuture.value();
+
+               if (value.isDefined()) {
+                       Try<T> tri = value.get();
+
+                       if (tri instanceof Success) {
+                               return ((Success<T>)tri).value();
+                       } else {
+                               throw new 
ExecutionException(((Failure<T>)tri).exception());
+                       }
+               } else {
+                       return valueIfAbsent;
+               }
+       }
+
+       @Override
+       public <R> Future<R> thenApplyAsync(final ApplyFunction<? super T, ? 
extends R> applyFunction, Executor executor) {
+               Preconditions.checkNotNull(scalaFuture);
+               Preconditions.checkNotNull(applyFunction);
+               Preconditions.checkNotNull(executor);
+
+               scala.concurrent.Future<R> mappedFuture = scalaFuture.map(new 
Mapper<T, R>() {
+                       @Override
+                       public R apply(T value) {
+                               return applyFunction.apply(value);
+                       }
+               }, createExecutionContext(executor));
+
+               return new FlinkFuture<>(mappedFuture);
+       }
+
+       @Override
+       public Future<Void> thenAcceptAsync(final AcceptFunction<? super T> 
acceptFunction, Executor executor) {
+               Preconditions.checkNotNull(scalaFuture);
+               Preconditions.checkNotNull(acceptFunction);
+               Preconditions.checkNotNull(executor);
+
+               scala.concurrent.Future<Void> acceptedFuture = 
scalaFuture.map(new Mapper<T, Void>() {
+                       @Override
+                       public Void apply(T value) {
+                               acceptFunction.accept(value);
+
+                               return null;
+                       }
+               }, createExecutionContext(executor));
+
+               return new FlinkFuture<>(acceptedFuture);
+       }
+
+       @Override
+       public <R> Future<R> exceptionallyAsync(final ApplyFunction<Throwable, 
? extends R> exceptionallyFunction, Executor executor) {
+               Preconditions.checkNotNull(scalaFuture);
+               Preconditions.checkNotNull(exceptionallyFunction);
+               Preconditions.checkNotNull(executor);
+
+               scala.concurrent.Future<R> recoveredFuture = 
scalaFuture.recover(new Recover<R>() {
+                       @Override
+                       public R recover(Throwable failure) throws Throwable {
+                               return exceptionallyFunction.apply(failure);
+                       }
+               }, createExecutionContext(executor));
+
+               return new FlinkFuture<>(recoveredFuture);
+       }
+
+       @Override
+       public <R> Future<R> thenComposeAsync(final ApplyFunction<? super T, 
Future<? extends R>> applyFunction, final Executor executor) {
+               Preconditions.checkNotNull(scalaFuture);
+               Preconditions.checkNotNull(applyFunction);
+               Preconditions.checkNotNull(executor);
+
+               scala.concurrent.Future<R> flatMappedFuture = 
scalaFuture.flatMap(new Mapper<T, scala.concurrent.Future<R>>() {
+                       @Override
+                       public scala.concurrent.Future<R> apply(T value) {
+                               final Future<? extends R> future = 
applyFunction.apply(value);
+
+                               if (future instanceof FlinkFuture) {
+                                       @SuppressWarnings("unchecked")
+                                       FlinkFuture<R> flinkFuture = 
(FlinkFuture<R>) future;
+
+                                       return flinkFuture.scalaFuture;
+                               } else {
+                                       return Futures.future(new Callable<R>() 
{
+                                               @Override
+                                               public R call() throws 
Exception {
+                                                       return future.get();
+                                               }
+                                       }, createExecutionContext(executor));
+                               }
+                       }
+               }, createExecutionContext(executor));
+
+               return new FlinkFuture<>(flatMappedFuture);
+       }
+
+       @Override
+       public <R> Future<R> handleAsync(final BiFunction<? super T, Throwable, 
? extends R> biFunction, Executor executor) {
+               Preconditions.checkNotNull(scalaFuture);
+               Preconditions.checkNotNull(biFunction);
+               Preconditions.checkNotNull(executor);
+
+               scala.concurrent.Future<R> mappedFuture = scalaFuture.map(new 
Mapper<T, R>() {
+                       @Override
+                       public R checkedApply(T value) throws Exception {
+                               try {
+                                       return biFunction.apply(value, null);
+                               } catch (Throwable t) {
+                                       throw new 
FlinkFuture.WrapperException(t);
+                               }
+                       }
+               }, createExecutionContext(executor));
+
+               scala.concurrent.Future<R> recoveredFuture = 
mappedFuture.recover(new Recover<R>() {
+                       @Override
+                       public R recover(Throwable failure) throws Throwable {
+                               if (failure instanceof 
FlinkFuture.WrapperException) {
+                                       throw failure.getCause();
+                               } else {
+                                       return biFunction.apply(null, failure);
+                               }
+                       }
+               }, createExecutionContext(executor));
+
+
+               return new FlinkFuture<>(recoveredFuture);
+       }
+
+       
//-----------------------------------------------------------------------------------
+       // Static factory methods
+       
//-----------------------------------------------------------------------------------
+
+       /**
+        * Creates a future whose value is determined by the asynchronously 
executed callable.
+        *
+        * @param callable whose value is delivered by the future
+        * @param executor to be used to execute the callable
+        * @param <T> type of the future's value
+        * @return future which represents the value of the callable
+        */
+       public static <T> Future<T> supplyAsync(Callable<T> callable, Executor 
executor) {
+               Preconditions.checkNotNull(callable);
+               Preconditions.checkNotNull(executor);
+
+               scala.concurrent.Future<T> scalaFuture = 
Futures.future(callable, createExecutionContext(executor));
+
+               return new FlinkFuture<>(scalaFuture);
+       }
+
+       
//-----------------------------------------------------------------------------------
+       // Helper functions and types
+       
//-----------------------------------------------------------------------------------
+
+       private static ExecutionContext createExecutionContext(Executor 
executor) {
+               return ExecutionContexts$.MODULE$.fromExecutor(executor);
+       }
+
+       private static class WrapperException extends Exception {
+
+               private static final long serialVersionUID = 
6533166370660884091L;
+
+               WrapperException(Throwable cause) {
+                       super(cause);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f766f120/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java
new file mode 100644
index 0000000..bd5af66
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent;
+
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.util.TestLogger;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for Flink's future implementation.
+ */
+public class FlinkFutureTest extends TestLogger {
+
+       private static ExecutorService executor;
+
+       @BeforeClass
+       public static void setup() {
+               executor = Executors.newSingleThreadExecutor();
+       }
+
+       @AfterClass
+       public static void teardown() {
+               executor.shutdown();
+       }
+
+       @Test
+       public void testFutureApply() throws Exception {
+               int expectedValue = 42;
+
+               CompletableFuture<Integer> initialFuture = new 
FlinkCompletableFuture<>();
+
+               Future<String> appliedFuture = initialFuture.thenApplyAsync(new 
ApplyFunction<Integer, String>() {
+                       @Override
+                       public String apply(Integer value) {
+                               return String.valueOf(value);
+                       }
+               }, executor);
+
+               initialFuture.complete(expectedValue);
+
+               assertEquals(String.valueOf(expectedValue), 
appliedFuture.get());
+       }
+
+       @Test(expected = TimeoutException.class)
+       public void testFutureGetTimeout() throws InterruptedException, 
ExecutionException, TimeoutException {
+               CompletableFuture<Integer> future = new 
FlinkCompletableFuture<>();
+
+               future.get(10, TimeUnit.MILLISECONDS);
+
+               fail("Get should have thrown a timeout exception.");
+       }
+
+       @Test(expected = TestException.class)
+       public void testExceptionalCompletion() throws Throwable {
+               CompletableFuture<Integer> initialFuture = new 
FlinkCompletableFuture<>();
+
+               initialFuture.completeExceptionally(new TestException("Test 
exception"));
+
+               try {
+                       initialFuture.get();
+
+                       fail("Get should have thrown an exception.");
+               } catch (ExecutionException e) {
+                       throw e.getCause();
+               }
+       }
+
+       /**
+        * Tests that an exception is propagated through an apply function.
+        */
+       @Test(expected = TestException.class)
+       public void testExceptionPropagation() throws Throwable {
+               CompletableFuture<Integer> initialFuture = new 
FlinkCompletableFuture<>();
+
+               Future<String> mappedFuture = initialFuture.thenApplyAsync(new 
ApplyFunction<Integer, String>() {
+                       @Override
+                       public String apply(Integer value) {
+                               throw new TestException("Test exception");
+                       }
+               }, executor);
+
+               Future<String> mapped2Future = mappedFuture.thenApplyAsync(new 
ApplyFunction<String, String>() {
+                       @Override
+                       public String apply(String value) {
+                               return "foobar";
+                       }
+               }, executor);
+
+               initialFuture.complete(42);
+
+               try {
+                       mapped2Future.get();
+
+                       fail("Get should have thrown an exception.");
+               } catch (ExecutionException e) {
+                       throw e.getCause();
+               }
+       }
+
+       @Test
+       public void testExceptionally() throws ExecutionException, 
InterruptedException {
+               CompletableFuture<Integer> initialFuture = new 
FlinkCompletableFuture<>();
+               String exceptionMessage = "Foobar";
+
+               Future<String> recovered = initialFuture.exceptionallyAsync(new 
ApplyFunction<Throwable, String>() {
+                       @Override
+                       public String apply(Throwable value) {
+                               return value.getMessage();
+                       }
+               }, executor);
+
+               initialFuture.completeExceptionally(new 
TestException(exceptionMessage));
+
+               String actualMessage = recovered.get();
+
+               assertEquals(exceptionMessage, actualMessage);
+       }
+
+       @Test
+       public void testCompose() throws ExecutionException, 
InterruptedException {
+               CompletableFuture<Integer> initialFuture = new 
FlinkCompletableFuture<>();
+
+               final int expectedValue = 42;
+
+               Future<Integer> composedFuture = 
initialFuture.thenComposeAsync(new ApplyFunction<Integer, Future<? extends 
Integer>>() {
+                       @Override
+                       public Future<? extends Integer> apply(Integer value) {
+                               return FlinkFuture.supplyAsync(new 
Callable<Integer>() {
+                                       @Override
+                                       public Integer call() throws Exception {
+                                               return expectedValue;
+                                       }
+                               }, executor);
+                       }
+               }, executor);
+
+               initialFuture.complete(42);
+
+               int actualValue = composedFuture.get();
+
+               assertEquals(expectedValue, actualValue);
+       }
+
+       @Test
+       public void testGetNow() throws ExecutionException {
+               CompletableFuture<Integer> initialFuture = new 
FlinkCompletableFuture<>();
+
+               final int absentValue = 41;
+
+               assertEquals(new Integer(absentValue), 
initialFuture.getNow(absentValue));
+       }
+
+       @Test
+       public void testAccept() throws ExecutionException, 
InterruptedException {
+               CompletableFuture<Integer> initialFuture = new 
FlinkCompletableFuture<>();
+               final AtomicInteger atomicInteger = new AtomicInteger(0);
+               int expectedValue = 42;
+
+               Future<Void> result = initialFuture.thenAcceptAsync(new 
AcceptFunction<Integer>() {
+                       @Override
+                       public void accept(Integer value) {
+                               atomicInteger.set(value);
+                       }
+               }, executor);
+
+               initialFuture.complete(expectedValue);
+
+               result.get();
+
+               assertEquals(expectedValue, atomicInteger.get());
+       }
+
+       @Test
+       public void testHandle() throws ExecutionException, 
InterruptedException {
+               CompletableFuture<Integer> initialFuture = new 
FlinkCompletableFuture<>();
+               int expectedValue = 43;
+
+               Future<String> result = initialFuture.handleAsync(new 
BiFunction<Integer, Throwable, String>() {
+                       @Override
+                       public String apply(Integer integer, Throwable 
throwable) {
+                               if (integer != null) {
+                                       return String.valueOf(integer);
+                               } else {
+                                       return throwable.getMessage();
+                               }
+                       }
+               }, executor);
+
+               initialFuture.complete(expectedValue);
+
+               assertEquals(String.valueOf(expectedValue), result.get());
+       }
+
+       @Test
+       public void testHandleException() throws ExecutionException, 
InterruptedException {
+               CompletableFuture<Integer> initialFuture = new 
FlinkCompletableFuture<>();
+               String exceptionMessage = "foobar";
+
+               Future<String> result = initialFuture.handleAsync(new 
BiFunction<Integer, Throwable, String>() {
+                       @Override
+                       public String apply(Integer integer, Throwable 
throwable) {
+                               if (integer != null) {
+                                       return String.valueOf(integer);
+                               } else {
+                                       return throwable.getMessage();
+                               }
+                       }
+               }, executor);
+
+               initialFuture.completeExceptionally(new 
TestException(exceptionMessage));
+
+               assertEquals(exceptionMessage, result.get());
+       }
+
+       @Test
+       public void testMultipleCompleteOperations() throws ExecutionException, 
InterruptedException {
+               CompletableFuture<Integer> initialFuture = new 
FlinkCompletableFuture<>();
+               int expectedValue = 42;
+
+               assertTrue(initialFuture.complete(expectedValue));
+
+               assertFalse(initialFuture.complete(1337));
+
+               assertFalse(initialFuture.completeExceptionally(new 
TestException("foobar")));
+
+               assertEquals(new Integer(expectedValue), initialFuture.get());
+       }
+
+       private static class TestException extends RuntimeException {
+
+               private static final long serialVersionUID = 
-1274022962838535130L;
+
+               public TestException(String message) {
+                       super(message);
+               }
+       }
+}

Reply via email to