[ 
https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17381794#comment-17381794
 ] 

loyi commented on FLINK-23190:
------------------------------

[~trohrmann]  Thanks for your patience.  I have implement a version based on 
flink-1.13.1, and then I will use two real examples for comparison below.
h2. Example 1:

This job could make completely evenly allocation in theory.  
// job description

StreamExecutionEnvironment env;
env.addSource(...).name("a").setParallelism(10).
 map(...).name("b").setParallelism(30)
 .map(...).name("c").setParallelism(40)
 .addSink(...).name("d").setParallelism(20);

//We specify -ys 4 , that means the job will apply for 10 TaskExecutors.
flink run -yjm 2048 -ytm 2048 -ys 4 -m yarn-cluster -c 
com.loyi.flink.demo.Bootstrap -d /home/deploy/flink-demo.jar
!image-2021-07-16-10-34-30-700.png|width=1042,height=219!

 

*flink-1.13.1-official*(default config) operator's subtask distribution:
||Operator||TM-1||TM-2||TM-3||TM-4||TM-5||TM-6||TM-7||TM-8||TM-9||TM-10||
|a|0|1|2|0|0|1|2|2|2|0|
|b|4|1|3|3|4|4|3|3|3|2|
|c|4|4|4|4|4|4|4|4|4|4|
|d|3|1|3|1|4|2|2|2|0|2|

 

*flink-1.13.1-official*(with cluster.evenly-spread-out-slots: true) operator's 
subtask distribution:

 
||Operator||TM-1||TM-2||TM-3||TM-4||TM-5||TM-6||TM-7||TM-8||TM-9||TM-10||
|a|1|1|0|2|0|3|1|0|0|2|
|b|4|1|3|4|4|3|4|2|3|2|
|c|4|4|4|4|4|4|4|4|4|4|
|d|3|1|3|2|3|2|2|1|2|1|

 

*flink-1.13.1-patch*(default config) operator's subtask distribution:
||Operator||TM-1||TM-2||TM-3||TM-4||TM-5||TM-6||TM-7||TM-8||TM-9||TM-10||
|a|1|1|1|1|1|1|1|1|1|1|
|b|3|3|3|3|3|3|3|3|3|3|
|c|4|4|4|4|4|4|4|4|4|4|
|d|2|2|2|2|2|2|2|2|2|2|

 

*flink-1.13.1-patch*(with jobmanager.scheduler: adaptive) operator's subtask 
distribution:
||Operator||TM-1||TM-2||TM-3||TM-4||TM-5||TM-6||TM-7||TM-8||TM-9||TM-10||
|a|1|1|1|1|1|1|1|1|1|1|
|b|3|3|3|3|3|3|3|3|3|3|
|c|4|4|4|4|4|4|4|4|4|4|
|d|2|2|2|2|2|2|2|2|2|2|

 

 
h2. Example 2: 

We will construct a job that couldn't acheive absolutely evenly allocation.
//job description is the same with above.

StreamExecutionEnvironment env;
env.addSource(...).name("a").setParallelism(10).
 map(...).name("b").setParallelism(30)
 .map(...).name("c").setParallelism(40)
 .addSink(...).name("d").setParallelism(20);

//We specify -ys 3 , that means the job will apply for 14 TaskExecutors.
flink run -yjm 2048 -ytm 2048 -ys 3 -m yarn-cluster -c 
com.loyi.flink.demo.Bootstrap -d /home/deploy/flink-demo.jar
 

*flink-1.13.1-official*(default config) operator's subtask distribution:
||Operator||TM-1||TM-2||TM-3||TM-4||TM-5||TM-6||TM-7||TM-8||TM-9||TM-10||TM-11||TM-12||TM-13||TM-14||
|a|3|1|0|3|1|1|1|0|0|0|0|0|0|0|
|b|3|3|3|3|2|2|2|1|3|2|3|2|1|0|
|c|3|3|3|3|3|3|3|3|3|1|3|3|3|3|
|d|2|2|1|2|3|1|1|1|1|1|2|2|1|0|

 

