Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5403#discussion_r170543190 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java --- @@ -1343,63 +1330,27 @@ boolean contains(AllocationID slotId) { * Poll a slot which matches the required resource profile. The polling tries to satisfy the * location preferences, by TaskManager and by host. * - * @param resourceProfile The required resource profile. - * @param locationPreferences The location preferences, in order to be checked. + * @param slotProfile slot profile that specifies the requirements for the slot * * @return Slot which matches the resource profile, null if we can't find a match */ - SlotAndLocality poll(ResourceProfile resourceProfile, Collection<TaskManagerLocation> locationPreferences) { + SlotAndLocality poll(SlotProfile slotProfile) { // fast path if no slots are available if (availableSlots.isEmpty()) { return null; } - boolean hadLocationPreference = false; + SlotProfile.ProfileToSlotContextMatcher matcher = slotProfile.matcher(); - if (locationPreferences != null && !locationPreferences.isEmpty()) { - - // first search by TaskManager - for (TaskManagerLocation location : locationPreferences) { - hadLocationPreference = true; - - final Set<AllocatedSlot> onTaskManager = availableSlotsByTaskManager.get(location.getResourceID()); - if (onTaskManager != null) { - for (AllocatedSlot candidate : onTaskManager) { - if (candidate.getResourceProfile().isMatching(resourceProfile)) { - remove(candidate.getAllocationId()); - return new SlotAndLocality(candidate, Locality.LOCAL); - } - } - } - } - - // now, search by host - for (TaskManagerLocation location : locationPreferences) { - final Set<AllocatedSlot> onHost = availableSlotsByHost.get(location.getFQDNHostname()); - if (onHost != null) { - for (AllocatedSlot candidate : onHost) { - if (candidate.getResourceProfile().isMatching(resourceProfile)) { - remove(candidate.getAllocationId()); - return new SlotAndLocality(candidate, Locality.HOST_LOCAL); - } - } - } - } - } - - // take any slot - for (SlotAndTimestamp candidate : availableSlots.values()) { - final AllocatedSlot slot = candidate.slot(); - - if (slot.getResourceProfile().isMatching(resourceProfile)) { + return matcher.findMatchWithLocality( + availableSlots.values().stream(), + SlotAndTimestamp::slot, + (slot) -> slot.slot().getResourceProfile().isMatching(slotProfile.getResourceProfile()), + ((slotAndTimestamp, locality) -> { + AllocatedSlot slot = slotAndTimestamp.slot(); remove(slot.getAllocationId()); --- End diff -- ð
---