Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4916#discussion_r148437379
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 ---
    @@ -476,14 +482,13 @@ else if (numSources < parallelism) {
         * @return The preferred locations based in input streams, or an empty 
iterable,
         *         if there is no input-based preference.
         */
    -   public Iterable<TaskManagerLocation> 
getPreferredLocationsBasedOnInputs() {
    +   public Collection<CompletableFuture<TaskManagerLocation>> 
getPreferredLocationsBasedOnInputs() {
                // otherwise, base the preferred locations on the input 
connections
                if (inputEdges == null) {
                        return Collections.emptySet();
                }
                else {
    -                   Set<TaskManagerLocation> locations = new HashSet<>();
    -                   Set<TaskManagerLocation> inputLocations = new 
HashSet<>();
    +                   Set<CompletableFuture<TaskManagerLocation>> 
inputLocations = new HashSet<>(4);
    --- End diff --
    
    I think if we weigh inputs in scheduler maybe this problem can be solved. 
In above example, imagine that, a is "broadcast" and it downstream‘s 
parallelism is 10, and b is "forward" , then every ExecutionEdge for a's weight 
is ```a-total-weight / 10``` and ExecutionEdge of b's weight is 
```b-total-weight / 1```, we can evaluate total weight according to vertex's 
throughput, currently, all vertex's total weight are equal, so location of b's 
input will be picked.


---

Reply via email to