This is an automated email from the ASF dual-hosted git repository.

zhangyue19921010 pushed a commit to branch HUDI-8990
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 97bf409a5ad2cb939bc941f2b94432960c17a945
Author: YueZhang <[email protected]>
AuthorDate: Wed Mar 19 20:28:17 2025 +0800

    finish Flink related coding and UT && Spark partitioner and index without 
test
---
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  4 ++
 .../hudi/index/bucket/HoodieSimpleBucketIndex.java |  4 ++
 .../bucket/PartitionBucketIndexCalculator.java     |  7 +-
 .../index/bucket/PartitionBucketIndexUtils.java    | 34 +++++----
 .../BucketIndexBulkInsertPartitionerWithRows.java  | 14 +++-
 .../RDDSimpleBucketBulkInsertPartitioner.java      | 14 +++-
 .../BucketBulkInsertDataInternalWriterHelper.java  | 11 ++-
 .../action/commit/SparkBucketIndexPartitioner.java | 82 +++++++++++++++++++---
 .../apache/spark/sql/BucketPartitionUtils.scala    | 21 ++++--
 .../hudi/configuration/OptionsInference.java       |  2 +-
 .../sink/bucket/BucketBulkInsertWriterHelper.java  | 11 +--
 .../sink/bucket/BucketStreamWriteFunction.java     |  2 -
 .../sink/partitioner/BucketIndexPartitioner.java   | 12 ++--
 .../java/org/apache/hudi/sink/utils/Pipelines.java | 16 +++--
 .../hudi/sink/utils/BulkInsertFunctionWrapper.java |  3 +-
 .../apache/hudi/table/ITTestHoodieDataSource.java  | 78 ++++++++++++++++++++
 .../org/apache/hudi/utils/TestConfigurations.java  |  3 +
 .../BaseDatasetBulkInsertCommitActionExecutor.java |  2 +-
 18 files changed, 259 insertions(+), 61 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 959c94242b1..557f8672e19 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -1955,6 +1955,10 @@ public class HoodieWriteConfig extends HoodieConfig {
     return 
HoodieIndex.IndexType.valueOf(getString(HoodieIndexConfig.INDEX_TYPE));
   }
 
+  public String getHashingConfigInstantToLoad() {
+    return getString(HoodieIndexConfig.BUCKET_INDEX_PARTITION_LOAD_INSTANT);
+  }
+
   public String getIndexClass() {
     return getString(HoodieIndexConfig.INDEX_CLASS_NAME);
   }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java
index 9901ea0b257..e19e80b82a4 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java
@@ -140,6 +140,10 @@ public class HoodieSimpleBucketIndex extends 
HoodieBucketIndex {
     return BucketIdentifier.getBucketId(key.getRecordKey(), indexKeyFields, 
numBuckets);
   }
 
