Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/4916#discussion_r148209063
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
---
@@ -878,113 +880,70 @@ private void scheduleEager(SlotProvider
slotProvider, final Time timeout) {
// that way we do not have any operation that can fail between
allocating the slots
// and adding them to the list. If we had a failure in between
there, that would
// cause the slots to get lost
- final ArrayList<ExecutionAndSlot[]> resources = new
ArrayList<>(getNumberOfExecutionJobVertices());
final boolean queued = allowQueuedScheduling;
- // we use this flag to handle failures in a 'finally' clause
- // that allows us to not go through clumsy cast-and-rethrow
logic
- boolean successful = false;
+ // collecting all the slots may resize and fail in that
operation without slots getting lost
+ final ArrayList<CompletableFuture<Execution>>
allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
- try {
- // collecting all the slots may resize and fail in that
operation without slots getting lost
- final ArrayList<CompletableFuture<SimpleSlot>>
slotFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
+ // allocate the slots (obtain all their futures
+ for (ExecutionJobVertex ejv : getVerticesTopologically()) {
+ // these calls are not blocking, they only return
futures
--- End diff --
There is no specific reason why we iterate over the vertices in topological
order. We could also choose a completely random order for eager scheduling
because the scheduling order will be determined by the preferred location
futures (which at the moment is based on inputs only). If we should switch to
state location then it basically means that we schedule the individual tasks
independently because the vertices don't depend on the input locations.
---