[ 
https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

loyi updated FLINK-23190:
-------------------------
    Description: 
FLINK-12122 only guarantees spreading out tasks across the set of TMs which are 
registered at the time of scheduling, but our jobs are all runing on active 
yarn mode, the job with smaller source parallelism offen cause load-balance 
issues. 

 

For this job:
{code:java}
//  -ys 4     means 10 taskmanagers

env.addSource(...).name("A").setParallelism(10).
 map(...).name("B").setParallelism(30)
 .map(...).name("C").setParallelism(40)
 .addSink(...).name("D").setParallelism(20);
{code}
 

 Flink-1.12.3 task allocation: 
||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
|A| 
1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
|B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
|C|4|4|4|4|4|4|4|4|4|4|
|D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|

 

Suggestions:

When TaskManger start register slots to slotManager , current processing logic 
will choose  the first pendingSlot which meet its resource requirements.  The 
"random" strategy usually causes uneven task allocation when source-operator's 
parallelism is significantly below process-operator's.   A simple feasible idea 
 is  {color:#de350b}partition{color} the current  
"{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as  let 
AllocationID bring the detail)  , then allocate the slots proportionally to 
each JobVertexGroup.

 

For above case, the 40 pendingSlots could be divided into 4 groups:

[ABCD]: 10        // A、B、C、D reprents  {color:#de350b}jobVertexId{color}

[BCD]: 10

[CD]: 10

[D]: 10

 

Every taskmanager will provide 4 slots one time, and each group will get 1 slot 
according their proportion (1/4), the final allocation result is below:

[ABCD] : deploye on 10 different taskmangers

[BCD]: deploye on 10 different taskmangers

[CD]: deploye on 10  different taskmangers

[D]: deploye on 10 different taskmangers

 

I have implement a [concept 
code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1]
  based on Flink-1.12.3 ,  the patch version has {color:#de350b}fully 
evenly{color} task allocation , and works well on my workload .  Are there 
other point that have not been considered or  does it conflict with future 
plans?      Sorry for my poor english.

 

 

  was:
Description:

FLINK-12122 only guarantees spreading out tasks across the set of TMs which are 
registered at the time of scheduling, but our jobs are all runing on active 
yarn mode, the job with smaller source parallelism offen cause load-balance 
issues. 

 

For this job:
{code:java}
//  -ys 4     means 10 taskmanagers

env.addSource(...).name("A").setParallelism(10).
 map(...).name("B").setParallelism(30)
 .map(...).name("C").setParallelism(40)
 .addSink(...).name("D").setParallelism(20);
{code}
 

 released-1.12.3  allocation: 
||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
|A| 
1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
|B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
|C|4|4|4|4|4|4|4|4|4|4|
|D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|

 

Suggestions:

When TM register slots to slotManager , we could group the pendingRequests by 
their "ExecutionVertexGroup" , then allocate the slots proportionally to each 
group.

 

I have implement a concept version based on release-1.12.3 , the job have fully 
evenly task allocation . I want to know if there are other point that have not 
been considered ?  

 

 

 


> Make task-slot allocation much more evenly
> ------------------------------------------
>
>                 Key: FLINK-23190
>                 URL: https://issues.apache.org/jira/browse/FLINK-23190
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Task
>    Affects Versions: 1.12.3
>            Reporter: loyi
>            Priority: Major
>
> FLINK-12122 only guarantees spreading out tasks across the set of TMs which 
> are registered at the time of scheduling, but our jobs are all runing on 
> active yarn mode, the job with smaller source parallelism offen cause 
> load-balance issues. 
>  
> For this job:
> {code:java}
> //  -ys 4     means 10 taskmanagers
> env.addSource(...).name("A").setParallelism(10).
>  map(...).name("B").setParallelism(30)
>  .map(...).name("C").setParallelism(40)
>  .addSink(...).name("D").setParallelism(20);
> {code}
>  
>  Flink-1.12.3 task allocation: 
> ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
> |A| 
> 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
> |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
> |C|4|4|4|4|4|4|4|4|4|4|
> |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|
>  
> Suggestions:
> When TaskManger start register slots to slotManager , current processing 
> logic will choose  the first pendingSlot which meet its resource 
> requirements.  The "random" strategy usually causes uneven task allocation 
> when source-operator's parallelism is significantly below process-operator's. 
>   A simple feasible idea  is  {color:#de350b}partition{color} the current  
> "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as  let 
> AllocationID bring the detail)  , then allocate the slots proportionally to 
> each JobVertexGroup.
>  
> For above case, the 40 pendingSlots could be divided into 4 groups:
> [ABCD]: 10        // A、B、C、D reprents  {color:#de350b}jobVertexId{color}
> [BCD]: 10
> [CD]: 10
> [D]: 10
>  
> Every taskmanager will provide 4 slots one time, and each group will get 1 
> slot according their proportion (1/4), the final allocation result is below:
> [ABCD] : deploye on 10 different taskmangers
> [BCD]: deploye on 10 different taskmangers
> [CD]: deploye on 10  different taskmangers
> [D]: deploye on 10 different taskmangers
>  
> I have implement a [concept 
> code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1]
>   based on Flink-1.12.3 ,  the patch version has {color:#de350b}fully 
> evenly{color} task allocation , and works well on my workload .  Are there 
> other point that have not been considered or  does it conflict with future 
> plans?      Sorry for my poor english.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to