1996fanrui commented on code in PR #25218:
URL: https://github.com/apache/flink/pull/25218#discussion_r1764649022


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAssigner.java:
##########
@@ -32,4 +46,77 @@ Collection<SlotAssignment> assignSlots(
             Collection<? extends SlotInfo> freeSlots,
             VertexParallelism vertexParallelism,
             JobAllocationsInformation previousAllocations);
+
+    /**
+     * Select the target slots to assign with the requested groups.
+     *
+     * @param slots the raw slots to filter.
+     * @param slotsByTaskExecutor slots per task executor.
+     * @param requestedGroups the number of the request execution slot sharing 
groups.
+     * @return the target slots that are distributed on the minimal task 
executors.
+     */
+    default Collection<? extends SlotInfo> selectSlotsInMinimalTaskExecutors(
+            Collection<? extends SlotInfo> slots,
+            Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>> 
slotsByTaskExecutor,
+            int requestedGroups,
+            List<TaskManagerLocation> sortedTaskExecutors) {
+        if (slots.size() - requestedGroups <= 0) {
+            return slots;
+        }

Review Comment:
   IIUC, the branch is never executed. 
   
   If the resource is not enough, Adaptive Scheduler won't schedule a job. So 
`slots.size()` is always greater than `requestedGroups`.
   
   
   BTW, there is a limitation in the first line of 
`StateLocalitySlotAssigner#assignSlots` : ` checkState(freeSlots.size() >= 
jobInformation.getSlotSharingGroups().size())`.
   
   As I understand, DefaultSlotAssigner could add this limitation as well.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java:
##########
@@ -48,30 +51,28 @@ public Collection<SlotAssignment> assignSlots(
             
allGroups.addAll(createExecutionSlotSharingGroups(vertexParallelism, 
slotSharingGroup));
         }
 
-        Iterator<? extends SlotInfo> iterator = freeSlots.iterator();
+        final Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>> 
slotsPerTaskExecutor =
+                getSlotsPerTaskExecutor(freeSlots);
+        final Collection<? extends SlotInfo> slotInfos =
+                selectSlotsInMinimalTaskExecutors(
+                        freeSlots,
+                        slotsPerTaskExecutor,
+                        allGroups.size(),
+                        getSortedTaskExecutors(slotsPerTaskExecutor));
+
+        Iterator<? extends SlotInfo> iterator = slotInfos.iterator();
         Collection<SlotAssignment> assignments = new ArrayList<>();
         for (ExecutionSlotSharingGroup group : allGroups) {
             assignments.add(new SlotAssignment(iterator.next(), group));
         }
         return assignments;
     }
 
