[ 
https://issues.apache.org/jira/browse/HUDI-5671?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Chen resolved HUDI-5671.
------------------------------

> BucketIndexPartitioner partition algorithm skew
> -----------------------------------------------
>
>                 Key: HUDI-5671
>                 URL: https://issues.apache.org/jira/browse/HUDI-5671
>             Project: Apache Hudi
>          Issue Type: Improvement
>          Components: flink, index
>            Reporter: loukey_j
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: image-2023-02-01-14-45-33-116.png, 
> image-2023-02-01-14-50-29-703.png, image-2023-02-01-15-00-14-889.png, 
> image-2023-02-01-15-05-15-491.png
>
>
> The online job runs for 13 days and finds that there are subtasks but no data 
> processing, as shown in the figure below, this job uses the update time as 
> the partition, uses the bucket index, the number of buckets is 128, and the 
> write parallelism is 128. The key is uniform because the file size of each 
> bucket is not much different from the storage point of view. After 
> positioning, there is a skew in the shuffle algorithm.
> !image-2023-02-01-14-45-33-116.png!
> Potential disadvantages of algorithmic tilt:
>   1. The memory usage is uneven, some nodes may have high pressure on the 
> JVM, and TM nodes are prone to timeout
>   2. It may cause the checkpoint to time out, because the data will be 
> flushed to hdfs during the snapshot state. If the skew is serious, it will 
> cause some nodes to take too long and cause timeout.
> current algorithm:
> !image-2023-02-01-14-50-29-703.png!
> Algorithm flaws:
>   1. curBucket ∈ [0, numBuckets -1]
>   2. For the number of globalHash values in the same partition <= numBuckets 
> number, globalHash is divergent, and mod(globalHash, numPartitions) is easy 
> to conflict
>   3. When numBuckets is relatively large, shuffleIndex is prone to conflicts, 
> resulting in skew
> Algorithm optimization:
> !image-2023-02-01-15-00-14-889.png!
> kb = key % b; kb ∈ [0, b-1] pw = pt % w;
> pw ∈ [0, w-1] shuffleIndex = (pw + kb) % w 
> shuffleIndex ∈ [0, w-1] 
>  
> In fact, it is to calculate a pw according to the partition first. Pw can be 
> understood as a slot Wn allocated to the partition. Different partitions have 
> a slot.
> Then move b slots back on the basis of this slot as the writing of data for 
> this partition
> !image-2023-02-01-15-05-15-491.png!
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to