Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4949#discussion_r192407287
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java
 ---
    @@ -203,68 +206,104 @@ public 
LocalityAwareRequirementsToSlotMatcher(@Nonnull Collection<TaskManagerLoc
                        }
     
                        // we build up two indexes, one for resource id and one 
for host names of the preferred locations.
    -                   HashSet<ResourceID> preferredResourceIDs = new 
HashSet<>(locationPreferences.size());
    -                   HashSet<String> preferredFQHostNames = new 
HashSet<>(locationPreferences.size());
    +                   Map<ResourceID, Integer> preferredResourceIDs = new 
HashMap<>(locationPreferences.size());
    +                   Map<String, Integer> preferredFQHostNames = new 
HashMap<>(locationPreferences.size());
     
                        for (TaskManagerLocation locationPreference : 
locationPreferences) {
    -                           
preferredResourceIDs.add(locationPreference.getResourceID());
    -                           
preferredFQHostNames.add(locationPreference.getFQDNHostname());
    +                           Integer oldVal = 
preferredResourceIDs.getOrDefault(locationPreference.getResourceID(), 0);
    +                           
preferredResourceIDs.put(locationPreference.getResourceID(), oldVal + 1);
    +
    +                           oldVal = 
preferredFQHostNames.getOrDefault(locationPreference.getFQDNHostname(), 0);
    +                           
preferredFQHostNames.put(locationPreference.getFQDNHostname(), oldVal + 1);
                        }
     
                        Iterator<IN> iterator = candidates.iterator();
     
    -                   IN matchByHostName = null;
                        IN matchByAdditionalRequirements = null;
     
    +                   final Map<IN, CandidateMatchedResult> 
candidateMatchedResults = new HashMap<>();
    +
                        while (iterator.hasNext()) {
     
                                IN candidate = iterator.next();
                                SlotContext slotContext = 
contextExtractor.apply(candidate);
     
                                // this if checks if the candidate has is a 
local slot
    -                           if 
(preferredResourceIDs.contains(slotContext.getTaskManagerLocation().getResourceID()))
 {
    +                           Integer localWeigh = 
preferredResourceIDs.get(slotContext.getTaskManagerLocation().getResourceID());
    +                           if (localWeigh != null) {
                                        if 
(additionalRequirementsFilter.test(candidate)) {
    -                                           // we can stop, because we 
found a match with best possible locality.
    -                                           return 
resultProducer.apply(candidate, Locality.LOCAL);
    +                                           // we found a match with 
locality.
    +                                           
candidateMatchedResults.put(candidate, new CandidateMatchedResult(localWeigh, 
0));
                                        } else {
                                                // next candidate because this 
failed on the additional requirements.
                                                continue;
                                        }
    -                           }
    -
    -                           // this if checks if the candidate is at least 
host-local, if we did not find another host-local
    -                           // candidate before.
    -                           if (matchByHostName == null) {
    -                                   if 
(preferredFQHostNames.contains(slotContext.getTaskManagerLocation().getFQDNHostname()))
 {
    +                           } else {
    +                                   // this if checks if the candidate is 
host-local.
    +                                   Integer hostLocalWeigh = 
preferredFQHostNames.get(slotContext.getTaskManagerLocation().getFQDNHostname());
    +                                   if (hostLocalWeigh != null) {
                                                if 
(additionalRequirementsFilter.test(candidate)) {
    -                                                   // We remember the 
candidate, but still continue because there might still be a candidate
    -                                                   // that is local to the 
desired task manager.
    -                                                   matchByHostName = 
candidate;
    +                                                   // we found a match 
with host locality.
    +                                                   
candidateMatchedResults.put(candidate, new CandidateMatchedResult(0, 
hostLocalWeigh));
                                                } else {
                                                        // next candidate 
because this failed on the additional requirements.
                                                        continue;
                                                }
                                        }
    +                           }
     
    -                                   // this if checks if the candidate at 
least fulfils the resource requirements, and is only required
    -                                   // if we did not yet find a valid 
candidate with better locality.
    -                                   if (matchByAdditionalRequirements == 
null
    -                                           && 
additionalRequirementsFilter.test(candidate)) {
    -                                           // Again, we remember but 
continue in hope for a candidate with better locality.
    -                                           matchByAdditionalRequirements = 
candidate;
    -                                   }
    +                           // this if checks if the candidate at least 
fulfils the resource requirements, and is only required
    +                           // if we did not yet find a valid candidate 
with better locality.
    +                           if (candidateMatchedResults.isEmpty()
    +                                   && matchByAdditionalRequirements == null
    +                                   && 
additionalRequirementsFilter.test(candidate)) {
    +                                   // Again, we remember but continue in 
hope for a candidate with better locality.
    +                                   matchByAdditionalRequirements = 
candidate;
                                }
                        }
     
    +                   // find the best matched one.
    +                   Map.Entry<IN, CandidateMatchedResult> theBestOne = 
candidateMatchedResults.entrySet()
    +                                   .stream()
    +                                   
.sorted(Comparator.comparingInt((Map.Entry<IN, CandidateMatchedResult> 
matchResult)
    +                                           -> 
matchResult.getValue().getScore()).reversed())
    +                                   .findFirst()
    +                                   .orElse(null);
    +
                        // at the end of the iteration, we return the candidate 
with best possible locality or null.
    -                   if (matchByHostName != null) {
    -                           return resultProducer.apply(matchByHostName, 
Locality.HOST_LOCAL);
    +                   if (theBestOne != null) {
    +                           return 
resultProducer.apply(theBestOne.getKey(), theBestOne.getValue().getLocality());
                        } else if (matchByAdditionalRequirements != null) {
                                return 
resultProducer.apply(matchByAdditionalRequirements, Locality.NON_LOCAL);
                        } else {
                                return null;
                        }
                }
    +
    +           /**
    +            * Helper class to record the match result.
    +            */
    +           private class CandidateMatchedResult {
    +
    +                   private int localCount;
    +
    +                   private int hostLocalCount;
    +
    +                   public CandidateMatchedResult(int localCount, int 
hostLocalCount) {
    +                           this.localCount = localCount;
    +                           this.hostLocalCount = hostLocalCount;
    +                   }
    +
    +                   // evaluate the match score
    +                   public int getScore() {
    +                           return localCount * 10 + hostLocalCount * 1;
    +                   }
    +
    +                   // get the highest locality.
    +                   public Locality getLocality() {
    +                           return localCount > 0 ? Locality.LOCAL : 
Locality.HOST_LOCAL;
    --- End diff --
    
    This is won't happen in our case, but you are right this is a logical bug, 
will change.


---

Reply via email to