Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5403#discussion_r170261991
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
 ---
    @@ -175,105 +176,17 @@ MultiTaskSlot createRootSlot(
         * preferred locations is checked.
         *
         * @param groupId which the returned slot must not contain
    -    * @param locationPreferences specifying which locations are preferred
    +    * @param matcher TODO
         * @return the resolved root slot and its locality wrt to the specified 
location preferences
         *              or null if there was no root slot which did not contain 
the given groupId
         */
        @Nullable
    -   MultiTaskSlotLocality getResolvedRootSlot(AbstractID groupId, 
Collection<TaskManagerLocation> locationPreferences) {
    -           Preconditions.checkNotNull(locationPreferences);
    -
    -           final MultiTaskSlotLocality multiTaskSlotLocality;
    -
    -           if (locationPreferences.isEmpty()) {
    -                   multiTaskSlotLocality = 
getResolvedRootSlotWithoutLocationPreferences(groupId);
    -           } else {
    -                   multiTaskSlotLocality = 
getResolvedRootSlotWithLocationPreferences(groupId, locationPreferences);
    -           }
    -
    -           return multiTaskSlotLocality;
    -   }
    -
    -   /**
    -    * Gets a resolved root slot which does not yet contain the given 
groupId. The method will try to
    -    * find a slot of a TaskManager contained in the collection of 
preferred locations. If there is no such slot
    -    * with free capacities available, then the method will look for slots 
of TaskManager which run on the same
    -    * machine as the TaskManager in the collection of preferred locations. 
If there is no such slot, then any slot
    -    * with free capacities is returned. If there is no such slot, then 
null is returned.
    -    *
    -    * @param groupId which the returned slot must not contain
    -    * @param locationPreferences specifying which locations are preferred
    -    * @return the resolved root slot and its locality wrt to the specified 
location preferences
    -    *              or null if there was not root slot which did not 
contain the given groupId
    -    */
    -   @Nullable
    -   private MultiTaskSlotLocality 
getResolvedRootSlotWithLocationPreferences(AbstractID groupId, 
Collection<TaskManagerLocation> locationPreferences) {
    -           Preconditions.checkNotNull(groupId);
    -           Preconditions.checkNotNull(locationPreferences);
    -           final Set<String> hostnameSet = new HashSet<>(16);
    -           MultiTaskSlot nonLocalMultiTaskSlot = null;
    -
    -           synchronized (lock) {
    -                   for (TaskManagerLocation locationPreference : 
locationPreferences) {
    -                           final Set<MultiTaskSlot> multiTaskSlots = 
resolvedRootSlots.get(locationPreference);
    -
    -                           if (multiTaskSlots != null) {
    -                                   for (MultiTaskSlot multiTaskSlot : 
multiTaskSlots) {
    -                                           if 
(!multiTaskSlot.contains(groupId)) {
    -                                                   return 
MultiTaskSlotLocality.of(multiTaskSlot, Locality.LOCAL);
    -                                           }
    -                                   }
    -
    -                                   
hostnameSet.add(locationPreference.getHostname());
    -                           }
    -                   }
    -
    -                   for (Map.Entry<TaskManagerLocation, Set<MultiTaskSlot>> 
taskManagerLocationSetEntry : resolvedRootSlots.entrySet()) {
    -                           if 
(hostnameSet.contains(taskManagerLocationSetEntry.getKey().getHostname())) {
    -                                   for (MultiTaskSlot multiTaskSlot : 
taskManagerLocationSetEntry.getValue()) {
    -                                           if 
(!multiTaskSlot.contains(groupId)) {
    -                                                   return 
MultiTaskSlotLocality.of(multiTaskSlot, Locality.HOST_LOCAL);
    -                                           }
    -                                   }
    -                           } else if (nonLocalMultiTaskSlot == null) {
    -                                   for (MultiTaskSlot multiTaskSlot : 
taskManagerLocationSetEntry.getValue()) {
    -                                           if 
(!multiTaskSlot.contains(groupId)) {
    -                                                   nonLocalMultiTaskSlot = 
multiTaskSlot;
    -                                           }
    -                                   }
    -                           }
    -                   }
    -           }
    -
    -           if (nonLocalMultiTaskSlot != null) {
    -                   return MultiTaskSlotLocality.of(nonLocalMultiTaskSlot, 
Locality.NON_LOCAL);
    -           } else {
    -                   return null;
    -           }
    -   }
    -
    -   /**
    -    * Gets a resolved slot which does not yet contain the given groupId 
without any location
    -    * preferences.
    -    *
    -    * @param groupId which the returned slot must not contain
    -    * @return the resolved slot or null if there was no root slot with 
free capacities
    -    */
    -   @Nullable
    -   private MultiTaskSlotLocality 
getResolvedRootSlotWithoutLocationPreferences(AbstractID groupId) {
    -           Preconditions.checkNotNull(groupId);
    -
    -           synchronized (lock) {
    -                   for (Set<MultiTaskSlot> multiTaskSlots : 
resolvedRootSlots.values()) {
    -                           for (MultiTaskSlot multiTaskSlot : 
multiTaskSlots) {
    -                                   if (!multiTaskSlot.contains(groupId)) {
    -                                           return 
MultiTaskSlotLocality.of(multiTaskSlot, Locality.UNCONSTRAINED);
    -                                   }
    -                           }
    -                   }
    -           }
    -
    -           return null;
    +   MultiTaskSlotLocality getResolvedRootSlot(AbstractID groupId, 
SlotProfile.ProfileToSlotContextMatcher matcher) {
    +           return matcher.findMatchWithLocality(
    +                   
resolvedRootSlots.values().stream().flatMap(Collection::stream),
    --- End diff --
    
    Access to `resolvedRootSlots` outside of `lock` guard.


---

Reply via email to