[
https://issues.apache.org/jira/browse/FLINK-5747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15871843#comment-15871843
]
ASF GitHub Bot commented on FLINK-5747:
---------------------------------------
Github user StephanEwen commented on a diff in the pull request:
https://github.com/apache/flink/pull/3295#discussion_r101754578
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
---
@@ -88,4 +100,104 @@ public RetryException(Throwable cause) {
super(cause);
}
}
+
+ //
------------------------------------------------------------------------
+ // composing futures
+ //
------------------------------------------------------------------------
+
+ /**
+ * Creates a future that is complete once multiple other futures
completed.
+ * The ConjunctFuture fails (completes exceptionally) once one of the
Futures in the
+ * conjunction fails.
+ *
+ * <p>The ConjunctFuture gives access to how many Futures in the
conjunction have already
+ * completed successfully, via {@link
ConjunctFuture#getNumFuturesCompleted()}.
+ *
+ * @param futures The futures that make up the conjunction. No null
entries are allowed.
+ * @return The ConjunctFuture that completes once all given futures are
complete (or one fails).
+ */
+ public static ConjunctFuture combineAll(Collection<? extends Future<?>>
futures) {
+ checkNotNull(futures, "futures");
+ checkArgument(!futures.isEmpty(), "futures is empty");
+
+ final ConjunctFutureImpl conjunct = new
ConjunctFutureImpl(futures.size());
+
+ for (Future<?> future : futures) {
+ future.handle(conjunct.completionHandler);
+ }
+
+ return conjunct;
+ }
+
+ /**
+ * A future that is complete once multiple other futures completed. The
futures are not
+ * necessarily of the same type, which is why the type of this Future
is {@code Void}.
+ * The ConjunctFuture fails (completes exceptionally) once one of the
Futures in the
+ * conjunction fails.
+ *
+ * <p>The advantage of using the ConjunctFuture over chaining all the
futures (such as via
+ * {@link Future#thenCombine(Future, BiFunction)}) is that
ConjunctFuture also tracks how
+ * many of the Futures are already complete.
+ */
+ public interface ConjunctFuture extends CompletableFuture<Void> {
+
+ /**
+ * Gets the total number of Futures in the conjunction.
+ * @return The total number of Futures in the conjunction.
+ */
+ int getNumFuturesTotal();
+
+ /**
+ * Gets the number of Futures in the conjunction that are
already complete.
+ * @return The number of Futures in the conjunction that are
already complete
+ */
+ int getNumFuturesCompleted();
+ }
+
+ /**
+ * The implementation of the {@link ConjunctFuture}.
+ *
+ * <p>Implementation notice: The member fields all have package-private
access, because they are
+ * either accessed by an inner subclass or by the enclosing class.
+ */
+ private static class ConjunctFutureImpl extends
FlinkCompletableFuture<Void> implements ConjunctFuture {
--- End diff --
Interesting idea. I think the linked implementation is not yet thread safe,
because the BiFunction that adds the results to the collection is called
concurrently as the different original futures complete. For this particular
use case, we'd need to also preserve the order. This is easy to change by
simply pre-allocating a target array and setting the results to the positions
(the completion function would need to get the target index).
I would actually like to do that as a separate follow-up, unless you object
there.
> Eager Scheduling should deploy all Tasks together
> -------------------------------------------------
>
> Key: FLINK-5747
> URL: https://issues.apache.org/jira/browse/FLINK-5747
> Project: Flink
> Issue Type: Bug
> Components: JobManager
> Affects Versions: 1.2.0
> Reporter: Stephan Ewen
> Assignee: Stephan Ewen
> Fix For: 1.3.0
>
>
> Currently, eager scheduling immediately triggers the scheduling for all
> vertices and their subtasks in topological order.
> This has two problems:
> - This works only, as long as resource acquisition is "synchronous". With
> dynamic resource acquisition in FLIP-6, the resources are returned as Futures
> which may complete out of order. This results in out-of-order (not in
> topological order) scheduling of tasks which does not work for streaming.
> - Deploying some tasks that depend on other tasks before it is clear that
> the other tasks have resources as well leads to situations where many
> deploy/recovery cycles happen before enough resources are available to get
> the job running fully.
> For eager scheduling, we should allocate all resources in one chunk and then
> deploy once we know that all are available.
> As a follow-up, the same should be done per pipelined component in lazy batch
> scheduling as well. That way we get lazy scheduling across blocking
> boundaries, and bulk (gang) scheduling in pipelined subgroups.
> This also does not apply for efforts of fine grained recovery, where
> individual tasks request replacement resources.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)