Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3873#discussion_r116169564
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
 ---
    @@ -163,26 +165,31 @@ public static ConjunctFuture combineAll(Collection<? 
extends Future<?>> futures)
         * <p>Implementation notice: The member fields all have package-private 
access, because they are
         * either accessed by an inner subclass or by the enclosing class.
         */
    -   private static class ConjunctFutureImpl extends 
FlinkCompletableFuture<Void> implements ConjunctFuture {
    +   private static class ConjunctFutureImpl<T> extends 
FlinkCompletableFuture<Collection<T>> implements ConjunctFuture<T> {
     
                /** The total number of futures in the conjunction */
                final int numTotal;
     
                /** The number of futures in the conjunction that are already 
complete */
                final AtomicInteger numCompleted = new AtomicInteger();
     
    +           final ArrayList<T> results;
    +
                /** The function that is attached to all futures in the 
conjunction. Once a future
                 * is complete, this function tracks the completion or fails 
the conjunct.  
                 */
    -           final BiFunction<Object, Throwable, Void> completionHandler = 
new BiFunction<Object, Throwable, Void>() {
    +           final BiFunction<T, Throwable, Void> completionHandler = new 
BiFunction<T, Throwable, Void>() {
     
                        @Override
    -                   public Void apply(Object o, Throwable throwable) {
    +                   public Void apply(T o, Throwable throwable) {
                                if (throwable != null) {
                                        completeExceptionally(throwable);
    -                           }
    -                           else if (numTotal == 
numCompleted.incrementAndGet()) {
    -                                   complete(null);
    +                           } else {
    +                                   results.add(o);
    --- End diff --
    
    True, I wanted to add an atomic integer to determine the index but forgot 
about it. Thanks for catching it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to