This is an automated email from the ASF dual-hosted git repository.
fcsaky pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 4efdebbf2f4 [FLINK-38676][runtime] Remove
`DefaultSlotAssigner#getSlotsPerTaskExecutor` code duplication
4efdebbf2f4 is described below
commit 4efdebbf2f480cdc5fb08dade42529c65d3d938a
Author: Yuepeng Pan <[email protected]>
AuthorDate: Wed Nov 12 13:02:32 2025 +0800
[FLINK-38676][runtime] Remove `DefaultSlotAssigner#getSlotsPerTaskExecutor`
code duplication
---
.../adaptive/allocator/DefaultSlotAssigner.java | 33 ++++++++--------------
1 file changed, 11 insertions(+), 22 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
index 81293ec98a4..0de353e6979 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
@@ -19,11 +19,10 @@
package org.apache.flink.runtime.scheduler.adaptive.allocator;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
import
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import javax.annotation.Nullable;
@@ -34,10 +33,9 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.stream.Collectors;
-import static java.util.function.Function.identity;
import static
org.apache.flink.runtime.scheduler.adaptive.allocator.AllocatorUtil.checkMinimumRequiredSlots;
+import static
org.apache.flink.runtime.scheduler.adaptive.allocator.AllocatorUtil.getSlotsPerTaskExecutor;
/**
* Simple {@link SlotAssigner} that treats all slots and slot sharing groups
equally. Specifically,
@@ -91,7 +89,7 @@ public class DefaultSlotAssigner implements SlotAssigner {
&& minimalTaskManagerPreferred
// To avoid the sort-work loading.
&& freeSlots.size() > requestExecutionSlotSharingGroups) {
- final Map<TaskManagerLocation, Set<PhysicalSlot>>
slotsPerTaskExecutor =
+ final Map<ResourceID, Set<PhysicalSlot>> slotsPerTaskExecutor =
getSlotsPerTaskExecutor(freeSlots);
pickedSlots =
pickSlotsInMinimalTaskExecutors(
@@ -108,13 +106,13 @@ public class DefaultSlotAssigner implements SlotAssigner {
* @param slotsPerTaskExecutor The slots per task manager.
* @return The ordered task manager that orders by the number of free
slots descending.
*/
- private Iterator<TaskManagerLocation> getSortedTaskExecutors(
- Map<TaskManagerLocation, Set<PhysicalSlot>> slotsPerTaskExecutor) {
- final Comparator<TaskManagerLocation> taskExecutorComparator =
- (leftTml, rightTml) ->
+ private Iterator<ResourceID> getSortedTaskExecutors(
+ Map<ResourceID, Set<PhysicalSlot>> slotsPerTaskExecutor) {
+ final Comparator<ResourceID> taskExecutorComparator =
+ (leftTm, rightTm) ->
Integer.compare(
- slotsPerTaskExecutor.get(rightTml).size(),
- slotsPerTaskExecutor.get(leftTml).size());
+ slotsPerTaskExecutor.get(rightTm).size(),
+ slotsPerTaskExecutor.get(leftTm).size());
return
slotsPerTaskExecutor.keySet().stream().sorted(taskExecutorComparator).iterator();
}
@@ -126,9 +124,9 @@ public class DefaultSlotAssigner implements SlotAssigner {
* @return the target slots that are distributed on the minimal task
executors.
*/
private Collection<PhysicalSlot> pickSlotsInMinimalTaskExecutors(
- Map<TaskManagerLocation, Set<PhysicalSlot>> slotsByTaskExecutor,
int requestedGroups) {
+ Map<ResourceID, Set<PhysicalSlot>> slotsByTaskExecutor, int
requestedGroups) {
final List<PhysicalSlot> pickedSlots = new ArrayList<>();
- final Iterator<TaskManagerLocation> sortedTaskExecutors =
+ final Iterator<ResourceID> sortedTaskExecutors =
getSortedTaskExecutors(slotsByTaskExecutor);
while (pickedSlots.size() < requestedGroups) {
Set<PhysicalSlot> slotInfos =
slotsByTaskExecutor.get(sortedTaskExecutors.next());
@@ -136,13 +134,4 @@ public class DefaultSlotAssigner implements SlotAssigner {
}
return pickedSlots;
}
-
- private Map<TaskManagerLocation, Set<PhysicalSlot>>
getSlotsPerTaskExecutor(
- Collection<PhysicalSlot> slots) {
- return slots.stream()
- .collect(
- Collectors.groupingBy(
- SlotInfo::getTaskManagerLocation,
- Collectors.mapping(identity(),
Collectors.toSet())));
- }
}