This is an automated email from the ASF dual-hosted git repository.

zhangyue19921010 pushed a commit to branch HUDI-8990
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit aa9ed6ce12c49d9c79034453a9bba3bfaf16a9d2
Author: YueZhang <[email protected]>
AuthorDate: Fri Mar 21 16:03:50 2025 +0800

    finish Spark related conding and test
---
 .../hudi/index/bucket/HoodieSimpleBucketIndex.java |  10 +-
 .../index/bucket/PartitionBucketIndexUtils.java    |  29 ++-
 .../RDDSimpleBucketBulkInsertPartitioner.java      |  10 +-
 .../hudi/HoodieDatasetBulkInsertHelper.scala       |  15 +-
 .../hudi/common/model/WriteOperationType.java      |   2 +
 .../apache/hudi/table/ITTestHoodieDataSource.java  |  16 +-
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |  19 +-
 .../spark/sql/hudi/ProvidesHoodieConfig.scala      |   6 +-
 .../hudi/command/procedures/HoodieProcedures.scala |   1 +
 .../procedures/PartitionBucketIndexManager.scala   | 153 ++++++++------
 .../spark/sql/hudi/dml/TestInsertTable.scala       |  88 --------
 .../TestInsertTableWithPartitionBucketIndex.scala  | 223 +++++++++++++++++++++
 .../TestPartitionBucketIndexManager.scala          |   2 +-
 13 files changed, 400 insertions(+), 174 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java