*flink-1.13.1-official*(with cluster.evenly-spread-out-slots: true) operator's 
subtask distribution:
||Operator||TM-1||TM-2||TM-3||TM-4||TM-5||TM-6||TM-7||TM-8||TM-9||TM-10||TM-11||TM-12||TM-13||TM-14||
|a|2|1|1|1|2|2|1|0|0|0|0|0|0|0|
|b|3|3|2|1|1|3|3|2|1|3|2|3|3|0|
|c|3|3|3|1|3|3|3|3|3|3|3|3|3|3|
|d|1|2|2|1|2|3|1|2|2|3|1|0|0|0|

 

*flink-1.13.1-patch*(default config) operator's subtask distribution:
||Operator||TM-1||TM-2||TM-3||TM-4||TM-5||TM-6||TM-7||TM-8||TM-9||TM-10||TM-11||TM-12||TM-13||TM-14||
|a|1|1|1|1|1|0|1|1|0|1|0|1|0|1|
|b|3|2|2|2|2|2|3|2|2|2|2|2|2|2|
|c|3|3|3|3|3|3|3|3|3|3|3|3|1|3|
|d|2|1|2|1|1|1|2|2|1|2|1|2|2|0|

 

*flink-1.13.1-patch*(with jobmanager.scheduler: adaptive) operator's subtask 
distribution:
||Operator||TM-1||TM-2||TM-3||TM-4||TM-5||TM-6||TM-7||TM-8||TM-9||TM-10||TM-11||TM-12||TM-13||TM-14||
|a|1|0|0|1|1|1|1|0|0|1|1|1|1|1|
|b|2|2|2|2|3|3|3|2|1|2|2|2|2|2|
|c|3|3|3|3|3|3|3|3|1|3|3|3|3|3|
|d|1|1|1|1|2|2|2|1|1|2|1|2|1|2|

 

We can see that the patch has a better allocation effect, and spread out each 
operator across all TaskExecutors as possibly as evenly as it can.
 

> Make task-slot allocation much more evenly
> ------------------------------------------
>
>                 Key: FLINK-23190
>                 URL: https://issues.apache.org/jira/browse/FLINK-23190
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Coordination
>    Affects Versions: 1.12.3
>            Reporter: loyi
>            Priority: Major
>         Attachments: image-2021-07-16-10-34-30-700.png
>
>
> FLINK-12122 only guarantees spreading out tasks across the set of TMs which 
> are registered at the time of scheduling, but our jobs are all runing on 
> active yarn mode, the job with smaller source parallelism offen cause 
> load-balance issues. 
>  
> For this job:
> {code:java}
> //  -ys 4     means 10 taskmanagers
> env.addSource(...).name("A").setParallelism(10).
>  map(...).name("B").setParallelism(30)
>  .map(...).name("C").setParallelism(40)
>  .addSink(...).name("D").setParallelism(20);
> {code}
>  
>  Flink-1.12.3 task allocation: 
> ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
> |A| 
> 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
> |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
> |C|4|4|4|4|4|4|4|4|4|4|
> |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|
>  
> Suggestions:
> When TaskManger start register slots to slotManager , current processing 
> logic will choose  the first pendingSlot which meet its resource 
> requirements.  The "random" strategy usually causes uneven task allocation 
> when source-operator's parallelism is significantly below process-operator's. 
>   A simple feasible idea  is  {color:#de350b}partition{color} the current  
> "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as  let 
> AllocationID bring the detail)  , then allocate the slots proportionally to 
> each JobVertexGroup.
>  
> For above case, the 40 pendingSlots could be divided into 4 groups:
> [ABCD]: 10        // A、B、C、D reprents  {color:#de350b}jobVertexId{color}
> [BCD]: 10
> [CD]: 10
> [D]: 10
>  
> Every taskmanager will provide 4 slots one time, and each group will get 1 
> slot according their proportion (1/4), the final allocation result is below:
> [ABCD] : deploye on 10 different taskmangers
> [BCD]: deploye on 10 different taskmangers
> [CD]: deploye on 10  different taskmangers
> [D]: deploye on 10 different taskmangers
>  
> I have implement a [concept 
> code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1]
>   based on Flink-1.12.3 ,  the patch version has {color:#de350b}fully 
> evenly{color} task allocation , and works well on my workload .  Are there 
> other point that have not been considered or  does it conflict with future 
> plans?      Sorry for my poor english.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to