Github user sihuazhou commented on a diff in the pull request:
https://github.com/apache/flink/pull/4916#discussion_r148211942
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
---
@@ -878,113 +880,70 @@ private void scheduleEager(SlotProvider
slotProvider, final Time timeout) {
// that way we do not have any operation that can fail between
allocating the slots
// and adding them to the list. If we had a failure in between
there, that would
// cause the slots to get lost
- final ArrayList<ExecutionAndSlot[]> resources = new
ArrayList<>(getNumberOfExecutionJobVertices());
final boolean queued = allowQueuedScheduling;
- // we use this flag to handle failures in a 'finally' clause
- // that allows us to not go through clumsy cast-and-rethrow
logic
- boolean successful = false;
+ // collecting all the slots may resize and fail in that
operation without slots getting lost
+ final ArrayList<CompletableFuture<Execution>>
allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
- try {
- // collecting all the slots may resize and fail in that
operation without slots getting lost
- final ArrayList<CompletableFuture<SimpleSlot>>
slotFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
+ // allocate the slots (obtain all their futures
+ for (ExecutionJobVertex ejv : getVerticesTopologically()) {
+ // these calls are not blocking, they only return
futures
--- End diff --
If we switch to state location then we can't allocate resources according
to the order of topologically, because stateless vertices may share the same
SlotSharingGroup with stateful vertices, if stateless vertices allocated before
the stateful vertices, the result can be bad. An intuitive way to do this is
to allocate resources to stateful vertices firstly.
---