RocMarshal commented on code in PR #25218:
URL: https://github.com/apache/flink/pull/25218#discussion_r1759680801
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java:
##########
@@ -48,14 +50,30 @@ public Collection<SlotAssignment> assignSlots(
allGroups.addAll(createExecutionSlotSharingGroups(vertexParallelism,
slotSharingGroup));
}
- Iterator<? extends SlotInfo> iterator = freeSlots.iterator();
+ Iterator<? extends SlotInfo> iterator =
+ selectSlotsInMinimalTaskExecutors(freeSlots, allGroups,
Collections.emptyList())
+ .iterator();
Review Comment:
After discussion offline:
If it is in application mode and the number of slots for each TM is equal,
the commented is a good idea to optimize it.
However, there are the following situations for compatibility here:
1. Deployment mode: application & session;
2. Some user configurations may cause the number of slots on TM to be
different in session mode
- For the application mode
Sort the TM in reverse order according to the number of slots, and then
start using slots, which is a good choice because it can use the minimum number
of TMs (at job scope side/vision).
- For the session mode:
Sorting TM in ascending order based on the number of slots and then starting
to use slots is a good choice, as it allows for the minimum number of TM((at
`ResourceManager`/`session-cluster` scope side/vision)) to be used for multiple
jobs are running. It's mentioned that although this solution may not achieve
maximum benefits(the number of slots for each TM is not equal) in application
mode, it can still have quite good results(when the number of slots for each TM
is equal).
So, sorting TM in reverse order based on the number of slots and selecting
its is a good choice.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAssigner.java:
##########
@@ -18,18 +18,123 @@
package org.apache.flink.runtime.scheduler.adaptive.allocator;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.jobmaster.SlotInfo;
import
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
-/** Interface for assigning slots to slot sharing groups. */
+import static java.util.function.Function.identity;
+import static
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
+
+/** The Interface for assigning slots to slot sharing groups. */
@Internal
public interface SlotAssigner {
+ /**
+ * The helper class to represent the allocation score on the specified
group and allocated slot.
+ */
+ class AllocationScore implements Comparable<AllocationScore> {
+
+ private final String groupId;
+ private final AllocationID allocationId;
+ private final long score;
+
+ public AllocationScore(String groupId, AllocationID allocationId, long
score) {
+ this.groupId = groupId;
+ this.allocationId = allocationId;
+ this.score = score;
+ }
+
+ public String getGroupId() {
+ return groupId;
+ }
+
+ public AllocationID getAllocationId() {
+ return allocationId;
+ }
+
+ public long getScore() {
+ return score;
+ }
+
+ @Override
+ public int compareTo(StateLocalitySlotAssigner.AllocationScore other) {
Review Comment:
Thx for the comment.
I'd want to remote the redundant reference and keep the `AllocationScore`
in `AllocationScore`.
Because it's used in the all implementations of `SlotAssigner`.
Please let me know what's your opinion.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]