Github user sihuazhou commented on a diff in the pull request:
https://github.com/apache/flink/pull/4916#discussion_r148437379
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
---
@@ -476,14 +482,13 @@ else if (numSources < parallelism) {
* @return The preferred locations based in input streams, or an empty
iterable,
* if there is no input-based preference.
*/
- public Iterable<TaskManagerLocation>
getPreferredLocationsBasedOnInputs() {
+ public Collection<CompletableFuture<TaskManagerLocation>>
getPreferredLocationsBasedOnInputs() {
// otherwise, base the preferred locations on the input
connections
if (inputEdges == null) {
return Collections.emptySet();
}
else {
- Set<TaskManagerLocation> locations = new HashSet<>();
- Set<TaskManagerLocation> inputLocations = new
HashSet<>();
+ Set<CompletableFuture<TaskManagerLocation>>
inputLocations = new HashSet<>(4);
--- End diff --
I think if we weigh inputs in scheduler maybe this problem can be solved.
In above example, imagine that, a is "broadcast" and it downstreamâs
parallelism is 10, and b is "forward" , then every ExecutionEdge for a's weight
is ```a-total-weight / 10``` and ExecutionEdge of b's weight is
```b-total-weight / 1```, we can evaluate total weight according to vertex's
throughput, currently, all vertex's total weight are equal, so location of b's
input will be picked.
---