1996fanrui commented on code in PR #25218:
URL: https://github.com/apache/flink/pull/25218#discussion_r1765985891
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java:
##########
@@ -139,6 +154,36 @@ public Collection<SlotAssignment> assignSlots(
return assignments;
}
+ /**
+ * The sorting principle and strategy here are very similar to {@link
+ * DefaultSlotAssigner#getSortedTaskExecutors(Map)}. The difference is
that when there are task
+ * executors with the same number of available slots, it is necessary to
prioritize selecting
+ * the slots on the task executors with the largest sate size in order to
quickly restart the
Review Comment:
```suggestion
* the slots on the task executors with the larger state size in order
to speed up the local recovery.
```
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java:
##########
@@ -111,8 +117,17 @@ public Collection<SlotAssignment> assignSlots(
final Map<String, ExecutionSlotSharingGroup> groupsById =
allGroups.stream().collect(toMap(ExecutionSlotSharingGroup::getId, identity()));
+
+ final Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>>
slotsPerTaskExecutor =
+ getSlotsPerTaskExecutor(freeSlots);
+ final Collection<? extends SlotInfo> pickedSlots =
+ pickSlotsInMinimalTaskExecutors(
+ slotsPerTaskExecutor,
+ allGroups.size(),
+ getSortedTaskExecutors(freeSlots,
slotsPerTaskExecutor, scores));
Review Comment:
`Collection<? extends SlotInfo> pickedSlots = freeSlots`;
`getSlotsPerTaskExecutor and pickSlotsInMinimalTaskExecutors` are only
needed when `freeSlots.size > allGroups.size()`, right?
If yes, this optimization can be applied for DefaultAssigner.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAssigner.java:
##########
@@ -32,4 +46,77 @@ Collection<SlotAssignment> assignSlots(
Collection<? extends SlotInfo> freeSlots,
VertexParallelism vertexParallelism,
JobAllocationsInformation previousAllocations);
+
+ /**
+ * Select the target slots to assign with the requested groups.
+ *
+ * @param slots the raw slots to filter.
+ * @param slotsByTaskExecutor slots per task executor.
+ * @param requestedGroups the number of the request execution slot sharing
groups.
+ * @return the target slots that are distributed on the minimal task
executors.
+ */
+ default Collection<? extends SlotInfo> selectSlotsInMinimalTaskExecutors(
+ Collection<? extends SlotInfo> slots,
+ Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>>
slotsByTaskExecutor,
+ int requestedGroups,
+ List<TaskManagerLocation> sortedTaskExecutors) {
+ if (slots.size() - requestedGroups <= 0) {
+ return slots;
+ }
+
+ int requestedSlots = requestedGroups;
+ final List<SlotInfo> result = new ArrayList<>();
+ for (TaskManagerLocation tml : sortedTaskExecutors) {
+ if (requestedSlots <= 0) {
+ break;
+ }
+ final Set<? extends SlotInfo> slotInfos =
slotsByTaskExecutor.get(tml);
+ requestedSlots -= slotInfos.size();
+ result.addAll(slotInfos);
+ }
+ return result;
+ }
+
+ /**
+ * Get the task executors with the order that aims to priority assigning
requested groups on it.
+ *
+ * @param taskManagerLocations task executors to sort.
+ * @param taskExecutorComparator the comparator to compare the target task
executors.
+ * @return The sorted task executors list with the specified order by the
comparator.
+ */
+ static List<TaskManagerLocation> sortTaskExecutors(
+ Collection<TaskManagerLocation> taskManagerLocations,
+ Comparator<TaskManagerLocation> taskExecutorComparator) {
+ return taskManagerLocations.stream()
+ .sorted(taskExecutorComparator)
+ .collect(Collectors.toList());
+ }
Review Comment:
Why not return `Iterator<TaskManagerLocation>` here? I saw all callers of
`sortTaskExecutors` call iterator directly.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAssignerTest.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotAssigner.getSlotsPerTaskExecutor;
+import static
org.apache.flink.runtime.scheduler.adaptive.allocator.StateLocalitySlotAssigner.AllocationScore;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link SlotAssigner}. */
+class SlotAssignerTest {
+
+ private static final TaskManagerLocation tml1 = new
LocalTaskManagerLocation();
+ private static final SlotInfo slot1OfTml1 = new TestingSlot(tml1);
+ private static final SlotInfo slot2OfTml1 = new TestingSlot(tml1);
+ private static final SlotInfo slot3OfTml1 = new TestingSlot(tml1);
+
+ private static final TaskManagerLocation tml2 = new
LocalTaskManagerLocation();
+ private static final SlotInfo slot1OfTml2 = new TestingSlot(tml2);
+ private static final SlotInfo slot2OfTml2 = new TestingSlot(tml2);
+ private static final SlotInfo slot3OfTml2 = new TestingSlot(tml2);
+
+ private static final TaskManagerLocation tml3 = new
LocalTaskManagerLocation();
+ private static final SlotInfo slot1OfTml3 = new TestingSlot(tml3);
+ private static final SlotInfo slot2OfTml3 = new TestingSlot(tml3);
+
+ private static final List<SlotInfo> allSlots =
+ Arrays.asList(
+ slot1OfTml1,
+ slot2OfTml1,
+ slot3OfTml1,
+ slot1OfTml2,
+ slot2OfTml2,
+ slot3OfTml2,
+ slot1OfTml3,
+ slot2OfTml3);
+
+ private static Stream<Arguments> getTestingParameters() {
+ return Stream.of(
+ Arguments.of(
+ new StateLocalitySlotAssigner(),
+ 3,
+ allSlots,
+ createTestingScores(Tuple2.of(slot1OfTml1, 2L),
Tuple2.of(slot1OfTml2, 1L)),
+ Arrays.asList(tml1, tml3)),
+ Arguments.of(
+ new StateLocalitySlotAssigner(),
+ 2,
+ allSlots,
+ createTestingScores(Tuple2.of(slot1OfTml1, 2L),
Tuple2.of(slot1OfTml2, 2L)),
+ Collections.singletonList(tml3)),
+ Arguments.of(
+ new StateLocalitySlotAssigner(),
+ 6,
+ allSlots,
+ createTestingScores(Tuple2.of(slot1OfTml2, 2L)),
+ Arrays.asList(tml1, tml2, tml3)),
+ Arguments.of(
+ new StateLocalitySlotAssigner(),
+ 4,
+ Arrays.asList(
+ slot1OfTml1,
+ slot2OfTml1,
+ slot1OfTml2,
+ slot2OfTml2,
+ slot1OfTml3,
+ slot2OfTml3),
+ createTestingScores(Tuple2.of(slot1OfTml2, 2L),
Tuple2.of(slot2OfTml3, 1L)),
+ Arrays.asList(tml2, tml3)),
+ Arguments.of(
+ new DefaultSlotAssigner(),
+ 2,
+ allSlots,
+ null,
+ Collections.singletonList(tml3)),
+ Arguments.of(
+ new DefaultSlotAssigner(),
+ 3,
+ Arrays.asList(slot1OfTml1, slot1OfTml2, slot2OfTml2,
slot3OfTml2),
+ null,
+ Arrays.asList(tml1, tml2)),
+ Arguments.of(
+ new DefaultSlotAssigner(),
+ 7,
+ allSlots,
+ createTestingScores(Tuple2.of(slot1OfTml2, 2L)),
+ Arrays.asList(tml1, tml2, tml3)));
+ }
+
+ @MethodSource("getTestingParameters")
+ @ParameterizedTest(
+ name =
+ "slotAssigner={0}, groups={1}, allSlots={2},
scoredAllocations={3}, minimalTaskExecutors={4}")
+ void testPickSlotsInMinimalTaskExecutors(
Review Comment:
I’m think could we call `assignSlots` instead of
`pickSlotsInMinimalTaskExecutors` to test `PickSlotsInMinimalTaskExecutors`?
It's for 3 reasons:
1. We don't need to care about the inside logic of SlotAssigner.
2. Current test cannot cover the following case:
- If `getSortedTaskExecutors` and `pickSlotsInMinimalTaskExecutors`
doesn't have bug, but `SlotAssigner.assignSlots` doesn't call them correctly.
3. Current test doesn't need to check `slotAssigner instanceof`, it could
call `slotAssigner.assignSlots` directly.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]