zhangyue19921010 commented on code in PR #12884:
URL: https://github.com/apache/hudi/pull/12884#discussion_r1975169121


##########
rfc/rfc-89/rfc-89.md:
##########
@@ -0,0 +1,297 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+# RFC-89: Partition Level Bucket Index
+
+## Proposers
+- @zhangyue19921010
+
+## Approvers
+- @danny0405
+- @codope
+- @xiarixiaoyao
+
+## Status
+
+JIRA: https://issues.apache.org/jira/browse/HUDI-8990
+
+## Abstract
+
+As we know, Hudi proposed and introduced Bucket Index in RFC-29. Bucket Index 
can well unify the indexes of Flink and
+Spark, that is, Spark and Flink could upsert the same Hudi table using bucket 
index.
+
+However, Bucket Index has a limit of fixed number of buckets. In order to 
solve this problem, RFC-42 proposed the ability
+of consistent hashing achieving bucket resizing by splitting or merging 
several local buckets dynamically.
+
+But from PRD experience, sometimes a Partition-Level Bucket Index and a 
offline way to do bucket rescale is good enough
+without introducing additional efforts (multiple writes, clustering, automatic 
resizing,etc.). Because the more complex
+the Architecture, the more error-prone it is and the greater operation and 
maintenance pressure.
+
+In this regard, we could upgrade the traditional Bucket Index to implement a 
Partition-Level Bucket Index, so that users
+can set a specific number of buckets for different partitions through a rule 
engine (such as regular expression matching).
+On the other hand, for a certain existing partitions, an off-line command is 
provided to reorganized the data using insert
+overwrite(need to stop the data writing of the current partition).
+
+More importantly, the existing Bucket Index table can be upgraded to 
Partition-Level Bucket Index smoothly and seamlessly.
+
+## Background
+The following is the core read-write process of the Flink/Spark engine based 
on Simple Bucket Index
+### Flink Write Using Simple Bucket Index
+**Step 1**: re-partition input records based on `BucketIndexPartitioner`, 
BucketIndexPartitioner has **a fixed bucketNumber** for all partition path.
+For each record key, compute a fixed data partition number, doing re-partition 
works.
+
+```java
+/**
+ * Bucket index input partitioner.
+ * The fields to hash can be a subset of the primary key fields.
+ *
+ * @param <T> The type of obj to hash
+ */
+public class BucketIndexPartitioner<T extends HoodieKey> implements 
Partitioner<T> {
+
+  private final int bucketNum;
+  private final String indexKeyFields;
+
+  private Functions.Function2<String, Integer, Integer> partitionIndexFunc;
+
+  public BucketIndexPartitioner(int bucketNum, String indexKeyFields) {
+    this.bucketNum = bucketNum;
+    this.indexKeyFields = indexKeyFields;
+  }
+
+  @Override
+  public int partition(HoodieKey key, int numPartitions) {
+    if (this.partitionIndexFunc == null) {
+      this.partitionIndexFunc = 
BucketIndexUtil.getPartitionIndexFunc(bucketNum, numPartitions);
+    }
+    int curBucket = BucketIdentifier.getBucketId(key.getRecordKey(), 
indexKeyFields, bucketNum);
+    return this.partitionIndexFunc.apply(key.getPartitionPath(), curBucket);
+  }
+}
+```
+**Step 2**: Using `BucketStreamWriteFunction` upsert records into hoodie
+- Bootstrap and cache `partition_bucket -> fileID` mapping from the existing 
hudi table
+- Tagging: compute `bucketNum` and tag `fileID` based on record key and 
bucketNumber config through `BucketIdentifier`
+- buffer and write records
+
+### Flink Read Pruning Using Simple Bucket Index
+**Step 1**: compute `dataBucket`
+```java
+  private int getDataBucket(List<ResolvedExpression> dataFilters) {
+    if (!OptionsResolver.isBucketIndexType(conf) || dataFilters.isEmpty()) {
+      return PrimaryKeyPruners.BUCKET_ID_NO_PRUNING;
+    }
+    Set<String> indexKeyFields = 
Arrays.stream(OptionsResolver.getIndexKeys(conf)).collect(Collectors.toSet());
+    List<ResolvedExpression> indexKeyFilters = 
dataFilters.stream().filter(expr -> ExpressionUtils.isEqualsLitExpr(expr, 
indexKeyFields)).collect(Collectors.toList());
+    if (!ExpressionUtils.isFilteringByAllFields(indexKeyFilters, 
indexKeyFields)) {
+      return PrimaryKeyPruners.BUCKET_ID_NO_PRUNING;
+    }
+    return PrimaryKeyPruners.getBucketId(indexKeyFilters, conf);
+  }
+```
+**Step 2**: Do partition pruning and get all files in given partitions
+**Step 3**: do bucket pruning for all files from step2
+```java
+  /**
+   * Returns all the file statuses under the table base path.
+   */
+  public List<StoragePathInfo> getFilesInPartitions() {
+    ...
+    // Partition pruning
+    String[] partitions =
+        getOrBuildPartitionPaths().stream().map(p -> fullPartitionPath(path, 
p)).toArray(String[]::new);
+    if (partitions.length < 1) {
+      return Collections.emptyList();
+    }
+    List<StoragePathInfo> allFiles = ...
+    
+    // bucket pruning
+    if (this.dataBucket >= 0) {
+      String bucketIdStr = BucketIdentifier.bucketIdStr(this.dataBucket);
+      List<StoragePathInfo> filesAfterBucketPruning = allFiles.stream()
+          .filter(fileInfo -> 
fileInfo.getPath().getName().contains(bucketIdStr))
+          .collect(Collectors.toList());
+      logPruningMsg(allFiles.size(), filesAfterBucketPruning.size(), "bucket 
pruning");
+      allFiles = filesAfterBucketPruning;
+    }
+    ...
+  }
+
+```
+
+### Spark Write/Read Using Simple Bucket Index
+The read-write process of Spark based on Bucket Index is also similar.
+- Use `HoodieSimpleBucketIndex` to tag location.
+- Use `SparkBucketIndexPartitioner` to packs incoming records to be inserted 
into buckets (1 bucket = 1 RDD partition).
+- Use `BucketIndexSupport` to Bucket Index pruning during reading.
+
+## Design
+### Config
+Add a new config named `hoodie.bucket.index.partition.expressions` default 
null. Users can specify the bucket numbers for different
+partitions by configuring a JSON expression. For example
+```json
+{
+    "expressions": [

Review Comment:
   Thanks for all your attentions
   **Q: Multiple-versions support and the way user modify and add new rules**
   We do need to support 
   1. Multi-Versions for `<instant>.hashing_meta`. It will record partitions 
and related bucketNumber mapping. For new commit it will merge 
last-compete-instant corresponding hashing_meta and do conflict check(In case 
of multi-write). If there are new partitions and conflict check passed, we 
could write a new `<instant>.hashing_meta` which is a new version
   2. Multi-Versions for `<timestamp>.hashing_config`. It will be create during 
table DDL under `.hoodie/.hashing_meta/simple/`. Only `alter table xxx set()` 
command will modify and create a new version of this hashing_config, please 
note that `alter table` command won't block read and write but only by 
restarting the job can the new config work. 
   
   As for Multi-Write scenarios , IMO,  we need to do partition-bucketNumber 
conflict check before update meta. For example:
   T1: Job1 writing records into partition1 with 10 bucket numbers and 
partition2 with **20** bucket numbers
   T2: Alter table set partition2 using 30 bucket numbers
   T3: Launch Job2 writing partition1 with 10 bucket numbers and partition2 
with **30** bucket numbers
   T4: Job1 finish writing partition2 with 20 bucket number and update 
T1.hashing_meta
   T5: Job2 start to do commit, merging writeStatus with T1.hashing_meta and 
find partition2 conflicted. So that Job2 failed to commit.
   There will be a conflict check and abort current write if necessary during 
`pre-commit` step
   
   But I believe this is a conner case. When users need to modify or add rules, 
they should refer to the following steps:
   1. Call "alter table" command to modify the configuration, and then restart 
all jobs.
   2. Alternatively, stop all jobs first, then modify the configuration, and 
finally start the jobs.
   
   Users should not modify the configuration during the process of starting or 
stopping jobs. Otherwise, the jobs will be failed because of inner conflict 
detection.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to