Github user summerleafs commented on a diff in the pull request:
https://github.com/apache/flink/pull/4916#discussion_r147889602
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
---
@@ -878,113 +880,70 @@ private void scheduleEager(SlotProvider
slotProvider, final Time timeout) {
// that way we do not have any operation that can fail between
allocating the slots
// and adding them to the list. If we had a failure in between
there, that would
// cause the slots to get lost
- final ArrayList<ExecutionAndSlot[]> resources = new
ArrayList<>(getNumberOfExecutionJobVertices());
final boolean queued = allowQueuedScheduling;
- // we use this flag to handle failures in a 'finally' clause
- // that allows us to not go through clumsy cast-and-rethrow
logic
- boolean successful = false;
+ // collecting all the slots may resize and fail in that
operation without slots getting lost
+ final ArrayList<CompletableFuture<Execution>>
allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
- try {
- // collecting all the slots may resize and fail in that
operation without slots getting lost
- final ArrayList<CompletableFuture<SimpleSlot>>
slotFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
+ // allocate the slots (obtain all their futures
+ for (ExecutionJobVertex ejv : getVerticesTopologically()) {
+ // these calls are not blocking, they only return
futures
--- End diff --
Hi, allocate resources according to the order of topologically, is just to
facilitate the optimization of 'allocate resource base on prefer input'? it may
cause bad result if we allocate base on state.
---