Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5403#discussion_r170562399
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 ---
    @@ -1343,63 +1330,27 @@ boolean contains(AllocationID slotId) {
                 * Poll a slot which matches the required resource profile. The 
polling tries to satisfy the
                 * location preferences, by TaskManager and by host.
                 *
    -            * @param resourceProfile      The required resource profile.
    -            * @param locationPreferences  The location preferences, in 
order to be checked.
    +            * @param slotProfile slot profile that specifies the 
requirements for the slot
                 *
                 * @return Slot which matches the resource profile, null if we 
can't find a match
                 */
    -           SlotAndLocality poll(ResourceProfile resourceProfile, 
Collection<TaskManagerLocation> locationPreferences) {
    +           SlotAndLocality poll(SlotProfile slotProfile) {
                        // fast path if no slots are available
                        if (availableSlots.isEmpty()) {
                                return null;
                        }
     
    -                   boolean hadLocationPreference = false;
    +                   SlotProfile.ProfileToSlotContextMatcher matcher = 
slotProfile.matcher();
     
    -                   if (locationPreferences != null && 
!locationPreferences.isEmpty()) {
    -
    -                           // first search by TaskManager
    -                           for (TaskManagerLocation location : 
locationPreferences) {
    -                                   hadLocationPreference = true;
    -
    -                                   final Set<AllocatedSlot> onTaskManager 
= availableSlotsByTaskManager.get(location.getResourceID());
    -                                   if (onTaskManager != null) {
    -                                           for (AllocatedSlot candidate : 
onTaskManager) {
    -                                                   if 
(candidate.getResourceProfile().isMatching(resourceProfile)) {
    -                                                           
remove(candidate.getAllocationId());
    -                                                           return new 
SlotAndLocality(candidate, Locality.LOCAL);
    -                                                   }
    -                                           }
    -                                   }
    -                           }
    -
    -                           // now, search by host
    -                           for (TaskManagerLocation location : 
locationPreferences) {
    -                                   final Set<AllocatedSlot> onHost = 
availableSlotsByHost.get(location.getFQDNHostname());
    -                                   if (onHost != null) {
    -                                           for (AllocatedSlot candidate : 
onHost) {
    -                                                   if 
(candidate.getResourceProfile().isMatching(resourceProfile)) {
    -                                                           
remove(candidate.getAllocationId());
    -                                                           return new 
SlotAndLocality(candidate, Locality.HOST_LOCAL);
    -                                                   }
    -                                           }
    -                                   }
    -                           }
    -                   }
    -
    -                   // take any slot
    -                   for (SlotAndTimestamp candidate : 
availableSlots.values()) {
    -                           final AllocatedSlot slot = candidate.slot();
    -
    -                           if 
(slot.getResourceProfile().isMatching(resourceProfile)) {
    +                   return matcher.findMatchWithLocality(
    +                           availableSlots.values().stream(),
    +                           SlotAndTimestamp::slot,
    +                           (slot) -> 
slot.slot().getResourceProfile().isMatching(slotProfile.getResourceProfile()),
    +                           ((slotAndTimestamp, locality) -> {
    --- End diff --
    
    👍 


---

Reply via email to