[
https://issues.apache.org/jira/browse/FLINK-21695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17299546#comment-17299546
]
Yun Tang commented on FLINK-21695:
----------------------------------
I use flink-benchmarks to compare the performance of one state backend with 128
key groups (default case if only one task) V.S one state backend with 32768 key
groups (if we decide to increase the number of key groups, one subtask could
have at most 32768 key groups)
|operation|state backend type|128 key group OPS|32768 key group OPS|performance
regression|
|ListStateBenchmark.listAdd | ROCKSDB|682.33|684.367|0.30%|
|ListStateBenchmark.listAppend | ROCKSDB|674.482|666.848|-1.13%|
|ListStateBenchmark.listGet | ROCKSDB|185.277|208.806|12.70%|
|ListStateBenchmark.listGetAndIterate | ROCKSDB|185.208|205.86|11.15%|
|ListStateBenchmark.listUpdate | ROCKSDB|681.437|673.118|-1.22%|
|MapStateBenchmark.mapAdd | ROCKSDB|567.921|568.897|0.17%|
|MapStateBenchmark.mapContains | ROCKSDB|72.011|71.098|-1.27%|
|MapStateBenchmark.mapEntries | ROCKSDB|494.355|501.153|1.38%|
|MapStateBenchmark.mapGet | ROCKSDB|71.242|70.704|-0.76%|
|MapStateBenchmark.mapIsEmpty | ROCKSDB|63.857|63.905|0.08%|
|MapStateBenchmark.mapIterator | ROCKSDB|493.626|496.126|0.51%|
|MapStateBenchmark.mapKeys | ROCKSDB|502.111|504.131|0.40%|
|MapStateBenchmark.mapPutAll | ROCKSDB|167.855|167.857|0.00%|
|MapStateBenchmark.mapRemove | ROCKSDB|581.158|593.621|2.14%|
|MapStateBenchmark.mapUpdate | ROCKSDB|565.662|562.162|-0.62%|
|MapStateBenchmark.mapValues | ROCKSDB|491.329|500.291|1.82%|
|ValueStateBenchmark.valueAdd | ROCKSDB|578.754|583.654|0.85%|
|ValueStateBenchmark.valueGet | ROCKSDB|738.245|721.947|-2.21%|
|ValueStateBenchmark.valueUpdate | ROCKSDB|579.904|580.896|0.17%|
|operation|state backend type|128 key group OPS|32768 key group OPS|performance
regression|
|ListStateBenchmark.listAdd | HEAP|5003.16|2373.527|-52.56%|
|ListStateBenchmark.listAddAll | HEAP|1068.359|900.153|-15.74%|
|ListStateBenchmark.listAppend | HEAP|3675.166|2218.354|-39.64%|
|ListStateBenchmark.listGet | HEAP|3713.117|2225.929|-40.05%|
|ListStateBenchmark.listGetAndIterate | HEAP|3625.819|2195.043|-39.46%|
|ListStateBenchmark.listUpdate | HEAP|3583.648|2186.204|-39.00%|
|MapStateBenchmark.mapAdd | HEAP|3358.554|2187.515|-34.87%|
|MapStateBenchmark.mapContains | HEAP|3203.165|2322.414|-27.50%|
|MapStateBenchmark.mapEntries | HEAP|20060.221|17020.261|-15.15%|
|MapStateBenchmark.mapGet | HEAP|3065.08|2245.538|-26.74%|
|MapStateBenchmark.mapIsEmpty | HEAP|4106.847|2724.34|-33.66%|
|MapStateBenchmark.mapIterator | HEAP|19450.228|15944.573|-18.02%|
|MapStateBenchmark.mapKeys | HEAP|22503.507|17767.405|-21.05%|
|MapStateBenchmark.mapPutAll | HEAP|2183.723|1756.404|-19.57%|
|MapStateBenchmark.mapRemove | HEAP|3911.577|2601.356|-33.50%|
|MapStateBenchmark.mapUpdate | HEAP|3070.39|2226.015|-27.50%|
|MapStateBenchmark.mapValues | HEAP|22187.989|17667.486|-20.37%|
|ValueStateBenchmark.valueAdd | HEAP|4207.135|3467.953|-17.57%|
|ValueStateBenchmark.valueGet | HEAP|4564.774|3802.617|-16.70%|
|ValueStateBenchmark.valueUpdate | HEAP|4597.11|3819.481|-16.92%|
As we can see, RocksDB state backend doe not have much performance regression
(some merge-based operation even behaves better, still need to investigate)
while heap state backend could have at most 50% performance regression.
> Increase default value for number of KeyGroups
> ----------------------------------------------
>
> Key: FLINK-21695
> URL: https://issues.apache.org/jira/browse/FLINK-21695
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Checkpointing, Runtime / State Backends
> Reporter: Stephan Ewen
> Priority: Major
> Fix For: 1.13.0
>
>
> The current calculation for the number of Key Groups (max parallelism) leads
> in many cases to data skew and to confusion among users.
> Specifically, the fact that for maxParallelisms above 128, the default value
> is set to {{roundToPowerOfTwo(1.5 x parallelism)}} means that frequently,
> half of the tasks get one keygroup and the other half gets two keygroups,
> which is very skewed.
> See section (1) in this "lessons learned" blog post.
> https://engineering.contentsquare.com/2021/ten-flink-gotchas/
> We can fix this by
> - either setting a default maxParallelism to something pretty high (2048
> for example). The cost is that we add the default key group overhead per
> state entry from one byte to two bytes.
> - or we stay with some similar logic, but we instead of {{1.5 x
> operatorParallelism}} we go with some higher multiplier, like {{4 x
> operatorParallelism}}. The price is again that we more quickly reach the
> point where we have two bytes of keygroup encoding overhead, instead of one.
> Implementation wise, there is an unfortunate situation that the
> maxParallelism, if not configured, is not stored anywhere in the job graph,
> but re-derived on the JobManager each time it loads a JobGraph vertex
> (ExecutionJobVertex) which does not have a MaxParallelism configured. This
> relies on the implicit contract that this logic never changes.
> Changing this logic will instantly break all jobs which have not explicitly
> configured the Max Parallelism. That seems like a pretty heavy design
> shortcoming, unfortunately :-(
> A way to partially work around that is by moving the logic that derives the
> maximum parallelism to the {{StreamGraphGenerator}}, so we never create
> JobGraphs where vertices have no configured Max Parallelism (and we keep the
> re-derivation logic for backwards compatibility for persisted JobGraphs).
> The {{StreamExecutionEnvironment}} will need a flag to use the "old mode" to
> give existing un-configured applications a way to keep restoring from old
> savepoints.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)