1996fanrui commented on code in PR #25218:
URL: https://github.com/apache/flink/pull/25218#discussion_r1765985891


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java:
##########
@@ -139,6 +154,36 @@ public Collection<SlotAssignment> assignSlots(
         return assignments;
     }
 
+    /**
+     * The sorting principle and strategy here are very similar to {@link
+     * DefaultSlotAssigner#getSortedTaskExecutors(Map)}. The difference is 
that when there are task
+     * executors with the same number of available slots, it is necessary to 
prioritize selecting
+     * the slots on the task executors with the largest sate size in order to 
quickly restart the

Review Comment:
   ```suggestion
        * the slots on the task executors with the larger state size in order 
to speed up the local recovery.
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java:
##########
@@ -111,8 +117,17 @@ public Collection<SlotAssignment> assignSlots(
 
         final Map<String, ExecutionSlotSharingGroup> groupsById =
                 
allGroups.stream().collect(toMap(ExecutionSlotSharingGroup::getId, identity()));
+
+        final Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>> 
slotsPerTaskExecutor =
+                getSlotsPerTaskExecutor(freeSlots);
+        final Collection<? extends SlotInfo> pickedSlots =
+                pickSlotsInMinimalTaskExecutors(
+                        slotsPerTaskExecutor,
+                        allGroups.size(),
+                        getSortedTaskExecutors(freeSlots, 
slotsPerTaskExecutor, scores));

Review Comment:
   `Collection<? extends SlotInfo> pickedSlots = freeSlots`;
   
   `getSlotsPerTaskExecutor and pickSlotsInMinimalTaskExecutors` are only 
needed when `freeSlots.size > allGroups.size()`, right?
   
   If yes, this optimization can be applied for DefaultAssigner.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAssigner.java:
##########
@@ -32,4 +46,77 @@ Collection<SlotAssignment> assignSlots(
             Collection<? extends SlotInfo> freeSlots,
             VertexParallelism vertexParallelism,
             JobAllocationsInformation previousAllocations);
+
+    /**
+     * Select the target slots to assign with the requested groups.
+     *
+     * @param slots the raw slots to filter.
+     * @param slotsByTaskExecutor slots per task executor.
+     * @param requestedGroups the number of the request execution slot sharing 
groups.
+     * @return the target slots that are distributed on the minimal task 
executors.
+     */
+    default Collection<? extends SlotInfo> selectSlotsInMinimalTaskExecutors(
+            Collection<? extends SlotInfo> slots,
+            Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>> 
slotsByTaskExecutor,
+            int requestedGroups,
+            List<TaskManagerLocation> sortedTaskExecutors) {
+        if (slots.size() - requestedGroups <= 0) {
+            return slots;
+        }
+
+        int requestedSlots = requestedGroups;
+        final List<SlotInfo> result = new ArrayList<>();
+        for (TaskManagerLocation tml : sortedTaskExecutors) {
+            if (requestedSlots <= 0) {
+                break;
+            }
+            final Set<? extends SlotInfo> slotInfos = 
slotsByTaskExecutor.get(tml);
+            requestedSlots -= slotInfos.size();
+            result.addAll(slotInfos);
+        }
+        return result;
+    }
+
+    /**
+     * Get the task executors with the order that aims to priority assigning 
requested groups on it.
+     *
+     * @param taskManagerLocations task executors to sort.
+     * @param taskExecutorComparator the comparator to compare the target task 
executors.
+     * @return The sorted task executors list with the specified order by the 
comparator.
+     */
+    static List<TaskManagerLocation> sortTaskExecutors(
+            Collection<TaskManagerLocation> taskManagerLocations,
+            Comparator<TaskManagerLocation> taskExecutorComparator) {
+        return taskManagerLocations.stream()
+                .sorted(taskExecutorComparator)
+                .collect(Collectors.toList());
+    }

Review Comment:
   Why not return `Iterator<TaskManagerLocation>` here? I saw all callers of 
