[
https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17738094#comment-17738094
]
yuanfenghu commented on FLINK-23190:
------------------------------------
{code:java}
//代码占位符
@Overridepublic List<SlotExecutionVertexAssignment> allocateSlotsFor(
List<ExecutionVertexID> executionVertexIds){ // bla bla
Map<ExecutionSlotSharingGroup, List<ExecutionVertexID>> executionsByGroup =
executionVertexIds.stream() .collect( Collectors.groupingBy(
slotSharingStrategy::getExecutionSlotSharingGroup));
Map<ExecutionSlotSharingGroup, SharedSlot> slots = // Here we can control the
ExecutionSlotSharingGroup allocation order instead of random order. // For
example, a circle order of JobVertexGroup
sortExecutionSlotSharingGroup(executionsByGroup.keySet()) .stream().map(group
-> getOrAllocateSharedSlot(group, sharedSlotProfileRetriever)) .collect(
Collectors.toMap( SharedSlot::getExecutionSlotSharingGroup,
Function.identity())); // bla bla}private Collection<ExecutionSlotSharingGroup>
sortExecutionSlotSharingGroup(
Collection<ExecutionSlotSharingGroup> executionVertexGroups)
{Map<Set<JobVertexID>, List<ExecutionSlotSharingGroup>> jobVertexGroups =
executionVertexGroups.stream()
.collect(Collectors.groupingBy(this::getJobVertexSharingGroup));List<List<ExecutionSlotSharingGroup>>
groups = jobVertexGroups.entrySet()
.stream().sorted((g1, g2) -> Integer.compare(g2.getKey().size(),
g1.getKey().size()))
.map(Map.Entry::getValue).collect(Collectors.toList());List<ExecutionSlotSharingGroup>
sorted = new ArrayList<>(); int i = executionVertexGroups.size(), j = 0; while
(i > 0) {
List<ExecutionSlotSharingGroup> group = groups.get((j++) % groups.size()); if
(group.isEmpty()){ continue; }i--;
sorted.add(group.remove(0));
} return sorted;
}Set<JobVertexID> getJobVertexSharingGroup(ExecutionSlotSharingGroup group){
return group.getExecutionVertexIds().stream()
.map(ExecutionVertexID::getJobVertexId).collect(Collectors.toSet()); } {code}
[~loyi] I just use the code ,and the Flink version I use is 1.16.2 , is this
different from your experimental environment?
> Make task-slot allocation much more evenly
> ------------------------------------------
>
> Key: FLINK-23190
> URL: https://issues.apache.org/jira/browse/FLINK-23190
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Coordination
> Affects Versions: 1.14.0, 1.12.3, 1.13.1
> Reporter: loyi
> Assignee: loyi
> Priority: Major
> Labels: pull-request-available, stale-assigned
> Attachments: image-2021-07-16-10-34-30-700.png
>
>
> FLINK-12122 only guarantees spreading out tasks across the set of TMs which
> are registered at the time of scheduling, but our jobs are all runing on
> active yarn mode, the job with smaller source parallelism offen cause
> load-balance issues.
>
> For this job:
> {code:java}
> // -ys 4 means 10 taskmanagers
> env.addSource(...).name("A").setParallelism(10).
> map(...).name("B").setParallelism(30)
> .map(...).name("C").setParallelism(40)
> .addSink(...).name("D").setParallelism(20);
> {code}
>
> Flink-1.12.3 task allocation:
> ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
> |A|
> 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
> |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
> |C|4|4|4|4|4|4|4|4|4|4|
> |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|
>
> Suggestions:
> When TaskManger start register slots to slotManager , current processing
> logic will choose the first pendingSlot which meet its resource
> requirements. The "random" strategy usually causes uneven task allocation
> when source-operator's parallelism is significantly below process-operator's.
> A simple feasible idea is {color:#de350b}partition{color} the current
> "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as let
> AllocationID bring the detail) , then allocate the slots proportionally to
> each JobVertexGroup.
>
> For above case, the 40 pendingSlots could be divided into 4 groups:
> [ABCD]: 10 // A、B、C、D reprents {color:#de350b}jobVertexId{color}
> [BCD]: 10
> [CD]: 10
> [D]: 10
>
> Every taskmanager will provide 4 slots one time, and each group will get 1
> slot according their proportion (1/4), the final allocation result is below:
> [ABCD] : deploye on 10 different taskmangers
> [BCD]: deploye on 10 different taskmangers
> [CD]: deploye on 10 different taskmangers
> [D]: deploye on 10 different taskmangers
>
> I have implement a [concept
> code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1]
> based on Flink-1.12.3 , the patch version has {color:#de350b}fully
> evenly{color} task allocation , and works well on my workload . Are there
> other point that have not been considered or does it conflict with future
> plans? Sorry for my poor english.
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)