zhangyue19921010 opened a new pull request, #13265:
URL: https://github.com/apache/hudi/pull/13265
### Change Logs
**Background**
Spark Bulk insert Bucket index Table,will use
```
public static Functions.Function3<Integer, String, Integer, Integer>
getPartitionIndexFunc(int parallelism) {
return (bucketNum, partition, curBucket) -> {
long partitionIndex = (partition.hashCode() & Integer.MAX_VALUE) %
parallelism * (long) bucketNum;
long globalIndex = partitionIndex + curBucket;
int partitionId = (int) (globalIndex % parallelism);
ValidationUtils.checkArgument(partitionId >= 0 && partitionId <
parallelism,
() -> "Partition id should be in range [0, " + parallelism + "),
but got " + partitionId);
return partitionId;
};
}
```
For a given partition and BucketNumber, calculating the Spark Partition
number based on Hash and Mode during repartition operations offers the
advantage of high performance and simple logic. However, its drawbacks include
inevitable data skew and an inability to fully utilize all task resources. This
results in poor performance for large-scale data initialization scenarios. For
example, in our full product table scenario with 1.2 trillion existing records
and 20,000 buckets, initialization takes 8 hours. Due to data skew limitations,
scaling resources cannot further accelerate the process.
**Proposed Solution**
We propose a RemotePartitioner algorithm leveraging Hudi's built-in Javalin
API service. This RemotePartitioner requests the current data's
partition-specific BucketStartIndex from the Driver (cached with two-level
caching at both Driver and Executor levels) and combines it with the computed
bucketID to determine the target SparkPartitionNumber. The total number of
requests equals SparkTaskNumber × DataPartitionNumber, ensuring overall
controllability.
**Results**
This approach completely resolves data skew issues. The aforementioned task
was optimized from 8 hours to 2 hours, with performance remaining continuously
scalable.

### Impact
Spark write bucket index table using bulkinsert operation
### Risk level (write none, low medium or high below)
low
### Documentation Update
_Describe any necessary documentation update if there is any new feature,
config, or user-facing change. If not, put "none"._
- _The config description must be updated if new configs are added or the
default value of the configs are changed_
- _Any new feature or user-facing change requires updating the Hudi website.
Please create a Jira ticket, attach the
ticket number here and follow the
[instruction](https://hudi.apache.org/contribute/developer-setup#website) to
make
changes to the website._
### Contributor's checklist
- [ ] Read through [contributor's
guide](https://hudi.apache.org/contribute/how-to-contribute)
- [ ] Change Logs and Impact were stated clearly
- [ ] Adequate tests were added if applicable
- [ ] CI passed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]