Stephan Ewen created FLINK-21695:
------------------------------------

             Summary: Increase default value for number of KeyGroups
                 Key: FLINK-21695
                 URL: https://issues.apache.org/jira/browse/FLINK-21695
             Project: Flink
          Issue Type: Improvement
          Components: Runtime / Checkpointing, Runtime / State Backends
            Reporter: Stephan Ewen
             Fix For: 1.13.0


The current calculation for the number of Key Groups (max parallelism) leads in 
many cases to data skew and to confusion among users.

Specifically, the fact that for maxParallelisms above 128, the default value is 
set to {{roundToPowerOfTwo(1.5 x parallelism)}} means that frequently, half of 
the tasks get one keygroup and the other half gets two keygroups, which is very 
skewed.

See section (1) in this "lessons learned" blog post. 
https://engineering.contentsquare.com/2021/ten-flink-gotchas/

We can fix this by
  - either setting a default maxParallelism to something pretty high (2048 for 
example). The cost is that we add the default key group overhead per state 
entry from one byte to two bytes.
  - or we stay with some similar logic, but we instead of {{1.5 x 
operatorParallelism}} we go with some higher multiplier, like {{4 x 
operatorParallelism}}. The price is again that we more quickly reach the point 
where we have two bytes of keygroup encoding overhead, instead of one.

Implementation wise, there is an unfortunate situation that the maxParallelism, 
if not configured, is not stored anywhere in the job graph, but re-derived on 
the JobManager each time it loads a JobGraph vertex (ExecutionJobVertex) which 
does not have a MaxParallelism configured. This relies on the implicit contract 
that this logic never changes.
Changing this logic will instantly break all jobs which have not explicitly 
configured the Max Parallelism. That seems like a pretty heavy design 
shortcoming, unfortunately :-(

A way to partially work around that is by moving the logic that derives the 
maximum parallelism to the {{StreamGraphGenerator}}, so we never create 
JobGraphs where vertices have no configured Max Parallelism (and we keep the 
re-derivation logic for backwards compatibility for persisted JobGraphs).
The {{StreamExecutionEnvironment}} will need a flag to use the "old mode" to 
give existing un-configured applications a way to keep restoring from old 
savepoints. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to