[
https://issues.apache.org/jira/browse/HUDI-5671?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Danny Chen resolved HUDI-5671.
------------------------------
> BucketIndexPartitioner partition algorithm skew
> -----------------------------------------------
>
> Key: HUDI-5671
> URL: https://issues.apache.org/jira/browse/HUDI-5671
> Project: Apache Hudi
> Issue Type: Improvement
> Components: flink, index
> Reporter: loukey_j
> Priority: Major
> Labels: pull-request-available
> Attachments: image-2023-02-01-14-45-33-116.png,
> image-2023-02-01-14-50-29-703.png, image-2023-02-01-15-00-14-889.png,
> image-2023-02-01-15-05-15-491.png
>
>
> The online job runs for 13 days and finds that there are subtasks but no data
> processing, as shown in the figure below, this job uses the update time as
> the partition, uses the bucket index, the number of buckets is 128, and the
> write parallelism is 128. The key is uniform because the file size of each
> bucket is not much different from the storage point of view. After
> positioning, there is a skew in the shuffle algorithm.
> !image-2023-02-01-14-45-33-116.png!
> Potential disadvantages of algorithmic tilt:
> 1. The memory usage is uneven, some nodes may have high pressure on the
> JVM, and TM nodes are prone to timeout
> 2. It may cause the checkpoint to time out, because the data will be
> flushed to hdfs during the snapshot state. If the skew is serious, it will
> cause some nodes to take too long and cause timeout.
> current algorithm:
> !image-2023-02-01-14-50-29-703.png!
> Algorithm flaws:
> 1. curBucket ∈ [0, numBuckets -1]
> 2. For the number of globalHash values in the same partition <= numBuckets
> number, globalHash is divergent, and mod(globalHash, numPartitions) is easy
> to conflict
> 3. When numBuckets is relatively large, shuffleIndex is prone to conflicts,
> resulting in skew
> Algorithm optimization:
> !image-2023-02-01-15-00-14-889.png!
> kb = key % b; kb ∈ [0, b-1] pw = pt % w;
> pw ∈ [0, w-1] shuffleIndex = (pw + kb) % w
> shuffleIndex ∈ [0, w-1]
>
> In fact, it is to calculate a pw according to the partition first. Pw can be
> understood as a slot Wn allocated to the partition. Different partitions have
> a slot.
> Then move b slots back on the basis of this slot as the writing of data for
> this partition
> !image-2023-02-01-15-05-15-491.png!
>
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)