[ 
https://issues.apache.org/jira/browse/FLINK-6555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16007756#comment-16007756
 ] 

ASF GitHub Bot commented on FLINK-6555:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3873#discussion_r116169564
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
 ---
    @@ -163,26 +165,31 @@ public static ConjunctFuture combineAll(Collection<? 
extends Future<?>> futures)
         * <p>Implementation notice: The member fields all have package-private 
access, because they are
         * either accessed by an inner subclass or by the enclosing class.
         */
    -   private static class ConjunctFutureImpl extends 
FlinkCompletableFuture<Void> implements ConjunctFuture {
    +   private static class ConjunctFutureImpl<T> extends 
FlinkCompletableFuture<Collection<T>> implements ConjunctFuture<T> {
     
                /** The total number of futures in the conjunction */
                final int numTotal;
     
                /** The number of futures in the conjunction that are already 
complete */
                final AtomicInteger numCompleted = new AtomicInteger();
     
    +           final ArrayList<T> results;
    +
                /** The function that is attached to all futures in the 
conjunction. Once a future
                 * is complete, this function tracks the completion or fails 
the conjunct.  
                 */
    -           final BiFunction<Object, Throwable, Void> completionHandler = 
new BiFunction<Object, Throwable, Void>() {
    +           final BiFunction<T, Throwable, Void> completionHandler = new 
BiFunction<T, Throwable, Void>() {
     
                        @Override
    -                   public Void apply(Object o, Throwable throwable) {
    +                   public Void apply(T o, Throwable throwable) {
                                if (throwable != null) {
                                        completeExceptionally(throwable);
    -                           }
    -                           else if (numTotal == 
numCompleted.incrementAndGet()) {
    -                                   complete(null);
    +                           } else {
    +                                   results.add(o);
    --- End diff --
    
    True, I wanted to add an atomic integer to determine the index but forgot 
about it. Thanks for catching it.


> Generalize ConjunctFuture
> -------------------------
>
>                 Key: FLINK-6555
>                 URL: https://issues.apache.org/jira/browse/FLINK-6555
>             Project: Flink
>          Issue Type: Improvement
>          Components: Distributed Coordination
>    Affects Versions: 1.4.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>            Priority: Trivial
>             Fix For: 1.4.0
>
>
> The {{ConjunctFuture}} allows to combine multiple {{Futures}} into one. At 
> the moment it does not return the collection of results of the individuals 
> futures. In some cases this information is helpful and should, thus, be 
> returned.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to