[
https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
loyi updated FLINK-23190:
-------------------------
Description:
FLINK-12122 only guarantees spreading out tasks across the set of TMs which are
registered at the time of scheduling, but our jobs are all runing on active
yarn mode, the job with smaller source parallelism offen cause load-balance
issues.
For this job:
{code:java}
// -ys 4 means 10 taskmanagers
env.addSource(...).name("A").setParallelism(10).
map(...).name("B").setParallelism(30)
.map(...).name("C").setParallelism(40)
.addSink(...).name("D").setParallelism(20);
{code}
Flink-1.12.3 task allocation:
||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
|A|
1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
|B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
|C|4|4|4|4|4|4|4|4|4|4|
|D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|
Suggestions:
When TaskManger start register slots to slotManager , current processing logic
will choose the first pendingSlot which meet its resource requirements. The
"random" strategy usually causes uneven task allocation when source-operator's
parallelism is significantly below process-operator's. A simple feasible idea
is {color:#de350b}partition{color} the current
"{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as let
AllocationID bring the detail) , then allocate the slots proportionally to
each JobVertexGroup.
For above case, the 40 pendingSlots could be divided into 4 groups:
[ABCD]: 10 // A、B、C、D reprents {color:#de350b}jobVertexId{color}
[BCD]: 10
[CD]: 10
[D]: 10
Every taskmanager will provide 4 slots one time, and each group will get 1 slot
according their proportion (1/4), the final allocation result is below:
[ABCD] : deploye on 10 different taskmangers
[BCD]: deploye on 10 different taskmangers
[CD]: deploye on 10 different taskmangers
[D]: deploye on 10 different taskmangers
I have implement a [concept
code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1]
based on Flink-1.12.3 , the patch version has {color:#de350b}fully
evenly{color} task allocation , and works well on my workload . Are there
other point that have not been considered or does it conflict with future
plans? Sorry for my poor english.
was:
Description:
FLINK-12122 only guarantees spreading out tasks across the set of TMs which are
registered at the time of scheduling, but our jobs are all runing on active
yarn mode, the job with smaller source parallelism offen cause load-balance
issues.
For this job:
{code:java}
// -ys 4 means 10 taskmanagers
env.addSource(...).name("A").setParallelism(10).
map(...).name("B").setParallelism(30)
.map(...).name("C").setParallelism(40)
.addSink(...).name("D").setParallelism(20);
{code}
released-1.12.3 allocation:
||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
|A|
1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
|B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
|C|4|4|4|4|4|4|4|4|4|4|
|D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|
Suggestions:
When TM register slots to slotManager , we could group the pendingRequests by
their "ExecutionVertexGroup" , then allocate the slots proportionally to each
group.
I have implement a concept version based on release-1.12.3 , the job have fully
evenly task allocation . I want to know if there are other point that have not
been considered ?
> Make task-slot allocation much more evenly
> ------------------------------------------
>
> Key: FLINK-23190
> URL: https://issues.apache.org/jira/browse/FLINK-23190
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Task
> Affects Versions: 1.12.3
> Reporter: loyi
> Priority: Major
>
> FLINK-12122 only guarantees spreading out tasks across the set of TMs which
> are registered at the time of scheduling, but our jobs are all runing on
> active yarn mode, the job with smaller source parallelism offen cause
> load-balance issues.
>
> For this job:
> {code:java}
> // -ys 4 means 10 taskmanagers
> env.addSource(...).name("A").setParallelism(10).
> map(...).name("B").setParallelism(30)
> .map(...).name("C").setParallelism(40)
> .addSink(...).name("D").setParallelism(20);
> {code}
>
> Flink-1.12.3 task allocation:
> ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
> |A|
> 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
> |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
> |C|4|4|4|4|4|4|4|4|4|4|
> |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|
>
> Suggestions:
> When TaskManger start register slots to slotManager , current processing
> logic will choose the first pendingSlot which meet its resource
> requirements. The "random" strategy usually causes uneven task allocation
> when source-operator's parallelism is significantly below process-operator's.
> A simple feasible idea is {color:#de350b}partition{color} the current
> "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as let
> AllocationID bring the detail) , then allocate the slots proportionally to
> each JobVertexGroup.
>
> For above case, the 40 pendingSlots could be divided into 4 groups:
> [ABCD]: 10 // A、B、C、D reprents {color:#de350b}jobVertexId{color}
> [BCD]: 10
> [CD]: 10
> [D]: 10
>
> Every taskmanager will provide 4 slots one time, and each group will get 1
> slot according their proportion (1/4), the final allocation result is below:
> [ABCD] : deploye on 10 different taskmangers
> [BCD]: deploye on 10 different taskmangers
> [CD]: deploye on 10 different taskmangers
> [D]: deploye on 10 different taskmangers
>
> I have implement a [concept
> code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1]
> based on Flink-1.12.3 , the patch version has {color:#de350b}fully
> evenly{color} task allocation , and works well on my workload . Are there
> other point that have not been considered or does it conflict with future
> plans? Sorry for my poor english.
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)