[
https://issues.apache.org/jira/browse/FLINK-21548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Piotr Nowojski closed FLINK-21548.
----------------------------------
Resolution: Not A Bug
> keyBy operation produces skewed record distribution for low-cardinality keys
> ----------------------------------------------------------------------------
>
> Key: FLINK-21548
> URL: https://issues.apache.org/jira/browse/FLINK-21548
> Project: Flink
> Issue Type: Bug
> Components: API / DataStream, Runtime / Task
> Affects Versions: 1.11.0, 1.12.1
> Reporter: Iaroslav Zeigerman
> Priority: Minor
> Labels: auto-deprioritized-major
> Attachments: Screen Shot 2021-03-01 at 10.52.31 AM.png, Screen Shot
> 2021-03-01 at 10.54.42 AM.png, Screen Shot 2021-03-01 at 10.57.33 AM.png
>
>
> When the cardinality of keys matches the existing parallelism not all
> downstream tasks are utilized in the downstream operator. Even those that are
> utilized are not utilized evenly.
> For example if I have 500 unique keys [0, 500) only 313 downstream tasks (out
> of 500) will receive any records at all.
> *NOTE*: for all examples below 1 million record instances were used.
> This behavior can easily be reproduced with the following test case:
> {code:scala}
> import org.apache.flink.runtime.state.KeyGroupRangeAssignment
> object Test {
> val parallelism = 500
> val recordsNum = 1000000
> def run(): Unit = {
> val recordIds = (0 to recordsNum).map(_ % parallelism)
> val tasks = recordIds.map(selectTask)
> println(s"Total unique keys: ${recordIds.toSet.size}")
> println(s"Key distribution:
> ${recordIds.groupBy(identity).mapValues(_.size).toVector.sortBy(-_._2)}")
> println("=======================")
> println(s"Tasks involved: ${tasks.toSet.size}")
> println(s"Record distribution by task:
> ${tasks.groupBy(identity).mapValues(_.size).toVector.sortBy(-_._2)}")
> }
> def selectTask(key: Int): Int =
> KeyGroupRangeAssignment.assignToKeyGroup(
> key,
> parallelism
> )
> }
> {code}
> Which produces the following results:
> {noformat}
> Total unique keys: 500
> Key distribution: Vector((0,2001), (69,2000), ..., (232,2000), (100,2000))
> =======================
> Tasks involved: 313
> Record distribution by task: Vector((147,10000), (248,10000), ...,
> (232,2000), (100,2000))
> {noformat}
> Record distribution visualized:
> !Screen Shot 2021-03-01 at 10.52.31 AM.png!
> I have determined that in order to achieve the utilization of all tasks the
> number of unique keys should be at least 5 times of the parallelism value.
> The relation between number of unique keys and a fraction of utilized tasks
> appears to be exponential:
> !Screen Shot 2021-03-01 at 10.54.42 AM.png!
> But with 5x number of keys the skew is still quite significant:
> !Screen Shot 2021-03-01 at 10.57.33 AM.png!
> Given that keys used in my test are integer values for which `hashCode`
> returns the value itself I tend to believe that the skew is caused by the
> Flink's murmur hash implementation which is used
> [here|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java#L76].
--
This message was sent by Atlassian Jira
(v8.3.4#803005)