[
https://issues.apache.org/jira/browse/FLINK-5747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15871835#comment-15871835
]
ASF GitHub Bot commented on FLINK-5747:
---------------------------------------
Github user StephanEwen commented on a diff in the pull request:
https://github.com/apache/flink/pull/3295#discussion_r101753888
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
---
@@ -754,6 +759,139 @@ public void scheduleForExecution(SlotProvider
slotProvider) throws JobException
}
}
+ private void scheduleLazy(SlotProvider slotProvider) throws
NoResourceAvailableException {
+ // simply take the vertices without inputs.
+ for (ExecutionJobVertex ejv : this.tasks.values()) {
+ if (ejv.getJobVertex().isInputVertex()) {
+ ejv.scheduleAll(slotProvider,
allowQueuedScheduling);
+ }
+ }
+ }
+
+ /**
+ *
+ *
+ * @param slotProvider The resource provider from which the slots are
allocated
+ * @param timeout The maximum time that the deployment may take,
before a
+ * TimeoutException is thrown.
+ */
+ private void scheduleEager(SlotProvider slotProvider, final Time
timeout) {
+ checkState(state == JobStatus.RUNNING, "job is not running
currently");
+
+ // Important: reserve all the space we need up front.
+ // that way we do not have any operation that can fail between
allocating the slots
+ // and adding them to the list. If we had a failure in between
there, that would
+ // cause the slots to get lost
+ final ArrayList<ExecutionAndSlot[]> resources = new
ArrayList<>(getNumberOfExecutionJobVertices());
+ final boolean queued = allowQueuedScheduling;
+
+ // we use this flag to handle failures in a 'finally' clause
+ // that allows us to not go through clumsy cast-and-rethrow
logic
+ boolean successful = false;
+
+ try {
+ // collecting all the slots may resize and fail in that
operation without slots getting lost
+ final ArrayList<Future<SimpleSlot>> slotFutures = new
ArrayList<>(getNumberOfExecutionJobVertices());
+
+ // allocate the slots (obtain all their futures
+ for (ExecutionJobVertex ejv :
getVerticesTopologically()) {
+ // these calls are not blocking, they only
return futures
+ ExecutionAndSlot[] slots =
ejv.allocateResourcesForAll(slotProvider, queued);
+
+ // we need to first add the slots to this list,
to be safe on release
+ resources.add(slots);
+
+ for (ExecutionAndSlot ens : slots) {
+ slotFutures.add(ens.slotFuture);
+ }
+ }
+
+ // this future is complete once all slot futures are
complete.
+ // the future fails once one slot future fails.
+ final ConjunctFuture allAllocationsComplete =
FutureUtils.combineAll(slotFutures);
--- End diff --
I thought about that, and one purpose of this change is to avoid many
partial deployments / failures when not all resources are available.
In the "FLIP-1" work, we would introduce something like "domains of tasks
that schedule and fail together". We can schedule them topologically
independently.
> Eager Scheduling should deploy all Tasks together
> -------------------------------------------------
>
> Key: FLINK-5747
> URL: https://issues.apache.org/jira/browse/FLINK-5747
> Project: Flink
> Issue Type: Bug
> Components: JobManager
> Affects Versions: 1.2.0
> Reporter: Stephan Ewen
> Assignee: Stephan Ewen
> Fix For: 1.3.0
>
>
> Currently, eager scheduling immediately triggers the scheduling for all
> vertices and their subtasks in topological order.
> This has two problems:
> - This works only, as long as resource acquisition is "synchronous". With
> dynamic resource acquisition in FLIP-6, the resources are returned as Futures
> which may complete out of order. This results in out-of-order (not in
> topological order) scheduling of tasks which does not work for streaming.
> - Deploying some tasks that depend on other tasks before it is clear that
> the other tasks have resources as well leads to situations where many
> deploy/recovery cycles happen before enough resources are available to get
> the job running fully.
> For eager scheduling, we should allocate all resources in one chunk and then
> deploy once we know that all are available.
> As a follow-up, the same should be done per pipelined component in lazy batch
> scheduling as well. That way we get lazy scheduling across blocking
> boundaries, and bulk (gang) scheduling in pipelined subgroups.
> This also does not apply for efforts of fine grained recovery, where
> individual tasks request replacement resources.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)