[
https://issues.apache.org/jira/browse/FLINK-5747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15871794#comment-15871794
]
ASF GitHub Bot commented on FLINK-5747:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/3295#discussion_r101739062
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
---
@@ -754,6 +759,139 @@ public void scheduleForExecution(SlotProvider
slotProvider) throws JobException
}
}
+ private void scheduleLazy(SlotProvider slotProvider) throws
NoResourceAvailableException {
+ // simply take the vertices without inputs.
+ for (ExecutionJobVertex ejv : this.tasks.values()) {
+ if (ejv.getJobVertex().isInputVertex()) {
+ ejv.scheduleAll(slotProvider,
allowQueuedScheduling);
+ }
+ }
+ }
+
+ /**
+ *
+ *
+ * @param slotProvider The resource provider from which the slots are
allocated
+ * @param timeout The maximum time that the deployment may take,
before a
+ * TimeoutException is thrown.
+ */
+ private void scheduleEager(SlotProvider slotProvider, final Time
timeout) {
+ checkState(state == JobStatus.RUNNING, "job is not running
currently");
+
+ // Important: reserve all the space we need up front.
+ // that way we do not have any operation that can fail between
allocating the slots
+ // and adding them to the list. If we had a failure in between
there, that would
+ // cause the slots to get lost
+ final ArrayList<ExecutionAndSlot[]> resources = new
ArrayList<>(getNumberOfExecutionJobVertices());
+ final boolean queued = allowQueuedScheduling;
+
+ // we use this flag to handle failures in a 'finally' clause
+ // that allows us to not go through clumsy cast-and-rethrow
logic
+ boolean successful = false;
+
+ try {
+ // collecting all the slots may resize and fail in that
operation without slots getting lost
+ final ArrayList<Future<SimpleSlot>> slotFutures = new
ArrayList<>(getNumberOfExecutionJobVertices());
+
+ // allocate the slots (obtain all their futures
+ for (ExecutionJobVertex ejv :
getVerticesTopologically()) {
+ // these calls are not blocking, they only
return futures
+ ExecutionAndSlot[] slots =
ejv.allocateResourcesForAll(slotProvider, queued);
--- End diff --
With the generalized `ConjunctFuture` we could return a collection if
`Future<ExecutionAndSlots>` which could then be combined to a
`ConjunctFuture<ExecutionAndSlots>`. When completed it would pass a
`Collection<ExecutionAndSlots>` to the handle method.
> Eager Scheduling should deploy all Tasks together
> -------------------------------------------------
>
> Key: FLINK-5747
> URL: https://issues.apache.org/jira/browse/FLINK-5747
> Project: Flink
> Issue Type: Bug
> Components: JobManager
> Affects Versions: 1.2.0
> Reporter: Stephan Ewen
> Assignee: Stephan Ewen
> Fix For: 1.3.0
>
>
> Currently, eager scheduling immediately triggers the scheduling for all
> vertices and their subtasks in topological order.
> This has two problems:
> - This works only, as long as resource acquisition is "synchronous". With
> dynamic resource acquisition in FLIP-6, the resources are returned as Futures
> which may complete out of order. This results in out-of-order (not in
> topological order) scheduling of tasks which does not work for streaming.
> - Deploying some tasks that depend on other tasks before it is clear that
> the other tasks have resources as well leads to situations where many
> deploy/recovery cycles happen before enough resources are available to get
> the job running fully.
> For eager scheduling, we should allocate all resources in one chunk and then
> deploy once we know that all are available.
> As a follow-up, the same should be done per pipelined component in lazy batch
> scheduling as well. That way we get lazy scheduling across blocking
> boundaries, and bulk (gang) scheduling in pipelined subgroups.
> This also does not apply for efforts of fine grained recovery, where
> individual tasks request replacement resources.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)