Github user StephanEwen commented on a diff in the pull request:
https://github.com/apache/flink/pull/4916#discussion_r148364165
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
---
@@ -1065,6 +1177,46 @@ private void sendUpdatePartitionInfoRpcCall(
// Miscellaneous
//
--------------------------------------------------------------------------------------------
+ /**
+ * Calculates the preferred locations based on the location preference
constraint.
+ *
+ * @param locationPreferenceConstraint constraint for the location
preference
+ * @return Future containing the collection of preferred locations.
This might not be completed if not all inputs
+ * have been a resource assigned.
+ */
+ @VisibleForTesting
+ public CompletableFuture<Collection<TaskManagerLocation>>
calculatePreferredLocations(LocationPreferenceConstraint
locationPreferenceConstraint) {
+ final Collection<CompletableFuture<TaskManagerLocation>>
preferredLocationFutures = getVertex().getPreferredLocationsBasedOnInputs();
+ final CompletableFuture<Collection<TaskManagerLocation>>
preferredLocationsFuture;
+
+ switch(locationPreferenceConstraint) {
+ case ALL:
+ preferredLocationsFuture =
FutureUtils.combineAll(preferredLocationFutures);
+ break;
+ case ANY:
+ final ArrayList<TaskManagerLocation>
completedTaskManagerLocations = new ArrayList<>(1);
--- End diff --
Is the intention to return the first result available here (assuming from
the size 1 initialization of the array list)?
If yes, should the loop below break after the input with a completed
location future?
---