+  public int getBucketID(HoodieKey key, int numBuckets) {
+    return BucketIdentifier.getBucketId(key.getRecordKey(), indexKeyFields, 
numBuckets);
+  }
+
   @Override
   public boolean canIndexLogFiles() {
     return false;
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/PartitionBucketIndexCalculator.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/PartitionBucketIndexCalculator.java
index ff8af59a73d..65a08363402 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/PartitionBucketIndexCalculator.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/PartitionBucketIndexCalculator.java
@@ -69,12 +69,7 @@ public class PartitionBucketIndexCalculator implements 
Serializable {
    */
   private PartitionBucketIndexCalculator(String instantToLoad, Configuration 
hadoopConf, String basePath) {
     this.instantToLoad = instantToLoad;
-    StoragePath metaPath = new StoragePath(basePath, 
HoodieTableMetaClient.METAFOLDER_NAME);
-    StoragePath hashingBase = new StoragePath(metaPath, 
HoodieTableMetaClient.BUCKET_INDEX_METAFOLDER_CONFIG_FOLDER);
-    StoragePath hashingConfigPath =
-        new StoragePath(hashingBase,
-            instantToLoad + 
PartitionBucketIndexHashingConfig.HASHING_CONFIG_FILE_SUFFIX);
-
+    StoragePath hashingConfigPath = 
PartitionBucketIndexUtils.getHashingConfigPath(basePath, instantToLoad);
     try (HoodieHadoopStorage storage = new 
HoodieHadoopStorage(hashingConfigPath, 
HadoopFSUtils.getStorageConf(hadoopConf))) {
       init(storage, hashingConfigPath);
     } catch (IOException e) {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/PartitionBucketIndexUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/PartitionBucketIndexUtils.java
index 82b61acd7c7..69e29bd92e3 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/PartitionBucketIndexUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/PartitionBucketIndexUtils.java
@@ -49,8 +49,8 @@ public class PartitionBucketIndexUtils {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(PartitionBucketIndexUtils.class);
 
-  public static boolean isPartitionSimpleBucketIndex(Configuration conf) {
-    StoragePath storagePath = new 
StoragePath(HoodieTableMetaClient.PARTITION_BUCKET_INDEX_HASHING_FOLDER);
+  public static boolean isPartitionSimpleBucketIndex(Configuration conf, 
String basePath) {
+    StoragePath storagePath = getHashingConfigStorageFolder(basePath);
     try (HoodieHadoopStorage storage = new HoodieHadoopStorage(storagePath, 
HadoopFSUtils.getStorageConf(conf))) {
       return storage.exists(storagePath);
     } catch (IOException e) {
@@ -58,6 +58,16 @@ public class PartitionBucketIndexUtils {
     }
   }
 
+  public static StoragePath getHashingConfigStorageFolder(String basePath) {
+    StoragePath metaPath = new StoragePath(basePath, 
HoodieTableMetaClient.METAFOLDER_NAME);
+    return new StoragePath(metaPath, 
HoodieTableMetaClient.BUCKET_INDEX_METAFOLDER_CONFIG_FOLDER);
+  }
+
+  public static StoragePath getHashingConfigPath(String basePath, String 
instantToLoad) {
+    StoragePath hashingBase = getHashingConfigStorageFolder(basePath);
+    return new StoragePath(hashingBase, instantToLoad + 
PartitionBucketIndexHashingConfig.HASHING_CONFIG_FILE_SUFFIX);
+  }
+
   public static boolean initHashingConfig(HoodieTableMetaClient metaClient,
                                           String expressions,
                                           String rule,
@@ -104,7 +114,6 @@ public class PartitionBucketIndexUtils {
     Option<HoodieInstant> earliestInstant = 
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().firstInstant();
     String instantToLoad = "";
     try {
-      // 按时间降序排列
       List<String> hashingConfigInstants = metaClient.getStorage()
           .listDirectEntries(new 
StoragePath(metaClient.getHashingMetadataConfigPath())).stream().map(info -> {
             String instant = getHashingConfigInstant(info.getPath().getName());
@@ -115,11 +124,15 @@ public class PartitionBucketIndexUtils {
           }).sorted().collect(Collectors.toList());
 
       for (String instant : hashingConfigInstants) {
-        if (instants.contains(instant)) {
+        if (!earliestInstant.isPresent()) {
           instantToLoad = instant;
           break;
-        } else if (earliestInstant.isPresent() && 
instant.compareTo(earliestInstant.get().requestedTime()) < 0){
+        } else if (instants.contains(instant)) {
           instantToLoad = instant;
+          break;
+        } else if (instant.compareTo(earliestInstant.get().requestedTime()) < 
0){
+          instantToLoad = instant;
+          break;
         }
       }
 
@@ -133,16 +146,11 @@ public class PartitionBucketIndexUtils {
     }
   }
 
-  public static String getHashingConfigInstant(String hashingConfig) {
-    int lastIndex = hashingConfig.lastIndexOf('/');
-    if (lastIndex == -1) {
-      return null;
-    }
-    String fileName = hashingConfig.substring(lastIndex + 1);
-    int dotIndex = fileName.indexOf('.');
+  public static String getHashingConfigInstant(String hashingConfigName) {
+    int dotIndex = hashingConfigName.indexOf('.');
     if (dotIndex == -1) {
       return null;
     }
-    return fileName.substring(0, dotIndex);
+    return hashingConfigName.substring(0, dotIndex);
   }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketIndexBulkInsertPartitionerWithRows.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketIndexBulkInsertPartitionerWithRows.java
index ef32d24deb8..e80c8bfac0e 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketIndexBulkInsertPartitionerWithRows.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketIndexBulkInsertPartitionerWithRows.java
@@ -18,7 +18,10 @@
 
 package org.apache.hudi.execution.bulkinsert;
 
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.index.bucket.PartitionBucketIndexCalculator;
 import org.apache.hudi.table.BulkInsertPartitioner;
+import org.apache.hudi.table.HoodieTable;
 
 import org.apache.spark.sql.BucketPartitionUtils$;
 import org.apache.spark.sql.Dataset;
@@ -31,15 +34,22 @@ public class BucketIndexBulkInsertPartitionerWithRows 
implements BulkInsertParti
 
   private final String indexKeyFields;
   private final int bucketNum;
+  private boolean isPartitionBucketIndexEnable = false;
+  private PartitionBucketIndexCalculator calc;
 
-  public BucketIndexBulkInsertPartitionerWithRows(String indexKeyFields, int 
bucketNum) {
+  public BucketIndexBulkInsertPartitionerWithRows(String indexKeyFields, int 
bucketNum, HoodieTable table) {
     this.indexKeyFields = indexKeyFields;
     this.bucketNum = bucketNum;
+    String hashingInstantToLoad = 
table.getConfig().getHashingConfigInstantToLoad();
+    this.isPartitionBucketIndexEnable = 
StringUtils.isNullOrEmpty(hashingInstantToLoad);
+    if (isPartitionBucketIndexEnable) {
+      calc = PartitionBucketIndexCalculator.getInstance(hashingInstantToLoad, 
table.getMetaClient());
+    }
   }
 
   @Override
   public Dataset<Row> repartitionRecords(Dataset<Row> rows, int 
outputPartitions) {
-    return BucketPartitionUtils$.MODULE$.createDataFrame(rows, indexKeyFields, 
bucketNum, outputPartitions);
+    return BucketPartitionUtils$.MODULE$.createDataFrame(rows, indexKeyFields, 
bucketNum, outputPartitions, calc, isPartitionBucketIndexEnable);
   }
 
   @Override
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSimpleBucketBulkInsertPartitioner.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSimpleBucketBulkInsertPartitioner.java
index 8304c1031cb..d651342c368 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSimpleBucketBulkInsertPartitioner.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSimpleBucketBulkInsertPartitioner.java
@@ -23,9 +23,11 @@ import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.index.bucket.BucketIdentifier;
 import org.apache.hudi.index.bucket.HoodieSimpleBucketIndex;
+import org.apache.hudi.index.bucket.PartitionBucketIndexCalculator;
 import org.apache.hudi.table.HoodieTable;
 
 import org.apache.spark.Partitioner;
@@ -39,11 +41,18 @@ import java.util.stream.Collectors;
 public class RDDSimpleBucketBulkInsertPartitioner<T extends 
HoodieRecordPayload> extends RDDBucketIndexPartitioner<T> {
 
   private final boolean isNonBlockingConcurrencyControl;
+  private boolean isPartitionBucketIndexEnable = false;
+  private PartitionBucketIndexCalculator calc;
 
   public RDDSimpleBucketBulkInsertPartitioner(HoodieTable table) {
     super(table, null, false);
     ValidationUtils.checkArgument(table.getIndex() instanceof 
HoodieSimpleBucketIndex);
     this.isNonBlockingConcurrencyControl = 
table.getConfig().isNonBlockingConcurrencyControl();
+    String hashingInstantToLoad = 
table.getConfig().getHashingConfigInstantToLoad();
+    this.isPartitionBucketIndexEnable = 
StringUtils.isNullOrEmpty(hashingInstantToLoad);
+    if (isPartitionBucketIndexEnable) {
+      calc = PartitionBucketIndexCalculator.getInstance(hashingInstantToLoad, 
table.getMetaClient());
+    }
   }
 
   @Override
@@ -64,7 +73,7 @@ public class RDDSimpleBucketBulkInsertPartitioner<T extends 
HoodieRecordPayload>
       public int getPartition(Object key) {
         HoodieKey hoodieKey = (HoodieKey) key;
         String partitionPath = hoodieKey.getPartitionPath();
-        int bucketID = index.getBucketID(hoodieKey);
+        int bucketID = isPartitionBucketIndexEnable ? 
index.getBucketID(hoodieKey, calc.computeNumBuckets(partitionPath)) : 
index.getBucketID(hoodieKey);
         String fileID = partitionMapper.get(partitionPath).get(bucketID);
         return fileIdPrefixToBucketIndex.get(fileID);
       }
@@ -75,11 +84,11 @@ public class RDDSimpleBucketBulkInsertPartitioner<T extends 
HoodieRecordPayload>
                                                        Map<String, Integer> 
fileIdPrefixToBucketIndex) {
 
     HoodieSimpleBucketIndex index = (HoodieSimpleBucketIndex) table.getIndex();
-    int numBuckets = index.getNumBuckets();
     return records
         .map(HoodieRecord::getPartitionPath)
         .distinct().collect().stream()
         .collect(Collectors.toMap(p -> p, p -> {
+          // p ==> partition path
           Map<Integer, HoodieRecordLocation> locationMap = 
index.loadBucketIdToFileIdMappingForPartition(table, p);
           Map<Integer, String> bucketIdToFileIdPrefixMap = new HashMap<>();
           HashSet<Integer> existsBucketID = new HashSet<>();
@@ -94,6 +103,7 @@ public class RDDSimpleBucketBulkInsertPartitioner<T extends 
HoodieRecordPayload>
             doAppend.add(true);
           });
 
+          int numBuckets = isPartitionBucketIndexEnable ? 
calc.computeNumBuckets(p) : index.getNumBuckets();
           // Generate a file that does not exist
           for (int i = 0; i < numBuckets; i++) {
             if (!existsBucketID.contains(i)) {
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BucketBulkInsertDataInternalWriterHelper.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BucketBulkInsertDataInternalWriterHelper.java
index 68123c02b79..d2b6a04c2e1 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BucketBulkInsertDataInternalWriterHelper.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BucketBulkInsertDataInternalWriterHelper.java
@@ -19,10 +19,12 @@
 package org.apache.hudi.table.action.commit;
 
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.index.bucket.BucketIdentifier;
+import org.apache.hudi.index.bucket.PartitionBucketIndexCalculator;
 import org.apache.hudi.io.storage.row.HoodieRowCreateHandle;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 import org.apache.hudi.table.HoodieTable;
@@ -44,6 +46,8 @@ import java.util.Objects;
 public class BucketBulkInsertDataInternalWriterHelper extends 
BulkInsertDataInternalWriterHelper {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(BucketBulkInsertDataInternalWriterHelper.class);
+  private final boolean isPartitionBucketIndexEnable;
+  private PartitionBucketIndexCalculator calc;
 
   private Pair<UTF8String, Integer> lastFileId; // for efficient code path
   // p -> (fileId -> handle)
@@ -66,13 +70,18 @@ public class BucketBulkInsertDataInternalWriterHelper 
extends BulkInsertDataInte
     this.bucketNum = 
writeConfig.getInt(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS);
     this.handles = new HashMap<>();
     this.isNonBlockingConcurrencyControl = 
writeConfig.isNonBlockingConcurrencyControl();
+    String hashingInstantToLoad = writeConfig.getHashingConfigInstantToLoad();
+    this.isPartitionBucketIndexEnable = 
StringUtils.isNullOrEmpty(hashingInstantToLoad);
+    if (isPartitionBucketIndexEnable) {
+      calc = PartitionBucketIndexCalculator.getInstance(hashingInstantToLoad, 
hoodieTable.getMetaClient());
+    }
   }
 
   public void write(InternalRow row) throws IOException {
     try {
       UTF8String partitionPath = extractPartitionPath(row);
       UTF8String recordKey = extractRecordKey(row);
-      int bucketId = BucketIdentifier.getBucketId(String.valueOf(recordKey), 
indexKeyFields, bucketNum);
+      int bucketId = BucketIdentifier.getBucketId(String.valueOf(recordKey), 
indexKeyFields, isPartitionBucketIndexEnable ? 
calc.computeNumBuckets(partitionPath.toString()) : bucketNum);
       if (lastFileId == null || !Objects.equals(lastFileId.getKey(), 
partitionPath) || !Objects.equals(lastFileId.getValue(), bucketId)) {
         // NOTE: It's crucial to make a copy here, since [[UTF8String]] could 
be pointing into
         //       a mutable underlying buffer
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBucketIndexPartitioner.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBucketIndexPartitioner.java
index b7cdef72ab1..c760d2aabdd 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBucketIndexPartitioner.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBucketIndexPartitioner.java
@@ -23,12 +23,14 @@ import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.index.bucket.BucketIdentifier;
 import org.apache.hudi.index.bucket.HoodieBucketIndex;
+import org.apache.hudi.index.bucket.PartitionBucketIndexCalculator;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.WorkloadProfile;
 import org.apache.hudi.table.WorkloadStat;
@@ -49,11 +51,17 @@ import static 
org.apache.hudi.common.model.WriteOperationType.INSERT_OVERWRITE_T
 
 /**
  * Packs incoming records to be inserted into buckets (1 bucket = 1 RDD 
partition).
+ *
+ * For Partition Level bucket index
+ * Creates direct lookup arrays during initialization, Provides O(1) lookup 
time for partition path and bucket ID,
+ * The trade-off is a small amount of additional memory usage for the lookup 
arrays.
  */
-public class SparkBucketIndexPartitioner<T> extends
-    SparkHoodiePartitioner<T> {
+public class SparkBucketIndexPartitioner<T> extends SparkHoodiePartitioner<T> {
 
-  private final int numBuckets;
+  private final int totalPartitions;
+  private boolean isPartitionBucketIndexEnable = false;
+  private PartitionBucketIndexCalculator calc;
+  private int numBuckets;
   private final String indexKeyField;
   private final int totalPartitionPaths;
   private final List<String> partitionPaths;
@@ -70,6 +78,15 @@ public class SparkBucketIndexPartitioner<T> extends
   private Map<String, Set<String>> updatePartitionPathFileIds;
 
   private final boolean isNonBlockingConcurrencyControl;
+  /**
+   * Direct mapping from partition number to partition path
+   */
+  private String[] partitionNumberToPath;
+
+  /**
+   * Direct mapping from partition number to local bucket ID
+   */
+  private Integer[] partitionNumberToLocalBucketId;
 
   public SparkBucketIndexPartitioner(WorkloadProfile profile,
                                      HoodieEngineContext context,
@@ -81,6 +98,11 @@ public class SparkBucketIndexPartitioner<T> extends
           " Bucket index partitioner should only be used by BucketIndex other 
than "
               + table.getIndex().getClass().getSimpleName());
     }
+    String hashingInstantToLoad = 
table.getConfig().getHashingConfigInstantToLoad();
+    this.isPartitionBucketIndexEnable = 
StringUtils.isNullOrEmpty(hashingInstantToLoad);
+    if (isPartitionBucketIndexEnable) {
+      calc = PartitionBucketIndexCalculator.getInstance(hashingInstantToLoad, 
table.getMetaClient());
+    }
     this.numBuckets = ((HoodieBucketIndex) table.getIndex()).getNumBuckets();
     this.indexKeyField = config.getBucketIndexHashField();
     this.totalPartitionPaths = profile.getPartitionPaths().size();
@@ -89,12 +111,33 @@ public class SparkBucketIndexPartitioner<T> extends
     int i = 0;
     for (Object partitionPath : profile.getPartitionPaths()) {
       partitionPathOffset.put(partitionPath.toString(), i);
-      i += numBuckets;
+      if (isPartitionBucketIndexEnable) {
+        i += calc.computeNumBuckets(partitionPath.toString());
+      } else {
+        i += numBuckets;
+      }
     }
+    this.totalPartitions = i;
     assignUpdates(profile);
     WriteOperationType operationType = profile.getOperationType();
     this.isOverwrite = INSERT_OVERWRITE.equals(operationType) || 
INSERT_OVERWRITE_TABLE.equals(operationType);
     this.isNonBlockingConcurrencyControl = 
config.isNonBlockingConcurrencyControl();
+
+    if (isPartitionBucketIndexEnable) {
+      this.partitionNumberToPath = new String[totalPartitions];
+      this.partitionNumberToLocalBucketId = new Integer[totalPartitions];
+
+      for (String partitionPath : partitionPaths) {
+        int offset = partitionPathOffset.get(partitionPath);
+        int numBuckets = calc.computeNumBuckets(partitionPath);
+
+        for (int j = 0; j < numBuckets; j++) {
+          int partitionNumber = offset + j;
+          partitionNumberToPath[partitionNumber] = partitionPath;
+          partitionNumberToLocalBucketId[partitionNumber] = j;
+        }
+      }
+    }
   }
 
   private void assignUpdates(WorkloadProfile profile) {
@@ -115,33 +158,49 @@ public class SparkBucketIndexPartitioner<T> extends
 
   @Override
   public BucketInfo getBucketInfo(int bucketNumber) {
-    String partitionPath = partitionPaths.get(bucketNumber / numBuckets);
+    Pair<Integer, String> res = computeBucketAndPartitionPath(bucketNumber);
+    int bucket = res.getLeft();
+    String partitionPath = res.getRight();
     // Insert overwrite always generates new bucket file id
     if (isOverwrite) {
       ValidationUtils.checkArgument(!isNonBlockingConcurrencyControl,
           "Insert overwrite is not supported with non-blocking concurrency 
control");
-      return new BucketInfo(BucketType.INSERT, 
BucketIdentifier.newBucketFileIdPrefix(bucketNumber % numBuckets), 
partitionPath);
+      return new BucketInfo(BucketType.INSERT, 
BucketIdentifier.newBucketFileIdPrefix(bucket), partitionPath);
     }
     Option<String> fileIdOption = 
Option.fromJavaOptional(updatePartitionPathFileIds
         .getOrDefault(partitionPath, Collections.emptySet()).stream()
-        .filter(e -> e.startsWith(BucketIdentifier.bucketIdStr(bucketNumber % 
numBuckets)))
+        .filter(e -> e.startsWith(BucketIdentifier.bucketIdStr(bucket)))
         .findFirst());
     if (fileIdOption.isPresent()) {
       return new BucketInfo(BucketType.UPDATE, fileIdOption.get(), 
partitionPath);
     } else {
       // Always write into log file instead of base file if using NB-CC
       if (isNonBlockingConcurrencyControl) {
-        String fileId = BucketIdentifier.newBucketFileIdForNBCC(bucketNumber % 
numBuckets);
+        String fileId = BucketIdentifier.newBucketFileIdForNBCC(bucket);
         return new BucketInfo(BucketType.UPDATE, fileId, partitionPath);
       }
-      String fileIdPrefix = 
BucketIdentifier.newBucketFileIdPrefix(bucketNumber % numBuckets);
+      String fileIdPrefix = BucketIdentifier.newBucketFileIdPrefix(bucket);
       return new BucketInfo(BucketType.INSERT, fileIdPrefix, partitionPath);
     }
   }
 
+  private Pair<Integer, String> computeBucketAndPartitionPath(int 
bucketNumber) {
+    Integer bucket;
+    String partitionPath;
+    if (isPartitionBucketIndexEnable) {
+      bucket = partitionNumberToLocalBucketId[bucketNumber];
+      partitionPath = partitionNumberToPath[bucketNumber];
+      ValidationUtils.checkArgument(bucket != null && partitionPath != null);
+    } else {
+      bucket = bucketNumber % numBuckets;
+      partitionPath = partitionPaths.get(bucketNumber / numBuckets);
+    }
+    return Pair.of(bucket, partitionPath);
+  }
+
   @Override
   public int numPartitions() {
-    return totalPartitionPaths * numBuckets;
+    return totalPartitions;
   }
 
   @Override
@@ -151,7 +210,8 @@ public class SparkBucketIndexPartitioner<T> extends
     Option<HoodieRecordLocation> location = keyLocation._2;
     int bucketId = location.isPresent()
         ? BucketIdentifier.bucketIdFromFileId(location.get().getFileId())
-        : BucketIdentifier.getBucketId(keyLocation._1.getRecordKey(), 
indexKeyField, numBuckets);
+        : BucketIdentifier.getBucketId(keyLocation._1.getRecordKey(), 
indexKeyField,
+        isPartitionBucketIndexEnable ? calc.computeNumBuckets(partitionPath) : 
numBuckets);
     return partitionPathOffset.get(partitionPath) + bucketId;
   }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/BucketPartitionUtils.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/BucketPartitionUtils.scala
index 740e6799348..464438671c1 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/BucketPartitionUtils.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/BucketPartitionUtils.scala
@@ -21,16 +21,27 @@ package org.apache.spark.sql
 import org.apache.hudi.common.model.HoodieRecord
 import org.apache.hudi.common.util.Functions
 import org.apache.hudi.common.util.hash.BucketIndexUtil
-import org.apache.hudi.index.bucket.BucketIdentifier
+import org.apache.hudi.index.bucket.{BucketIdentifier, 
PartitionBucketIndexCalculator}
 import org.apache.spark.Partitioner
 import org.apache.spark.sql.catalyst.InternalRow
 
 object BucketPartitionUtils {
-  def createDataFrame(df: DataFrame, indexKeyFields: String, bucketNum: Int, 
partitionNum: Int): DataFrame = {
+  def createDataFrame(df: DataFrame, indexKeyFields: String, bucketNum: Int, 
partitionNum: Int,
+                      calc: PartitionBucketIndexCalculator, 
isPartitionBucketIndexEnable: Boolean): DataFrame = {
+
+    def computeBucketNumber(): String => Integer = partition => {
+      val bucketNumber = if (isPartitionBucketIndexEnable) {
+        calc.computeNumBuckets(partition)
+      } else {
+        bucketNum
+      }
+      bucketNumber
+    }
+
     def getPartitionKeyExtractor(): InternalRow => (String, Int) = row => {
-      val kb = BucketIdentifier
-        .getBucketId(row.getString(HoodieRecord.RECORD_KEY_META_FIELD_ORD), 
indexKeyFields, bucketNum)
       val partition = row.getString(HoodieRecord.PARTITION_PATH_META_FIELD_ORD)
+      val kb = BucketIdentifier
+        .getBucketId(row.getString(HoodieRecord.RECORD_KEY_META_FIELD_ORD), 
indexKeyFields, computeBucketNumber().apply(partition))
       if (partition == null || partition.trim.isEmpty) {
         ("", kb)
       } else {
@@ -48,7 +59,7 @@ object BucketPartitionUtils {
 
       override def getPartition(value: Any): Int = {
         val partitionKeyPair = value.asInstanceOf[(String, Int)]
-        partitionIndexFunc.apply(bucketNum, partitionKeyPair._1, 
partitionKeyPair._2)
+        
partitionIndexFunc.apply(computeBucketNumber().apply(partitionKeyPair._1), 
partitionKeyPair._1, partitionKeyPair._2)
       }
     }
     // use internalRow to avoid extra convert.
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsInference.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsInference.java
index 66d2e08417e..e2e2e6c2d55 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsInference.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsInference.java
@@ -99,7 +99,7 @@ public class OptionsInference {
   public static void setupIndexConfigs(Configuration conf) {
     HoodieIndex.BucketIndexEngineType engineType = 
OptionsResolver.getBucketEngineType(conf);
     if (engineType.equals(HoodieIndex.BucketIndexEngineType.SIMPLE) &&
-        
PartitionBucketIndexUtils.isPartitionSimpleBucketIndex(HadoopConfigurations.getHadoopConf(conf)))
 {
+        
PartitionBucketIndexUtils.isPartitionSimpleBucketIndex(HadoopConfigurations.getHadoopConf(conf),
 conf.get(FlinkOptions.PATH))) {
       try (HoodieFlinkWriteClient writeClient = 
FlinkWriteClients.createWriteClientV2(conf)) {
         HoodieTableMetaClient metaClient = 
writeClient.getHoodieTable().getMetaClient();
         String hashingConfigToLoad = 
PartitionBucketIndexUtils.getHashingConfigInstantToLoad(metaClient);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
index 8369a0b768e..c3fc9a2e1af 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
@@ -27,6 +27,7 @@ import 
org.apache.hudi.io.storage.row.HoodieRowDataCreateHandle;
 import org.apache.hudi.sink.bulk.BulkInsertWriterHelper;
 import org.apache.hudi.sink.bulk.RowDataKeyGen;
 import org.apache.hudi.sink.bulk.sort.SortOperatorGen;
+import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.util.Lazy;
 
@@ -97,13 +98,13 @@ public class BucketBulkInsertWriterHelper extends 
BulkInsertWriterHelper {
   }
 
   private static String getFileId(Map<String, String> bucketIdToFileId, 
RowDataKeyGen keyGen, RowData record, String indexKeys, Configuration conf, 
boolean needFixedFileIdSuffix,
-                                  Lazy<org.apache.hadoop.conf.Configuration> 
hadoopConf) {
+                                  
StorageConfiguration<org.apache.hadoop.conf.Configuration> storageConf) {
     String recordKey = keyGen.getRecordKey(record);
     String partition = keyGen.getPartitionPath(record);
     final int numBuckets;
     if 
(!StringUtils.isNullOrEmpty(conf.get(FlinkOptions.BUCKET_INDEX_PARTITION_LOAD_INSTANT)))
 {
-      numBuckets = 
PartitionBucketIndexCalculator.getInstance(conf.get(FlinkOptions.BUCKET_INDEX_PARTITION_LOAD_INSTANT),
 hadoopConf.get(), conf.get(FlinkOptions.PATH))
-          .computeNumBuckets(partition);
+      numBuckets = 
PartitionBucketIndexCalculator.getInstance(conf.get(FlinkOptions.BUCKET_INDEX_PARTITION_LOAD_INSTANT),
+              
storageConf.unwrapAs(org.apache.hadoop.conf.Configuration.class), 
conf.get(FlinkOptions.PATH)).computeNumBuckets(partition);
     } else {
       numBuckets = conf.get(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
     }
@@ -113,8 +114,8 @@ public class BucketBulkInsertWriterHelper extends 
BulkInsertWriterHelper {
   }
 
   public static RowData rowWithFileId(Map<String, String> bucketIdToFileId, 
RowDataKeyGen keyGen, RowData record, String indexKeys, Configuration conf, 
boolean needFixedFileIdSuffix,
-                                      
Lazy<org.apache.hadoop.conf.Configuration> hadoopConf) {
-    final String fileId = getFileId(bucketIdToFileId, keyGen, record, 
indexKeys, conf, needFixedFileIdSuffix, hadoopConf);
+                                      
StorageConfiguration<org.apache.hadoop.conf.Configuration> storageConf) {
+    final String fileId = getFileId(bucketIdToFileId, keyGen, record, 
indexKeys, conf, needFixedFileIdSuffix, storageConf);
     return GenericRowData.of(StringData.fromString(fileId), record);
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
index 2a3e1e154dd..9cc23e1d0e0 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
@@ -139,9 +139,7 @@ public class BucketStreamWriteFunction extends 
StreamWriteFunction {
       bootstrapIndexIfNeed(partition);
     }
     Map<Integer, String> bucketToFileId = 
bucketIndex.computeIfAbsent(partition, p -> new HashMap<>());
-
     int numBuckets = getOrComputeNumBuckets(partition);
-
     final int bucketNum = BucketIdentifier.getBucketId(record.getRecordKey(), 
indexKeyFields, numBuckets);
     final String bucketId = partition + "/" + bucketNum;
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java
index ecf4656d7ee..ca7cf4023c5 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java
@@ -25,7 +25,7 @@ import org.apache.hudi.common.util.hash.BucketIndexUtil;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.index.bucket.BucketIdentifier;
 import org.apache.hudi.index.bucket.PartitionBucketIndexCalculator;
-import org.apache.hudi.util.Lazy;
+import org.apache.hudi.storage.StorageConfiguration;
 
 import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.configuration.Configuration;
@@ -41,17 +41,16 @@ public class BucketIndexPartitioner<T extends HoodieKey> 
implements Partitioner<
   private final Configuration conf;
   private final String indexKeyFields;
   private final String hashingInstantToLoad;
-  private org.apache.hadoop.conf.Configuration hadoopConf;
+  private StorageConfiguration<org.apache.hadoop.conf.Configuration> 
storageConf;
 
   private Functions.Function3<Integer, String, Integer, Integer> 
partitionIndexFunc;
 
-  public BucketIndexPartitioner(Configuration conf, String indexKeyFields, 
Lazy<org.apache.hadoop.conf.Configuration> hadoopConf) {
+  public BucketIndexPartitioner(Configuration conf, String indexKeyFields,
+                                
StorageConfiguration<org.apache.hadoop.conf.Configuration> storageConf) {
     this.conf = conf;
     this.indexKeyFields = indexKeyFields;
     this.hashingInstantToLoad = 
conf.get(FlinkOptions.BUCKET_INDEX_PARTITION_LOAD_INSTANT);
-    if (!StringUtils.isNullOrEmpty(hashingInstantToLoad)) {
-      this.hadoopConf = hadoopConf.get();
-    }
+    this.storageConf = storageConf;
   }
 
   @Override
@@ -62,6 +61,7 @@ public class BucketIndexPartitioner<T extends HoodieKey> 
implements Partitioner<
 
     int bucketNum;
     if (!StringUtils.isNullOrEmpty(hashingInstantToLoad)) {
+      org.apache.hadoop.conf.Configuration hadoopConf = 
storageConf.unwrapAs(org.apache.hadoop.conf.Configuration.class);
       PartitionBucketIndexCalculator calc = 
PartitionBucketIndexCalculator.getInstance(hashingInstantToLoad, hadoopConf, 
conf.get(FlinkOptions.PATH));
       bucketNum = calc.computeNumBuckets(key.getPartitionPath());
     } else {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index d604567246d..d278d807064 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -53,8 +53,9 @@ import org.apache.hudi.sink.compact.CompactionPlanOperator;
 import org.apache.hudi.sink.partitioner.BucketAssignFunction;
 import org.apache.hudi.sink.partitioner.BucketIndexPartitioner;
 import org.apache.hudi.sink.transform.RowDataToHoodieFunctions;
+import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
 import org.apache.hudi.table.format.FilePathUtils;
-import org.apache.hudi.util.Lazy;
 
 import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -122,8 +123,9 @@ public class Pipelines {
             "Consistent hashing bucket index does not work with bulk insert 
using FLINK engine. Use simple bucket index or Spark engine.");
       }
       String indexKeys = OptionsResolver.getIndexKeyField(conf);
-      Lazy<org.apache.hadoop.conf.Configuration> hadoopConf = Lazy.lazily(() 
-> HadoopConfigurations.getHadoopConf(conf));
-      BucketIndexPartitioner<HoodieKey> partitioner = new 
BucketIndexPartitioner<>(conf, indexKeys, hadoopConf);
+      StorageConfiguration<org.apache.hadoop.conf.Configuration> storageConf = 
StringUtils.isNullOrEmpty(conf.get(FlinkOptions.BUCKET_INDEX_PARTITION_LOAD_INSTANT))
 ?
+          null : new 
HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf(conf));
+      BucketIndexPartitioner<HoodieKey> partitioner = new 
BucketIndexPartitioner<>(conf, indexKeys, storageConf);
       RowDataKeyGen keyGen = RowDataKeyGen.instance(conf, rowType);
       RowType rowTypeWithFileId = 
BucketBulkInsertWriterHelper.rowTypeWithFileId(rowType);
       InternalTypeInfo<RowData> typeInfo = 
InternalTypeInfo.of(rowTypeWithFileId);
@@ -131,7 +133,7 @@ public class Pipelines {
 
       Map<String, String> bucketIdToFileId = new HashMap<>();
       dataStream = dataStream.partitionCustom(partitioner, 
keyGen::getHoodieKey)
-          .map(record -> 
BucketBulkInsertWriterHelper.rowWithFileId(bucketIdToFileId, keyGen, record, 
indexKeys, conf, needFixedFileIdSuffix, hadoopConf), typeInfo)
+          .map(record -> 
BucketBulkInsertWriterHelper.rowWithFileId(bucketIdToFileId, keyGen, record, 
indexKeys, conf, needFixedFileIdSuffix, storageConf), typeInfo)
           .setParallelism(PARALLELISM_VALUE);
       if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT)) {
         SortOperatorGen sortOperatorGen = 
BucketBulkInsertWriterHelper.getFileIdSorterGen(rowTypeWithFileId);
@@ -341,7 +343,9 @@ public class Pipelines {
           String indexKeyFields = OptionsResolver.getIndexKeyField(conf);
           // [HUDI-9036] BucketIndexPartitioner is also used in bulk insert 
mode,
           // keep use of HoodieKey here in partitionCustom for now
-          BucketIndexPartitioner<HoodieKey> partitioner = new 
BucketIndexPartitioner<>(conf, indexKeyFields, Lazy.lazily(() -> 
HadoopConfigurations.getHadoopConf(conf)));
+          StorageConfiguration<org.apache.hadoop.conf.Configuration> 
storageConf = 
StringUtils.isNullOrEmpty(conf.get(FlinkOptions.BUCKET_INDEX_PARTITION_LOAD_INSTANT))
 ?
+              null : new 
HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf(conf));
+          BucketIndexPartitioner<HoodieKey> partitioner = new 
BucketIndexPartitioner<>(conf, indexKeyFields, storageConf);
           return dataStream
               .partitionCustom(
                   partitioner,
@@ -505,6 +509,8 @@ public class Pipelines {
         : databaseName + "." + conf.getString(FlinkOptions.TABLE_NAME);
   }
 
+
+
   /**
    * Dummy sink that does nothing.
    */
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BulkInsertFunctionWrapper.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BulkInsertFunctionWrapper.java
index a4da9049dce..eb6a8e3c25d 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BulkInsertFunctionWrapper.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BulkInsertFunctionWrapper.java
@@ -32,6 +32,7 @@ import org.apache.hudi.sink.bulk.sort.SortOperator;
 import org.apache.hudi.sink.bulk.sort.SortOperatorGen;
 import org.apache.hudi.sink.common.AbstractWriteFunction;
 import org.apache.hudi.sink.event.WriteMetadataEvent;
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
 import org.apache.hudi.util.AvroSchemaConverter;
 import org.apache.hudi.util.Lazy;
 import org.apache.hudi.util.StreamerUtil;
@@ -221,7 +222,7 @@ public class BulkInsertFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
     boolean needFixedFileIdSuffix = 
OptionsResolver.isNonBlockingConcurrencyControl(conf);
     this.bucketIdToFileId = new HashMap<>();
     this.mapFunction = r -> 
BucketBulkInsertWriterHelper.rowWithFileId(bucketIdToFileId, keyGen, r, 
indexKeys, conf, needFixedFileIdSuffix,
-        Lazy.lazily(() -> HadoopConfigurations.getHadoopConf(conf)));
+        new 
HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf(conf)));
   }
 
   private void setupSortOperator() throws Exception {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index a2a31b492e9..17ff21c3938 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -23,13 +23,16 @@ import 
org.apache.hudi.common.model.DefaultHoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathInfo;
 import org.apache.hudi.table.catalog.HoodieCatalogTestUtils;
 import org.apache.hudi.table.catalog.HoodieHiveCatalog;
 import org.apache.hudi.util.StreamerUtil;
@@ -66,10 +69,12 @@ import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.File;
+import java.io.IOException;
 import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
 import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
@@ -89,6 +94,7 @@ import static org.apache.hudi.utils.TestData.assertRowsEquals;
 import static org.apache.hudi.utils.TestData.assertRowsEqualsUnordered;
 import static org.apache.hudi.utils.TestData.map;
 import static org.apache.hudi.utils.TestData.row;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -1258,6 +1264,78 @@ public class ITTestHoodieDataSource {
         + "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]");
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = {"bulk_insert", "upsert"})
+  void testBulkInsertWithPartitionBucketIndex(String operationType) throws 
IOException {
+    TableEnvironment tableEnv = batchTableEnv;
+    // csv source
+    String csvSourceDDL = TestConfigurations.getCsvSourceDDL("csv_source", 
"test_source_5.data");
+    tableEnv.executeSql(csvSourceDDL);
+    String catalogName = "hudi_" + operationType;
+    String hudiCatalogDDL = catalog(catalogName)
+        .catalogPath(tempFile.getAbsolutePath())
+        .end();
+
+    tableEnv.executeSql(hudiCatalogDDL);
+    String dbName = "hudi";
+    tableEnv.executeSql("create database " + catalogName + "." + dbName);
+    String basePath = tempFile.getAbsolutePath() + "/hudi/hoodie_sink";
+
+    String hoodieTableDDL = sql(catalogName + ".hudi.hoodie_sink")
+        .option(FlinkOptions.PATH, basePath)
+        .option(FlinkOptions.OPERATION, operationType)
+        .option(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_INPUT, true)
+        .option(FlinkOptions.INDEX_TYPE, "BUCKET")
+        .option(FlinkOptions.HIVE_STYLE_PARTITIONING, "true")
+        .option(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, "1")
+        .option(FlinkOptions.BUCKET_INDEX_PARTITION_RULE, "regex")
+        .option(FlinkOptions.BUCKET_INDEX_PARTITION_EXPRESSIONS, 
"partition=(par1|par2),2")
+        .end();
+    tableEnv.executeSql(hoodieTableDDL);
+
+    String insertInto = "insert into " + catalogName + ".hudi.hoodie_sink 
select * from csv_source";
+    execInsertSql(tableEnv, insertInto);
+
+    List<Row> result1 = CollectionUtil.iterableToList(
+        () -> tableEnv.sqlQuery("select * from " + catalogName + 
".hudi.hoodie_sink").execute().collect());
+    assertRowsEquals(result1, TestData.DATA_SET_SOURCE_INSERT);
+    // apply filters
+    List<Row> result2 = CollectionUtil.iterableToList(
+        () -> tableEnv.sqlQuery("select * from " + catalogName + 
".hudi.hoodie_sink where uuid > 'id5'").execute().collect());
+    assertRowsEquals(result2, "["
+        + "+I[id6, Emma, 20, 1970-01-01T00:00:06, par3], "
+        + "+I[id7, Bob, 44, 1970-01-01T00:00:07, par4], "
+        + "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]");
+    HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(basePath, 
new org.apache.hadoop.conf.Configuration());
+
+    List<StoragePathInfo> allFiles = 
metaClient.getStorage().listDirectEntries(new 
StoragePath(basePath)).stream().flatMap(path -> {
+      try {
+        return 
metaClient.getStorage().listDirectEntries(path.getPath()).stream();
+      } catch (IOException e) {
+        return Stream.empty();
+      }
+    }).collect(Collectors.toList());
+
+    HoodieTableFileSystemView fsView = new 
HoodieTableFileSystemView(metaClient,
+        
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(), 
allFiles);
+    List<String> actual = fsView.getAllFileGroups().map(group -> {
+      return group.getPartitionPath() + 
group.getFileGroupId().getFileId().split("-")[0];
+    }).collect(Collectors.toList());
+
+    // based on expression partition=(par1|par2),2 and default bucket number 1
+    // par1 and par2 have two buckets.
+    // par3 and par4 have one bucket.
+    ArrayList<String> expected = new ArrayList<>();
+    expected.add("partition=par1" + "00000000");
+    expected.add("partition=par1" + "00000001");
+    expected.add("partition=par2" + "00000000");
+    expected.add("partition=par2" + "00000001");
+    expected.add("partition=par3" + "00000000");
+    expected.add("partition=par4" + "00000000");
+
+    assertEquals(expected.stream().sorted().collect(Collectors.toList()), 
actual.stream().sorted().collect(Collectors.toList()));
+  }
+
   @Test
   void testBulkInsertWithSortByRecordKey() {
     TableEnvironment tableEnv = batchTableEnv;
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
index 92d44a8e7ef..ad07f95c3b6 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
@@ -400,6 +400,9 @@ public class TestConfigurations {
       return TestConfigurations.getCreateHoodieTableDDL(this.tableName, 
this.fields, options,
           this.withPartition, this.pkField, this.partitionField);
     }
+    public Map<String, String> getOptions() {
+      return this.options;
+    }
   }
 
   /**
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java
 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java
index 9d643211c92..3bb0c2d585a 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java
@@ -120,7 +120,7 @@ public abstract class 
BaseDatasetBulkInsertCommitActionExecutor implements Seria
       if (writeConfig.getIndexType() == HoodieIndex.IndexType.BUCKET) {
         if (writeConfig.getBucketIndexEngineType() == 
HoodieIndex.BucketIndexEngineType.SIMPLE) {
           return new 
BucketIndexBulkInsertPartitionerWithRows(writeConfig.getBucketIndexHashFieldWithDefault(),
-              writeConfig.getBucketIndexNumBuckets());
+              writeConfig.getBucketIndexNumBuckets(), table);
         } else {
           return new ConsistentBucketIndexBulkInsertPartitionerWithRows(table, 
Collections.emptyMap(), true);
         }

Reply via email to