XComp commented on code in PR #25218:
URL: https://github.com/apache/flink/pull/25218#discussion_r1770926488
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAssigner.java:
##########
@@ -32,4 +46,77 @@ Collection<SlotAssignment> assignSlots(
Collection<? extends SlotInfo> freeSlots,
VertexParallelism vertexParallelism,
JobAllocationsInformation previousAllocations);
+
+ /**
+ * Select the target slots to assign with the requested groups.
+ *
+ * @param slots the raw slots to filter.
+ * @param slotsByTaskExecutor slots per task executor.
+ * @param requestedGroups the number of the request execution slot sharing
groups.
+ * @return the target slots that are distributed on the minimal task
executors.
+ */
+ default Collection<? extends SlotInfo> selectSlotsInMinimalTaskExecutors(
+ Collection<? extends SlotInfo> slots,
+ Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>>
slotsByTaskExecutor,
+ int requestedGroups,
+ List<TaskManagerLocation> sortedTaskExecutors) {
+ if (slots.size() - requestedGroups <= 0) {
+ return slots;
+ }
+
+ int requestedSlots = requestedGroups;
+ final List<SlotInfo> result = new ArrayList<>();
+ for (TaskManagerLocation tml : sortedTaskExecutors) {
+ if (requestedSlots <= 0) {
+ break;
+ }
+ final Set<? extends SlotInfo> slotInfos =
slotsByTaskExecutor.get(tml);
+ requestedSlots -= slotInfos.size();
+ result.addAll(slotInfos);
+ }
+ return result;
+ }
+
+ /**
+ * Get the task executors with the order that aims to priority assigning
requested groups on it.
+ *
+ * @param taskManagerLocations task executors to sort.
+ * @param taskExecutorComparator the comparator to compare the target task
executors.
+ * @return The sorted task executors list with the specified order by the
comparator.
+ */
+ static List<TaskManagerLocation> sortTaskExecutors(
+ Collection<TaskManagerLocation> taskManagerLocations,
+ Comparator<TaskManagerLocation> taskExecutorComparator) {
+ return taskManagerLocations.stream()
+ .sorted(taskExecutorComparator)
+ .collect(Collectors.toList());
+ }
+
+ static Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>>
getSlotsPerTaskExecutor(
+ Collection<? extends SlotInfo> slots) {
+ return slots.stream()
+ .collect(
+ Collectors.groupingBy(
+ SlotInfo::getTaskManagerLocation,
+ Collectors.mapping(identity(),
Collectors.toSet())));
+ }
+
+ static List<ExecutionSlotSharingGroup> createExecutionSlotSharingGroups(
Review Comment:
nit: I'm wondering whether we should move all these methods into a
`SlotAssignerUtils` class rather than using the interface here. WDYT?
Static methods of an interface are always `public`. This exposes certain
resources (e.g. `ExecutionSlotSharingGroup`) which are otherwise
package-private.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]