This is an automated email from the ASF dual-hosted git repository.

fcsaky pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 4efdebbf2f4 [FLINK-38676][runtime] Remove 
`DefaultSlotAssigner#getSlotsPerTaskExecutor` code duplication
4efdebbf2f4 is described below

commit 4efdebbf2f480cdc5fb08dade42529c65d3d938a
Author: Yuepeng Pan <[email protected]>
AuthorDate: Wed Nov 12 13:02:32 2025 +0800

    [FLINK-38676][runtime] Remove `DefaultSlotAssigner#getSlotsPerTaskExecutor` 
code duplication
---
 .../adaptive/allocator/DefaultSlotAssigner.java    | 33 ++++++++--------------
 1 file changed, 11 insertions(+), 22 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
index 81293ec98a4..0de353e6979 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
@@ -19,11 +19,10 @@
 package org.apache.flink.runtime.scheduler.adaptive.allocator;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
 import 
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
 import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
 import javax.annotation.Nullable;
 
@@ -34,10 +33,9 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.stream.Collectors;
 
-import static java.util.function.Function.identity;
 import static 
org.apache.flink.runtime.scheduler.adaptive.allocator.AllocatorUtil.checkMinimumRequiredSlots;
+import static 
org.apache.flink.runtime.scheduler.adaptive.allocator.AllocatorUtil.getSlotsPerTaskExecutor;
 
 /**
  * Simple {@link SlotAssigner} that treats all slots and slot sharing groups 
equally. Specifically,
@@ -91,7 +89,7 @@ public class DefaultSlotAssigner implements SlotAssigner {
                 && minimalTaskManagerPreferred
                 // To avoid the sort-work loading.
                 && freeSlots.size() > requestExecutionSlotSharingGroups) {
-            final Map<TaskManagerLocation, Set<PhysicalSlot>> 
slotsPerTaskExecutor =
+            final Map<ResourceID, Set<PhysicalSlot>> slotsPerTaskExecutor =
                     getSlotsPerTaskExecutor(freeSlots);
             pickedSlots =
                     pickSlotsInMinimalTaskExecutors(
@@ -108,13 +106,13 @@ public class DefaultSlotAssigner implements SlotAssigner {
      * @param slotsPerTaskExecutor The slots per task manager.
      * @return The ordered task manager that orders by the number of free 
slots descending.
      */
-    private Iterator<TaskManagerLocation> getSortedTaskExecutors(
-            Map<TaskManagerLocation, Set<PhysicalSlot>> slotsPerTaskExecutor) {
-        final Comparator<TaskManagerLocation> taskExecutorComparator =
-                (leftTml, rightTml) ->
+    private Iterator<ResourceID> getSortedTaskExecutors(
+            Map<ResourceID, Set<PhysicalSlot>> slotsPerTaskExecutor) {
+        final Comparator<ResourceID> taskExecutorComparator =
+                (leftTm, rightTm) ->
                         Integer.compare(
-                                slotsPerTaskExecutor.get(rightTml).size(),
-                                slotsPerTaskExecutor.get(leftTml).size());
+                                slotsPerTaskExecutor.get(rightTm).size(),
+                                slotsPerTaskExecutor.get(leftTm).size());
         return 
slotsPerTaskExecutor.keySet().stream().sorted(taskExecutorComparator).iterator();
     }
 
@@ -126,9 +124,9 @@ public class DefaultSlotAssigner implements SlotAssigner {
      * @return the target slots that are distributed on the minimal task 
executors.
      */
     private Collection<PhysicalSlot> pickSlotsInMinimalTaskExecutors(
-            Map<TaskManagerLocation, Set<PhysicalSlot>> slotsByTaskExecutor, 
int requestedGroups) {
+            Map<ResourceID, Set<PhysicalSlot>> slotsByTaskExecutor, int 
requestedGroups) {
         final List<PhysicalSlot> pickedSlots = new ArrayList<>();
-        final Iterator<TaskManagerLocation> sortedTaskExecutors =
+        final Iterator<ResourceID> sortedTaskExecutors =
                 getSortedTaskExecutors(slotsByTaskExecutor);
         while (pickedSlots.size() < requestedGroups) {
             Set<PhysicalSlot> slotInfos = 
slotsByTaskExecutor.get(sortedTaskExecutors.next());
@@ -136,13 +134,4 @@ public class DefaultSlotAssigner implements SlotAssigner {
         }
         return pickedSlots;
     }
-
-    private Map<TaskManagerLocation, Set<PhysicalSlot>> 
getSlotsPerTaskExecutor(
-            Collection<PhysicalSlot> slots) {
-        return slots.stream()
-                .collect(
-                        Collectors.groupingBy(
-                                SlotInfo::getTaskManagerLocation,
-                                Collectors.mapping(identity(), 
Collectors.toSet())));
-    }
 }

Reply via email to