RocMarshal commented on code in PR #25218:
URL: https://github.com/apache/flink/pull/25218#discussion_r1774238831
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAssigner.java:
##########
@@ -32,4 +48,74 @@ Collection<SlotAssignment> assignSlots(
Collection<? extends SlotInfo> freeSlots,
VertexParallelism vertexParallelism,
JobAllocationsInformation previousAllocations);
+
+ /**
+ * Pick the target slots to assign with the requested groups.
+ *
+ * @param slotsByTaskExecutor slots per task executor.
+ * @param requestedGroups the number of the request execution slot sharing
groups.
+ * @return the target slots that are distributed on the minimal task
executors.
+ */
+ default Collection<? extends SlotInfo> pickSlotsInMinimalTaskExecutors(
+ Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>>
slotsByTaskExecutor,
+ int requestedGroups,
+ Iterator<TaskManagerLocation> sortedTaskExecutors) {
+ final List<SlotInfo> pickedSlots = new ArrayList<>();
+ while (pickedSlots.size() < requestedGroups) {
+ Set<? extends SlotInfo> slotInfos =
slotsByTaskExecutor.get(sortedTaskExecutors.next());
+ pickedSlots.addAll(slotInfos);
+ }
+ return pickedSlots;
+ }
+
+ /**
+ * Sort the task executors with the order that aims to priority assigning
requested groups on
+ * it.
+ *
+ * @param taskManagerLocations task executors to sort.
+ * @param taskExecutorComparator the comparator to compare the target task
executors.
+ * @return The sorted task executors list with the specified order by the
comparator.
+ */
+ static Iterator<TaskManagerLocation> sortTaskExecutors(
+ Collection<TaskManagerLocation> taskManagerLocations,
+ Comparator<TaskManagerLocation> taskExecutorComparator) {
+ return
taskManagerLocations.stream().sorted(taskExecutorComparator).iterator();
+ }
+
+ static Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>>
getSlotsPerTaskExecutor(
+ Collection<? extends SlotInfo> slots) {
+ return slots.stream()
+ .collect(
+ Collectors.groupingBy(
+ SlotInfo::getTaskManagerLocation,
+ Collectors.mapping(identity(),
Collectors.toSet())));
+ }
+
+ static List<ExecutionSlotSharingGroup> createExecutionSlotSharingGroups(
+ VertexParallelism vertexParallelism, SlotSharingGroup
slotSharingGroup) {
+ final Map<Integer, Set<ExecutionVertexID>>
sharedSlotToVertexAssignment = new HashMap<>();
+ slotSharingGroup
+ .getJobVertexIds()
+ .forEach(
+ jobVertexId -> {
+ int parallelism =
vertexParallelism.getParallelism(jobVertexId);
+ for (int subtaskIdx = 0; subtaskIdx < parallelism;
subtaskIdx++) {
+ sharedSlotToVertexAssignment
+ .computeIfAbsent(subtaskIdx, ignored
-> new HashSet<>())
+ .add(new
ExecutionVertexID(jobVertexId, subtaskIdx));
+ }
+ });
+ return sharedSlotToVertexAssignment.values().stream()
+ .map(SlotSharingSlotAllocator.ExecutionSlotSharingGroup::new)
+ .collect(Collectors.toList());
+ }
+
+ static void checkSlotsSufficient(
+ JobInformation jobInformation, Collection<? extends SlotInfo>
freeSlots) {
+ checkState(
+ freeSlots.size() >=
jobInformation.getSlotSharingGroups().size(),
Review Comment:
thanks for the comment.
It makes sense to me.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]