Github user StephanEwen commented on a diff in the pull request:
https://github.com/apache/flink/pull/4916#discussion_r148366296
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
---
@@ -476,14 +482,13 @@ else if (numSources < parallelism) {
* @return The preferred locations based in input streams, or an empty
iterable,
* if there is no input-based preference.
*/
- public Iterable<TaskManagerLocation>
getPreferredLocationsBasedOnInputs() {
+ public Collection<CompletableFuture<TaskManagerLocation>>
getPreferredLocationsBasedOnInputs() {
// otherwise, base the preferred locations on the input
connections
if (inputEdges == null) {
return Collections.emptySet();
}
else {
- Set<TaskManagerLocation> locations = new HashSet<>();
- Set<TaskManagerLocation> inputLocations = new
HashSet<>();
+ Set<CompletableFuture<TaskManagerLocation>>
inputLocations = new HashSet<>(4);
--- End diff --
The code here changes some semantics:
- Originally: Determine preferred input location per input gate. If one
gate has too many candidates, and one gate as few candidates, then these few
candidates are the preference. Example: a broadcast join where one input if
*broadcast*, the other is *forward*. The code would pick the locality
preference for the "forward" input.
- Now: All input channels contribute to the same locality preference
pool. If one input has too many candidates, no locality preferences exist at
all. In the broadcast join case, the forward input is not taken into account
any more.
Is that intended? I think the broadcast join case is a good example why the
per-input(gate) treatment is helpful.
---