Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4916#discussion_r148479985
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 ---
    @@ -476,14 +482,13 @@ else if (numSources < parallelism) {
         * @return The preferred locations based in input streams, or an empty 
iterable,
         *         if there is no input-based preference.
         */
    -   public Iterable<TaskManagerLocation> 
getPreferredLocationsBasedOnInputs() {
    +   public Collection<CompletableFuture<TaskManagerLocation>> 
getPreferredLocationsBasedOnInputs() {
                // otherwise, base the preferred locations on the input 
connections
                if (inputEdges == null) {
                        return Collections.emptySet();
                }
                else {
    -                   Set<TaskManagerLocation> locations = new HashSet<>();
    -                   Set<TaskManagerLocation> inputLocations = new 
HashSet<>();
    +                   Set<CompletableFuture<TaskManagerLocation>> 
inputLocations = new HashSet<>(4);
    --- End diff --
    
    For example in the case of the broadcast join with lazy scheduling, it 
could be the case that the broadcasting operator produces first and thus 
triggers the `scheduleOrUpdateConsumers` call on the `ExecutionGraph`. This 
will then trigger the scheduling of the join operator. At this time, there 
might only be the location of the broadcast operator known. However, since we 
only return the forward operator's location future which has not been 
completed, the lazy scheduling will schedule without location preference 
because the `LocationPreferenceConstraint` is `ANY`.


---

Reply via email to