`sortTaskExecutors` call iterator directly.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAssignerTest.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotAssigner.getSlotsPerTaskExecutor;
+import static 
org.apache.flink.runtime.scheduler.adaptive.allocator.StateLocalitySlotAssigner.AllocationScore;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link SlotAssigner}. */
+class SlotAssignerTest {
+
+    private static final TaskManagerLocation tml1 = new 
LocalTaskManagerLocation();
+    private static final SlotInfo slot1OfTml1 = new TestingSlot(tml1);
+    private static final SlotInfo slot2OfTml1 = new TestingSlot(tml1);
+    private static final SlotInfo slot3OfTml1 = new TestingSlot(tml1);
+
+    private static final TaskManagerLocation tml2 = new 
LocalTaskManagerLocation();
+    private static final SlotInfo slot1OfTml2 = new TestingSlot(tml2);
+    private static final SlotInfo slot2OfTml2 = new TestingSlot(tml2);
+    private static final SlotInfo slot3OfTml2 = new TestingSlot(tml2);
+
+    private static final TaskManagerLocation tml3 = new 
LocalTaskManagerLocation();
+    private static final SlotInfo slot1OfTml3 = new TestingSlot(tml3);
+    private static final SlotInfo slot2OfTml3 = new TestingSlot(tml3);
+
+    private static final List<SlotInfo> allSlots =
+            Arrays.asList(
+                    slot1OfTml1,
+                    slot2OfTml1,
+                    slot3OfTml1,
+                    slot1OfTml2,
+                    slot2OfTml2,
+                    slot3OfTml2,
+                    slot1OfTml3,
+                    slot2OfTml3);
+
+    private static Stream<Arguments> getTestingParameters() {
+        return Stream.of(
+                Arguments.of(
+                        new StateLocalitySlotAssigner(),
+                        3,
+                        allSlots,
+                        createTestingScores(Tuple2.of(slot1OfTml1, 2L), 
Tuple2.of(slot1OfTml2, 1L)),
+                        Arrays.asList(tml1, tml3)),
+                Arguments.of(
+                        new StateLocalitySlotAssigner(),
+                        2,
+                        allSlots,
+                        createTestingScores(Tuple2.of(slot1OfTml1, 2L), 
Tuple2.of(slot1OfTml2, 2L)),
+                        Collections.singletonList(tml3)),
+                Arguments.of(
+                        new StateLocalitySlotAssigner(),
+                        6,
+                        allSlots,
+                        createTestingScores(Tuple2.of(slot1OfTml2, 2L)),
+                        Arrays.asList(tml1, tml2, tml3)),
+                Arguments.of(
+                        new StateLocalitySlotAssigner(),
+                        4,
+                        Arrays.asList(
+                                slot1OfTml1,
+                                slot2OfTml1,
+                                slot1OfTml2,
+                                slot2OfTml2,
+                                slot1OfTml3,
+                                slot2OfTml3),
+                        createTestingScores(Tuple2.of(slot1OfTml2, 2L), 
Tuple2.of(slot2OfTml3, 1L)),
+                        Arrays.asList(tml2, tml3)),
+                Arguments.of(
+                        new DefaultSlotAssigner(),
+                        2,
+                        allSlots,
+                        null,
+                        Collections.singletonList(tml3)),
+                Arguments.of(
+                        new DefaultSlotAssigner(),
+                        3,
+                        Arrays.asList(slot1OfTml1, slot1OfTml2, slot2OfTml2, 
slot3OfTml2),
+                        null,
+                        Arrays.asList(tml1, tml2)),
+                Arguments.of(
+                        new DefaultSlotAssigner(),
+                        7,
+                        allSlots,
+                        createTestingScores(Tuple2.of(slot1OfTml2, 2L)),
+                        Arrays.asList(tml1, tml2, tml3)));
+    }
+
+    @MethodSource("getTestingParameters")
+    @ParameterizedTest(
+            name =
+                    "slotAssigner={0}, groups={1}, allSlots={2}, 
scoredAllocations={3}, minimalTaskExecutors={4}")
+    void testPickSlotsInMinimalTaskExecutors(

Review Comment:
   I’m think could we call `assignSlots` instead of 
`pickSlotsInMinimalTaskExecutors` to test `PickSlotsInMinimalTaskExecutors`?
   
   It's for 3 reasons:
   1. We don't need to care about the inside logic of SlotAssigner.
   2. Current test cannot cover the following case: 
       - If `getSortedTaskExecutors` and `pickSlotsInMinimalTaskExecutors` 
doesn't have bug, but `SlotAssigner.assignSlots` doesn't call them correctly. 
   3. Current test doesn't need to check `slotAssigner instanceof`, it could 
call `slotAssigner.assignSlots` directly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to