[FLINK-4361] Introduce Flink's own future abstraction Flink's future abstraction whose API is similar to Java 8's CompletableFuture. That's in order to ease a future transition to this class once we ditch Java 7. The current set of operations comprises:
- isDone to check the completion of the future - get/getNow to obtain the future's value - cancel to cancel the future (best effort basis) - thenApplyAsync to transform the future's value into another value - thenAcceptAsync to register a callback for a successful completion of the future - exceptionallyAsync to register a callback for an exception completion of the future - thenComposeAsync to transform the future's value and flatten the returned future - handleAsync to register a callback which is called either with the regular result or the exceptional result Additionally, Flink offers a CompletableFuture which can be completed with a regular value or an exception: - complete/completeExceptionally Complete FlinkCompletableFuture exceptionally with a CanellationException upon cancel This closes #2472. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f766f120 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f766f120 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f766f120 Branch: refs/heads/flip-6 Commit: f766f120e4af6b60a3e628836963eb981005325c Parents: 0a134a3 Author: Till Rohrmann <trohrm...@apache.org> Authored: Fri Sep 2 21:13:34 2016 +0200 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Wed Sep 21 11:39:18 2016 +0200 ---------------------------------------------------------------------- .../runtime/concurrent/AcceptFunction.java | 34 +++ .../flink/runtime/concurrent/ApplyFunction.java | 36 +++ .../flink/runtime/concurrent/BiFunction.java | 38 +++ .../runtime/concurrent/CompletableFuture.java | 47 ++++ .../apache/flink/runtime/concurrent/Future.java | 156 +++++++++++ .../concurrent/impl/FlinkCompletableFuture.java | 71 +++++ .../runtime/concurrent/impl/FlinkFuture.java | 273 +++++++++++++++++++ .../runtime/concurrent/FlinkFutureTest.java | 269 ++++++++++++++++++ 8 files changed, 924 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f766f120/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/AcceptFunction.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/AcceptFunction.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/AcceptFunction.java new file mode 100644 index 0000000..a300647 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/AcceptFunction.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.concurrent; + +/** + * Function which is called with a single argument and does not return a value. + * + * @param <T> type of the argument + */ +public interface AcceptFunction<T> { + + /** + * Method which handles the function call. + * + * @param value is the function's argument + */ + void accept(T value); +} http://git-wip-us.apache.org/repos/asf/flink/blob/f766f120/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ApplyFunction.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ApplyFunction.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ApplyFunction.java new file mode 100644 index 0000000..64def98 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ApplyFunction.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.concurrent; + +/** + * Function which is called with a single argument. + * + * @param <V> type of the argument + * @param <R> type of the return value + */ +public interface ApplyFunction<V, R> { + + /** + * Method which handles the function call. + * + * @param value is the single argument + * @return the function value + */ + R apply(V value); +} http://git-wip-us.apache.org/repos/asf/flink/blob/f766f120/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/BiFunction.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/BiFunction.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/BiFunction.java new file mode 100644 index 0000000..2b09de8 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/BiFunction.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.concurrent; + +/** + * Function which is called with two arguments and returns a value. + * + * @param <T> type of the first argument + * @param <U> type of the second argument + * @param <R> type of the return value + */ +public interface BiFunction<T, U, R> { + + /** + * Method which handles the function call. + * + * @param t first argument + * @param u second argument + * @return the function value + */ + R apply(T t, U u); +} http://git-wip-us.apache.org/repos/asf/flink/blob/f766f120/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/CompletableFuture.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/CompletableFuture.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/CompletableFuture.java new file mode 100644 index 0000000..5288bf2 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/CompletableFuture.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.concurrent; + +/** + * Flink's completable future abstraction. A completable future can be completed with a regular + * value or an exception. + * + * @param <T> type of the future's value + */ +public interface CompletableFuture<T> extends Future<T> { + + /** + * Completes the future with the given value. The complete operation only succeeds if the future + * has not been completed before. Whether it is successful or not is returned by the method. + * + * @param value to complete the future with + * @return true if the completion was successful; otherwise false + */ + boolean complete(T value); + + /** + * Completes the future with the given exception. The complete operation only succeeds if the + * future has not been completed before. Whether it is successful or not is returned by the + * method. + * + * @param t the exception to complete the future with + * @return true if the completion was successful; otherwise false + */ + boolean completeExceptionally(Throwable t); +} http://git-wip-us.apache.org/repos/asf/flink/blob/f766f120/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Future.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Future.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Future.java new file mode 100644 index 0000000..b32bcd4 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Future.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.concurrent; + +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Flink's basic future abstraction. A future represents an asynchronous operation whose result + * will be contained in this instance upon completion. + * + * @param <T> type of the future's result + */ +public interface Future<T> { + + /** + * Checks if the future has been completed. A future is completed, if the result has been + * delivered. + * + * @return true if the future is completed; otherwise false + */ + boolean isDone(); + + /** + * Tries to cancel the future's operation. Note that not all future operations can be canceled. + * The result of the cancelling will be returned. + * + * @param mayInterruptIfRunning true iff the future operation may be interrupted + * @return true if the cancelling was successful; otherwise false + */ + boolean cancel(boolean mayInterruptIfRunning); + + /** + * Gets the result value of the future. If the future has not been completed, then this + * operation will block indefinitely until the result has been delivered. + * + * @return the result value + * @throws CancellationException if the future has been cancelled + * @throws InterruptedException if the current thread was interrupted while waiting for the result + * @throws ExecutionException if the future has been completed with an exception + */ + T get() throws InterruptedException, ExecutionException; + + /** + * Gets the result value of the future. If the future has not been done, then this operation + * will block the given timeout value. If the result has not been delivered within the timeout, + * then the method throws an {@link TimeoutException}. + * + * @param timeout the time to wait for the future to be done + * @param unit time unit for the timeout argument + * @return the result value + * @throws CancellationException if the future has been cancelled + * @throws InterruptedException if the current thread was interrupted while waiting for the result + * @throws ExecutionException if the future has been completed with an exception + * @throws TimeoutException if the future has not been completed within the given timeout + */ + T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; + + /** + * Gets the value of the future. If the future has not been completed when calling this + * function, the given value is returned. + * + * @param valueIfAbsent value which is returned if the future has not been completed + * @return value of the future or the given value if the future has not been completed + * @throws ExecutionException if the future has been completed with an exception + */ + T getNow(T valueIfAbsent) throws ExecutionException; + + /** + * Applies the given function to the value of the future. The result of the apply function is + * the value of the newly returned future. + * <p> + * The apply function is executed asynchronously by the given executor. + * + * @param applyFunction function to apply to the future's value + * @param executor used to execute the given apply function asynchronously + * @param <R> type of the apply function's return value + * @return future representing the return value of the given apply function + */ + <R> Future<R> thenApplyAsync(ApplyFunction<? super T, ? extends R> applyFunction, Executor executor); + + /** + * Applies the accept function to the value of the future. Unlike the {@link ApplyFunction}, the + * {@link AcceptFunction} does not return a value. The returned future, thus, represents only + * the completion of the accept callback. + * <p> + * The accept function is executed asynchronously by the given executor. + * + * @param acceptFunction function to apply to the future's value + * @param executor used to execute the given apply function asynchronously + * @return future representing the completion of the accept callback + */ + Future<Void> thenAcceptAsync(AcceptFunction<? super T> acceptFunction, Executor executor); + + /** + * Applies the given function to the value of the future if the future has been completed + * exceptionally. The completing exception is given to the apply function which can return a new + * value which is the value of the returned future. + * <p> + * The apply function is executed asynchronously by the given executor. + * + * @param exceptionallyFunction to apply to the future's value if it is an exception + * @param executor used to execute the given apply function asynchronously + * @param <R> type of the apply function's return value + * @return future representing the return value of the given apply function + */ + <R> Future<R> exceptionallyAsync(ApplyFunction<Throwable, ? extends R> exceptionallyFunction, Executor executor); + + /** + * Applies the given function to the value of the future. The apply function returns a future + * result, which is flattened. This means that the resulting future of this method represents + * the future's value of the apply function. + * <p> + * The apply function is executed asynchronously by the given executor. + * + * @param composeFunction to apply to the future's value. The function returns a future which is + * flattened + * @param executor used to execute the given apply function asynchronously + * @param <R> type of the returned future's value + * @return future representing the flattened return value of the apply function + */ + <R> Future<R> thenComposeAsync(ApplyFunction<? super T, Future<? extends R>> composeFunction, Executor executor); + + /** + * Applies the given handle function to the result of the future. The result can either be the + * future's value or the exception with which the future has been completed. The two cases are + * mutually exclusive. The result of the handle function is the returned future's value. + * <p> + * The handle function is executed asynchronously by the given executor. + * + * @param biFunction applied to the result (normal and exceptional) of the future + * @param executor used to execute the handle function asynchronously + * @param <R> type of the handle function's return value + * @return future representing the handle function's return value + */ + <R> Future<R> handleAsync(BiFunction<? super T, Throwable, ? extends R> biFunction, Executor executor); +} http://git-wip-us.apache.org/repos/asf/flink/blob/f766f120/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java new file mode 100644 index 0000000..5566880 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.concurrent.impl; + +import org.apache.flink.runtime.concurrent.CompletableFuture; +import org.apache.flink.util.Preconditions; +import scala.concurrent.Promise; + +import java.util.concurrent.CancellationException; + +/** + * Implementation of {@link CompletableFuture} which is backed by {@link Promise}. + * + * @param <T> type of the future's value + */ +public class FlinkCompletableFuture<T> extends FlinkFuture<T> implements CompletableFuture<T> { + + private final Promise<T> promise; + + public FlinkCompletableFuture() { + promise = new scala.concurrent.impl.Promise.DefaultPromise<>(); + scalaFuture = promise.future(); + } + + @Override + public boolean complete(T value) { + Preconditions.checkNotNull(value); + + try { + promise.success(value); + + return true; + } catch (IllegalStateException e) { + return false; + } + } + + @Override + public boolean completeExceptionally(Throwable t) { + Preconditions.checkNotNull(t); + + try { + promise.failure(t); + + return true; + } catch (IllegalStateException e) { + return false; + } + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return completeExceptionally(new CancellationException("Future has been canceled.")); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f766f120/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java new file mode 100644 index 0000000..361cd3d --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.concurrent.impl; + +import akka.dispatch.ExecutionContexts$; +import akka.dispatch.Futures; +import akka.dispatch.Mapper; +import akka.dispatch.Recover; +import org.apache.flink.runtime.concurrent.AcceptFunction; +import org.apache.flink.runtime.concurrent.ApplyFunction; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.BiFunction; +import org.apache.flink.util.Preconditions; +import scala.Option; +import scala.concurrent.Await; +import scala.concurrent.ExecutionContext; +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; +import scala.util.Failure; +import scala.util.Success; +import scala.util.Try; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Implementation of {@link Future} which is backed by {@link scala.concurrent.Future}. + * + * @param <T> type of the future's value + */ +public class FlinkFuture<T> implements Future<T> { + + protected scala.concurrent.Future<T> scalaFuture; + + FlinkFuture() { + scalaFuture = null; + } + + public FlinkFuture(scala.concurrent.Future<T> scalaFuture) { + this.scalaFuture = Preconditions.checkNotNull(scalaFuture); + } + + //----------------------------------------------------------------------------------- + // Future's methods + //----------------------------------------------------------------------------------- + + @Override + public boolean isDone() { + return scalaFuture.isCompleted(); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return false; + } + + @Override + public T get() throws InterruptedException, ExecutionException { + Preconditions.checkNotNull(scalaFuture); + + try { + return Await.result(scalaFuture, Duration.Inf()); + } catch (InterruptedException e) { + throw e; + } catch (Exception e) { + throw new ExecutionException(e); + } + } + + @Override + public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + Preconditions.checkNotNull(scalaFuture); + Preconditions.checkArgument(timeout >= 0L, "The timeout value has to be larger or " + + "equal than 0."); + + try { + return Await.result(scalaFuture, new FiniteDuration(timeout, unit)); + } catch (InterruptedException | TimeoutException e) { + throw e; + } catch (Exception e) { + throw new ExecutionException(e); + } + } + + @Override + public T getNow(T valueIfAbsent) throws ExecutionException { + Preconditions.checkNotNull(scalaFuture); + Preconditions.checkNotNull(valueIfAbsent); + + Option<Try<T>> value = scalaFuture.value(); + + if (value.isDefined()) { + Try<T> tri = value.get(); + + if (tri instanceof Success) { + return ((Success<T>)tri).value(); + } else { + throw new ExecutionException(((Failure<T>)tri).exception()); + } + } else { + return valueIfAbsent; + } + } + + @Override + public <R> Future<R> thenApplyAsync(final ApplyFunction<? super T, ? extends R> applyFunction, Executor executor) { + Preconditions.checkNotNull(scalaFuture); + Preconditions.checkNotNull(applyFunction); + Preconditions.checkNotNull(executor); + + scala.concurrent.Future<R> mappedFuture = scalaFuture.map(new Mapper<T, R>() { + @Override + public R apply(T value) { + return applyFunction.apply(value); + } + }, createExecutionContext(executor)); + + return new FlinkFuture<>(mappedFuture); + } + + @Override + public Future<Void> thenAcceptAsync(final AcceptFunction<? super T> acceptFunction, Executor executor) { + Preconditions.checkNotNull(scalaFuture); + Preconditions.checkNotNull(acceptFunction); + Preconditions.checkNotNull(executor); + + scala.concurrent.Future<Void> acceptedFuture = scalaFuture.map(new Mapper<T, Void>() { + @Override + public Void apply(T value) { + acceptFunction.accept(value); + + return null; + } + }, createExecutionContext(executor)); + + return new FlinkFuture<>(acceptedFuture); + } + + @Override + public <R> Future<R> exceptionallyAsync(final ApplyFunction<Throwable, ? extends R> exceptionallyFunction, Executor executor) { + Preconditions.checkNotNull(scalaFuture); + Preconditions.checkNotNull(exceptionallyFunction); + Preconditions.checkNotNull(executor); + + scala.concurrent.Future<R> recoveredFuture = scalaFuture.recover(new Recover<R>() { + @Override + public R recover(Throwable failure) throws Throwable { + return exceptionallyFunction.apply(failure); + } + }, createExecutionContext(executor)); + + return new FlinkFuture<>(recoveredFuture); + } + + @Override + public <R> Future<R> thenComposeAsync(final ApplyFunction<? super T, Future<? extends R>> applyFunction, final Executor executor) { + Preconditions.checkNotNull(scalaFuture); + Preconditions.checkNotNull(applyFunction); + Preconditions.checkNotNull(executor); + + scala.concurrent.Future<R> flatMappedFuture = scalaFuture.flatMap(new Mapper<T, scala.concurrent.Future<R>>() { + @Override + public scala.concurrent.Future<R> apply(T value) { + final Future<? extends R> future = applyFunction.apply(value); + + if (future instanceof FlinkFuture) { + @SuppressWarnings("unchecked") + FlinkFuture<R> flinkFuture = (FlinkFuture<R>) future; + + return flinkFuture.scalaFuture; + } else { + return Futures.future(new Callable<R>() { + @Override + public R call() throws Exception { + return future.get(); + } + }, createExecutionContext(executor)); + } + } + }, createExecutionContext(executor)); + + return new FlinkFuture<>(flatMappedFuture); + } + + @Override + public <R> Future<R> handleAsync(final BiFunction<? super T, Throwable, ? extends R> biFunction, Executor executor) { + Preconditions.checkNotNull(scalaFuture); + Preconditions.checkNotNull(biFunction); + Preconditions.checkNotNull(executor); + + scala.concurrent.Future<R> mappedFuture = scalaFuture.map(new Mapper<T, R>() { + @Override + public R checkedApply(T value) throws Exception { + try { + return biFunction.apply(value, null); + } catch (Throwable t) { + throw new FlinkFuture.WrapperException(t); + } + } + }, createExecutionContext(executor)); + + scala.concurrent.Future<R> recoveredFuture = mappedFuture.recover(new Recover<R>() { + @Override + public R recover(Throwable failure) throws Throwable { + if (failure instanceof FlinkFuture.WrapperException) { + throw failure.getCause(); + } else { + return biFunction.apply(null, failure); + } + } + }, createExecutionContext(executor)); + + + return new FlinkFuture<>(recoveredFuture); + } + + //----------------------------------------------------------------------------------- + // Static factory methods + //----------------------------------------------------------------------------------- + + /** + * Creates a future whose value is determined by the asynchronously executed callable. + * + * @param callable whose value is delivered by the future + * @param executor to be used to execute the callable + * @param <T> type of the future's value + * @return future which represents the value of the callable + */ + public static <T> Future<T> supplyAsync(Callable<T> callable, Executor executor) { + Preconditions.checkNotNull(callable); + Preconditions.checkNotNull(executor); + + scala.concurrent.Future<T> scalaFuture = Futures.future(callable, createExecutionContext(executor)); + + return new FlinkFuture<>(scalaFuture); + } + + //----------------------------------------------------------------------------------- + // Helper functions and types + //----------------------------------------------------------------------------------- + + private static ExecutionContext createExecutionContext(Executor executor) { + return ExecutionContexts$.MODULE$.fromExecutor(executor); + } + + private static class WrapperException extends Exception { + + private static final long serialVersionUID = 6533166370660884091L; + + WrapperException(Throwable cause) { + super(cause); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f766f120/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java new file mode 100644 index 0000000..bd5af66 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.concurrent; + +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; +import org.apache.flink.runtime.concurrent.impl.FlinkFuture; +import org.apache.flink.util.TestLogger; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests for Flink's future implementation. + */ +public class FlinkFutureTest extends TestLogger { + + private static ExecutorService executor; + + @BeforeClass + public static void setup() { + executor = Executors.newSingleThreadExecutor(); + } + + @AfterClass + public static void teardown() { + executor.shutdown(); + } + + @Test + public void testFutureApply() throws Exception { + int expectedValue = 42; + + CompletableFuture<Integer> initialFuture = new FlinkCompletableFuture<>(); + + Future<String> appliedFuture = initialFuture.thenApplyAsync(new ApplyFunction<Integer, String>() { + @Override + public String apply(Integer value) { + return String.valueOf(value); + } + }, executor); + + initialFuture.complete(expectedValue); + + assertEquals(String.valueOf(expectedValue), appliedFuture.get()); + } + + @Test(expected = TimeoutException.class) + public void testFutureGetTimeout() throws InterruptedException, ExecutionException, TimeoutException { + CompletableFuture<Integer> future = new FlinkCompletableFuture<>(); + + future.get(10, TimeUnit.MILLISECONDS); + + fail("Get should have thrown a timeout exception."); + } + + @Test(expected = TestException.class) + public void testExceptionalCompletion() throws Throwable { + CompletableFuture<Integer> initialFuture = new FlinkCompletableFuture<>(); + + initialFuture.completeExceptionally(new TestException("Test exception")); + + try { + initialFuture.get(); + + fail("Get should have thrown an exception."); + } catch (ExecutionException e) { + throw e.getCause(); + } + } + + /** + * Tests that an exception is propagated through an apply function. + */ + @Test(expected = TestException.class) + public void testExceptionPropagation() throws Throwable { + CompletableFuture<Integer> initialFuture = new FlinkCompletableFuture<>(); + + Future<String> mappedFuture = initialFuture.thenApplyAsync(new ApplyFunction<Integer, String>() { + @Override + public String apply(Integer value) { + throw new TestException("Test exception"); + } + }, executor); + + Future<String> mapped2Future = mappedFuture.thenApplyAsync(new ApplyFunction<String, String>() { + @Override + public String apply(String value) { + return "foobar"; + } + }, executor); + + initialFuture.complete(42); + + try { + mapped2Future.get(); + + fail("Get should have thrown an exception."); + } catch (ExecutionException e) { + throw e.getCause(); + } + } + + @Test + public void testExceptionally() throws ExecutionException, InterruptedException { + CompletableFuture<Integer> initialFuture = new FlinkCompletableFuture<>(); + String exceptionMessage = "Foobar"; + + Future<String> recovered = initialFuture.exceptionallyAsync(new ApplyFunction<Throwable, String>() { + @Override + public String apply(Throwable value) { + return value.getMessage(); + } + }, executor); + + initialFuture.completeExceptionally(new TestException(exceptionMessage)); + + String actualMessage = recovered.get(); + + assertEquals(exceptionMessage, actualMessage); + } + + @Test + public void testCompose() throws ExecutionException, InterruptedException { + CompletableFuture<Integer> initialFuture = new FlinkCompletableFuture<>(); + + final int expectedValue = 42; + + Future<Integer> composedFuture = initialFuture.thenComposeAsync(new ApplyFunction<Integer, Future<? extends Integer>>() { + @Override + public Future<? extends Integer> apply(Integer value) { + return FlinkFuture.supplyAsync(new Callable<Integer>() { + @Override + public Integer call() throws Exception { + return expectedValue; + } + }, executor); + } + }, executor); + + initialFuture.complete(42); + + int actualValue = composedFuture.get(); + + assertEquals(expectedValue, actualValue); + } + + @Test + public void testGetNow() throws ExecutionException { + CompletableFuture<Integer> initialFuture = new FlinkCompletableFuture<>(); + + final int absentValue = 41; + + assertEquals(new Integer(absentValue), initialFuture.getNow(absentValue)); + } + + @Test + public void testAccept() throws ExecutionException, InterruptedException { + CompletableFuture<Integer> initialFuture = new FlinkCompletableFuture<>(); + final AtomicInteger atomicInteger = new AtomicInteger(0); + int expectedValue = 42; + + Future<Void> result = initialFuture.thenAcceptAsync(new AcceptFunction<Integer>() { + @Override + public void accept(Integer value) { + atomicInteger.set(value); + } + }, executor); + + initialFuture.complete(expectedValue); + + result.get(); + + assertEquals(expectedValue, atomicInteger.get()); + } + + @Test + public void testHandle() throws ExecutionException, InterruptedException { + CompletableFuture<Integer> initialFuture = new FlinkCompletableFuture<>(); + int expectedValue = 43; + + Future<String> result = initialFuture.handleAsync(new BiFunction<Integer, Throwable, String>() { + @Override + public String apply(Integer integer, Throwable throwable) { + if (integer != null) { + return String.valueOf(integer); + } else { + return throwable.getMessage(); + } + } + }, executor); + + initialFuture.complete(expectedValue); + + assertEquals(String.valueOf(expectedValue), result.get()); + } + + @Test + public void testHandleException() throws ExecutionException, InterruptedException { + CompletableFuture<Integer> initialFuture = new FlinkCompletableFuture<>(); + String exceptionMessage = "foobar"; + + Future<String> result = initialFuture.handleAsync(new BiFunction<Integer, Throwable, String>() { + @Override + public String apply(Integer integer, Throwable throwable) { + if (integer != null) { + return String.valueOf(integer); + } else { + return throwable.getMessage(); + } + } + }, executor); + + initialFuture.completeExceptionally(new TestException(exceptionMessage)); + + assertEquals(exceptionMessage, result.get()); + } + + @Test + public void testMultipleCompleteOperations() throws ExecutionException, InterruptedException { + CompletableFuture<Integer> initialFuture = new FlinkCompletableFuture<>(); + int expectedValue = 42; + + assertTrue(initialFuture.complete(expectedValue)); + + assertFalse(initialFuture.complete(1337)); + + assertFalse(initialFuture.completeExceptionally(new TestException("foobar"))); + + assertEquals(new Integer(expectedValue), initialFuture.get()); + } + + private static class TestException extends RuntimeException { + + private static final long serialVersionUID = -1274022962838535130L; + + public TestException(String message) { + super(message); + } + } +}