Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/4916#discussion_r148216894
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
---
@@ -878,113 +880,70 @@ private void scheduleEager(SlotProvider
slotProvider, final Time timeout) {
// that way we do not have any operation that can fail between
allocating the slots
// and adding them to the list. If we had a failure in between
there, that would
// cause the slots to get lost
- final ArrayList<ExecutionAndSlot[]> resources = new
ArrayList<>(getNumberOfExecutionJobVertices());
final boolean queued = allowQueuedScheduling;
- // we use this flag to handle failures in a 'finally' clause
- // that allows us to not go through clumsy cast-and-rethrow
logic
- boolean successful = false;
+ // collecting all the slots may resize and fail in that
operation without slots getting lost
+ final ArrayList<CompletableFuture<Execution>>
allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
- try {
- // collecting all the slots may resize and fail in that
operation without slots getting lost
- final ArrayList<CompletableFuture<SimpleSlot>>
slotFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
+ // allocate the slots (obtain all their futures
+ for (ExecutionJobVertex ejv : getVerticesTopologically()) {
+ // these calls are not blocking, they only return
futures
--- End diff --
But be aware that the `allocateAndAssign` call is non-blocking and the
actual order depends on the preferred locations futures.
---