[
https://issues.apache.org/jira/browse/FLINK-6555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16006916#comment-16006916
]
ASF GitHub Bot commented on FLINK-6555:
---------------------------------------
Github user StephanEwen commented on a diff in the pull request:
https://github.com/apache/flink/pull/3873#discussion_r116062457
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
---
@@ -163,26 +165,31 @@ public static ConjunctFuture combineAll(Collection<?
extends Future<?>> futures)
* <p>Implementation notice: The member fields all have package-private
access, because they are
* either accessed by an inner subclass or by the enclosing class.
*/
- private static class ConjunctFutureImpl extends
FlinkCompletableFuture<Void> implements ConjunctFuture {
+ private static class ConjunctFutureImpl<T> extends
FlinkCompletableFuture<Collection<T>> implements ConjunctFuture<T> {
/** The total number of futures in the conjunction */
final int numTotal;
/** The number of futures in the conjunction that are already
complete */
final AtomicInteger numCompleted = new AtomicInteger();
+ final ArrayList<T> results;
+
/** The function that is attached to all futures in the
conjunction. Once a future
* is complete, this function tracks the completion or fails
the conjunct.
*/
- final BiFunction<Object, Throwable, Void> completionHandler =
new BiFunction<Object, Throwable, Void>() {
+ final BiFunction<T, Throwable, Void> completionHandler = new
BiFunction<T, Throwable, Void>() {
@Override
- public Void apply(Object o, Throwable throwable) {
+ public Void apply(T o, Throwable throwable) {
if (throwable != null) {
completeExceptionally(throwable);
- }
- else if (numTotal ==
numCompleted.incrementAndGet()) {
- complete(null);
+ } else {
+ results.add(o);
--- End diff --
Is this thread safe? My assumption is that many of the completion handlers
can be called at the same time.
> Generalize ConjunctFuture
> -------------------------
>
> Key: FLINK-6555
> URL: https://issues.apache.org/jira/browse/FLINK-6555
> Project: Flink
> Issue Type: Improvement
> Components: Distributed Coordination
> Affects Versions: 1.4.0
> Reporter: Till Rohrmann
> Assignee: Till Rohrmann
> Priority: Trivial
> Fix For: 1.4.0
>
>
> The {{ConjunctFuture}} allows to combine multiple {{Futures}} into one. At
> the moment it does not return the collection of results of the individuals
> futures. In some cases this information is helpful and should, thus, be
> returned.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)