XComp commented on code in PR #25218:
URL: https://github.com/apache/flink/pull/25218#discussion_r1770953081


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java:
##########
@@ -43,35 +47,43 @@ public Collection<SlotAssignment> assignSlots(
             Collection<? extends SlotInfo> freeSlots,
             VertexParallelism vertexParallelism,
             JobAllocationsInformation previousAllocations) {
-        List<ExecutionSlotSharingGroup> allGroups = new ArrayList<>();
+        checkSlotsSufficient(jobInformation, freeSlots);
+
+        final List<ExecutionSlotSharingGroup> allGroups = new ArrayList<>();

Review Comment:
   ```suggestion
           final List<ExecutionSlotSharingGroup> allExecutionSlotSharingGroups 
= new ArrayList<>();
   ```
   nit: I know it's not part of your change, but can we make this variable more 
explicit? "groups" is overloaded in this context (because we use 
`SlotSharingGroup` and `ExecutionSlotSharingGroup` in this context).



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAssigner.java:
##########
@@ -32,4 +48,74 @@ Collection<SlotAssignment> assignSlots(
             Collection<? extends SlotInfo> freeSlots,
             VertexParallelism vertexParallelism,
             JobAllocationsInformation previousAllocations);
+
+    /**
+     * Pick the target slots to assign with the requested groups.
+     *
+     * @param slotsByTaskExecutor slots per task executor.
+     * @param requestedGroups the number of the request execution slot sharing 
groups.
+     * @return the target slots that are distributed on the minimal task 
executors.
+     */
+    default Collection<? extends SlotInfo> pickSlotsInMinimalTaskExecutors(
+            Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>> 
slotsByTaskExecutor,
+            int requestedGroups,
+            Iterator<TaskManagerLocation> sortedTaskExecutors) {
+        final List<SlotInfo> pickedSlots = new ArrayList<>();
+        while (pickedSlots.size() < requestedGroups) {
+            Set<? extends SlotInfo> slotInfos = 
slotsByTaskExecutor.get(sortedTaskExecutors.next());
+            pickedSlots.addAll(slotInfos);
+        }
+        return pickedSlots;
+    }
+
+    /**
+     * Sort the task executors with the order that aims to priority assigning 
requested groups on
+     * it.
+     *
+     * @param taskManagerLocations task executors to sort.
+     * @param taskExecutorComparator the comparator to compare the target task 
executors.
+     * @return The sorted task executors list with the specified order by the 
comparator.
+     */
+    static Iterator<TaskManagerLocation> sortTaskExecutors(
+            Collection<TaskManagerLocation> taskManagerLocations,
+            Comparator<TaskManagerLocation> taskExecutorComparator) {
+        return 
taskManagerLocations.stream().sorted(taskExecutorComparator).iterator();
+    }
+
+    static Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>> 
getSlotsPerTaskExecutor(
+            Collection<? extends SlotInfo> slots) {
+        return slots.stream()
+                .collect(
+                        Collectors.groupingBy(
+                                SlotInfo::getTaskManagerLocation,
+                                Collectors.mapping(identity(), 
Collectors.toSet())));
+    }
+
+    static List<ExecutionSlotSharingGroup> createExecutionSlotSharingGroups(
+            VertexParallelism vertexParallelism, SlotSharingGroup 
slotSharingGroup) {
+        final Map<Integer, Set<ExecutionVertexID>> 
sharedSlotToVertexAssignment = new HashMap<>();
+        slotSharingGroup
+                .getJobVertexIds()
+                .forEach(
+                        jobVertexId -> {
+                            int parallelism = 
vertexParallelism.getParallelism(jobVertexId);
+                            for (int subtaskIdx = 0; subtaskIdx < parallelism; 
subtaskIdx++) {
+                                sharedSlotToVertexAssignment
+                                        .computeIfAbsent(subtaskIdx, ignored 
-> new HashSet<>())
+                                        .add(new 
ExecutionVertexID(jobVertexId, subtaskIdx));
+                            }
+                        });
+        return sharedSlotToVertexAssignment.values().stream()
+                .map(SlotSharingSlotAllocator.ExecutionSlotSharingGroup::new)
+                .collect(Collectors.toList());
+    }
+
+    static void checkSlotsSufficient(
+            JobInformation jobInformation, Collection<? extends SlotInfo> 
freeSlots) {
+        checkState(
+                freeSlots.size() >= 
jobInformation.getSlotSharingGroups().size(),

Review Comment:
   I understand that you're just using what's already in the code here. But I'm 
wondering whether we're using the correct state check here. I mean, it's not 
wrong but shouldn't we be even stricter and check for 
`ExecutionSlotSharingGroup` count? It's essentially relating to what we do in 
the 
[SlotSharingSlotAllocator:103](https://github.com/apache/flink/blob/57bc16948bef75cf0fb483efb8bb9959d72cf513/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java#L103)
 logic.
   
   We need to have enough slots to cover the upper maximum bound of a 
`SlotSharingGroup` (i.e. the highest parallelism of all the job vertices in a 
`SlotSharingGroup`). 
   
   Maybe, it make sense to create a utility method for this check and use it 
here and in the SlotAssigner implementations instead of the currently used 
`SlotAssigner.checkSlotsSufficient` method. That way we would connect the two 
code locations programmatically. WDYT?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java:
##########
@@ -139,6 +154,36 @@ public Collection<SlotAssignment> assignSlots(
         return assignments;
     }
 
+    /**
+     * The sorting principle and strategy here are very similar to {@link

Review Comment:
   I'm not sure whether we want to prioritize local state over slot alignment 
only if the number of utilized task slots are equal. This slot assignment 
strategy is used if we want to minimize moving around state. What if we have a 
TM that has less slots but a bigger amount of state? In that case, the slot 
count would have higher priority than the state size. 🤔 
   
   I think we might want to skip task slot prioritization in this 
implementation. WDYT?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAssigner.java:
##########
@@ -32,4 +46,77 @@ Collection<SlotAssignment> assignSlots(
             Collection<? extends SlotInfo> freeSlots,
             VertexParallelism vertexParallelism,
             JobAllocationsInformation previousAllocations);
+
+    /**
+     * Select the target slots to assign with the requested groups.
+     *
+     * @param slots the raw slots to filter.
+     * @param slotsByTaskExecutor slots per task executor.
+     * @param requestedGroups the number of the request execution slot sharing 
groups.
+     * @return the target slots that are distributed on the minimal task 
executors.
+     */
+    default Collection<? extends SlotInfo> selectSlotsInMinimalTaskExecutors(
+            Collection<? extends SlotInfo> slots,
+            Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>> 
slotsByTaskExecutor,
+            int requestedGroups,
+            List<TaskManagerLocation> sortedTaskExecutors) {
+        if (slots.size() - requestedGroups <= 0) {
+            return slots;
+        }
+
+        int requestedSlots = requestedGroups;
+        final List<SlotInfo> result = new ArrayList<>();
+        for (TaskManagerLocation tml : sortedTaskExecutors) {
+            if (requestedSlots <= 0) {
+                break;
+            }
+            final Set<? extends SlotInfo> slotInfos = 
slotsByTaskExecutor.get(tml);
+            requestedSlots -= slotInfos.size();
+            result.addAll(slotInfos);
+        }
+        return result;
+    }
+
+    /**
+     * Get the task executors with the order that aims to priority assigning 
requested groups on it.
+     *
+     * @param taskManagerLocations task executors to sort.
+     * @param taskExecutorComparator the comparator to compare the target task 
executors.
+     * @return The sorted task executors list with the specified order by the 
comparator.
+     */
+    static List<TaskManagerLocation> sortTaskExecutors(
+            Collection<TaskManagerLocation> taskManagerLocations,
+            Comparator<TaskManagerLocation> taskExecutorComparator) {
+        return taskManagerLocations.stream()
+                .sorted(taskExecutorComparator)
+                .collect(Collectors.toList());
+    }
+
+    static Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>> 
getSlotsPerTaskExecutor(
+            Collection<? extends SlotInfo> slots) {
+        return slots.stream()
+                .collect(
+                        Collectors.groupingBy(
+                                SlotInfo::getTaskManagerLocation,
+                                Collectors.mapping(identity(), 
Collectors.toSet())));
+    }
+
+    static List<ExecutionSlotSharingGroup> createExecutionSlotSharingGroups(

Review Comment:
   nit: I'm wondering whether we should move all these methods into a 
`SlotAssignerUtils` class rather than using the interface here. WDYT?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java:
##########
@@ -43,35 +47,43 @@ public Collection<SlotAssignment> assignSlots(
             Collection<? extends SlotInfo> freeSlots,
             VertexParallelism vertexParallelism,
             JobAllocationsInformation previousAllocations) {
-        List<ExecutionSlotSharingGroup> allGroups = new ArrayList<>();
+        checkSlotsSufficient(jobInformation, freeSlots);
+
+        final List<ExecutionSlotSharingGroup> allGroups = new ArrayList<>();
         for (SlotSharingGroup slotSharingGroup : 
jobInformation.getSlotSharingGroups()) {
             
allGroups.addAll(createExecutionSlotSharingGroups(vertexParallelism, 
slotSharingGroup));
         }
 
-        Iterator<? extends SlotInfo> iterator = freeSlots.iterator();
+        Collection<? extends SlotInfo> pickedSlots = freeSlots;
+        if (freeSlots.size() > allGroups.size()) {

Review Comment:
   On the other hand, is it worth to save the sorting efforts if 
`freeSlots.size() == allGroups.size()`? 🤔 I guess it would make sense for big 
amounts of TaskManagers. I don't know the limits we have on that front in Flink 
at the moment. 🤔 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java:
##########
@@ -43,35 +47,43 @@ public Collection<SlotAssignment> assignSlots(
             Collection<? extends SlotInfo> freeSlots,
             VertexParallelism vertexParallelism,
             JobAllocationsInformation previousAllocations) {
-        List<ExecutionSlotSharingGroup> allGroups = new ArrayList<>();
+        checkSlotsSufficient(jobInformation, freeSlots);
+
+        final List<ExecutionSlotSharingGroup> allGroups = new ArrayList<>();
         for (SlotSharingGroup slotSharingGroup : 
jobInformation.getSlotSharingGroups()) {
             
allGroups.addAll(createExecutionSlotSharingGroups(vertexParallelism, 
slotSharingGroup));
         }
 
-        Iterator<? extends SlotInfo> iterator = freeSlots.iterator();
+        Collection<? extends SlotInfo> pickedSlots = freeSlots;
+        if (freeSlots.size() > allGroups.size()) {

Review Comment:
   Why do we need this condition? To avoid the extra effort of sorting the TMs 
based on their offered slot count? If yes, could we add this as a comment to 
underline the intention?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java:
##########
@@ -18,21 +18,25 @@
 
 package org.apache.flink.runtime.scheduler.adaptive.allocator;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.jobmaster.SlotInfo;
 import 
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
 import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
-import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
+import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.stream.Collectors;
+
+import static 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotAssigner.checkSlotsSufficient;
+import static 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotAssigner.createExecutionSlotSharingGroups;
+import static 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotAssigner.getSlotsPerTaskExecutor;
+import static 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotAssigner.sortTaskExecutors;
 
 /** Simple {@link SlotAssigner} that treats all slots and slot sharing groups 
equally. */

Review Comment:
   This JavaDoc is not correct anymore.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to