XComp commented on code in PR #25218:
URL: https://github.com/apache/flink/pull/25218#discussion_r1770953081
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java:
##########
@@ -43,35 +47,43 @@ public Collection<SlotAssignment> assignSlots(
Collection<? extends SlotInfo> freeSlots,
VertexParallelism vertexParallelism,
JobAllocationsInformation previousAllocations) {
- List<ExecutionSlotSharingGroup> allGroups = new ArrayList<>();
+ checkSlotsSufficient(jobInformation, freeSlots);
+
+ final List<ExecutionSlotSharingGroup> allGroups = new ArrayList<>();
Review Comment:
```suggestion
final List<ExecutionSlotSharingGroup> allExecutionSlotSharingGroups
= new ArrayList<>();
```
nit: I know it's not part of your change, but can we make this variable more
explicit? "groups" is overloaded in this context (because we use
`SlotSharingGroup` and `ExecutionSlotSharingGroup` in this context).
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAssigner.java:
##########
@@ -32,4 +48,74 @@ Collection<SlotAssignment> assignSlots(
Collection<? extends SlotInfo> freeSlots,
VertexParallelism vertexParallelism,
JobAllocationsInformation previousAllocations);
+
+ /**
+ * Pick the target slots to assign with the requested groups.
+ *
+ * @param slotsByTaskExecutor slots per task executor.
+ * @param requestedGroups the number of the request execution slot sharing
groups.
+ * @return the target slots that are distributed on the minimal task
executors.
+ */
+ default Collection<? extends SlotInfo> pickSlotsInMinimalTaskExecutors(
+ Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>>
slotsByTaskExecutor,
+ int requestedGroups,
+ Iterator<TaskManagerLocation> sortedTaskExecutors) {
+ final List<SlotInfo> pickedSlots = new ArrayList<>();
+ while (pickedSlots.size() < requestedGroups) {
+ Set<? extends SlotInfo> slotInfos =
slotsByTaskExecutor.get(sortedTaskExecutors.next());
+ pickedSlots.addAll(slotInfos);
+ }
+ return pickedSlots;
+ }
+
+ /**
+ * Sort the task executors with the order that aims to priority assigning
requested groups on
+ * it.
+ *
+ * @param taskManagerLocations task executors to sort.
+ * @param taskExecutorComparator the comparator to compare the target task
executors.
+ * @return The sorted task executors list with the specified order by the
comparator.
+ */
+ static Iterator<TaskManagerLocation> sortTaskExecutors(
+ Collection<TaskManagerLocation> taskManagerLocations,
+ Comparator<TaskManagerLocation> taskExecutorComparator) {
+ return
taskManagerLocations.stream().sorted(taskExecutorComparator).iterator();
+ }
+
+ static Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>>
getSlotsPerTaskExecutor(
+ Collection<? extends SlotInfo> slots) {
+ return slots.stream()
+ .collect(
+ Collectors.groupingBy(
+ SlotInfo::getTaskManagerLocation,
+ Collectors.mapping(identity(),
Collectors.toSet())));
+ }
+
+ static List<ExecutionSlotSharingGroup> createExecutionSlotSharingGroups(
+ VertexParallelism vertexParallelism, SlotSharingGroup
slotSharingGroup) {
+ final Map<Integer, Set<ExecutionVertexID>>
sharedSlotToVertexAssignment = new HashMap<>();
+ slotSharingGroup
+ .getJobVertexIds()
+ .forEach(
+ jobVertexId -> {
+ int parallelism =
vertexParallelism.getParallelism(jobVertexId);
+ for (int subtaskIdx = 0; subtaskIdx < parallelism;
subtaskIdx++) {
+ sharedSlotToVertexAssignment
+ .computeIfAbsent(subtaskIdx, ignored
-> new HashSet<>())
+ .add(new
ExecutionVertexID(jobVertexId, subtaskIdx));
+ }
+ });
+ return sharedSlotToVertexAssignment.values().stream()
+ .map(SlotSharingSlotAllocator.ExecutionSlotSharingGroup::new)
+ .collect(Collectors.toList());
+ }
+
+ static void checkSlotsSufficient(
+ JobInformation jobInformation, Collection<? extends SlotInfo>
freeSlots) {
+ checkState(
+ freeSlots.size() >=
jobInformation.getSlotSharingGroups().size(),
Review Comment:
I understand that you're just using what's already in the code here. But I'm
wondering whether we're using the correct state check here. I mean, it's not
wrong but shouldn't we be even stricter and check for
`ExecutionSlotSharingGroup` count? It's essentially relating to what we do in
the
[SlotSharingSlotAllocator:103](https://github.com/apache/flink/blob/57bc16948bef75cf0fb483efb8bb9959d72cf513/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java#L103)
logic.
We need to have enough slots to cover the upper maximum bound of a
`SlotSharingGroup` (i.e. the highest parallelism of all the job vertices in a
`SlotSharingGroup`).
Maybe, it make sense to create a utility method for this check and use it
here and in the SlotAssigner implementations instead of the currently used
`SlotAssigner.checkSlotsSufficient` method. That way we would connect the two
code locations programmatically. WDYT?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java:
##########
@@ -139,6 +154,36 @@ public Collection<SlotAssignment> assignSlots(
return assignments;
}
+ /**
+ * The sorting principle and strategy here are very similar to {@link
Review Comment:
I'm not sure whether we want to prioritize local state over slot alignment
only if the number of utilized task slots are equal. This slot assignment
strategy is used if we want to minimize moving around state. What if we have a
TM that has less slots but a bigger amount of state? In that case, the slot
count would have higher priority than the state size. 🤔
I think we might want to skip task slot prioritization in this
implementation. WDYT?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAssigner.java:
##########
@@ -32,4 +46,77 @@ Collection<SlotAssignment> assignSlots(
Collection<? extends SlotInfo> freeSlots,
VertexParallelism vertexParallelism,
JobAllocationsInformation previousAllocations);
+
+ /**
+ * Select the target slots to assign with the requested groups.
+ *
+ * @param slots the raw slots to filter.
+ * @param slotsByTaskExecutor slots per task executor.
+ * @param requestedGroups the number of the request execution slot sharing
groups.
+ * @return the target slots that are distributed on the minimal task
executors.
+ */
+ default Collection<? extends SlotInfo> selectSlotsInMinimalTaskExecutors(
+ Collection<? extends SlotInfo> slots,
+ Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>>
slotsByTaskExecutor,
+ int requestedGroups,
+ List<TaskManagerLocation> sortedTaskExecutors) {
+ if (slots.size() - requestedGroups <= 0) {
+ return slots;
+ }
+
+ int requestedSlots = requestedGroups;
+ final List<SlotInfo> result = new ArrayList<>();
+ for (TaskManagerLocation tml : sortedTaskExecutors) {
+ if (requestedSlots <= 0) {
+ break;
+ }
+ final Set<? extends SlotInfo> slotInfos =
slotsByTaskExecutor.get(tml);
+ requestedSlots -= slotInfos.size();
+ result.addAll(slotInfos);
+ }
+ return result;
+ }
+
+ /**
+ * Get the task executors with the order that aims to priority assigning
requested groups on it.
+ *
+ * @param taskManagerLocations task executors to sort.
+ * @param taskExecutorComparator the comparator to compare the target task
executors.
+ * @return The sorted task executors list with the specified order by the
comparator.
+ */
+ static List<TaskManagerLocation> sortTaskExecutors(
+ Collection<TaskManagerLocation> taskManagerLocations,
+ Comparator<TaskManagerLocation> taskExecutorComparator) {
+ return taskManagerLocations.stream()
+ .sorted(taskExecutorComparator)
+ .collect(Collectors.toList());
+ }
+
+ static Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>>
getSlotsPerTaskExecutor(
+ Collection<? extends SlotInfo> slots) {
+ return slots.stream()
+ .collect(
+ Collectors.groupingBy(
+ SlotInfo::getTaskManagerLocation,
+ Collectors.mapping(identity(),
Collectors.toSet())));
+ }
+
+ static List<ExecutionSlotSharingGroup> createExecutionSlotSharingGroups(
Review Comment:
nit: I'm wondering whether we should move all these methods into a
`SlotAssignerUtils` class rather than using the interface here. WDYT?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java:
##########
@@ -43,35 +47,43 @@ public Collection<SlotAssignment> assignSlots(
Collection<? extends SlotInfo> freeSlots,
VertexParallelism vertexParallelism,
JobAllocationsInformation previousAllocations) {
- List<ExecutionSlotSharingGroup> allGroups = new ArrayList<>();
+ checkSlotsSufficient(jobInformation, freeSlots);
+
+ final List<ExecutionSlotSharingGroup> allGroups = new ArrayList<>();
for (SlotSharingGroup slotSharingGroup :
jobInformation.getSlotSharingGroups()) {
allGroups.addAll(createExecutionSlotSharingGroups(vertexParallelism,
slotSharingGroup));
}
- Iterator<? extends SlotInfo> iterator = freeSlots.iterator();
+ Collection<? extends SlotInfo> pickedSlots = freeSlots;
+ if (freeSlots.size() > allGroups.size()) {
Review Comment:
On the other hand, is it worth to save the sorting efforts if
`freeSlots.size() == allGroups.size()`? 🤔 I guess it would make sense for big
amounts of TaskManagers. I don't know the limits we have on that front in Flink
at the moment. 🤔
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java:
##########
@@ -43,35 +47,43 @@ public Collection<SlotAssignment> assignSlots(
Collection<? extends SlotInfo> freeSlots,
VertexParallelism vertexParallelism,
JobAllocationsInformation previousAllocations) {
- List<ExecutionSlotSharingGroup> allGroups = new ArrayList<>();
+ checkSlotsSufficient(jobInformation, freeSlots);
+
+ final List<ExecutionSlotSharingGroup> allGroups = new ArrayList<>();
for (SlotSharingGroup slotSharingGroup :
jobInformation.getSlotSharingGroups()) {
allGroups.addAll(createExecutionSlotSharingGroups(vertexParallelism,
slotSharingGroup));
}
- Iterator<? extends SlotInfo> iterator = freeSlots.iterator();
+ Collection<? extends SlotInfo> pickedSlots = freeSlots;
+ if (freeSlots.size() > allGroups.size()) {
Review Comment:
Why do we need this condition? To avoid the extra effort of sorting the TMs
based on their offered slot count? If yes, could we add this as a comment to
underline the intention?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java:
##########
@@ -18,21 +18,25 @@
package org.apache.flink.runtime.scheduler.adaptive.allocator;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.SlotInfo;
import
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
import
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
-import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
+import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.stream.Collectors;
+
+import static
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotAssigner.checkSlotsSufficient;
+import static
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotAssigner.createExecutionSlotSharingGroups;
+import static
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotAssigner.getSlotsPerTaskExecutor;
+import static
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotAssigner.sortTaskExecutors;
/** Simple {@link SlotAssigner} that treats all slots and slot sharing groups
equally. */
Review Comment:
This JavaDoc is not correct anymore.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]