-    static List<ExecutionSlotSharingGroup> createExecutionSlotSharingGroups(
-            VertexParallelism vertexParallelism, SlotSharingGroup 
slotSharingGroup) {
-        final Map<Integer, Set<ExecutionVertexID>> 
sharedSlotToVertexAssignment = new HashMap<>();
-        slotSharingGroup
-                .getJobVertexIds()
-                .forEach(
-                        jobVertexId -> {
-                            int parallelism = 
vertexParallelism.getParallelism(jobVertexId);
-                            for (int subtaskIdx = 0; subtaskIdx < parallelism; 
subtaskIdx++) {
-                                sharedSlotToVertexAssignment
-                                        .computeIfAbsent(subtaskIdx, ignored 
-> new HashSet<>())
-                                        .add(new 
ExecutionVertexID(jobVertexId, subtaskIdx));
-                            }
-                        });
-        return sharedSlotToVertexAssignment.values().stream()
-                .map(ExecutionSlotSharingGroup::new)
-                .collect(Collectors.toList());
+    @VisibleForTesting
+    List<TaskManagerLocation> getSortedTaskExecutors(
+            Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>> 
slotsPerTaskExecutor) {
+        final Comparator<TaskManagerLocation> taskExecutorComparator =
+                Comparator.comparingInt(tml -> 
slotsPerTaskExecutor.get(tml).size());

Review Comment:
   It's better to add some comments to explain why sort all 
`TaskManagerLocation` based on the free slot number. (Why use the TM with fewer 
slots first?)



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAssigner.java:
##########
@@ -32,4 +46,77 @@ Collection<SlotAssignment> assignSlots(
             Collection<? extends SlotInfo> freeSlots,
             VertexParallelism vertexParallelism,
             JobAllocationsInformation previousAllocations);
+
+    /**
+     * Select the target slots to assign with the requested groups.
+     *
+     * @param slots the raw slots to filter.
+     * @param slotsByTaskExecutor slots per task executor.
+     * @param requestedGroups the number of the request execution slot sharing 
groups.
+     * @return the target slots that are distributed on the minimal task 
executors.
+     */
+    default Collection<? extends SlotInfo> selectSlotsInMinimalTaskExecutors(
+            Collection<? extends SlotInfo> slots,
+            Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>> 
slotsByTaskExecutor,
+            int requestedGroups,
+            List<TaskManagerLocation> sortedTaskExecutors) {
+        if (slots.size() - requestedGroups <= 0) {
+            return slots;
+        }
+
+        int requestedSlots = requestedGroups;
+        final List<SlotInfo> result = new ArrayList<>();
+        for (TaskManagerLocation tml : sortedTaskExecutors) {
+            if (requestedSlots <= 0) {
+                break;
+            }
+            final Set<? extends SlotInfo> slotInfos = 
slotsByTaskExecutor.get(tml);
+            requestedSlots -= slotInfos.size();
+            result.addAll(slotInfos);
+        }
+        return result;
+    }
+
+    /**
+     * Get the task executors with the order that aims to priority assigning 
requested groups on it.
+     *
+     * @param taskManagerLocations task executors to sort.
+     * @param taskExecutorComparator the comparator to compare the target task 
executors.
+     * @return The sorted task executors list with the specified order by the 
comparator.
+     */
+    static List<TaskManagerLocation> sortTaskExecutors(
+            Collection<TaskManagerLocation> taskManagerLocations,
+            Comparator<TaskManagerLocation> taskExecutorComparator) {
+        return taskManagerLocations.stream()
+                .sorted(taskExecutorComparator)
+                .collect(Collectors.toList());
+    }

Review Comment:
   nit: it's better to return a `Iterator<TaskManagerLocation>` here.
   
   it's safer, and all callers only iterate the `sortedTaskExecutors`. 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java:
##########
@@ -111,8 +117,18 @@ public Collection<SlotAssignment> assignSlots(
 
         final Map<String, ExecutionSlotSharingGroup> groupsById =
                 
allGroups.stream().collect(toMap(ExecutionSlotSharingGroup::getId, identity()));
+
+        final Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>> 
slotsPerTaskExecutor =
+                getSlotsPerTaskExecutor(freeSlots);
+        final Collection<? extends SlotInfo> slotInfos =
+                selectSlotsInMinimalTaskExecutors(

Review Comment:
   nit: How about rename the method name from select to pick?  And rename the 
`slotInfos` to `pickedSlots`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java:
##########
@@ -139,6 +155,29 @@ public Collection<SlotAssignment> assignSlots(
         return assignments;
     }
 
+    @VisibleForTesting
+    List<TaskManagerLocation> getSortedTaskExecutors(
+            Collection<? extends SlotInfo> freeSlots,
+            Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>> 
slotsPerTaskExecutor,
+            PriorityQueue<AllocationScore> scores) {
+        final Map<TaskManagerLocation, Long> scorePerTaskExecutor =
+                getScorePerTaskExecutor(freeSlots, slotsPerTaskExecutor, 
scores);
+        final Comparator<TaskManagerLocation> taskExecutorComparator =
+                (left, right) -> {
+                    int diff =
+                            Integer.compare(
+                                    slotsPerTaskExecutor.get(left).size(),
+                                    slotsPerTaskExecutor.get(right).size());
+                    return diff != 0
+                            ? diff
+                            : Long.compare(
+                                    scorePerTaskExecutor.getOrDefault(right, 
0L),
+                                    scorePerTaskExecutor.getOrDefault(left, 
0L));
+                };

Review Comment:
   It's better to add some comments to explain the rule as well.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAssigner.java:
##########
@@ -32,4 +46,77 @@ Collection<SlotAssignment> assignSlots(
             Collection<? extends SlotInfo> freeSlots,
             VertexParallelism vertexParallelism,
             JobAllocationsInformation previousAllocations);
+
+    /**
+     * Select the target slots to assign with the requested groups.
+     *
+     * @param slots the raw slots to filter.
+     * @param slotsByTaskExecutor slots per task executor.
+     * @param requestedGroups the number of the request execution slot sharing 
groups.
+     * @return the target slots that are distributed on the minimal task 
executors.
+     */
+    default Collection<? extends SlotInfo> selectSlotsInMinimalTaskExecutors(
+            Collection<? extends SlotInfo> slots,
+            Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>> 
slotsByTaskExecutor,
+            int requestedGroups,
+            List<TaskManagerLocation> sortedTaskExecutors) {
+        if (slots.size() - requestedGroups <= 0) {
+            return slots;
+        }
+
+        int requestedSlots = requestedGroups;
+        final List<SlotInfo> result = new ArrayList<>();
+        for (TaskManagerLocation tml : sortedTaskExecutors) {
+            if (requestedSlots <= 0) {
+                break;
+            }
+            final Set<? extends SlotInfo> slotInfos = 
slotsByTaskExecutor.get(tml);
+            requestedSlots -= slotInfos.size();
+            result.addAll(slotInfos);
+        }
+        return result;
+    }
+
+    /**
+     * Get the task executors with the order that aims to priority assigning 
requested groups on it.
+     *
+     * @param taskManagerLocations task executors to sort.
+     * @param taskExecutorComparator the comparator to compare the target task 
executors.
+     * @return The sorted task executors list with the specified order by the 
comparator.
+     */
+    static List<TaskManagerLocation> sortTaskExecutors(
+            Collection<TaskManagerLocation> taskManagerLocations,
+            Comparator<TaskManagerLocation> taskExecutorComparator) {
+        return taskManagerLocations.stream()
+                .sorted(taskExecutorComparator)
+                .collect(Collectors.toList());
+    }
+
+    static Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>> 
getSlotsPerTaskExecutor(
+            Collection<? extends SlotInfo> slots) {
+        return slots.stream()
+                .collect(
+                        Collectors.groupingBy(
+                                SlotInfo::getTaskManagerLocation,
+                                Collectors.mapping(identity(), 
Collectors.toSet())));
+    }
+
+    static List<ExecutionSlotSharingGroup> createExecutionSlotSharingGroups(

Review Comment:
   nit: this method is a refactor. It's better to migrate this change into a 
separate refactor commit



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAssigner.java:
##########
@@ -32,4 +46,77 @@ Collection<SlotAssignment> assignSlots(
             Collection<? extends SlotInfo> freeSlots,
             VertexParallelism vertexParallelism,
             JobAllocationsInformation previousAllocations);
+
+    /**
+     * Select the target slots to assign with the requested groups.
+     *
+     * @param slots the raw slots to filter.
+     * @param slotsByTaskExecutor slots per task executor.
+     * @param requestedGroups the number of the request execution slot sharing 
groups.
+     * @return the target slots that are distributed on the minimal task 
executors.
+     */
+    default Collection<? extends SlotInfo> selectSlotsInMinimalTaskExecutors(
+            Collection<? extends SlotInfo> slots,
+            Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>> 
slotsByTaskExecutor,
+            int requestedGroups,
+            List<TaskManagerLocation> sortedTaskExecutors) {
+        if (slots.size() - requestedGroups <= 0) {
+            return slots;
+        }
+
+        int requestedSlots = requestedGroups;
+        final List<SlotInfo> result = new ArrayList<>();
+        for (TaskManagerLocation tml : sortedTaskExecutors) {
+            if (requestedSlots <= 0) {
+                break;
+            }
+            final Set<? extends SlotInfo> slotInfos = 
slotsByTaskExecutor.get(tml);
+            requestedSlots -= slotInfos.size();
+            result.addAll(slotInfos);
+        }
+        return result;

Review Comment:
   After refactor `sortedTaskExecutors` to ` Iterator<TaskManagerLocation>`, 
this logic can be simplified to the following code.
   
   ```suggestion
           List<SlotInfo> pickedSlots = new ArrayList<>();
           while (pickedSlots.size() < requestedGroups) {
               Set<? extends SlotInfo> slotInfos = 
slotsByTaskExecutor.get(sortedTaskExecutors.next());
               pickedSlots.addAll(slotInfos);
           }
           return pickedSlots;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to