Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/4949#discussion_r192378473 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java --- @@ -203,68 +206,104 @@ public LocalityAwareRequirementsToSlotMatcher(@Nonnull Collection<TaskManagerLoc } // we build up two indexes, one for resource id and one for host names of the preferred locations. - HashSet<ResourceID> preferredResourceIDs = new HashSet<>(locationPreferences.size()); - HashSet<String> preferredFQHostNames = new HashSet<>(locationPreferences.size()); + Map<ResourceID, Integer> preferredResourceIDs = new HashMap<>(locationPreferences.size()); + Map<String, Integer> preferredFQHostNames = new HashMap<>(locationPreferences.size()); for (TaskManagerLocation locationPreference : locationPreferences) { - preferredResourceIDs.add(locationPreference.getResourceID()); - preferredFQHostNames.add(locationPreference.getFQDNHostname()); + Integer oldVal = preferredResourceIDs.getOrDefault(locationPreference.getResourceID(), 0); --- End diff -- We could use `preferredResourceIDs.merge(locationPreference.getResourceID(), 1, Integer::sum)`, and similar for `preferredFQHostNames`.
---