Repository: flink
Updated Branches:
  refs/heads/master dceb5cc17 -> 7ad489d87


[FLINK-6555] [futures] Generalize ConjunctFuture to return results

The ConjunctFuture now returns the set of future values once it is completed.

Introduce WaitingConjunctFuture; Fix thread safety issue with 
ResultConjunctFuture

The WaitingConjunctFuture waits for the completion of its futures. The future 
values
are discarded making it more efficient than the ResultConjunctFuture which 
returns
the futures' values. The WaitingConjunctFuture is instantiated via
FutureUtils.waitForAll(Collection<Future>).

This closes #3873.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c081201f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c081201f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c081201f

Branch: refs/heads/master
Commit: c081201fd6c3c97a932c09d971f24bf42102650f
Parents: dceb5cc
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Thu May 11 17:36:17 2017 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Wed May 17 08:18:23 2017 +0200

----------------------------------------------------------------------
 .../flink/runtime/concurrent/FutureUtils.java   | 131 +++++++++++++++----
 .../runtime/executiongraph/ExecutionGraph.java  |   8 +-
 .../executiongraph/ExecutionJobVertex.java      |   4 +-
 .../executiongraph/failover/FailoverRegion.java |   2 +-
 .../runtime/concurrent/FutureUtilsTest.java     |  83 ++++++++++--
 5 files changed, 184 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c081201f/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index 4948147..a27af56 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -19,8 +19,11 @@
 package org.apache.flink.runtime.concurrent;
 
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.util.Preconditions;
 
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -106,8 +109,9 @@ public class FutureUtils {
 
        /**
         * Creates a future that is complete once multiple other futures 
completed. 
-        * The ConjunctFuture fails (completes exceptionally) once one of the 
Futures in the
-        * conjunction fails.
+        * The future fails (completes exceptionally) once one of the futures 
in the
+        * conjunction fails. Upon successful completion, the future returns the
+        * collection of the futures' results.
         *
         * <p>The ConjunctFuture gives access to how many Futures in the 
conjunction have already
         * completed successfully, via {@link 
ConjunctFuture#getNumFuturesCompleted()}. 
@@ -115,16 +119,16 @@ public class FutureUtils {
         * @param futures The futures that make up the conjunction. No null 
entries are allowed.
         * @return The ConjunctFuture that completes once all given futures are 
complete (or one fails).
         */
-       public static ConjunctFuture combineAll(Collection<? extends Future<?>> 
futures) {
+       public static <T> ConjunctFuture<Collection<T>> combineAll(Collection<? 
extends Future<? extends T>> futures) {
                checkNotNull(futures, "futures");
 
-               final ConjunctFutureImpl conjunct = new 
ConjunctFutureImpl(futures.size());
+               final ResultConjunctFuture<T> conjunct = new 
ResultConjunctFuture<>(futures.size());
 
                if (futures.isEmpty()) {
-                       conjunct.complete(null);
+                       conjunct.complete(Collections.<T>emptyList());
                }
                else {
-                       for (Future<?> future : futures) {
+                       for (Future<? extends T> future : futures) {
                                future.handle(conjunct.completionHandler);
                        }
                }
@@ -133,16 +137,32 @@ public class FutureUtils {
        }
 
        /**
+        * Creates a future that is complete once all of the given futures have 
completed.
+        * The future fails (completes exceptionally) once one of the given 
futures
+        * fails.
+        *
+        * <p>The ConjunctFuture gives access to how many Futures have already
+        * completed successfully, via {@link 
ConjunctFuture#getNumFuturesCompleted()}.
+        *
+        * @param futures The futures to wait on. No null entries are allowed.
+        * @return The WaitingFuture that completes once all given futures are 
complete (or one fails).
+        */
+       public static ConjunctFuture<Void> waitForAll(Collection<? extends 
Future<?>> futures) {
+               checkNotNull(futures, "futures");
+
+               return new WaitingConjunctFuture(futures);
+       }
+
+       /**
         * A future that is complete once multiple other futures completed. The 
futures are not
-        * necessarily of the same type, which is why the type of this Future 
is {@code Void}.
-        * The ConjunctFuture fails (completes exceptionally) once one of the 
Futures in the
-        * conjunction fails.
+        * necessarily of the same type. The ConjunctFuture fails (completes 
exceptionally) once
+        * one of the Futures in the conjunction fails.
         * 
         * <p>The advantage of using the ConjunctFuture over chaining all the 
futures (such as via
         * {@link Future#thenCombine(Future, BiFunction)}) is that 
ConjunctFuture also tracks how
         * many of the Futures are already complete.
         */
-       public interface ConjunctFuture extends CompletableFuture<Void> {
+       public interface ConjunctFuture<T> extends CompletableFuture<T> {
 
                /**
                 * Gets the total number of Futures in the conjunction.
@@ -158,39 +178,102 @@ public class FutureUtils {
        }
 
        /**
-        * The implementation of the {@link ConjunctFuture}.
-        * 
-        * <p>Implementation notice: The member fields all have package-private 
access, because they are
-        * either accessed by an inner subclass or by the enclosing class.
+        * The implementation of the {@link ConjunctFuture} which returns its 
Futures' result as a collection.
         */
-       private static class ConjunctFutureImpl extends 
FlinkCompletableFuture<Void> implements ConjunctFuture {
+       private static class ResultConjunctFuture<T> extends 
FlinkCompletableFuture<Collection<T>> implements ConjunctFuture<Collection<T>> {
 
                /** The total number of futures in the conjunction */
-               final int numTotal;
+               private final int numTotal;
+
+               /** The next free index in the results arrays */
+               private final AtomicInteger nextIndex = new AtomicInteger(0);
 
                /** The number of futures in the conjunction that are already 
complete */
-               final AtomicInteger numCompleted = new AtomicInteger();
+               private final AtomicInteger numCompleted = new AtomicInteger(0);
+
+               /** The set of collected results so far */
+               private volatile T[] results;
 
                /** The function that is attached to all futures in the 
conjunction. Once a future
-                * is complete, this function tracks the completion or fails 
the conjunct.  
+                * is complete, this function tracks the completion or fails 
the conjunct.
                 */
-               final BiFunction<Object, Throwable, Void> completionHandler = 
new BiFunction<Object, Throwable, Void>() {
+               final BiFunction<T, Throwable, Void> completionHandler = new 
BiFunction<T, Throwable, Void>() {
 
                        @Override
-                       public Void apply(Object o, Throwable throwable) {
+                       public Void apply(T o, Throwable throwable) {
                                if (throwable != null) {
                                        completeExceptionally(throwable);
-                               }
-                               else if (numTotal == 
numCompleted.incrementAndGet()) {
-                                       complete(null);
+                               } else {
+                                       int index = nextIndex.getAndIncrement();
+
+                                       results[index] = o;
+
+                                       if (numCompleted.incrementAndGet() == 
numTotal) {
+                                               
complete(Arrays.asList(results));
+                                       }
                                }
 
                                return null;
                        }
                };
 
-               ConjunctFutureImpl(int numTotal) {
+               @SuppressWarnings("unchecked")
+               ResultConjunctFuture(int numTotal) {
                        this.numTotal = numTotal;
+                       results = (T[])new Object[numTotal];
+               }
+
+               @Override
+               public int getNumFuturesTotal() {
+                       return numTotal;
+               }
+
+               @Override
+               public int getNumFuturesCompleted() {
+                       return numCompleted.get();
+               }
+       }
+
+       /**
+        * Implementation of the {@link ConjunctFuture} interface which waits 
only for the completion
+        * of its futures and does not return their values.
+        */
+       private static final class WaitingConjunctFuture extends 
FlinkCompletableFuture<Void> implements ConjunctFuture<Void> {
+
+               /** Number of completed futures */
+               private final AtomicInteger numCompleted = new AtomicInteger(0);
+
+               /** Total number of futures to wait on */
+               private final int numTotal;
+
+               /** Handler which increments the atomic completion counter and 
completes or fails the WaitingFutureImpl */
+               private final BiFunction<Object, Throwable, Void> 
completionHandler = new BiFunction<Object, Throwable, Void>() {
+                       @Override
+                       public Void apply(Object o, Throwable throwable) {
+                               if (throwable == null) {
+                                       if (numTotal == 
numCompleted.incrementAndGet()) {
+                                               complete(null);
+                                       }
+                               } else {
+                                       completeExceptionally(throwable);
+                               }
+
+                               return null;
+                       }
+               };
+
+               private WaitingConjunctFuture(Collection<? extends Future<?>> 
futures) {
+                       Preconditions.checkNotNull(futures, "Futures must not 
be null.");
+
+                       this.numTotal = futures.size();
+
+                       if (futures.isEmpty()) {
+                               complete(null);
+                       } else {
+                               for (Future<?> future : futures) {
+                                       future.handle(completionHandler);
+                               }
+                       }
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/c081201f/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 5eaa637..7c13936 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -871,7 +871,7 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
 
                        // this future is complete once all slot futures are 
complete.
                        // the future fails once one slot future fails.
-                       final ConjunctFuture allAllocationsComplete = 
FutureUtils.combineAll(slotFutures);
+                       final ConjunctFuture<Void> allAllocationsComplete = 
FutureUtils.waitForAll(slotFutures);
 
                        // make sure that we fail if the allocation timeout was 
exceeded
                        final ScheduledFuture<?> timeoutCancelHandle = 
futureExecutor.schedule(new Runnable() {
@@ -892,7 +892,7 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
                        allAllocationsComplete.handleAsync(new BiFunction<Void, 
Throwable, Void>() {
 
                                @Override
-                               public Void apply(Void ignored, Throwable 
throwable) {
+                               public Void apply(Void slots, Throwable 
throwable) {
                                        try {
                                                // we do not need the 
cancellation timeout any more
                                                
timeoutCancelHandle.cancel(false);
@@ -973,7 +973,7 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
                                        }
 
                                        // we build a future that is complete 
once all vertices have reached a terminal state
-                                       final ConjunctFuture allTerminal = 
FutureUtils.combineAll(futures);
+                                       final ConjunctFuture<Void> allTerminal 
= FutureUtils.waitForAll(futures);
                                        allTerminal.thenAccept(new 
AcceptFunction<Void>() {
                                                @Override
                                                public void accept(Void value) {
@@ -1102,7 +1102,7 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
                                        futures.add(ejv.cancelWithFuture());
                                }
 
-                               final ConjunctFuture allTerminal = 
FutureUtils.combineAll(futures);
+                               final ConjunctFuture<Void> allTerminal = 
FutureUtils.waitForAll(futures);
                                allTerminal.thenAccept(new 
AcceptFunction<Void>() {
                                        @Override
                                        public void accept(Void value) {

http://git-wip-us.apache.org/repos/asf/flink/blob/c081201f/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 3a98e0a..f5a592a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -509,7 +509,7 @@ public class ExecutionJobVertex implements 
AccessExecutionJobVertex, Archiveable
         */
        public Future<Void> cancelWithFuture() {
                // we collect all futures from the task cancellations
-               ArrayList<Future<?>> futures = new ArrayList<>(parallelism);
+               ArrayList<Future<ExecutionState>> futures = new 
ArrayList<>(parallelism);
 
                // cancel each vertex
                for (ExecutionVertex ev : getTaskVertices()) {
@@ -517,7 +517,7 @@ public class ExecutionJobVertex implements 
AccessExecutionJobVertex, Archiveable
                }
 
                // return a conjunct future, which is complete once all 
individual tasks are canceled
-               return FutureUtils.combineAll(futures);
+               return FutureUtils.waitForAll(futures);
        }
 
        public void fail(Throwable t) {

http://git-wip-us.apache.org/repos/asf/flink/blob/c081201f/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
index b36cfcf..6066c77 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
@@ -150,7 +150,7 @@ public class FailoverRegion {
                                                futures.add(vertex.cancel());
                                        }
 
-                                       final FutureUtils.ConjunctFuture 
allTerminal = FutureUtils.combineAll(futures);
+                                       final FutureUtils.ConjunctFuture<Void> 
allTerminal = FutureUtils.waitForAll(futures);
                                        allTerminal.thenAcceptAsync(new 
AcceptFunction<Void>() {
                                                @Override
                                                public void accept(Void value) {

http://git-wip-us.apache.org/repos/asf/flink/blob/c081201f/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
index 43710cb..e262459 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
@@ -21,10 +21,15 @@ package org.apache.flink.runtime.concurrent;
 import org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 
+import org.apache.flink.util.TestLogger;
+import org.hamcrest.collection.IsIterableContainingInAnyOrder;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.concurrent.ExecutionException;
 
@@ -33,17 +38,26 @@ import static org.junit.Assert.*;
 /**
  * Tests for the utility methods in {@link FutureUtils}
  */
-public class FutureUtilsTest {
+@RunWith(Parameterized.class)
+public class FutureUtilsTest extends TestLogger{
+
+       @Parameterized.Parameters
+       public static Collection<FutureFactory> parameters (){
+               return Arrays.asList(new ConjunctFutureFactory(), new 
WaitingFutureFactory());
+       }
+
+       @Parameterized.Parameter
+       public FutureFactory futureFactory;
 
        @Test
        public void testConjunctFutureFailsOnEmptyAndNull() throws Exception {
                try {
-                       FutureUtils.combineAll(null);
+                       futureFactory.createFuture(null);
                        fail();
                } catch (NullPointerException ignored) {}
 
                try {
-                       FutureUtils.combineAll(Arrays.asList(
+                       futureFactory.createFuture(Arrays.asList(
                                        new FlinkCompletableFuture<Object>(),
                                        null,
                                        new FlinkCompletableFuture<Object>()));
@@ -63,11 +77,11 @@ public class FutureUtilsTest {
                future2.complete(new Object());
 
                // build the conjunct future
-               ConjunctFuture result = 
FutureUtils.combineAll(Arrays.asList(future1, future2, future3, future4));
+               ConjunctFuture<?> result = 
futureFactory.createFuture(Arrays.asList(future1, future2, future3, future4));
 
-               Future<Void> resultMapped = result.thenAccept(new 
AcceptFunction<Void>() {
+               Future<?> resultMapped = result.thenAccept(new 
AcceptFunction<Object>() {
                        @Override
-                       public void accept(Void value) {}
+                       public void accept(Object value) {}
                });
 
                assertEquals(4, result.getNumFuturesTotal());
@@ -108,11 +122,11 @@ public class FutureUtilsTest {
                CompletableFuture<Object> future4 = new 
FlinkCompletableFuture<>();
 
                // build the conjunct future
-               ConjunctFuture result = 
FutureUtils.combineAll(Arrays.asList(future1, future2, future3, future4));
+               ConjunctFuture<?> result = 
futureFactory.createFuture(Arrays.asList(future1, future2, future3, future4));
 
-               Future<Void> resultMapped = result.thenAccept(new 
AcceptFunction<Void>() {
+               Future<?> resultMapped = result.thenAccept(new 
AcceptFunction<Object>() {
                        @Override
-                       public void accept(Void value) {}
+                       public void accept(Object value) {}
                });
 
                assertEquals(4, result.getNumFuturesTotal());
@@ -150,12 +164,12 @@ public class FutureUtilsTest {
                CompletableFuture<Object> future4 = new 
FlinkCompletableFuture<>();
 
                // build the conjunct future
-               ConjunctFuture result = 
FutureUtils.combineAll(Arrays.asList(future1, future2, future3, future4));
+               ConjunctFuture<?> result = 
futureFactory.createFuture(Arrays.asList(future1, future2, future3, future4));
                assertEquals(4, result.getNumFuturesTotal());
 
-               Future<Void> resultMapped = result.thenAccept(new 
AcceptFunction<Void>() {
+               Future<?> resultMapped = result.thenAccept(new 
AcceptFunction<Object>() {
                        @Override
-                       public void accept(Void value) {}
+                       public void accept(Object value) {}
                });
 
                future1.complete(new Object());
@@ -183,12 +197,55 @@ public class FutureUtilsTest {
                }
        }
 
+       /**
+        * Tests that the conjunct future returns upon completion the 
collection of all future values
+        */
+       @Test
+       public void testConjunctFutureValue() throws ExecutionException, 
InterruptedException {
+               CompletableFuture<Integer> future1 = 
FlinkCompletableFuture.completed(1);
+               CompletableFuture<Long> future2 = 
FlinkCompletableFuture.completed(2L);
+               CompletableFuture<Double> future3 = new 
FlinkCompletableFuture<>();
+
+               ConjunctFuture<Collection<Number>> result = 
FutureUtils.<Number>combineAll(Arrays.asList(future1, future2, future3));
+
+               assertFalse(result.isDone());
+
+               future3.complete(.1);
+
+               assertTrue(result.isDone());
+
+               assertThat(result.get(), 
IsIterableContainingInAnyOrder.<Number>containsInAnyOrder(1, 2L, .1));
+       }
+
        @Test
        public void testConjunctOfNone() throws Exception {
-               final ConjunctFuture result = 
FutureUtils.combineAll(Collections.<Future<Object>>emptyList());
+               final ConjunctFuture<?> result = 
futureFactory.createFuture(Collections.<Future<Object>>emptyList());
 
                assertEquals(0, result.getNumFuturesTotal());
                assertEquals(0, result.getNumFuturesCompleted());
                assertTrue(result.isDone());
        }
+
+       /**
+        * Factory to create {@link ConjunctFuture} for testing.
+        */
+       private interface FutureFactory {
+               ConjunctFuture<?> createFuture(Collection<? extends Future<?>> 
futures);
+       }
+
+       private static class ConjunctFutureFactory implements FutureFactory {
+
+               @Override
+               public ConjunctFuture<?> createFuture(Collection<? extends 
Future<?>> futures) {
+                       return FutureUtils.combineAll(futures);
+               }
+       }
+
+       private static class WaitingFutureFactory implements FutureFactory {
+
+               @Override
+               public ConjunctFuture<?> createFuture(Collection<? extends 
Future<?>> futures) {
+                       return FutureUtils.waitForAll(futures);
+               }
+       }
 }

Reply via email to