This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new c4680e4856d [HUDI-8383] Cached `OptionsResolver::isInsertOverwrite` in 
`BucketStreamWriteFunction::processElement` (#12113)
c4680e4856d is described below

commit c4680e4856d5f6ab724b182b903182c4f00abda1
Author: Geser Dugarov <[email protected]>
AuthorDate: Thu Oct 17 12:19:49 2024 +0700

    [HUDI-8383] Cached `OptionsResolver::isInsertOverwrite` in 
`BucketStreamWriteFunction::processElement` (#12113)
---
 .../hudi/sink/bucket/BucketStreamWriteFunction.java     | 17 ++++++++++++-----
 1 file changed, 12 insertions(+), 5 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
index 1475cf043b3..6c31f37cc91 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
@@ -80,6 +80,12 @@ public class BucketStreamWriteFunction<I> extends 
StreamWriteFunction<I> {
    */
   private Functions.Function2<String, Integer, Integer> partitionIndexFunc;
 
+
+  /**
+   * To prevent strings compare for each record, define this only during open()
+   */
+  private boolean isInsertOverwrite;
+
   /**
    * Constructs a BucketStreamWriteFunction.
    *
@@ -100,6 +106,7 @@ public class BucketStreamWriteFunction<I> extends 
StreamWriteFunction<I> {
     this.bucketIndex = new HashMap<>();
     this.incBucketIndex = new HashSet<>();
     this.partitionIndexFunc = BucketIndexUtil.getPartitionIndexFunc(bucketNum, 
parallelism);
+    this.isInsertOverwrite = OptionsResolver.isInsertOverwrite(config);
   }
 
   @Override
@@ -120,7 +127,11 @@ public class BucketStreamWriteFunction<I> extends 
StreamWriteFunction<I> {
     final String partition = hoodieKey.getPartitionPath();
     final HoodieRecordLocation location;
 
-    bootstrapIndexIfNeed(partition);
+    // for insert overwrite operation skip the index loading
+    if (!isInsertOverwrite) {
+      bootstrapIndexIfNeed(partition);
+    }
+
     Map<Integer, String> bucketToFileId = 
bucketIndex.computeIfAbsent(partition, p -> new HashMap<>());
     final int bucketNum = BucketIdentifier.getBucketId(hoodieKey, 
indexKeyFields, this.bucketNum);
     final String bucketId = partition + "/" + bucketNum;
@@ -154,10 +165,6 @@ public class BucketStreamWriteFunction<I> extends 
StreamWriteFunction<I> {
    * This is a required operation for each restart to avoid having duplicate 
file ids for one bucket.
    */
   private void bootstrapIndexIfNeed(String partition) {
-    if (OptionsResolver.isInsertOverwrite(config)) {
-      // skips the index loading for insert overwrite operation.
-      return;
-    }
     if (bucketIndex.containsKey(partition)) {
       return;
     }

Reply via email to