Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/4916#discussion_r148479985
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
---
@@ -476,14 +482,13 @@ else if (numSources < parallelism) {
* @return The preferred locations based in input streams, or an empty
iterable,
* if there is no input-based preference.
*/
- public Iterable<TaskManagerLocation>
getPreferredLocationsBasedOnInputs() {
+ public Collection<CompletableFuture<TaskManagerLocation>>
getPreferredLocationsBasedOnInputs() {
// otherwise, base the preferred locations on the input
connections
if (inputEdges == null) {
return Collections.emptySet();
}
else {
- Set<TaskManagerLocation> locations = new HashSet<>();
- Set<TaskManagerLocation> inputLocations = new
HashSet<>();
+ Set<CompletableFuture<TaskManagerLocation>>
inputLocations = new HashSet<>(4);
--- End diff --
For example in the case of the broadcast join with lazy scheduling, it
could be the case that the broadcasting operator produces first and thus
triggers the `scheduleOrUpdateConsumers` call on the `ExecutionGraph`. This
will then trigger the scheduling of the join operator. At this time, there
might only be the location of the broadcast operator known. However, since we
only return the forward operator's location future which has not been
completed, the lazy scheduling will schedule without location preference
because the `LocationPreferenceConstraint` is `ANY`.
---