index e19e80b82a4..e3bae439956 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.index.HoodieIndexUtils;
@@ -156,14 +157,21 @@ public class HoodieSimpleBucketIndex extends 
HoodieBucketIndex {
 
   private class SimpleBucketIndexLocationFunction implements 
Function<HoodieRecord, Option<HoodieRecordLocation>> {
     private final Map<Integer, HoodieRecordLocation> bucketIdToFileIdMapping;
+    private final boolean isPartitionBucketIndexEnable;
+    private PartitionBucketIndexCalculator calc;
 
     public SimpleBucketIndexLocationFunction(HoodieTable table, String 
partitionPath) {
       this.bucketIdToFileIdMapping = 
loadBucketIdToFileIdMappingForPartition(table, partitionPath);
+      String hashingInstantToLoad = 
table.getConfig().getHashingConfigInstantToLoad();
+      this.isPartitionBucketIndexEnable = 
StringUtils.nonEmpty(hashingInstantToLoad);
+      if (isPartitionBucketIndexEnable) {
+        calc = 
PartitionBucketIndexCalculator.getInstance(hashingInstantToLoad, 
table.getMetaClient());
+      }
     }
 
     @Override
     public Option<HoodieRecordLocation> apply(HoodieRecord record) {
-      int bucketId = getBucketID(record.getKey());
+      int bucketId = isPartitionBucketIndexEnable ? 
getBucketID(record.getKey(), calc.computeNumBuckets(record.getPartitionPath())) 
: getBucketID(record.getKey());
       return Option.ofNullable(bucketIdToFileIdMapping.get(bucketId));
     }
   }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/PartitionBucketIndexUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/PartitionBucketIndexUtils.java
index ad1e1d0b3bf..1fbd16c28da 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/PartitionBucketIndexUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/PartitionBucketIndexUtils.java
@@ -22,6 +22,7 @@ import 
org.apache.hudi.common.model.PartitionBucketIndexHashingConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.util.FileIOUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
@@ -30,6 +31,7 @@ import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.storage.HoodieInstantWriter;
 import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.StoragePathInfo;
 import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
@@ -44,6 +46,7 @@ import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 public class PartitionBucketIndexUtils {
   public static final String INITIAL_HASHING_CONFIG_INSTANT = 
HoodieTimeline.INIT_INSTANT_TS;
@@ -51,8 +54,12 @@ public class PartitionBucketIndexUtils {
   private static final Logger LOG = 
LoggerFactory.getLogger(PartitionBucketIndexUtils.class);
 
   public static boolean isPartitionSimpleBucketIndex(Configuration conf, 
String basePath) {
+    return isPartitionSimpleBucketIndex(HadoopFSUtils.getStorageConf(conf), 
basePath);
+  }
+
+  public static boolean isPartitionSimpleBucketIndex(StorageConfiguration 
conf, String basePath) {
     StoragePath storagePath = getHashingConfigStorageFolder(basePath);
-    try (HoodieHadoopStorage storage = new HoodieHadoopStorage(storagePath, 
HadoopFSUtils.getStorageConf(conf))) {
+    try (HoodieHadoopStorage storage = new HoodieHadoopStorage(storagePath, 
conf)) {
       return storage.exists(storagePath);
     } catch (IOException e) {
       throw new HoodieIOException("Failed to list 
PARTITION_BUCKET_INDEX_HASHING_FOLDER folder ", e);
@@ -164,4 +171,24 @@ public class PartitionBucketIndexUtils {
     }
     return calc.getPartitionToBucket();
   }
+
+  /**
+   * Used for test
+   * @return all File id in current table using `partitionPath + FileId` format
+   */
+  public static List<String> getAllFileIDWithPartition(HoodieTableMetaClient 
metaClient) throws IOException {
+    List<StoragePathInfo> allFiles = 
metaClient.getStorage().listDirectEntries(metaClient.getBasePath()).stream().flatMap(path
 -> {
+      try {
+        return 
metaClient.getStorage().listDirectEntries(path.getPath()).stream();
+      } catch (IOException e) {
+        return Stream.empty();
+      }
+    }).collect(Collectors.toList());
+
+    HoodieTableFileSystemView fsView = new 
HoodieTableFileSystemView(metaClient,
+        
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(), 
allFiles);
+    return fsView.getAllFileGroups().map(group -> {
+      return group.getPartitionPath() + 
group.getFileGroupId().getFileId().split("-")[0];
+    }).collect(Collectors.toList());
+  }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSimpleBucketBulkInsertPartitioner.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSimpleBucketBulkInsertPartitioner.java
index 393f0054398..638abe95ce9 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSimpleBucketBulkInsertPartitioner.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSimpleBucketBulkInsertPartitioner.java
@@ -66,7 +66,7 @@ public class RDDSimpleBucketBulkInsertPartitioner<T extends 
HoodieRecordPayload>
     return doPartition(records, new Partitioner() {
       @Override
       public int numPartitions() {
-        return index.getNumBuckets() * partitionMapper.size();
+        return computeNumPartitions();
       }
 
       @Override
@@ -77,6 +77,14 @@ public class RDDSimpleBucketBulkInsertPartitioner<T extends 
HoodieRecordPayload>
         String fileID = partitionMapper.get(partitionPath).get(bucketID);
         return fileIdPrefixToBucketIndex.get(fileID);
       }
+
+      private int computeNumPartitions () {
+        if (isPartitionBucketIndexEnable) {
+          return partitionMapper.values().stream().mapToInt(Map::size).sum();
+        } else {
+          return index.getNumBuckets() * partitionMapper.size();
+        }
+      }
     });
   }
 
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
index f4bb5b81e85..ab5cd537cfc 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
@@ -33,7 +33,7 @@ import org.apache.hudi.index.HoodieIndex.BucketIndexEngineType
 import org.apache.hudi.index.{HoodieIndex, SparkHoodieIndexFactory}
 import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
 import org.apache.hudi.keygen.{AutoRecordGenWrapperKeyGenerator, 
BuiltinKeyGenerator, KeyGenUtils}
-import 
org.apache.hudi.table.action.commit.{BulkInsertDataInternalWriterHelper, 
ConsistentBucketBulkInsertDataInternalWriterHelper, ParallelismHelper}
+import 
org.apache.hudi.table.action.commit.{BucketBulkInsertDataInternalWriterHelper, 
BulkInsertDataInternalWriterHelper, 
ConsistentBucketBulkInsertDataInternalWriterHelper, ParallelismHelper}
 import org.apache.hudi.table.{BulkInsertPartitioner, HoodieTable}
 import org.apache.hudi.util.JFunction.toJavaSerializableFunctionUnchecked
 import org.apache.spark.TaskContext
@@ -179,6 +179,19 @@ object HoodieDatasetBulkInsertHelper
               writeConfig.populateMetaFields,
               arePartitionRecordsSorted,
               shouldPreserveHoodieMetadata)
+          case HoodieIndex.IndexType.BUCKET if 
writeConfig.getBucketIndexEngineType
+            == BucketIndexEngineType.SIMPLE =>
+            new BucketBulkInsertDataInternalWriterHelper(
+              table,
+              writeConfig,
+              instantTime,
+              taskPartitionId,
+              taskId,
+              taskEpochId,
+              schema,
+              writeConfig.populateMetaFields,
+              arePartitionRecordsSorted,
+              shouldPreserveHoodieMetadata)
           case _ =>
             new BulkInsertDataInternalWriterHelper(
               table,
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
index bc82a04023c..033edc822c0 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
@@ -101,6 +101,8 @@ public enum WriteOperationType {
         return INDEX;
       case "alter_schema":
         return ALTER_SCHEMA;
+      case "bucket_rescale":
+        return BUCKET_RESCALE;
       case "unknown":
         return UNKNOWN;
       default:
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 17ff21c3938..39fabe0cbde 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -30,6 +30,7 @@ import 
org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.index.bucket.PartitionBucketIndexUtils;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.StoragePathInfo;
@@ -1307,20 +1308,7 @@ public class ITTestHoodieDataSource {
         + "+I[id7, Bob, 44, 1970-01-01T00:00:07, par4], "
         + "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]");
     HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(basePath, 
new org.apache.hadoop.conf.Configuration());
-
-    List<StoragePathInfo> allFiles = 
metaClient.getStorage().listDirectEntries(new 
StoragePath(basePath)).stream().flatMap(path -> {
-      try {
-        return 
metaClient.getStorage().listDirectEntries(path.getPath()).stream();
-      } catch (IOException e) {
-        return Stream.empty();
-      }
-    }).collect(Collectors.toList());
-
-    HoodieTableFileSystemView fsView = new 
HoodieTableFileSystemView(metaClient,
-        
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(), 
allFiles);
-    List<String> actual = fsView.getAllFileGroups().map(group -> {
-      return group.getPartitionPath() + 
group.getFileGroupId().getFileId().split("-")[0];
-    }).collect(Collectors.toList());
+    List<String> actual = 
PartitionBucketIndexUtils.getAllFileIDWithPartition(metaClient);
 
     // based on expression partition=(par1|par2),2 and default bucket number 1
     // par1 and par2 have two buckets.
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 7c59e433c8a..b6a72a73e7e 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -42,7 +42,7 @@ import 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType
 import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator
 import org.apache.hudi.common.util.{CommitUtils, StringUtils, Option => 
HOption}
 import org.apache.hudi.common.util.ConfigUtils.getAllConfigKeys
-import org.apache.hudi.config.{HoodieCompactionConfig, HoodieInternalConfig, 
HoodieWriteConfig}
+import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig, 
HoodieInternalConfig, HoodieWriteConfig}
 import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH, 
INDEX_CLASS_NAME}
 import 
org.apache.hudi.config.HoodieWriteConfig.{SPARK_SQL_MERGE_INTO_PREPPED_KEY, 
WRITE_TABLE_VERSION}
 import org.apache.hudi.exception.{HoodieException, 
HoodieRecordCreationException, HoodieWriteConflictException}
@@ -68,6 +68,8 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hadoop.hive.shims.ShimLoader
+import org.apache.hudi.index.HoodieIndex
+import org.apache.hudi.index.bucket.PartitionBucketIndexUtils
 import org.apache.spark.{SPARK_VERSION, SparkContext}
 import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
 import org.apache.spark.sql._
@@ -247,7 +249,7 @@ class HoodieSparkSqlWriterInternal {
     asyncCompactionTriggerFnDefined = 
streamingWritesParamsOpt.map(_.asyncCompactionTriggerFn.isDefined).orElse(Some(false)).get
     asyncClusteringTriggerFnDefined = 
streamingWritesParamsOpt.map(_.asyncClusteringTriggerFn.isDefined).orElse(Some(false)).get
     // re-use table configs and inject defaults.
-    val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, 
tableConfig, mode, streamingWritesParamsOpt.isDefined)
+    var (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, 
tableConfig, mode, streamingWritesParamsOpt.isDefined)
     val databaseName = 
hoodieConfig.getStringOrDefault(HoodieTableConfig.DATABASE_NAME, "")
     val tblName = hoodieConfig.getStringOrThrow(HoodieWriteConfig.TBL_NAME,
       s"'${HoodieWriteConfig.TBL_NAME.key}' must be set.").trim
@@ -331,6 +333,19 @@ class HoodieSparkSqlWriterInternal {
           
.initTable(HadoopFSUtils.getStorageConfWithCopy(sparkContext.hadoopConfiguration),
 path)
       }
 
+      // take care of partition level bucket index which is simple bucket index
+      if (parameters.contains(HoodieIndexConfig.INDEX_TYPE.key)
+        && parameters(HoodieIndexConfig.INDEX_TYPE.key) == 
HoodieIndex.IndexType.BUCKET.name
+        && 
(!parameters.contains(HoodieIndexConfig.BUCKET_INDEX_ENGINE_TYPE.key)
+        || (parameters.contains(HoodieIndexConfig.BUCKET_INDEX_ENGINE_TYPE.key)
+        && parameters(HoodieIndexConfig.BUCKET_INDEX_ENGINE_TYPE.key) == 
HoodieIndex.IndexType.SIMPLE.name))
+        && 
!parameters.contains(HoodieIndexConfig.BUCKET_INDEX_PARTITION_LOAD_INSTANT.key)
+        && 
PartitionBucketIndexUtils.isPartitionSimpleBucketIndex(tableMetaClient.getStorageConf,
 basePath.toString)) {
+        val hashingConfigToLoad: String = 
PartitionBucketIndexUtils.getHashingConfigInstantToLoad(tableMetaClient)
+        
hoodieConfig.setValue(HoodieIndexConfig.BUCKET_INDEX_PARTITION_LOAD_INSTANT.key,
 hashingConfigToLoad)
+        parameters = parameters ++ 
Map(HoodieIndexConfig.BUCKET_INDEX_PARTITION_LOAD_INSTANT.key -> 
hashingConfigToLoad)
+      }
+
       var instantTime: String = null
       tableConfig = tableMetaClient.getTableConfig
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
index 6cf5053bdce..3908e51a5db 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
@@ -30,6 +30,8 @@ import org.apache.hudi.config.{HoodieIndexConfig, 
HoodieInternalConfig, HoodieWr
 import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
 import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncConfigHolder, 
MultiPartKeysValueExtractor}
 import org.apache.hudi.hive.ddl.HiveSyncMode
+import org.apache.hudi.index.HoodieIndex
+import org.apache.hudi.index.HoodieIndex.BucketIndexEngineType
 import org.apache.hudi.keygen.{ComplexKeyGenerator, CustomAvroKeyGenerator, 
CustomKeyGenerator}
 import org.apache.hudi.sql.InsertMode
 import org.apache.hudi.sync.common.HoodieSyncConfig
@@ -91,9 +93,9 @@ trait ProvidesHoodieConfig extends Logging {
     val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable, 
tableConfig)
 
     val defaultOpts = Map[String, String](
-      OPERATION.key -> BULK_INSERT_OPERATION_OPT_VAL,
-      HoodieInternalConfig.BULKINSERT_OVERWRITE_OPERATION_TYPE.key -> 
WriteOperationType.BUCKET_RESCALE.value(),
       KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
+      HoodieIndexConfig.INDEX_TYPE.key() -> 
HoodieIndex.IndexType.BUCKET.name(),
+      HoodieIndexConfig.BUCKET_INDEX_ENGINE_TYPE.key() -> 
BucketIndexEngineType.SIMPLE.name(),
       SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> 
tableConfig.getKeyGeneratorClassName,
       SqlKeyGenerator.PARTITION_SCHEMA -> 
hoodieCatalogTable.partitionSchema.toDDL,
       HoodieSyncConfig.META_SYNC_ENABLED.key -> 
hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key),
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
index 0a6afd4e00a..7bea2d114a4 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
@@ -96,6 +96,7 @@ object HoodieProcedures {
       ,(RunTTLProcedure.NAME, RunTTLProcedure.builder)
       ,(DropPartitionProcedure.NAME, DropPartitionProcedure.builder)
       ,(TruncateTableProcedure.NAME, TruncateTableProcedure.builder)
+      ,(PartitionBucketIndexManager.NAME, PartitionBucketIndexManager.builder)
     )
   }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala
index 4aaeec80a1a..d949bbcf9a0 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala
@@ -18,44 +18,38 @@
 package org.apache.spark.sql.hudi.command.procedures
 
 import org.apache.avro.Schema
-import org.apache.hadoop.conf.Configuration
+import org.apache.hudi.DataSourceWriteOptions.{BULK_INSERT_OPERATION_OPT_VAL, 
ENABLE_ROW_WRITER, OPERATION}
 import org.apache.hudi.{AvroConversionUtils, HoodieCLIUtils, 
HoodieSparkSqlWriter}
 import org.apache.hudi.avro.HoodieAvroUtils
 import org.apache.hudi.client.SparkRDDWriteClient
-import org.apache.hudi.client.common.HoodieSparkEngineContext
-import org.apache.hudi.commit.DatasetBucketRescaleCommitActionExecutor
 import org.apache.hudi.common.config.{HoodieMetadataConfig, 
HoodieReaderConfig, SerializableSchema}
 import org.apache.hudi.common.engine.HoodieEngineContext
 import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.common.model.{FileSlice, 
PartitionBucketIndexHashingConfig, WriteOperationType}
+import org.apache.hudi.common.model.{PartitionBucketIndexHashingConfig, 
WriteOperationType}
 import org.apache.hudi.common.table.read.HoodieFileGroupReader
 import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView
-import org.apache.hudi.common.util.{Option, StringUtils}
-import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig}
+import org.apache.hudi.common.util.{Option, ValidationUtils}
+import org.apache.hudi.config.{HoodieIndexConfig, HoodieInternalConfig}
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.index.bucket.{PartitionBucketIndexCalculator, 
PartitionBucketIndexUtils}
 import org.apache.hudi.internal.schema.InternalSchema
-import org.apache.hudi.internal.schema.utils.SerDeHelper
-import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration
-import org.apache.hudi.storage.{StoragePath, StoragePathInfo}
+import org.apache.hudi.storage.StoragePath
 import org.apache.hudi.table.SparkBroadcastManager
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{HoodieUnsafeUtils, Row, SaveMode}
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
 import org.apache.spark.sql.catalyst.expressions.PredicateHelper
 import org.apache.spark.sql.hudi.ProvidesHoodieConfig
 import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, 
StructType}
 
 import java.util
 import java.util.function.Supplier
