RocMarshal commented on code in PR #25218:
URL: https://github.com/apache/flink/pull/25218#discussion_r1765150610
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAssigner.java:
##########
@@ -32,4 +46,77 @@ Collection<SlotAssignment> assignSlots(
Collection<? extends SlotInfo> freeSlots,
VertexParallelism vertexParallelism,
JobAllocationsInformation previousAllocations);
+
+ /**
+ * Select the target slots to assign with the requested groups.
+ *
+ * @param slots the raw slots to filter.
+ * @param slotsByTaskExecutor slots per task executor.
+ * @param requestedGroups the number of the request execution slot sharing
groups.
+ * @return the target slots that are distributed on the minimal task
executors.
+ */
+ default Collection<? extends SlotInfo> selectSlotsInMinimalTaskExecutors(
+ Collection<? extends SlotInfo> slots,
+ Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>>
slotsByTaskExecutor,
+ int requestedGroups,
+ List<TaskManagerLocation> sortedTaskExecutors) {
+ if (slots.size() - requestedGroups <= 0) {
+ return slots;
+ }
+
+ int requestedSlots = requestedGroups;
+ final List<SlotInfo> result = new ArrayList<>();
+ for (TaskManagerLocation tml : sortedTaskExecutors) {
+ if (requestedSlots <= 0) {
+ break;
+ }
+ final Set<? extends SlotInfo> slotInfos =
slotsByTaskExecutor.get(tml);
+ requestedSlots -= slotInfos.size();
+ result.addAll(slotInfos);
+ }
+ return result;
+ }
+
+ /**
+ * Get the task executors with the order that aims to priority assigning
requested groups on it.
+ *
+ * @param taskManagerLocations task executors to sort.
+ * @param taskExecutorComparator the comparator to compare the target task
executors.
+ * @return The sorted task executors list with the specified order by the
comparator.
+ */
+ static List<TaskManagerLocation> sortTaskExecutors(
+ Collection<TaskManagerLocation> taskManagerLocations,
+ Comparator<TaskManagerLocation> taskExecutorComparator) {
+ return taskManagerLocations.stream()
+ .sorted(taskExecutorComparator)
+ .collect(Collectors.toList());
+ }
Review Comment:
precise & concise !!!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]