[
https://issues.apache.org/jira/browse/FLINK-21695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Konstantin Knauf updated FLINK-21695:
-------------------------------------
Fix Version/s: (was: 1.13.0)
> Increase default value for number of KeyGroups
> ----------------------------------------------
>
> Key: FLINK-21695
> URL: https://issues.apache.org/jira/browse/FLINK-21695
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Checkpointing, Runtime / State Backends
> Reporter: Stephan Ewen
> Priority: Major
>
> The current calculation for the number of Key Groups (max parallelism) leads
> in many cases to data skew and to confusion among users.
> Specifically, the fact that for maxParallelisms above 128, the default value
> is set to {{roundToPowerOfTwo(1.5 x parallelism)}} means that frequently,
> half of the tasks get one keygroup and the other half gets two keygroups,
> which is very skewed.
> See section (1) in this "lessons learned" blog post.
> https://engineering.contentsquare.com/2021/ten-flink-gotchas/
> We can fix this by
> - either setting a default maxParallelism to something pretty high (2048
> for example). The cost is that we add the default key group overhead per
> state entry from one byte to two bytes.
> - or we stay with some similar logic, but we instead of {{1.5 x
> operatorParallelism}} we go with some higher multiplier, like {{4 x
> operatorParallelism}}. The price is again that we more quickly reach the
> point where we have two bytes of keygroup encoding overhead, instead of one.
> Implementation wise, there is an unfortunate situation that the
> maxParallelism, if not configured, is not stored anywhere in the job graph,
> but re-derived on the JobManager each time it loads a JobGraph vertex
> (ExecutionJobVertex) which does not have a MaxParallelism configured. This
> relies on the implicit contract that this logic never changes.
> Changing this logic will instantly break all jobs which have not explicitly
> configured the Max Parallelism. That seems like a pretty heavy design
> shortcoming, unfortunately :-(
> A way to partially work around that is by moving the logic that derives the
> maximum parallelism to the {{StreamGraphGenerator}}, so we never create
> JobGraphs where vertices have no configured Max Parallelism (and we keep the
> re-derivation logic for backwards compatibility for persisted JobGraphs).
> The {{StreamExecutionEnvironment}} will need a flag to use the "old mode" to
> give existing un-configured applications a way to keep restoring from old
> savepoints.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)