-import java.util.stream.Collectors
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 
-class PartitionBucketIndexManager() extends BaseProcedure
+class PartitionBucketIndexManager extends BaseProcedure
   with ProcedureBuilder
   with PredicateHelper
   with ProvidesHoodieConfig
@@ -64,12 +58,14 @@ class PartitionBucketIndexManager() extends BaseProcedure
   private val PARAMETERS = Array[ProcedureParameter](
     ProcedureParameter.required(0, "table", DataTypes.StringType),
     ProcedureParameter.optional(1, "overwrite", DataTypes.StringType),
-    ProcedureParameter.optional(2, "bucket-number", DataTypes.IntegerType, -1),
+    ProcedureParameter.optional(2, "bucketNumber", DataTypes.IntegerType, -1),
     ProcedureParameter.optional(3, "add", DataTypes.StringType),
     ProcedureParameter.optional(4, "dry-run", DataTypes.BooleanType, true),
     ProcedureParameter.optional(5, "rollback", DataTypes.StringType),
     ProcedureParameter.optional(6, "show-config", DataTypes.BooleanType, 
false),
-    ProcedureParameter.optional(7, "rule", DataTypes.StringType, "regex")
+    ProcedureParameter.optional(7, "rule", DataTypes.StringType, "regex"),
+    // params => key=value, key2=value2
+    ProcedureParameter.optional(8, "options", DataTypes.StringType)
   )
 
   private val OUTPUT_TYPE = new StructType(Array[StructField](
@@ -93,6 +89,7 @@ class PartitionBucketIndexManager() extends BaseProcedure
     val rollback = getArgValueOrDefault(args, 
PARAMETERS(5)).orNull.asInstanceOf[String]
     val showConfig = getArgValueOrDefault(args, 
PARAMETERS(6)).get.asInstanceOf[Boolean]
     val rule = getArgValueOrDefault(args, 
PARAMETERS(7)).orNull.asInstanceOf[String]
+    val options = getArgValueOrDefault(args, PARAMETERS(8))
 
     try {
       // Get table metadata
@@ -102,6 +99,18 @@ class PartitionBucketIndexManager() extends BaseProcedure
       val writeClient = HoodieCLIUtils.createHoodieWriteClient(sparkSession, 
basePath, Map.empty,
         scala.Option.apply(table))
       val context = writeClient.getEngineContext
+      var config = buildBucketRescaleHoodieConfig(hoodieCatalogTable)
+
+      options match {
+        case Some(p) =>
+          config = config ++ 
HoodieCLIUtils.extractOptions(p.asInstanceOf[String])
+        case _ =>
+          logInfo("No options")
+      }
+
+      config = config ++ Map(OPERATION.key -> BULK_INSERT_OPERATION_OPT_VAL,
+        HoodieInternalConfig.BULKINSERT_OVERWRITE_OPERATION_TYPE.key -> 
WriteOperationType.BUCKET_RESCALE.value(),
+        ENABLE_ROW_WRITER.key() -> "true")
 
       // Determine which operation to perform
       if (showConfig) {
@@ -109,21 +118,21 @@ class PartitionBucketIndexManager() extends BaseProcedure
       } else if (rollback != null) {
         handleRollback(metaClient, rollback)
       } else if (overwrite != null) {
-        handleOverwrite(hoodieCatalogTable, writeClient, context, metaClient, 
overwrite, bucketNumber, rule, dryRun)
+        handleOverwrite(config, writeClient, context, metaClient, overwrite, 
bucketNumber, rule, dryRun)
       } else if (add != null) {
         handleAdd(metaClient, add, dryRun)
       } else {
         Seq(Row("ERROR", "INVALID_OPERATION", "No valid operation specified"))
       }
     } catch {
-      case e: Exception => Seq(Row("ERROR", "EXCEPTION", e.getMessage))
+      case e: Exception => throw new HoodieException(e.getMessage, e)
     }
   }
 
   /**
    * Handle the overwrite operation.
    */
-  private def handleOverwrite(catalog: HoodieCatalogTable,
+  private def handleOverwrite(config: Map[String, String],
                               writeClient: SparkRDDWriteClient[_],
                               context : HoodieEngineContext,
                               metaClient: HoodieTableMetaClient,
@@ -131,30 +140,42 @@ class PartitionBucketIndexManager() extends BaseProcedure
                               bucketNumber: Int,
                               rule: String,
                               dryRun: Boolean): Seq[Row] = {
-    val config = buildBucketRescaleHoodieConfig(catalog).asJava
     val basePath = metaClient.getBasePath
     val mdtEnable = metaClient.getStorage().exists(new 
StoragePath(metaClient.getBasePath, 
HoodieTableMetaClient.METADATA_TABLE_FOLDER_PATH))
 
     // get all partition paths
     val allPartitions = FSUtils.getAllPartitionPaths(context, 
metaClient.getStorage, metaClient.getBasePath, mdtEnable)
+    val usePartitionBucketIndexBefore = 
PartitionBucketIndexUtils.isPartitionSimpleBucketIndex(context.getStorageConf, 
basePath.toString)
 
-    // get Map<partitionPath, bucketNumber> based on latest hashing_config
-    val latestHashingConfigInstant = 
PartitionBucketIndexUtils.getHashingConfigInstantToLoad(metaClient)
-    val calcWithLatestInstant = 
PartitionBucketIndexCalculator.getInstance(latestHashingConfigInstant, 
metaClient)
-    val partition2BucketWithLatestHashingConfig = 
PartitionBucketIndexUtils.getAllBucketNumbers(calcWithLatestInstant, 
allPartitions)
+    var partition2BucketWithLatestHashingConfig: util.Map[String, Integer] = 
null
+    var calcWithLatestInstant: PartitionBucketIndexCalculator = null
 
+    if (usePartitionBucketIndexBefore) {
+      // get Map<partitionPath, bucketNumber> based on latest hashing_config
+      val latestHashingConfigInstant = 
PartitionBucketIndexUtils.getHashingConfigInstantToLoad(metaClient)
+      calcWithLatestInstant = 
PartitionBucketIndexCalculator.getInstance(latestHashingConfigInstant, 
metaClient)
+      partition2BucketWithLatestHashingConfig = 
PartitionBucketIndexUtils.getAllBucketNumbers(calcWithLatestInstant, 
allPartitions)
+    } else {
+      ValidationUtils.checkArgument(bucketNumber != -1)
+      partition2BucketWithLatestHashingConfig = 
allPartitions.asScala.map(partition => (partition, new 
Integer(bucketNumber))).toMap.asJava
+    }
     // get Map<partitionPath, bucketNumber> based on new given expression
     val defaultBucketNumber = if (bucketNumber != -1) {
       bucketNumber
-    } else {
+    } else if (calcWithLatestInstant != null ) {
       // reuse latest default bucket number
       calcWithLatestInstant.getHashingConfig.getDefaultBucketNumber
+    } else {
+      throw new HoodieException("Please set original bucket number before 
upgrade to partition bucket level bucket index")
     }
+
     val instantTime = writeClient.createNewInstantTime()
-    config.put(HoodieIndexConfig.BUCKET_INDEX_PARTITION_EXPRESSIONS.key(), 
expression)
-    config.put(HoodieIndexConfig.BUCKET_INDEX_PARTITION_RULE_TYPE.key(), rule)
-    config.put(HoodieIndexConfig.BUCKET_INDEX_PARTITION_LOAD_INSTANT.key(), 
instantTime)
-    config.put(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key(), 
bucketNumber.toString)
+
+    val finalConfig = config ++ 
Map(HoodieIndexConfig.BUCKET_INDEX_PARTITION_EXPRESSIONS.key()->  expression,
+      HoodieIndexConfig.BUCKET_INDEX_PARTITION_RULE_TYPE.key() -> rule,
+      HoodieIndexConfig.BUCKET_INDEX_PARTITION_LOAD_INSTANT.key() -> 
instantTime,
+      HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key() -> bucketNumber.toString
+    )
 
     val newConfig = new PartitionBucketIndexHashingConfig(expression, 
defaultBucketNumber, rule, PartitionBucketIndexHashingConfig.CURRENT_VERSION, 
instantTime)
     val calcWithNewExpression = 
PartitionBucketIndexCalculator.getInstance(instantTime, newConfig)
@@ -166,7 +187,9 @@ class PartitionBucketIndexManager() extends BaseProcedure
 
     // get all fileSlices need to read
     val allFilesMap = FSUtils.getFilesInPartitions(context, 
metaClient.getStorage(), 
HoodieMetadataConfig.newBuilder.enable(mdtEnable).build,
-      metaClient.getBasePath.toString, partitionsToRescale.toArray)
+      metaClient.getBasePath.toString, partitionsToRescale.map(relative => {
+        new StoragePath(basePath, relative)
+      }).map(storagePath => storagePath.toString).toArray)
     val files = allFilesMap.values().asScala.flatMap(x => x.asScala).toList
     val view = new HoodieTableFileSystemView(metaClient, 
metaClient.getActiveTimeline, files.asJava)
     val allFileSlice = partitionsToRescale.flatMap(partitionPath => {
@@ -185,42 +208,46 @@ class PartitionBucketIndexManager() extends BaseProcedure
     val broadcastManager = new SparkBroadcastManager(context, metaClient)
     broadcastManager.prepareAndBroadcast()
     val sparkSchemaWithMetaFields = 
AvroConversionUtils.convertAvroSchemaToStructType(tableSchemaWithMetaFields)
-    val serializableTableSchemaWithMetaFields = new 
SerializableSchema(tableSchemaWithMetaFields)
-    val latestInstantTime = 
metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants().lastInstant().get()
-
-    val res: RDD[InternalRow] = spark.sparkContext.parallelize(allFileSlice, 
allFileSlice.size).flatMap(fileSlice => {
-      // instantiate other supporting cast
-      val readerSchema = serializableTableSchemaWithMetaFields.get
-      val readerContextOpt = 
broadcastManager.retrieveFileGroupReaderContext(basePath)
-      val internalSchemaOption: Option[InternalSchema] = Option.empty()
-      // instantiate FG reader
-      val fileGroupReader = new HoodieFileGroupReader(readerContextOpt.get(),
-        metaClient.getStorage,
-        basePath.toString,
-        latestInstantTime.requestedTime(),
-        fileSlice,
-        readerSchema,
-        readerSchema,
-        internalSchemaOption, // not support evolution of schema for now
-        metaClient,
-        metaClient.getTableConfig.getProps,
-        0,
-        java.lang.Long.MAX_VALUE,
-        HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.defaultValue())
-      fileGroupReader.initRecordIterators()
-      val iterator = 
fileGroupReader.getClosableIterator.asInstanceOf[HoodieFileGroupReader.HoodieFileGroupReaderIterator[InternalRow]]
-      iterator.asScala
-    })
-    val dataFrame = HoodieUnsafeUtils.createDataFrameFromRDD(sparkSession, 
res, sparkSchemaWithMetaFields)
 
+    val res: RDD[InternalRow] = if (allFileSlice.isEmpty) {
+      spark.sparkContext.emptyRDD
+    } else {
+      val serializableTableSchemaWithMetaFields = new 
SerializableSchema(tableSchemaWithMetaFields)
+      val latestInstantTime = 
metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants().lastInstant().get()
+
+      spark.sparkContext.parallelize(allFileSlice, 
allFileSlice.size).flatMap(fileSlice => {
+        // instantiate other supporting cast
+        val readerSchema = serializableTableSchemaWithMetaFields.get
+        val readerContextOpt = 
broadcastManager.retrieveFileGroupReaderContext(basePath)
+        val internalSchemaOption: Option[InternalSchema] = Option.empty()
+        // instantiate FG reader
+        val fileGroupReader = new HoodieFileGroupReader(readerContextOpt.get(),
+          metaClient.getStorage,
+          basePath.toString,
+          latestInstantTime.requestedTime(),
+          fileSlice,
+          readerSchema,
+          readerSchema,
+          internalSchemaOption, // not support evolution of schema for now
+          metaClient,
+          metaClient.getTableConfig.getProps,
+          0,
+          java.lang.Long.MAX_VALUE,
+          HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.defaultValue())
+        fileGroupReader.initRecordIterators()
+        val iterator = 
fileGroupReader.getClosableIterator.asInstanceOf[HoodieFileGroupReader.HoodieFileGroupReaderIterator[InternalRow]]
+        iterator.asScala
+      })
+    }
+    val dataFrame = HoodieUnsafeUtils.createDataFrameFromRDD(sparkSession, 
res, sparkSchemaWithMetaFields)
     val (success, _, _, _, _, _) = HoodieSparkSqlWriter.write(
       sparkSession.sqlContext,
-      SaveMode.Overwrite,
-      config.asScala.toMap,
+      SaveMode.Append,
+      finalConfig,
       dataFrame)
 
-    val details = s"Expression: $expression, Bucket Number: $bucketNumber, Dry 
Run: $dryRun"
-    Seq(Row("SUCCESS", "OVERWRITE", success))
+    val details = s"Expression: $expression, Bucket Number: $bucketNumber, Dry 
Run: $dryRun, Instant: $instantTime"
+    Seq(Row("SUCCESS", "OVERWRITE", details))
   }
 
   /**
@@ -277,13 +304,13 @@ class PartitionBucketIndexManager() extends BaseProcedure
     Seq(Row("SUCCESS", "SHOW_CONFIG", null))
   }
 
-  override def build: Procedure = new PartitionBucketIndexManager()
+  override def build: Procedure = new PartitionBucketIndexManager
 }
 
 object PartitionBucketIndexManager {
-  val NAME = "PartitionBucketIndexManager"
+  val NAME = "partition_bucket_index_manager"
 
-  def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
-    override def get() = new PartitionBucketIndexManager()
+  def builder : Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+    override def get() = new PartitionBucketIndexManager
   }
 }
\ No newline at end of file
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
index 532a000f0f6..7c1bc0aa2a2 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
@@ -1847,94 +1847,6 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
     }
   }
 
-  test("Test Bulk Insert Into Partition Bucket Index Table") {
-    withSQLConf(
-      "hoodie.datasource.write.operation" -> "bulk_insert",
-      "hoodie.bulkinsert.shuffle.parallelism" -> "1") {
-      Seq("mor", "cow").foreach { tableType =>
-        Seq("true", "false").foreach { bulkInsertAsRow =>
-          withTempDir { tmp =>
-            val tableName = generateTableName
-            // Create a partitioned table
-            spark.sql(
-              s"""
-                 |create table $tableName (
-                 |  id int,
-                 |  dt string,
-                 |  name string,
-                 |  price double,
-                 |  ts long
-                 |) using hudi
-                 | tblproperties (
-                 | primaryKey = 'id,name',
-                 | type = '$tableType',
-                 | preCombineField = 'ts',
-                 | hoodie.index.type = 'BUCKET',
-                 | hoodie.bucket.index.hash.field = 'id,name',
-                 | hoodie.datasource.write.row.writer.enable = 
'$bulkInsertAsRow')
-                 | partitioned by (dt)
-                 | location '${tmp.getCanonicalPath}'
-                 """.stripMargin)
-
-            // Note: Do not write the field alias, the partition field must be 
placed last.
-            spark.sql(
-              s"""
-                 | insert into $tableName values
-                 | (1, 'a1,1', 10, 1000, "2021-01-05"),
-                 | (2, 'a2', 20, 2000, "2021-01-06"),
-                 | (3, 'a3,3', 30, 3000, "2021-01-07")
-                 """.stripMargin)
-
-            checkAnswer(s"select id, name, price, ts, dt from $tableName")(
-              Seq(1, "a1,1", 10.0, 1000, "2021-01-05"),
-              Seq(2, "a2", 20.0, 2000, "2021-01-06"),
-              Seq(3, "a3,3", 30.0, 3000, "2021-01-07")
-            )
-
-            // for COW with disabled Spark native row writer, multiple bulk 
inserts are restricted
-            if (tableType != "cow" && bulkInsertAsRow != "false") {
-              spark.sql(
-                s"""
-                   | insert into $tableName values
-                   | (1, 'a1', 10, 1000, "2021-01-05"),
-                   | (3, "a3", 30, 3000, "2021-01-07")
-                 """.stripMargin)
-
-              checkAnswer(s"select id, name, price, ts, dt from $tableName")(
-                Seq(1, "a1,1", 10.0, 1000, "2021-01-05"),
-                Seq(1, "a1", 10.0, 1000, "2021-01-05"),
-                Seq(2, "a2", 20.0, 2000, "2021-01-06"),
-                Seq(3, "a3,3", 30.0, 3000, "2021-01-07"),
-                Seq(3, "a3", 30.0, 3000, "2021-01-07")
-              )
-
-              // there are two files in partition(dt = '2021-01-05')
-              checkAnswer(s"select count(distinct _hoodie_file_name) from 
$tableName where dt = '2021-01-05'")(
-                Seq(2)
-              )
-
-              // would generate 6 other files in partition(dt = '2021-01-05')
-              spark.sql(
-                s"""
-                   | insert into $tableName values
-                   | (4, 'a1,1', 10, 1000, "2021-01-05"),
-                   | (5, 'a1,1', 10, 1000, "2021-01-05"),
-                   | (6, 'a1,1', 10, 1000, "2021-01-05"),
-                   | (7, 'a1,1', 10, 1000, "2021-01-05"),
-                   | (8, 'a1,1', 10, 1000, "2021-01-05"),
-                   | (10, 'a3,3', 30, 3000, "2021-01-05")
-                 """.stripMargin)
-
-              checkAnswer(s"select count(distinct _hoodie_file_name) from 
$tableName where dt = '2021-01-05'")(
-                Seq(8)
-              )
-            }
-          }
-        }
-      }
-    }
-  }
-
   test("Test not supported multiple BULK INSERTs into COW with SIMPLE BUCKET 
and disabled Spark native row writer") {
     withSQLConf("hoodie.datasource.write.operation" -> "bulk_insert",
       "hoodie.bulkinsert.shuffle.parallelism" -> "1") {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTableWithPartitionBucketIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTableWithPartitionBucketIndex.scala
new file mode 100644
index 00000000000..d437f4c9679
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTableWithPartitionBucketIndex.scala
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.dml
+
+import org.apache.hudi.index.bucket.PartitionBucketIndexUtils
+import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient
+import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
+import scala.collection.JavaConverters._
+
+class TestInsertTableWithPartitionBucketIndex extends HoodieSparkSqlTestBase {
+
+  /**
+   * Write operation including :
+   * Case1: Mor + bulk_insert + asRow enable + simple bucket index
+   * Case2: Mor + bulk_insert + asRow disable + simple bucket index
+   * Case3: Cow + bulk_insert + asRow enable + simple bucket index
+   * Case4: Cow + bulk_insert + asRow disable + simple bucket index
+   * Case5: Mor + bulk_insert + asRow enable + partition level bucket index
+   * Case6: Mor + bulk_insert + asRow disable + partition level bucket index
+   * Case7: Cow + bulk_insert + asRow enable + partition level bucket index
+   * Case8: Cow + bulk_insert + asRow disable + partition level bucket index
+   *
+   * Total WorkFlow are as followed:
+   * Step1: Create a new table using simple bucket index
+   * Step2: Upgrade table to partition level bucket index using `call 
partition_bucket_index_manager`
+   * Step3: Trigger another write operation through partition level bucket 
index
+   *
+   */
+  test("Test Bulk Insert Into Partition Bucket Index Table") {
+    withSQLConf(
+      "hoodie.datasource.write.operation" -> "bulk_insert",
+      "hoodie.bulkinsert.shuffle.parallelism" -> "2") {
+      Seq("mor", "cow").foreach { tableType =>
+        Seq("true", "false").foreach { bulkInsertAsRow =>
+          withTempDir { tmp =>
+            val tableName = generateTableName
+            // Create a partitioned table
+            spark.sql(
+              s"""
+                 |create table $tableName (
+                 |  id int,
+                 |  dt string,
+                 |  name string,
+                 |  price double,
+                 |  ts long
+                 |) using hudi
+                 | tblproperties (
+                 | primaryKey = 'id,name',
+                 | type = '$tableType',
+                 | preCombineField = 'ts',
+                 | hoodie.index.type = 'BUCKET',
+                 | hoodie.bucket.index.hash.field = 'id,name',
+                 | hoodie.bucket.index.num.buckets = 1,
+                 | hoodie.datasource.write.row.writer.enable = 
'$bulkInsertAsRow')
+                 | partitioned by (dt)
+                 | location '${tmp.getCanonicalPath}'
+           """.stripMargin)
+
+            // Note: Do not write the field alias, the partition field must be 
placed last.
+            spark.sql(
+              s"""
+                 | insert into $tableName values
+                 | (1, 'a1,1', 10, 1000, "2021-01-05"),
+                 | (11, 'a1,1', 10, 1000, "2021-01-05"),
+                 | (2, 'a2', 20, 2000, "2021-01-06"),
+                 | (22, 'a2', 20, 2000, "2021-01-06")
+           """.stripMargin)
+
+            // upgrade to partition level bucket index and rescale 
dt=2021-01-05 from 1 to 2
+            val expressions = "dt=(2021\\-01\\-05|2021\\-01\\-07),2"
+            val rule = "regex"
+            val defaultBucketNumber = 1
+            spark.sql(s"call partition_bucket_index_manager(table => 
'$tableName', overwrite => '$expressions', rule => '$rule', bucketNumber => 
$defaultBucketNumber)")
+
+            checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+              Seq(1, "a1,1", 10.0, 1000, "2021-01-05"),
+              Seq(11, "a1,1", 10.0, 1000, "2021-01-05"),
+              Seq(2, "a2", 20.0, 2000, "2021-01-06"),
+              Seq(22, "a2", 20.0, 2000, "2021-01-06")
+            )
+
+            // insert into new partition 2021-01-07 with partition level 
bucket index
+            spark.sql(
+              s"""
+                 | insert into $tableName values
+                 | (3, 'a3,3', 30, 3000, "2021-01-07"),
+                 | (33, 'a3,3', 30, 3000, "2021-01-07")
+           """.stripMargin)
+
+            checkAnswer(s"select id, name, price, ts, dt from $tableName") (
+              Seq(1, "a1,1", 10.0, 1000, "2021-01-05"),
+              Seq(11, "a1,1", 10.0, 1000, "2021-01-05"),
+              Seq(2, "a2", 20.0, 2000, "2021-01-06"),
+              Seq(22, "a2", 20.0, 2000, "2021-01-06"),
+              Seq(3, "a3,3", 30.0, 3000, "2021-01-07"),
+              Seq(33, "a3,3", 30.0, 3000, "2021-01-07")
+            )
+            val metaClient = createMetaClient(spark, tmp.getCanonicalPath)
+            val actual: List[String] = 
PartitionBucketIndexUtils.getAllFileIDWithPartition(metaClient).asScala.toList
+            val expected: List[String] = List("dt=2021-01-05" + "00000000",
+              "dt=2021-01-05" + "00000000",
+              "dt=2021-01-05" + "00000001",
+              "dt=2021-01-06" + "00000000",
+              "dt=2021-01-07" + "00000000",
+              "dt=2021-01-07" + "00000001")
+            assert(actual.sorted == expected.sorted)
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Write operation including :
+   * Case1: Mor + upsert + simple bucket index
+   * Case2: Cow + upsert + simple bucket index
+   * Case3: Mor + upsert + partition level bucket index
+   * Case4: Cow + upsert + partition level bucket index
+   *
+   * Total WorkFlow are as followed:
+   * Step1: Create a new table using simple bucket index
+   * Step2: Upgrade table to partition level bucket index using `call 
partition_bucket_index_manager`
+   * Step3: Trigger another write operation through partition level bucket 
index
+   *
+   */
+  test("Test Upsert Into Partition Bucket Index Table") {
+    withSQLConf(
+      "hoodie.datasource.write.operation" -> "upsert") {
+      Seq("cow", "mor").foreach { tableType =>
+        withTempDir { tmp =>
+          val tableName = generateTableName
+          // Create a partitioned table
+          spark.sql(
+            s"""
+               |create table $tableName (
+               |  id int,
+               |  dt string,
+               |  name string,
+               |  price double,
+               |  ts long
+               |) using hudi
+               | tblproperties (
+               | primaryKey = 'id,name',
+               | type = '$tableType',
+               | preCombineField = 'ts',
+               | hoodie.index.type = 'BUCKET',
+               | hoodie.bucket.index.hash.field = 'id,name',
+               | hoodie.bucket.index.num.buckets = 1)
+               | partitioned by (dt)
+               | location '${tmp.getCanonicalPath}'
+               | """.stripMargin)
+
+          // Note: Do not write the field alias, the partition field must be 
placed last.
+          spark.sql(
+            s"""
+               | insert into $tableName values
+               | (1, 'a1,1', 10, 1000, "2021-01-05"),
+               | (11, 'a1,1', 10, 1000, "2021-01-05"),
+               | (2, 'a2', 20, 2000, "2021-01-06"),
+               | (22, 'a2', 20, 2000, "2021-01-06")
+               | """.stripMargin)
+
+          // upgrade to partition level bucket index and rescale dt=2021-01-05 
from 1 to 2
+          val expressions = "dt=(2021\\-01\\-05|2021\\-01\\-07),2"
+          val rule = "regex"
+          val defaultBucketNumber = 1
+          spark.sql(s"call partition_bucket_index_manager(table => 
'$tableName', overwrite => '$expressions', rule => '$rule', bucketNumber => 
$defaultBucketNumber)")
+
+          checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+            Seq(1, "a1,1", 10.0, 1000, "2021-01-05"),
+            Seq(11, "a1,1", 10.0, 1000, "2021-01-05"),
+            Seq(2, "a2", 20.0, 2000, "2021-01-06"),
+            Seq(22, "a2", 20.0, 2000, "2021-01-06")
+          )
+
+          // insert into new partition 2021-01-07 with partition level bucket 
index
+          // update (1, 'a1,1', 10, 1000, "2021-01-05") to (1, 'a1,1', 100, 
1000, "2021-01-05")
+          spark.sql(
+            s"""
+               | insert into $tableName values
+               | (1, 'a1,1', 100, 1000, "2021-01-05"),
+               | (3, 'a3,3', 30, 3000, "2021-01-07"),
+               | (33, 'a3,3', 30, 3000, "2021-01-07")
+               | """.stripMargin)
+
+          checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+            Seq(1, "a1,1", 100.0, 1000, "2021-01-05"),
+            Seq(11, "a1,1", 10.0, 1000, "2021-01-05"),
+            Seq(2, "a2", 20.0, 2000, "2021-01-06"),
+            Seq(22, "a2", 20.0, 2000, "2021-01-06"),
+            Seq(3, "a3,3", 30.0, 3000, "2021-01-07"),
+            Seq(33, "a3,3", 30.0, 3000, "2021-01-07")
+          )
+          val metaClient = createMetaClient(spark, tmp.getCanonicalPath)
+          val actual: List[String] = 
PartitionBucketIndexUtils.getAllFileIDWithPartition(metaClient).asScala.toList
+          val expected: List[String] = List("dt=2021-01-05" + "00000000",
+            "dt=2021-01-05" + "00000000",
+            "dt=2021-01-05" + "00000001",
+            "dt=2021-01-06" + "00000000",
+            "dt=2021-01-07" + "00000000",
+            "dt=2021-01-07" + "00000001")
+          // compare file group as expected
+          assert(actual.sorted == expected.sorted)
+        }
+      }
+    }
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestPartitionBucketIndexManager.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestPartitionBucketIndexManager.scala
index 93d2a557afe..0bf5aaaa3a4 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestPartitionBucketIndexManager.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestPartitionBucketIndexManager.scala
@@ -16,7 +16,7 @@ class TestPartitionBucketIndexManager extends 
HoodieSparkProcedureTestBase {
       val defaultBucketNumber = 10
       PartitionBucketIndexUtils.initHashingConfig(metaClient, expressions, 
rule, defaultBucketNumber, null)
 
-      spark.sql(s"""call PartitionBucketIndexManager(table => '$tableName', 
show-config => 'true')""").collect()
+      spark.sql(s"call PartitionBucketIndexManager(table => '$tableName', 
show-config => 'true')").collect()
 
     }
   }

Reply via email to