[ https://issues.apache.org/jira/browse/FLINK-6555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16007756#comment-16007756 ]
ASF GitHub Bot commented on FLINK-6555: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3873#discussion_r116169564 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java --- @@ -163,26 +165,31 @@ public static ConjunctFuture combineAll(Collection<? extends Future<?>> futures) * <p>Implementation notice: The member fields all have package-private access, because they are * either accessed by an inner subclass or by the enclosing class. */ - private static class ConjunctFutureImpl extends FlinkCompletableFuture<Void> implements ConjunctFuture { + private static class ConjunctFutureImpl<T> extends FlinkCompletableFuture<Collection<T>> implements ConjunctFuture<T> { /** The total number of futures in the conjunction */ final int numTotal; /** The number of futures in the conjunction that are already complete */ final AtomicInteger numCompleted = new AtomicInteger(); + final ArrayList<T> results; + /** The function that is attached to all futures in the conjunction. Once a future * is complete, this function tracks the completion or fails the conjunct. */ - final BiFunction<Object, Throwable, Void> completionHandler = new BiFunction<Object, Throwable, Void>() { + final BiFunction<T, Throwable, Void> completionHandler = new BiFunction<T, Throwable, Void>() { @Override - public Void apply(Object o, Throwable throwable) { + public Void apply(T o, Throwable throwable) { if (throwable != null) { completeExceptionally(throwable); - } - else if (numTotal == numCompleted.incrementAndGet()) { - complete(null); + } else { + results.add(o); --- End diff -- True, I wanted to add an atomic integer to determine the index but forgot about it. Thanks for catching it. > Generalize ConjunctFuture > ------------------------- > > Key: FLINK-6555 > URL: https://issues.apache.org/jira/browse/FLINK-6555 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination > Affects Versions: 1.4.0 > Reporter: Till Rohrmann > Assignee: Till Rohrmann > Priority: Trivial > Fix For: 1.4.0 > > > The {{ConjunctFuture}} allows to combine multiple {{Futures}} into one. At > the moment it does not return the collection of results of the individuals > futures. In some cases this information is helpful and should, thus, be > returned. -- This message was sent by Atlassian JIRA (v6.3.15#6346)