[ 
https://issues.apache.org/jira/browse/FLINK-21695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17298572#comment-17298572
 ] 

Yu Li commented on FLINK-21695:
-------------------------------

Thanks for the note [~sewen], I also noticed the blog through twitter and 
totally agree to do some software (instead of operational) level improvement.

One input for the fix, that in our internal version (blink) we chose the first 
way (set default value to 32768), and the change was applied from very early 
days (when I was even not working on blink yet). Maybe [~maguowei] could give 
some more information about why we didn't choose the second way.

How to keep backward compatibility (restoring from old 
savepoint/retained-checkpoint) is indeed a problem, and I don't have any better 
idea yet...

> Increase default value for number of KeyGroups
> ----------------------------------------------
>
>                 Key: FLINK-21695
>                 URL: https://issues.apache.org/jira/browse/FLINK-21695
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Checkpointing, Runtime / State Backends
>            Reporter: Stephan Ewen
>            Priority: Major
>             Fix For: 1.13.0
>
>
> The current calculation for the number of Key Groups (max parallelism) leads 
> in many cases to data skew and to confusion among users.
> Specifically, the fact that for maxParallelisms above 128, the default value 
> is set to {{roundToPowerOfTwo(1.5 x parallelism)}} means that frequently, 
> half of the tasks get one keygroup and the other half gets two keygroups, 
> which is very skewed.
> See section (1) in this "lessons learned" blog post. 
> https://engineering.contentsquare.com/2021/ten-flink-gotchas/
> We can fix this by
>   - either setting a default maxParallelism to something pretty high (2048 
> for example). The cost is that we add the default key group overhead per 
> state entry from one byte to two bytes.
>   - or we stay with some similar logic, but we instead of {{1.5 x 
> operatorParallelism}} we go with some higher multiplier, like {{4 x 
> operatorParallelism}}. The price is again that we more quickly reach the 
> point where we have two bytes of keygroup encoding overhead, instead of one.
> Implementation wise, there is an unfortunate situation that the 
> maxParallelism, if not configured, is not stored anywhere in the job graph, 
> but re-derived on the JobManager each time it loads a JobGraph vertex 
> (ExecutionJobVertex) which does not have a MaxParallelism configured. This 
> relies on the implicit contract that this logic never changes.
> Changing this logic will instantly break all jobs which have not explicitly 
> configured the Max Parallelism. That seems like a pretty heavy design 
> shortcoming, unfortunately :-(
> A way to partially work around that is by moving the logic that derives the 
> maximum parallelism to the {{StreamGraphGenerator}}, so we never create 
> JobGraphs where vertices have no configured Max Parallelism (and we keep the 
> re-derivation logic for backwards compatibility for persisted JobGraphs).
> The {{StreamExecutionEnvironment}} will need a flag to use the "old mode" to 
> give existing un-configured applications a way to keep restoring from old 
> savepoints. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to