This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new efa8ddfe2f1 [HUDI-8990][RFC-89] Partition Level Bucket Index (#12884)
efa8ddfe2f1 is described below
commit efa8ddfe2f15a022d39f4fa2fd51e548473d985f
Author: YueZhang <[email protected]>
AuthorDate: Thu Mar 13 09:51:14 2025 +0800
[HUDI-8990][RFC-89] Partition Level Bucket Index (#12884)
---
rfc/rfc-89/archive.jpg | Bin 0 -> 278228 bytes
rfc/rfc-89/dry-run.jpg | Bin 0 -> 276824 bytes
rfc/rfc-89/rescale.jpg | Bin 0 -> 410531 bytes
rfc/rfc-89/rfc-89.md | 397 ++++++++++++++++++++++++++++++++++++++++++++++++
rfc/rfc-89/workflow.jpg | Bin 0 -> 570534 bytes
5 files changed, 397 insertions(+)
diff --git a/rfc/rfc-89/archive.jpg b/rfc/rfc-89/archive.jpg
new file mode 100644
index 00000000000..cbc3380548e
Binary files /dev/null and b/rfc/rfc-89/archive.jpg differ
diff --git a/rfc/rfc-89/dry-run.jpg b/rfc/rfc-89/dry-run.jpg
new file mode 100644
index 00000000000..057443aa97d
Binary files /dev/null and b/rfc/rfc-89/dry-run.jpg differ
diff --git a/rfc/rfc-89/rescale.jpg b/rfc/rfc-89/rescale.jpg
new file mode 100644
index 00000000000..097002fca1e
Binary files /dev/null and b/rfc/rfc-89/rescale.jpg differ
diff --git a/rfc/rfc-89/rfc-89.md b/rfc/rfc-89/rfc-89.md
new file mode 100644
index 00000000000..485d9583743
--- /dev/null
+++ b/rfc/rfc-89/rfc-89.md
@@ -0,0 +1,397 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+# RFC-89: Partition Level Bucket Index
+
+## Proposers
+- @zhangyue19921010
+
+## Approvers
+- @danny0405
+- @xiarixiaoyao
+- @LinMingQiang
+
+## Status
+
+JIRA: https://issues.apache.org/jira/browse/HUDI-8990
+
+## Abstract
+
+As we know, Hudi proposed and introduced Bucket Index in RFC-29. Bucket Index
can well unify the indexes of Flink and
+Spark, that is, Spark and Flink could upsert the same Hudi table using bucket
index.
+
+However, Bucket Index has a limit of fixed number of buckets. In order to
solve this problem, RFC-42 proposed the ability
+of consistent hashing achieving bucket resizing by splitting or merging
several local buckets dynamically.
+
+But from PRD experience, sometimes a Partition-Level Bucket Index and a
offline way to do bucket rescale is good enough
+without introducing additional efforts (multiple writes, clustering, automatic
resizing,etc.). Because the more complex
+the Architecture, the more error-prone it is and the greater operation and
maintenance pressure.
+
+In this regard, we could upgrade the traditional Bucket Index to implement a
Partition-Level Bucket Index, so that users
+can set a specific number of buckets for different partitions through a rule
engine (such as regular expression matching).
+On the other hand, for a certain existing partitions, an offline command is
provided to reorganize the data using insert
+overwrite(need to stop the data writing of the current partition).
+
+More importantly, the existing Bucket Index table can be upgraded to
Partition-Level Bucket Index smoothly and seamlessly.
+
+## Background
+The following is the core read-write process of the Flink/Spark engine based
on Simple Bucket Index
+### Flink Write Using Simple Bucket Index
+**Step 1**: repartition input records based on `BucketIndexPartitioner`,
BucketIndexPartitioner has **a fixed bucketNumber** for all partition path.
+For each record key, compute a fixed data partition number and dispatch the
record to its corresponding partition.
+
+```java
+/**
+ * Bucket index input partitioner.
+ * The fields to hash can be a subset of the primary key fields.
+ *
+ * @param <T> The type of obj to hash
+ */
+public class BucketIndexPartitioner<T extends HoodieKey> implements
Partitioner<T> {
+
+ private final int bucketNum;
+ private final String indexKeyFields;
+
+ private Functions.Function2<String, Integer, Integer> partitionIndexFunc;
+
+ public BucketIndexPartitioner(int bucketNum, String indexKeyFields) {
+ this.bucketNum = bucketNum;
+ this.indexKeyFields = indexKeyFields;
+ }
+
+ @Override
+ public int partition(HoodieKey key, int numPartitions) {
+ if (this.partitionIndexFunc == null) {
+ this.partitionIndexFunc =
BucketIndexUtil.getPartitionIndexFunc(bucketNum, numPartitions);
+ }
+ int curBucket = BucketIdentifier.getBucketId(key.getRecordKey(),
indexKeyFields, bucketNum);
+ return this.partitionIndexFunc.apply(key.getPartitionPath(), curBucket);
+ }
+}
+```
+**Step 2**: Using `BucketStreamWriteFunction` upsert records into hoodie
+- Bootstrap and cache `partition_bucket -> fileID` mapping from the existing
hudi table
+- Tagging: compute `bucketNum` and tag `fileID` based on record key and
bucketNumber config through `BucketIdentifier`
+- buffer and write records
+
+### Flink Read Pruning Using Simple Bucket Index
+**Step 1**: compute `dataBucket`
+```java
+ private int getDataBucket(List<ResolvedExpression> dataFilters) {
+ if (!OptionsResolver.isBucketIndexType(conf) || dataFilters.isEmpty()) {
+ return PrimaryKeyPruners.BUCKET_ID_NO_PRUNING;
+ }
+ Set<String> indexKeyFields =
Arrays.stream(OptionsResolver.getIndexKeys(conf)).collect(Collectors.toSet());
+ List<ResolvedExpression> indexKeyFilters =
dataFilters.stream().filter(expr -> ExpressionUtils.isEqualsLitExpr(expr,
indexKeyFields)).collect(Collectors.toList());
+ if (!ExpressionUtils.isFilteringByAllFields(indexKeyFilters,
indexKeyFields)) {
+ return PrimaryKeyPruners.BUCKET_ID_NO_PRUNING;
+ }
+ return PrimaryKeyPruners.getBucketId(indexKeyFilters, conf);
+ }
+```
+**Step 2**: Do partition pruning and get all files in given partitions
+**Step 3**: do bucket pruning for all files from step2
+```java
+ /**
+ * Returns all the file statuses under the table base path.
+ */
+ public List<StoragePathInfo> getFilesInPartitions() {
+ ...
+ // Partition pruning
+ String[] partitions =
+ getOrBuildPartitionPaths().stream().map(p -> fullPartitionPath(path,
p)).toArray(String[]::new);
+ if (partitions.length < 1) {
+ return Collections.emptyList();
+ }
+ List<StoragePathInfo> allFiles = ...
+
+ // bucket pruning
+ if (this.dataBucket >= 0) {
+ String bucketIdStr = BucketIdentifier.bucketIdStr(this.dataBucket);
+ List<StoragePathInfo> filesAfterBucketPruning = allFiles.stream()
+ .filter(fileInfo ->
fileInfo.getPath().getName().contains(bucketIdStr))
+ .collect(Collectors.toList());
+ logPruningMsg(allFiles.size(), filesAfterBucketPruning.size(), "bucket
pruning");
+ allFiles = filesAfterBucketPruning;
+ }
+ ...
+ }
+
+```
+
+### Spark Write/Read Using Simple Bucket Index
+The read-write process of Spark based on Bucket Index is also similar.
+- Use `HoodieSimpleBucketIndex` to tag location.
+- Use `SparkBucketIndexPartitioner` to packs incoming records to be inserted
into buckets (1 bucket = 1 RDD partition).
+- Use `BucketIndexSupport` to Bucket Index pruning during reading.
+
+## Design
+### Overview
+Implement a partition-level Bucket Index capability where users can set the
calculation expression for the partition
+bucket number via `hoodie.bucket.index.partition.expressions`. For historical
partitions, provide a CALL command `call partitionBucketIndexManager(...)`
+to support bucket rescaling which is a replace-commit.
+
+Note: When users invoke the CALL command to modify the expression and resize
historical partitions' bucket number,
+they must follow THREE STEPS
+1. Firstly, stop all ingestion Jobs
+2. Then, trigger call command and wait for bucket rescaling completed
+3. Finally, restart all ingestion Jobs
+
+This STEPS ensures that all writers load the same expression config,
maintaining consistency between the expression and the data.
+
+
+
+Next will introduce the implementation details.
+
+### New Config
+
+| Config | type | default | Description
|
+|-------------------------------------------|--------|---------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| hoodie.bucket.index.partition.rule.type | string | regex | Set rule
parser for expressions.
|
+| hoodie.bucket.index.partition.expressions | string | null | Users can use
this parameter to specify expression and the corresponding bucket numbers
(separated by commas).Multiple rules are separated by semicolons like
`hoodie.bucket.index.partition.expressions=expression1,bucket-number1;expression2,bucket-number2`
|
+| hoodie.bucket.index.num.buckets | int | 4 | Hudi bucket
number per partition.
|
+
+The above parameters are at table-level which need to be declared in the DDL.
Users can't change this config through
+runtime options like Flink /*+ OPTIONS(...) */
+
+```sql
+CREATE TABLE hudi_table(
+ id BIGINT,
+ name STRING,
+ price DOUBLE
+) WITH (
+'connector' = 'hudi',
+'path' = 'file:///tmp/hudi_table',
+'table.type' = 'MERGE_ON_READ',
+'index.type' = 'BUCKET',
+'hoodie.bucket.index.partition.expressions' =
'\\d{4}-(06-(01|17|18)|11-(01|10|11)),256'
+)
+```
+
+For the dates of 06-01, 06-17, 06-18 in June and 01-11, 10-11, 11-11 in
+November of each year (in the format of yyyy-MM-dd), the corresponding bucket
number for the partition is 256
+
+For common partitions use 4 as partition bucket number
+
+### Hashing Config And Management
+
+The expression config will be persisted as `<instant>.hashing_config` for
current table, it stores in
+`.hoodie/.hashing_meta` directory and contains the following information in
json format
+
+```json
+{
+ "rule": "rule-engine",
+ "expressions": "expression1,bucket-number1;expression2,bucket-number2",
+ "default_bucket_number": "default-bucket-number"
+}
+```
+#### Multi-version for <instant>.hashing_config
+
+Apache Hudi natively supports multi-versioning and version rollback
capabilities. For Bucket Rescale operation,
+we must also support Bucket Rescale rollbacks, even rolling back multiple
completed Bucket Rescale operations at once.
+This requires tracking the instant and corresponding config version for each
Bucket Rescale action.
+During rollback, both the data and config must be reverted to ensure
consistency between the configuration and the data state.
+
+So that we need to maintain config versions alongside commit instants which is
multiple versions of hashing_config (multi-versioned configs).
+This necessitates implementing mechanisms for config creation, update,
rollback, cleanup and loading
+
+About Hashing_Config Creation
+
+The initial version *must and can only* be created via DDL. During the DDL
phase, hoodie will create a
+`00000000000000000.hashing_config` for the first time.
+
+About Hashing_Config Update
+
+A new version can only be updated via `call
partitionBucketIndexManager(overwrite => 'new-expressions', dry-run =>
'false')`, which is
+a replace-commit operation.
+
+The transactional relationship between the replace-commit operation and
hashing_config updates will be explained in the
+PartitionBucketIndexManager section later.
+
+*Note:* Before updating the hashing_config, users must manually stop all
ingestion jobs.
+
+```text
+Firstly DDL phase creates initial version
+ls .hoodie/.hashing_meta/
+.hoodie/.hashing_meta/00000000000000000.hashing_config
+{
+ "rule": "rule-engine",
+ "expressions": "expression1,bucket-number1;expression2,bucket-number2",
+ "default_bucket_number": "default-bucket-number"
+}
+
+Secondly call PartitionBucketIndexManager(overwrite =>
'expression3,bucket-number3;expression1,bucket-number1;expression2,bucket-number2',
'dry-run' = 'false')
+ls .hoodie/.hashing_meta/
+.hoodie/.hashing_meta/00000000000000000.hashing_config (deprecated)
+.hoodie/.hashing_meta/20250303095546020.hashing_config (used)
+{
+ "rule": "rule-engine",
+ "expressions":
"expression3,bucket-number3;expression1,bucket-number1;expression2,bucket-number2",
+ "default_bucket_number": "default-bucket-number"
+}
+```
+
+The expression written at the front has a higher priority, and the expression
has a higher priority than the default_bucket_number.
+
+About Hashing_Config Rollback And Cleanup
+
+The Bucket Rescale operation is essentially an insert overwrite action and can
seamlessly integrate with Hudi's rollback mechanism.
+
+This requires integration with Hudi's existing rollback mechanism, such that
when rolling back an INSERT OVERWRITE operation,
+the corresponding hashing_config file should be deleted if existed.
+
+
+About Hashing_Config Archive
+
+As the number of times users modify expressions increases, the corresponding
number of hashing_config files also grows.
+To address this, we need a mechanism to archive active hashing_config files
while
+preserving all historical hashing_config information in an archive file
`archived.hashing_config` under `.hoodie/.hashing_meta/archive/`
+to support scenarios like time travel.
+
+A new archived hashing config file
`.hoodie/.hashing_meta/archive/archived.hashing_config`, will be created. Its
schema is as follows:
+```json
+{
+ "<instant1>" : {
+ "rule": "rule-engine",
+ "expressions": "expression",
+ "default_bucket_number": "number"
+ },
+ "<instant2>" : {
+ "rule": "rule-engine",
+ "expressions": "expression",
+ "default_bucket_number": "number"
+ }
+}
+```
+
+The archive logic is triggered when invoking `call
partitionBucketIndexManager(overwrite => 'new-expressions', dry-run =>
'false')` to modify expressions.
+
+Once the number of active hashing configs exceeds 5, the oldest expression’s
metadata is automatically merged into `archived.hashing_config`.
+And we always keep at least one valid active hashing_config
+
+The workflow is as follows:
+
+
+
+**we always create a new temp file and rename it to archived.hashing_config**
+
+About Loading Hashing_Config
+
+As for loading latest hashing_config(we always keep at least one valid active
hashing_config after hashing config archived),
+1. Attempt to retrieve the latest `<replace-commit>.hashing_config` from the
committed replace active timeline.
+2. If it does not exist, fetch the latest hashing_config before `active
timeline start`.
+
+### SimpleBucketIdentifier
+
+```java
+public class SimpleBucketIdentifier extends BucketIdentifier {
+ // use expressionEvaluator to compute bucket number for given partition path
+ private BucketCalculator calculator;
+ // get previous partition and bucket number from meta under .hoodie folder
+ private Map<String, Integer> partition2BucketNumber;
+
+ @Override
+ public int getBucketId(String recordKey, String indexKeyFields, String
partitionPath) {
+ int numBuckets = getBucketNumber(partitionPath);
+ return getBucketId(getHashKeys(recordKey, indexKeyFields), numBuckets);
+ }
+ // ...
+
+ public int getBucketNumber(String partitionPath) {
+ return partition2BucketNumber.computeIfAbsent(partitionPath,
calculator::calculateBucketNumber);
+ }
+}
+```
+**WorkFlow for SimpleBucketIdentifier**
+
+For the partition of the given data, try to get its corresponding bucket
number from the cache. If the cache missed,
+1. Calculate the expression through the Bucket Calculator to obtain the
corresponding Bucket Number.
+2. Put Calculated bucket number into cache
+
+we would use `org.apache.commons.collections.map.LRUMap` (max-size 500,000
about 200M at most)to cache partitionToBucketNumber
+which could take good care of cache eviction.
+
+Using SimpleBucketIdentifier to get `BucketNumber` or `BucketID` based on
given partitionPath and corresponding expression
+
+For new Partitioner:
+`SparkBucketIndexPartitioner`-Spark and `BucketIndexPartitioner`-Flink use
this new SimpleBucketIdentifier to do data repartition
+
+For Writer:
+Writer like `BucketStreamWriteFunction` in Flink also use this new
`SimpleBucketIdentifier` to compute bucketID
+
+NOTE:We need to get hashing_config `instant` at the beginning(JM or Driver)
and pass it to each operator,
+ensure the consistency of loading and avoid unnecessary listing action.
+
+### BucketCalculator
+The BucketCalculator calculates the bucket numbers for different partitions
based on the different rules set by users.
+Here, the types of supported Rules are extensible. In the first phase, regex
expressions are mainly supported.
+
+### Call PartitionBucketIndexManager
+
+| Config | type | default | Description
|
+|---------------|---------|---------|---------------------------------------------------------------------------------------------------------------------|
+| overwrite | string | null | Create a new version of hashing_config
with new expression, also trigger bucket rescale action if necessary |
+| bucket-number | int | -1 | Set default bucket number for current
expression, if not set will reuse last hashing_config's default bucket number |
+| add | string | null | Update hashing_config with adding a new
expression, also trigger bucket rescale action if necessary |
+| dry-run | boolean | true | Just show which partitions need to be
rescale and how many files need to be resizing instead of IO |
+| rollback | string | null | Rollback specific bucket rescale action
|
+| show-config | boolean | false | Show all the committed hashing_config in
order |
+
+Before proceeding, it is recommended to set dry-run to true to preview the
impact of the current operation.
+
+#### overwrite
+**Note that all Ingestion tasks for the current table must be stopped before
performing this operation.**
+
+```sql
+call partitionBucketIndexManager(overwrite => 'new-expressions', dry-run =>
'false')
+call partitionBucketIndexManager(overwrite => 'new-expressions', bucket-number
=> 'new-default-bucket-number', dry-run => 'false')
+```
+
+
+`do insert overwrite`, `create a new version of hashing_config with current
replace-commit instant` and `do replace-commit
+commit action` are within a single transaction.
+
+Avoid modifying the default bucket number casually, as it may trigger
unexpected partition data reprocessing.
+
+#### dry-run
+
+
+Just show which partitions need to be rescale and how many files need to be
resizing instead of doing IO actually
+
+#### rollback
+**Note that all Ingestion tasks for the current table must be stopped before
performing this operation.**
+
+Rollback specific bucket rescale action
+
+#### show-config
+
+Show all the committed hashing_config in order
+
+## Original Bucket Index upgrade
+
+Users can use followed command to upgrade simple bucket index into partition
level bucket index
+```sql
+call partitionBucketIndexManager(overwrite => 'new-expressions', bucket-number
=> 'original-bucket-number', dry-run => 'false')
+```
+
+## Rollout/Adoption Plan
+- First, support writing with Flink, and perform the Bucket rescale operation
with Spark.
+- Then, support writing with Spark.
+- Finally, support the push-down of Bucket Index filtering at the partition
level.
+
+## Test Plan
+
+Describe in few sentences how the RFC will be tested. How will we know that
the implementation works as expected? How will we know nothing broke?.
\ No newline at end of file
diff --git a/rfc/rfc-89/workflow.jpg b/rfc/rfc-89/workflow.jpg
new file mode 100644
index 00000000000..04bbaf0297a
Binary files /dev/null and b/rfc/rfc-89/workflow.jpg differ