Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/4916#discussion_r148477247
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
---
@@ -476,14 +482,13 @@ else if (numSources < parallelism) {
* @return The preferred locations based in input streams, or an empty
iterable,
* if there is no input-based preference.
*/
- public Iterable<TaskManagerLocation>
getPreferredLocationsBasedOnInputs() {
+ public Collection<CompletableFuture<TaskManagerLocation>>
getPreferredLocationsBasedOnInputs() {
// otherwise, base the preferred locations on the input
connections
if (inputEdges == null) {
return Collections.emptySet();
}
else {
- Set<TaskManagerLocation> locations = new HashSet<>();
- Set<TaskManagerLocation> inputLocations = new
HashSet<>();
+ Set<CompletableFuture<TaskManagerLocation>>
inputLocations = new HashSet<>(4);
--- End diff --
This change was not entirely intended and the previous code makes totally
sense with your explanation. I'm actually in favour of reverting my changes to
not change the semantics for the moment.
However, for the future, I'm wondering whether this kind of decision should
be made by the `ExecutionVertex` or whether it shouldn't rather be the task of
the `Scheduler` to make this kind of decision.
For example, what if a task has multiple input gates and one of them with
exactly one producer. Then it will only return the location of this single
producer. Now if this TM has no more slots left, then we would basically
randomly pick another slot even though there might be other TMs one which
another producer for this task would run.
---