Github user summerleafs commented on a diff in the pull request:
https://github.com/apache/flink/pull/4916#discussion_r147891494
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
---
@@ -1065,6 +1177,46 @@ private void sendUpdatePartitionInfoRpcCall(
// Miscellaneous
//
--------------------------------------------------------------------------------------------
+ /**
+ * Calculates the preferred locations based on the location preference
constraint.
+ *
+ * @param locationPreferenceConstraint constraint for the location
preference
+ * @return Future containing the collection of preferred locations.
This might not be completed if not all inputs
+ * have been a resource assigned.
+ */
+ @VisibleForTesting
+ public CompletableFuture<Collection<TaskManagerLocation>>
calculatePreferredLocations(LocationPreferenceConstraint
locationPreferenceConstraint) {
+ final Collection<CompletableFuture<TaskManagerLocation>>
preferredLocationFutures = getVertex().getPreferredLocationsBasedOnInputs();
+ final CompletableFuture<Collection<TaskManagerLocation>>
preferredLocationsFuture;
--- End diff --
Hi Till,`getPreferredLocations()` is not invoked here because flink doesn't
yet support reading the checkpoint data locally? I have create a issue for
flink reading checkpoint locally
[here](https://issues.apache.org/jira/browse/FLINK-7873?filter=-1), when it
complete i wonder if we can invoke `getPreferedLocations()` instead of
`getPreferredLocationsBasedOnInputs()`.
---