Repository: flink Updated Branches: refs/heads/master dceb5cc17 -> 7ad489d87
[FLINK-6555] [futures] Generalize ConjunctFuture to return results The ConjunctFuture now returns the set of future values once it is completed. Introduce WaitingConjunctFuture; Fix thread safety issue with ResultConjunctFuture The WaitingConjunctFuture waits for the completion of its futures. The future values are discarded making it more efficient than the ResultConjunctFuture which returns the futures' values. The WaitingConjunctFuture is instantiated via FutureUtils.waitForAll(Collection<Future>). This closes #3873. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c081201f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c081201f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c081201f Branch: refs/heads/master Commit: c081201fd6c3c97a932c09d971f24bf42102650f Parents: dceb5cc Author: Till Rohrmann <trohrm...@apache.org> Authored: Thu May 11 17:36:17 2017 +0200 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Wed May 17 08:18:23 2017 +0200 ---------------------------------------------------------------------- .../flink/runtime/concurrent/FutureUtils.java | 131 +++++++++++++++---- .../runtime/executiongraph/ExecutionGraph.java | 8 +- .../executiongraph/ExecutionJobVertex.java | 4 +- .../executiongraph/failover/FailoverRegion.java | 2 +- .../runtime/concurrent/FutureUtilsTest.java | 83 ++++++++++-- 5 files changed, 184 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c081201f/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java index 4948147..a27af56 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java @@ -19,8 +19,11 @@ package org.apache.flink.runtime.concurrent; import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; +import org.apache.flink.util.Preconditions; +import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.concurrent.Callable; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; @@ -106,8 +109,9 @@ public class FutureUtils { /** * Creates a future that is complete once multiple other futures completed. - * The ConjunctFuture fails (completes exceptionally) once one of the Futures in the - * conjunction fails. + * The future fails (completes exceptionally) once one of the futures in the + * conjunction fails. Upon successful completion, the future returns the + * collection of the futures' results. * * <p>The ConjunctFuture gives access to how many Futures in the conjunction have already * completed successfully, via {@link ConjunctFuture#getNumFuturesCompleted()}. @@ -115,16 +119,16 @@ public class FutureUtils { * @param futures The futures that make up the conjunction. No null entries are allowed. * @return The ConjunctFuture that completes once all given futures are complete (or one fails). */ - public static ConjunctFuture combineAll(Collection<? extends Future<?>> futures) { + public static <T> ConjunctFuture<Collection<T>> combineAll(Collection<? extends Future<? extends T>> futures) { checkNotNull(futures, "futures"); - final ConjunctFutureImpl conjunct = new ConjunctFutureImpl(futures.size()); + final ResultConjunctFuture<T> conjunct = new ResultConjunctFuture<>(futures.size()); if (futures.isEmpty()) { - conjunct.complete(null); + conjunct.complete(Collections.<T>emptyList()); } else { - for (Future<?> future : futures) { + for (Future<? extends T> future : futures) { future.handle(conjunct.completionHandler); } } @@ -133,16 +137,32 @@ public class FutureUtils { } /** + * Creates a future that is complete once all of the given futures have completed. + * The future fails (completes exceptionally) once one of the given futures + * fails. + * + * <p>The ConjunctFuture gives access to how many Futures have already + * completed successfully, via {@link ConjunctFuture#getNumFuturesCompleted()}. + * + * @param futures The futures to wait on. No null entries are allowed. + * @return The WaitingFuture that completes once all given futures are complete (or one fails). + */ + public static ConjunctFuture<Void> waitForAll(Collection<? extends Future<?>> futures) { + checkNotNull(futures, "futures"); + + return new WaitingConjunctFuture(futures); + } + + /** * A future that is complete once multiple other futures completed. The futures are not - * necessarily of the same type, which is why the type of this Future is {@code Void}. - * The ConjunctFuture fails (completes exceptionally) once one of the Futures in the - * conjunction fails. + * necessarily of the same type. The ConjunctFuture fails (completes exceptionally) once + * one of the Futures in the conjunction fails. * * <p>The advantage of using the ConjunctFuture over chaining all the futures (such as via * {@link Future#thenCombine(Future, BiFunction)}) is that ConjunctFuture also tracks how * many of the Futures are already complete. */ - public interface ConjunctFuture extends CompletableFuture<Void> { + public interface ConjunctFuture<T> extends CompletableFuture<T> { /** * Gets the total number of Futures in the conjunction. @@ -158,39 +178,102 @@ public class FutureUtils { } /** - * The implementation of the {@link ConjunctFuture}. - * - * <p>Implementation notice: The member fields all have package-private access, because they are - * either accessed by an inner subclass or by the enclosing class. + * The implementation of the {@link ConjunctFuture} which returns its Futures' result as a collection. */ - private static class ConjunctFutureImpl extends FlinkCompletableFuture<Void> implements ConjunctFuture { + private static class ResultConjunctFuture<T> extends FlinkCompletableFuture<Collection<T>> implements ConjunctFuture<Collection<T>> { /** The total number of futures in the conjunction */ - final int numTotal; + private final int numTotal; + + /** The next free index in the results arrays */ + private final AtomicInteger nextIndex = new AtomicInteger(0); /** The number of futures in the conjunction that are already complete */ - final AtomicInteger numCompleted = new AtomicInteger(); + private final AtomicInteger numCompleted = new AtomicInteger(0); + + /** The set of collected results so far */ + private volatile T[] results; /** The function that is attached to all futures in the conjunction. Once a future - * is complete, this function tracks the completion or fails the conjunct. + * is complete, this function tracks the completion or fails the conjunct. */ - final BiFunction<Object, Throwable, Void> completionHandler = new BiFunction<Object, Throwable, Void>() { + final BiFunction<T, Throwable, Void> completionHandler = new BiFunction<T, Throwable, Void>() { @Override - public Void apply(Object o, Throwable throwable) { + public Void apply(T o, Throwable throwable) { if (throwable != null) { completeExceptionally(throwable); - } - else if (numTotal == numCompleted.incrementAndGet()) { - complete(null); + } else { + int index = nextIndex.getAndIncrement(); + + results[index] = o; + + if (numCompleted.incrementAndGet() == numTotal) { + complete(Arrays.asList(results)); + } } return null; } }; - ConjunctFutureImpl(int numTotal) { + @SuppressWarnings("unchecked") + ResultConjunctFuture(int numTotal) { this.numTotal = numTotal; + results = (T[])new Object[numTotal]; + } + + @Override + public int getNumFuturesTotal() { + return numTotal; + } + + @Override + public int getNumFuturesCompleted() { + return numCompleted.get(); + } + } + + /** + * Implementation of the {@link ConjunctFuture} interface which waits only for the completion + * of its futures and does not return their values. + */ + private static final class WaitingConjunctFuture extends FlinkCompletableFuture<Void> implements ConjunctFuture<Void> { + + /** Number of completed futures */ + private final AtomicInteger numCompleted = new AtomicInteger(0); + + /** Total number of futures to wait on */ + private final int numTotal; + + /** Handler which increments the atomic completion counter and completes or fails the WaitingFutureImpl */ + private final BiFunction<Object, Throwable, Void> completionHandler = new BiFunction<Object, Throwable, Void>() { + @Override + public Void apply(Object o, Throwable throwable) { + if (throwable == null) { + if (numTotal == numCompleted.incrementAndGet()) { + complete(null); + } + } else { + completeExceptionally(throwable); + } + + return null; + } + }; + + private WaitingConjunctFuture(Collection<? extends Future<?>> futures) { + Preconditions.checkNotNull(futures, "Futures must not be null."); + + this.numTotal = futures.size(); + + if (futures.isEmpty()) { + complete(null); + } else { + for (Future<?> future : futures) { + future.handle(completionHandler); + } + } } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/c081201f/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 5eaa637..7c13936 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -871,7 +871,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive // this future is complete once all slot futures are complete. // the future fails once one slot future fails. - final ConjunctFuture allAllocationsComplete = FutureUtils.combineAll(slotFutures); + final ConjunctFuture<Void> allAllocationsComplete = FutureUtils.waitForAll(slotFutures); // make sure that we fail if the allocation timeout was exceeded final ScheduledFuture<?> timeoutCancelHandle = futureExecutor.schedule(new Runnable() { @@ -892,7 +892,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive allAllocationsComplete.handleAsync(new BiFunction<Void, Throwable, Void>() { @Override - public Void apply(Void ignored, Throwable throwable) { + public Void apply(Void slots, Throwable throwable) { try { // we do not need the cancellation timeout any more timeoutCancelHandle.cancel(false); @@ -973,7 +973,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive } // we build a future that is complete once all vertices have reached a terminal state - final ConjunctFuture allTerminal = FutureUtils.combineAll(futures); + final ConjunctFuture<Void> allTerminal = FutureUtils.waitForAll(futures); allTerminal.thenAccept(new AcceptFunction<Void>() { @Override public void accept(Void value) { @@ -1102,7 +1102,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive futures.add(ejv.cancelWithFuture()); } - final ConjunctFuture allTerminal = FutureUtils.combineAll(futures); + final ConjunctFuture<Void> allTerminal = FutureUtils.waitForAll(futures); allTerminal.thenAccept(new AcceptFunction<Void>() { @Override public void accept(Void value) { http://git-wip-us.apache.org/repos/asf/flink/blob/c081201f/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java index 3a98e0a..f5a592a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java @@ -509,7 +509,7 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable */ public Future<Void> cancelWithFuture() { // we collect all futures from the task cancellations - ArrayList<Future<?>> futures = new ArrayList<>(parallelism); + ArrayList<Future<ExecutionState>> futures = new ArrayList<>(parallelism); // cancel each vertex for (ExecutionVertex ev : getTaskVertices()) { @@ -517,7 +517,7 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable } // return a conjunct future, which is complete once all individual tasks are canceled - return FutureUtils.combineAll(futures); + return FutureUtils.waitForAll(futures); } public void fail(Throwable t) { http://git-wip-us.apache.org/repos/asf/flink/blob/c081201f/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java index b36cfcf..6066c77 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java @@ -150,7 +150,7 @@ public class FailoverRegion { futures.add(vertex.cancel()); } - final FutureUtils.ConjunctFuture allTerminal = FutureUtils.combineAll(futures); + final FutureUtils.ConjunctFuture<Void> allTerminal = FutureUtils.waitForAll(futures); allTerminal.thenAcceptAsync(new AcceptFunction<Void>() { @Override public void accept(Void value) { http://git-wip-us.apache.org/repos/asf/flink/blob/c081201f/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java index 43710cb..e262459 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java @@ -21,10 +21,15 @@ package org.apache.flink.runtime.concurrent; import org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture; import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; +import org.apache.flink.util.TestLogger; +import org.hamcrest.collection.IsIterableContainingInAnyOrder; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.io.IOException; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.concurrent.ExecutionException; @@ -33,17 +38,26 @@ import static org.junit.Assert.*; /** * Tests for the utility methods in {@link FutureUtils} */ -public class FutureUtilsTest { +@RunWith(Parameterized.class) +public class FutureUtilsTest extends TestLogger{ + + @Parameterized.Parameters + public static Collection<FutureFactory> parameters (){ + return Arrays.asList(new ConjunctFutureFactory(), new WaitingFutureFactory()); + } + + @Parameterized.Parameter + public FutureFactory futureFactory; @Test public void testConjunctFutureFailsOnEmptyAndNull() throws Exception { try { - FutureUtils.combineAll(null); + futureFactory.createFuture(null); fail(); } catch (NullPointerException ignored) {} try { - FutureUtils.combineAll(Arrays.asList( + futureFactory.createFuture(Arrays.asList( new FlinkCompletableFuture<Object>(), null, new FlinkCompletableFuture<Object>())); @@ -63,11 +77,11 @@ public class FutureUtilsTest { future2.complete(new Object()); // build the conjunct future - ConjunctFuture result = FutureUtils.combineAll(Arrays.asList(future1, future2, future3, future4)); + ConjunctFuture<?> result = futureFactory.createFuture(Arrays.asList(future1, future2, future3, future4)); - Future<Void> resultMapped = result.thenAccept(new AcceptFunction<Void>() { + Future<?> resultMapped = result.thenAccept(new AcceptFunction<Object>() { @Override - public void accept(Void value) {} + public void accept(Object value) {} }); assertEquals(4, result.getNumFuturesTotal()); @@ -108,11 +122,11 @@ public class FutureUtilsTest { CompletableFuture<Object> future4 = new FlinkCompletableFuture<>(); // build the conjunct future - ConjunctFuture result = FutureUtils.combineAll(Arrays.asList(future1, future2, future3, future4)); + ConjunctFuture<?> result = futureFactory.createFuture(Arrays.asList(future1, future2, future3, future4)); - Future<Void> resultMapped = result.thenAccept(new AcceptFunction<Void>() { + Future<?> resultMapped = result.thenAccept(new AcceptFunction<Object>() { @Override - public void accept(Void value) {} + public void accept(Object value) {} }); assertEquals(4, result.getNumFuturesTotal()); @@ -150,12 +164,12 @@ public class FutureUtilsTest { CompletableFuture<Object> future4 = new FlinkCompletableFuture<>(); // build the conjunct future - ConjunctFuture result = FutureUtils.combineAll(Arrays.asList(future1, future2, future3, future4)); + ConjunctFuture<?> result = futureFactory.createFuture(Arrays.asList(future1, future2, future3, future4)); assertEquals(4, result.getNumFuturesTotal()); - Future<Void> resultMapped = result.thenAccept(new AcceptFunction<Void>() { + Future<?> resultMapped = result.thenAccept(new AcceptFunction<Object>() { @Override - public void accept(Void value) {} + public void accept(Object value) {} }); future1.complete(new Object()); @@ -183,12 +197,55 @@ public class FutureUtilsTest { } } + /** + * Tests that the conjunct future returns upon completion the collection of all future values + */ + @Test + public void testConjunctFutureValue() throws ExecutionException, InterruptedException { + CompletableFuture<Integer> future1 = FlinkCompletableFuture.completed(1); + CompletableFuture<Long> future2 = FlinkCompletableFuture.completed(2L); + CompletableFuture<Double> future3 = new FlinkCompletableFuture<>(); + + ConjunctFuture<Collection<Number>> result = FutureUtils.<Number>combineAll(Arrays.asList(future1, future2, future3)); + + assertFalse(result.isDone()); + + future3.complete(.1); + + assertTrue(result.isDone()); + + assertThat(result.get(), IsIterableContainingInAnyOrder.<Number>containsInAnyOrder(1, 2L, .1)); + } + @Test public void testConjunctOfNone() throws Exception { - final ConjunctFuture result = FutureUtils.combineAll(Collections.<Future<Object>>emptyList()); + final ConjunctFuture<?> result = futureFactory.createFuture(Collections.<Future<Object>>emptyList()); assertEquals(0, result.getNumFuturesTotal()); assertEquals(0, result.getNumFuturesCompleted()); assertTrue(result.isDone()); } + + /** + * Factory to create {@link ConjunctFuture} for testing. + */ + private interface FutureFactory { + ConjunctFuture<?> createFuture(Collection<? extends Future<?>> futures); + } + + private static class ConjunctFutureFactory implements FutureFactory { + + @Override + public ConjunctFuture<?> createFuture(Collection<? extends Future<?>> futures) { + return FutureUtils.combineAll(futures); + } + } + + private static class WaitingFutureFactory implements FutureFactory { + + @Override + public ConjunctFuture<?> createFuture(Collection<? extends Future<?>> futures) { + return FutureUtils.waitForAll(futures); + } + } }