Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3873#discussion_r116169564 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java --- @@ -163,26 +165,31 @@ public static ConjunctFuture combineAll(Collection<? extends Future<?>> futures) * <p>Implementation notice: The member fields all have package-private access, because they are * either accessed by an inner subclass or by the enclosing class. */ - private static class ConjunctFutureImpl extends FlinkCompletableFuture<Void> implements ConjunctFuture { + private static class ConjunctFutureImpl<T> extends FlinkCompletableFuture<Collection<T>> implements ConjunctFuture<T> { /** The total number of futures in the conjunction */ final int numTotal; /** The number of futures in the conjunction that are already complete */ final AtomicInteger numCompleted = new AtomicInteger(); + final ArrayList<T> results; + /** The function that is attached to all futures in the conjunction. Once a future * is complete, this function tracks the completion or fails the conjunct. */ - final BiFunction<Object, Throwable, Void> completionHandler = new BiFunction<Object, Throwable, Void>() { + final BiFunction<T, Throwable, Void> completionHandler = new BiFunction<T, Throwable, Void>() { @Override - public Void apply(Object o, Throwable throwable) { + public Void apply(T o, Throwable throwable) { if (throwable != null) { completeExceptionally(throwable); - } - else if (numTotal == numCompleted.incrementAndGet()) { - complete(null); + } else { + results.add(o); --- End diff -- True, I wanted to add an atomic integer to determine the index but forgot about it. Thanks for catching it.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---