RocMarshal commented on code in PR #25218:
URL: https://github.com/apache/flink/pull/25218#discussion_r1774238831


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAssigner.java:
##########
@@ -32,4 +48,74 @@ Collection<SlotAssignment> assignSlots(
             Collection<? extends SlotInfo> freeSlots,
             VertexParallelism vertexParallelism,
             JobAllocationsInformation previousAllocations);
+
+    /**
+     * Pick the target slots to assign with the requested groups.
+     *
+     * @param slotsByTaskExecutor slots per task executor.
+     * @param requestedGroups the number of the request execution slot sharing 
groups.
+     * @return the target slots that are distributed on the minimal task 
executors.
+     */
+    default Collection<? extends SlotInfo> pickSlotsInMinimalTaskExecutors(
+            Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>> 
slotsByTaskExecutor,
+            int requestedGroups,
+            Iterator<TaskManagerLocation> sortedTaskExecutors) {
+        final List<SlotInfo> pickedSlots = new ArrayList<>();
+        while (pickedSlots.size() < requestedGroups) {
+            Set<? extends SlotInfo> slotInfos = 
slotsByTaskExecutor.get(sortedTaskExecutors.next());
+            pickedSlots.addAll(slotInfos);
+        }
+        return pickedSlots;
+    }
+
+    /**
+     * Sort the task executors with the order that aims to priority assigning 
requested groups on
+     * it.
+     *
+     * @param taskManagerLocations task executors to sort.
+     * @param taskExecutorComparator the comparator to compare the target task 
executors.
+     * @return The sorted task executors list with the specified order by the 
comparator.
+     */
+    static Iterator<TaskManagerLocation> sortTaskExecutors(
+            Collection<TaskManagerLocation> taskManagerLocations,
+            Comparator<TaskManagerLocation> taskExecutorComparator) {
+        return 
taskManagerLocations.stream().sorted(taskExecutorComparator).iterator();
+    }
+
+    static Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>> 
getSlotsPerTaskExecutor(
+            Collection<? extends SlotInfo> slots) {
+        return slots.stream()
+                .collect(
+                        Collectors.groupingBy(
+                                SlotInfo::getTaskManagerLocation,
+                                Collectors.mapping(identity(), 
Collectors.toSet())));
+    }
+
+    static List<ExecutionSlotSharingGroup> createExecutionSlotSharingGroups(
+            VertexParallelism vertexParallelism, SlotSharingGroup 
slotSharingGroup) {
+        final Map<Integer, Set<ExecutionVertexID>> 
sharedSlotToVertexAssignment = new HashMap<>();
+        slotSharingGroup
+                .getJobVertexIds()
+                .forEach(
+                        jobVertexId -> {
+                            int parallelism = 
vertexParallelism.getParallelism(jobVertexId);
+                            for (int subtaskIdx = 0; subtaskIdx < parallelism; 
subtaskIdx++) {
+                                sharedSlotToVertexAssignment
+                                        .computeIfAbsent(subtaskIdx, ignored 
-> new HashSet<>())
+                                        .add(new 
ExecutionVertexID(jobVertexId, subtaskIdx));
+                            }
+                        });
+        return sharedSlotToVertexAssignment.values().stream()
+                .map(SlotSharingSlotAllocator.ExecutionSlotSharingGroup::new)
+                .collect(Collectors.toList());
+    }
+
+    static void checkSlotsSufficient(
+            JobInformation jobInformation, Collection<? extends SlotInfo> 
freeSlots) {
+        checkState(
+                freeSlots.size() >= 
jobInformation.getSlotSharingGroups().size(),

Review Comment:
   thanks for the comment. 
   It makes sense to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to