Stephan Ewen created FLINK-21695:
------------------------------------
Summary: Increase default value for number of KeyGroups
Key: FLINK-21695
URL: https://issues.apache.org/jira/browse/FLINK-21695
Project: Flink
Issue Type: Improvement
Components: Runtime / Checkpointing, Runtime / State Backends
Reporter: Stephan Ewen
Fix For: 1.13.0
The current calculation for the number of Key Groups (max parallelism) leads in
many cases to data skew and to confusion among users.
Specifically, the fact that for maxParallelisms above 128, the default value is
set to {{roundToPowerOfTwo(1.5 x parallelism)}} means that frequently, half of
the tasks get one keygroup and the other half gets two keygroups, which is very
skewed.
See section (1) in this "lessons learned" blog post.
https://engineering.contentsquare.com/2021/ten-flink-gotchas/
We can fix this by
- either setting a default maxParallelism to something pretty high (2048 for
example). The cost is that we add the default key group overhead per state
entry from one byte to two bytes.
- or we stay with some similar logic, but we instead of {{1.5 x
operatorParallelism}} we go with some higher multiplier, like {{4 x
operatorParallelism}}. The price is again that we more quickly reach the point
where we have two bytes of keygroup encoding overhead, instead of one.
Implementation wise, there is an unfortunate situation that the maxParallelism,
if not configured, is not stored anywhere in the job graph, but re-derived on
the JobManager each time it loads a JobGraph vertex (ExecutionJobVertex) which
does not have a MaxParallelism configured. This relies on the implicit contract
that this logic never changes.
Changing this logic will instantly break all jobs which have not explicitly
configured the Max Parallelism. That seems like a pretty heavy design
shortcoming, unfortunately :-(
A way to partially work around that is by moving the logic that derives the
maximum parallelism to the {{StreamGraphGenerator}}, so we never create
JobGraphs where vertices have no configured Max Parallelism (and we keep the
re-derivation logic for backwards compatibility for persisted JobGraphs).
The {{StreamExecutionEnvironment}} will need a flag to use the "old mode" to
give existing un-configured applications a way to keep restoring from old
savepoints.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)