Github user summerleafs commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4916#discussion_r147889602
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ---
    @@ -878,113 +880,70 @@ private void scheduleEager(SlotProvider 
slotProvider, final Time timeout) {
                // that way we do not have any operation that can fail between 
allocating the slots
                // and adding them to the list. If we had a failure in between 
there, that would
                // cause the slots to get lost
    -           final ArrayList<ExecutionAndSlot[]> resources = new 
ArrayList<>(getNumberOfExecutionJobVertices());
                final boolean queued = allowQueuedScheduling;
     
    -           // we use this flag to handle failures in a 'finally' clause
    -           // that allows us to not go through clumsy cast-and-rethrow 
logic
    -           boolean successful = false;
    +           // collecting all the slots may resize and fail in that 
operation without slots getting lost
    +           final ArrayList<CompletableFuture<Execution>> 
allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
     
    -           try {
    -                   // collecting all the slots may resize and fail in that 
operation without slots getting lost
    -                   final ArrayList<CompletableFuture<SimpleSlot>> 
slotFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
    +           // allocate the slots (obtain all their futures
    +           for (ExecutionJobVertex ejv : getVerticesTopologically()) {
    +                   // these calls are not blocking, they only return 
futures
    --- End diff --
    
    Hi, allocate resources according to the order of topologically, is just to 
facilitate the optimization of 'allocate resource base on prefer input'? it may 
cause bad result if we allocate base on state.


---

Reply via email to