Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/4949#discussion_r192407287 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java --- @@ -203,68 +206,104 @@ public LocalityAwareRequirementsToSlotMatcher(@Nonnull Collection<TaskManagerLoc } // we build up two indexes, one for resource id and one for host names of the preferred locations. - HashSet<ResourceID> preferredResourceIDs = new HashSet<>(locationPreferences.size()); - HashSet<String> preferredFQHostNames = new HashSet<>(locationPreferences.size()); + Map<ResourceID, Integer> preferredResourceIDs = new HashMap<>(locationPreferences.size()); + Map<String, Integer> preferredFQHostNames = new HashMap<>(locationPreferences.size()); for (TaskManagerLocation locationPreference : locationPreferences) { - preferredResourceIDs.add(locationPreference.getResourceID()); - preferredFQHostNames.add(locationPreference.getFQDNHostname()); + Integer oldVal = preferredResourceIDs.getOrDefault(locationPreference.getResourceID(), 0); + preferredResourceIDs.put(locationPreference.getResourceID(), oldVal + 1); + + oldVal = preferredFQHostNames.getOrDefault(locationPreference.getFQDNHostname(), 0); + preferredFQHostNames.put(locationPreference.getFQDNHostname(), oldVal + 1); } Iterator<IN> iterator = candidates.iterator(); - IN matchByHostName = null; IN matchByAdditionalRequirements = null; + final Map<IN, CandidateMatchedResult> candidateMatchedResults = new HashMap<>(); + while (iterator.hasNext()) { IN candidate = iterator.next(); SlotContext slotContext = contextExtractor.apply(candidate); // this if checks if the candidate has is a local slot - if (preferredResourceIDs.contains(slotContext.getTaskManagerLocation().getResourceID())) { + Integer localWeigh = preferredResourceIDs.get(slotContext.getTaskManagerLocation().getResourceID()); + if (localWeigh != null) { if (additionalRequirementsFilter.test(candidate)) { - // we can stop, because we found a match with best possible locality. - return resultProducer.apply(candidate, Locality.LOCAL); + // we found a match with locality. + candidateMatchedResults.put(candidate, new CandidateMatchedResult(localWeigh, 0)); } else { // next candidate because this failed on the additional requirements. continue; } - } - - // this if checks if the candidate is at least host-local, if we did not find another host-local - // candidate before. - if (matchByHostName == null) { - if (preferredFQHostNames.contains(slotContext.getTaskManagerLocation().getFQDNHostname())) { + } else { + // this if checks if the candidate is host-local. + Integer hostLocalWeigh = preferredFQHostNames.get(slotContext.getTaskManagerLocation().getFQDNHostname()); + if (hostLocalWeigh != null) { if (additionalRequirementsFilter.test(candidate)) { - // We remember the candidate, but still continue because there might still be a candidate - // that is local to the desired task manager. - matchByHostName = candidate; + // we found a match with host locality. + candidateMatchedResults.put(candidate, new CandidateMatchedResult(0, hostLocalWeigh)); } else { // next candidate because this failed on the additional requirements. continue; } } + } - // this if checks if the candidate at least fulfils the resource requirements, and is only required - // if we did not yet find a valid candidate with better locality. - if (matchByAdditionalRequirements == null - && additionalRequirementsFilter.test(candidate)) { - // Again, we remember but continue in hope for a candidate with better locality. - matchByAdditionalRequirements = candidate; - } + // this if checks if the candidate at least fulfils the resource requirements, and is only required + // if we did not yet find a valid candidate with better locality. + if (candidateMatchedResults.isEmpty() + && matchByAdditionalRequirements == null + && additionalRequirementsFilter.test(candidate)) { + // Again, we remember but continue in hope for a candidate with better locality. + matchByAdditionalRequirements = candidate; } } + // find the best matched one. + Map.Entry<IN, CandidateMatchedResult> theBestOne = candidateMatchedResults.entrySet() + .stream() + .sorted(Comparator.comparingInt((Map.Entry<IN, CandidateMatchedResult> matchResult) + -> matchResult.getValue().getScore()).reversed()) + .findFirst() + .orElse(null); + // at the end of the iteration, we return the candidate with best possible locality or null. - if (matchByHostName != null) { - return resultProducer.apply(matchByHostName, Locality.HOST_LOCAL); + if (theBestOne != null) { + return resultProducer.apply(theBestOne.getKey(), theBestOne.getValue().getLocality()); } else if (matchByAdditionalRequirements != null) { return resultProducer.apply(matchByAdditionalRequirements, Locality.NON_LOCAL); } else { return null; } } + + /** + * Helper class to record the match result. + */ + private class CandidateMatchedResult { + + private int localCount; + + private int hostLocalCount; + + public CandidateMatchedResult(int localCount, int hostLocalCount) { + this.localCount = localCount; + this.hostLocalCount = hostLocalCount; + } + + // evaluate the match score + public int getScore() { + return localCount * 10 + hostLocalCount * 1; + } + + // get the highest locality. + public Locality getLocality() { + return localCount > 0 ? Locality.LOCAL : Locality.HOST_LOCAL; --- End diff -- This is won't happen in our case, but you are right this is a logical bug, will change.
---