This is an automated email from the ASF dual-hosted git repository.

zhangyue19921010 pushed a commit to branch HUDI-8990
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 184231de7b02e33baad0d6ee5b08f2075f0e7f9c
Author: YueZhang <[email protected]>
AuthorDate: Thu Mar 20 16:58:36 2025 +0800

    finish Spark call update hashing config need more test
---
 .../org/apache/hudi/config/HoodieWriteConfig.java  |   8 +
 .../bucket/PartitionBucketIndexCalculator.java     |  24 +-
 .../index/bucket/PartitionBucketIndexUtils.java    |  15 +-
 .../BucketIndexBulkInsertPartitionerWithRows.java  |   9 +-
 .../RDDSimpleBucketBulkInsertPartitioner.java      |   2 +-
 .../BucketBulkInsertDataInternalWriterHelper.java  |   2 +-
 .../action/commit/SparkBucketIndexPartitioner.java |   2 +-
 .../model/PartitionBucketIndexHashingConfig.java   |   4 +
 .../hudi/common/model/WriteOperationType.java      |   1 +
 .../DatasetBucketRescaleCommitActionExecutor.java  |  77 ++++++
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |  12 +-
 .../spark/sql/hudi/ProvidesHoodieConfig.scala      |  24 +-
 .../procedures/PartitionBucketIndexManager.scala   | 289 +++++++++++++++++++++
 .../sql/hudi/common/HoodieSparkSqlTestBase.scala   |   1 +
 .../spark/sql/hudi/dml/TestInsertTable.scala       |  88 +++++++
 .../TestPartitionBucketIndexManager.scala          |  31 +++
 16 files changed, 575 insertions(+), 14 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 557f8672e19..623f02e623c 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -1959,6 +1959,14 @@ public class HoodieWriteConfig extends HoodieConfig {
     return getString(HoodieIndexConfig.BUCKET_INDEX_PARTITION_LOAD_INSTANT);
   }
 
+  public String getBucketIndexPartitionExpression() {
+    return getString(HoodieIndexConfig.BUCKET_INDEX_PARTITION_EXPRESSIONS);
+  }
+
+  public String getBucketIndexPartitionRuleType() {
+    return getString(HoodieIndexConfig.BUCKET_INDEX_PARTITION_RULE_TYPE);
+  }
+
   public String getIndexClass() {
     return getString(HoodieIndexConfig.INDEX_CLASS_NAME);
   }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/PartitionBucketIndexCalculator.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/PartitionBucketIndexCalculator.java
index 65a08363402..acd48bb9961 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/PartitionBucketIndexCalculator.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/PartitionBucketIndexCalculator.java
@@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -55,7 +56,7 @@ public class PartitionBucketIndexCalculator implements 
Serializable {
   private static final int CACHE_SIZE = 100_000;
   private PartitionBucketIndexHashingConfig hashingConfig;
   private int defaultBucketNumber;
-  private final String instantToLoad;
+  private String instantToLoad;
   // Cache for partition to bucket number mapping
   @SuppressWarnings("unchecked")
   private final Map<String, Integer> partitionToBucketCache = new 
LRUMap(CACHE_SIZE);
@@ -89,6 +90,14 @@ public class PartitionBucketIndexCalculator implements 
Serializable {
     }
   }
 
+  private PartitionBucketIndexCalculator(String instantToLoad, 
PartitionBucketIndexHashingConfig config) {
+    this.hashingConfig = config;
+    this.defaultBucketNumber = config.getDefaultBucketNumber();
+    String expressions = config.getExpressions();
+    String ruleType = config.getRule();
+    this.ruleEngine = createRuleEngine(ruleType, expressions);
+  }
+
   private void init(HoodieStorage storage, StoragePath hashingConfigPath) {
     Option<PartitionBucketIndexHashingConfig> config = 
PartitionBucketIndexUtils.loadHashingConfig(storage, hashingConfigPath);
     ValidationUtils.checkArgument(config.isPresent());
@@ -124,6 +133,15 @@ public class PartitionBucketIndexCalculator implements 
Serializable {
         });
   }
 
+  public static PartitionBucketIndexCalculator getInstance(String 
instantToLoad, PartitionBucketIndexHashingConfig config) {
+    // Using instantToLoad as the key for the cache
+    return INSTANCES.computeIfAbsent(instantToLoad,
+        key -> {
+          LOG.info("Creating new PartitionBucketIndexCalculator instance for 
instantToLoad: {}", key);
+          return new PartitionBucketIndexCalculator(key, config);
+        });
+  }
+
   /**
    * Computes the bucket number for a given partition path
    *
@@ -182,6 +200,10 @@ public class PartitionBucketIndexCalculator implements 
Serializable {
     return partitionToBucketCache.size();
   }
 
+  public Map<String, Integer> getPartitionToBucket() {
+    return new HashMap<>(partitionToBucketCache);
+  }
+
   public void clearCache() {
     partitionToBucketCache.clear();
     LOG.info("Cleared partition to bucket number cache");
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/PartitionBucketIndexUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/PartitionBucketIndexUtils.java
index 69e29bd92e3..ad1e1d0b3bf 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/PartitionBucketIndexUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/PartitionBucketIndexUtils.java
@@ -42,6 +42,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 
 public class PartitionBucketIndexUtils {
@@ -77,11 +78,14 @@ public class PartitionBucketIndexUtils {
       return false;
     }
     String hashingInstant = StringUtils.isNullOrEmpty(instant) ? 
INITIAL_HASHING_CONFIG_INSTANT : instant;
-    HoodieStorage storage = metaClient.getStorage();
     PartitionBucketIndexHashingConfig hashingConfig =
         new PartitionBucketIndexHashingConfig(expressions, 
defaultBucketNumber, rule, PartitionBucketIndexHashingConfig.CURRENT_VERSION, 
hashingInstant);
-    StoragePath hashingConfigPath = new 
StoragePath(metaClient.getHashingMetadataConfigPath(), 
hashingConfig.getFilename());
+    return saveHashingConfig(hashingConfig, metaClient);
+  }
 
+  public static boolean saveHashingConfig(PartitionBucketIndexHashingConfig 
hashingConfig, HoodieTableMetaClient metaClient) {
+    StoragePath hashingConfigPath = new 
StoragePath(metaClient.getHashingMetadataConfigPath(), 
hashingConfig.getFilename());
+    HoodieStorage storage = metaClient.getStorage();
     try {
       Option<byte []> content = 
Option.of(hashingConfig.toJsonString().getBytes(StandardCharsets.UTF_8));
       storage.createImmutableFileInPath(hashingConfigPath, 
content.map(HoodieInstantWriter::convertByteArrayToWriter));
@@ -153,4 +157,11 @@ public class PartitionBucketIndexUtils {
     }
     return hashingConfigName.substring(0, dotIndex);
   }
+
+  public static Map<String, Integer> 
getAllBucketNumbers(PartitionBucketIndexCalculator calc, List<String> 
partitions) {
+    for (String partition : partitions) {
+      calc.computeNumBuckets(partition);
+    }
+    return calc.getPartitionToBucket();
+  }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketIndexBulkInsertPartitionerWithRows.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketIndexBulkInsertPartitionerWithRows.java
index e80c8bfac0e..3a94fa78cb3 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketIndexBulkInsertPartitionerWithRows.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketIndexBulkInsertPartitionerWithRows.java
@@ -41,12 +41,19 @@ public class BucketIndexBulkInsertPartitionerWithRows 
implements BulkInsertParti
     this.indexKeyFields = indexKeyFields;
     this.bucketNum = bucketNum;
     String hashingInstantToLoad = 
table.getConfig().getHashingConfigInstantToLoad();
-    this.isPartitionBucketIndexEnable = 
StringUtils.isNullOrEmpty(hashingInstantToLoad);
+    this.isPartitionBucketIndexEnable = 
StringUtils.nonEmpty(hashingInstantToLoad);
     if (isPartitionBucketIndexEnable) {
       calc = PartitionBucketIndexCalculator.getInstance(hashingInstantToLoad, 
table.getMetaClient());
     }
   }
 
+  public BucketIndexBulkInsertPartitionerWithRows(String indexKeyFields, int 
bucketNum, PartitionBucketIndexCalculator calc) {
+    this.indexKeyFields = indexKeyFields;
+    this.bucketNum = bucketNum;
+    this.isPartitionBucketIndexEnable = true;
+    this.calc = calc;
+  }
+
   @Override
   public Dataset<Row> repartitionRecords(Dataset<Row> rows, int 
outputPartitions) {
     return BucketPartitionUtils$.MODULE$.createDataFrame(rows, indexKeyFields, 
bucketNum, outputPartitions, calc, isPartitionBucketIndexEnable);
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSimpleBucketBulkInsertPartitioner.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSimpleBucketBulkInsertPartitioner.java
index d651342c368..393f0054398 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSimpleBucketBulkInsertPartitioner.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSimpleBucketBulkInsertPartitioner.java
@@ -49,7 +49,7 @@ public class RDDSimpleBucketBulkInsertPartitioner<T extends 
HoodieRecordPayload>
     ValidationUtils.checkArgument(table.getIndex() instanceof 
HoodieSimpleBucketIndex);
     this.isNonBlockingConcurrencyControl = 
table.getConfig().isNonBlockingConcurrencyControl();
     String hashingInstantToLoad = 
table.getConfig().getHashingConfigInstantToLoad();
-    this.isPartitionBucketIndexEnable = 
StringUtils.isNullOrEmpty(hashingInstantToLoad);
+    this.isPartitionBucketIndexEnable = 
StringUtils.nonEmpty(hashingInstantToLoad);
     if (isPartitionBucketIndexEnable) {
       calc = PartitionBucketIndexCalculator.getInstance(hashingInstantToLoad, 
table.getMetaClient());
     }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BucketBulkInsertDataInternalWriterHelper.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BucketBulkInsertDataInternalWriterHelper.java
index d2b6a04c2e1..7f55c02b851 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BucketBulkInsertDataInternalWriterHelper.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BucketBulkInsertDataInternalWriterHelper.java
@@ -71,7 +71,7 @@ public class BucketBulkInsertDataInternalWriterHelper extends 
BulkInsertDataInte
     this.handles = new HashMap<>();
     this.isNonBlockingConcurrencyControl = 
writeConfig.isNonBlockingConcurrencyControl();
     String hashingInstantToLoad = writeConfig.getHashingConfigInstantToLoad();
-    this.isPartitionBucketIndexEnable = 
StringUtils.isNullOrEmpty(hashingInstantToLoad);
+    this.isPartitionBucketIndexEnable = 
StringUtils.nonEmpty(hashingInstantToLoad);
     if (isPartitionBucketIndexEnable) {
       calc = PartitionBucketIndexCalculator.getInstance(hashingInstantToLoad, 
hoodieTable.getMetaClient());
     }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBucketIndexPartitioner.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBucketIndexPartitioner.java
index c760d2aabdd..b099392e9e1 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBucketIndexPartitioner.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBucketIndexPartitioner.java
@@ -99,7 +99,7 @@ public class SparkBucketIndexPartitioner<T> extends 
SparkHoodiePartitioner<T> {
               + table.getIndex().getClass().getSimpleName());
     }
     String hashingInstantToLoad = 
table.getConfig().getHashingConfigInstantToLoad();
-    this.isPartitionBucketIndexEnable = 
StringUtils.isNullOrEmpty(hashingInstantToLoad);
+    this.isPartitionBucketIndexEnable = 
StringUtils.nonEmpty(hashingInstantToLoad);
     if (isPartitionBucketIndexEnable) {
       calc = PartitionBucketIndexCalculator.getInstance(hashingInstantToLoad, 
table.getMetaClient());
     }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/PartitionBucketIndexHashingConfig.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/PartitionBucketIndexHashingConfig.java
index 46b38896925..a658003f851 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/PartitionBucketIndexHashingConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/PartitionBucketIndexHashingConfig.java
@@ -62,6 +62,10 @@ public class PartitionBucketIndexHashingConfig implements 
Serializable {
     return 
JsonUtils.getObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(this);
   }
 
+  public String getInstant() {
+    return this.instant;
+  }
+
   public static <T> T fromJsonString(String jsonStr, Class<T> clazz) throws 
Exception {
     if (jsonStr == null || jsonStr.isEmpty()) {
       // For empty commit file (no data or somethings bad happen).
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
index 1036e8fca44..bc82a04023c 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
@@ -41,6 +41,7 @@ public enum WriteOperationType {
   BOOTSTRAP("bootstrap"),
   // insert overwrite with static partitioning
   INSERT_OVERWRITE("insert_overwrite"),
+  BUCKET_RESCALE("bucket_rescale"),
   // cluster
   CLUSTER("cluster"),
   // delete partition
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBucketRescaleCommitActionExecutor.java
 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBucketRescaleCommitActionExecutor.java
new file mode 100644
index 00000000000..1c6d90eb6f7
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBucketRescaleCommitActionExecutor.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.commit;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.PartitionBucketIndexHashingConfig;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import 
org.apache.hudi.execution.bulkinsert.BucketIndexBulkInsertPartitionerWithRows;
+import org.apache.hudi.index.bucket.PartitionBucketIndexCalculator;
+import org.apache.hudi.index.bucket.PartitionBucketIndexUtils;
+import org.apache.hudi.table.BulkInsertPartitioner;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+
+public class DatasetBucketRescaleCommitActionExecutor extends 
DatasetBulkInsertOverwriteCommitActionExecutor {
+
+  private final PartitionBucketIndexCalculator calc;
+
+  public DatasetBucketRescaleCommitActionExecutor(HoodieWriteConfig config,
+                                                  SparkRDDWriteClient 
writeClient,
+                                                  String instantTime) {
+    super(config, writeClient, instantTime);
+    String expression = config.getBucketIndexPartitionExpression();
+    String instant = config.getHashingConfigInstantToLoad();
+    String rule = config.getBucketIndexPartitionRuleType();
+    int bucketNumber = config.getBucketIndexNumBuckets();
+    PartitionBucketIndexHashingConfig hashingConfig = new 
PartitionBucketIndexHashingConfig(expression,
+        bucketNumber, rule, PartitionBucketIndexHashingConfig.CURRENT_VERSION, 
instant);
+    this.calc = PartitionBucketIndexCalculator.getInstance(instantTime, 
hashingConfig);
+  }
+
+  /**
+   * Create BulkInsertPartitioner with prepared PartitionBucketIndexCalculator
+   * @param populateMetaFields
+   * @param isTablePartitioned
+   * @return BulkInsertPartitioner
+   */
+  @Override
+  protected BulkInsertPartitioner<Dataset<Row>> getPartitioner(boolean 
populateMetaFields, boolean isTablePartitioned) {
+    return new 
BucketIndexBulkInsertPartitionerWithRows(writeConfig.getBucketIndexHashFieldWithDefault(),
+        writeConfig.getBucketIndexNumBuckets(), calc);
+  }
+
+  /**
+   * create new hashing_config during afterExecute and before commit finished
+   * @param result
+   */
+  @Override
+  protected void afterExecute(HoodieWriteMetadata<JavaRDD<WriteStatus>> 
result) {
+    super.afterExecute(result);
+
+    PartitionBucketIndexHashingConfig hashingConfig = calc.getHashingConfig();
+    boolean res = PartitionBucketIndexUtils.saveHashingConfig(hashingConfig, 
table.getMetaClient());
+    ValidationUtils.checkArgument(res);
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 58cfebf596d..7c59e433c8a 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -30,7 +30,7 @@ import 
org.apache.hudi.avro.AvroSchemaUtils.resolveNullableSchema
 import org.apache.hudi.avro.HoodieAvroUtils
 import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient}
 import org.apache.hudi.client.common.HoodieSparkEngineContext
-import org.apache.hudi.commit.{DatasetBulkInsertCommitActionExecutor, 
DatasetBulkInsertOverwriteCommitActionExecutor, 
DatasetBulkInsertOverwriteTableCommitActionExecutor}
+import org.apache.hudi.commit.{DatasetBucketRescaleCommitActionExecutor, 
DatasetBulkInsertCommitActionExecutor, 
DatasetBulkInsertOverwriteCommitActionExecutor, 
DatasetBulkInsertOverwriteTableCommitActionExecutor}
 import org.apache.hudi.common.config._
 import org.apache.hudi.common.engine.HoodieEngineContext
 import org.apache.hudi.common.fs.FSUtils
@@ -40,7 +40,7 @@ import 
org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, MERGE_ON_REA
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, 
HoodieTableVersion, TableSchemaResolver}
 import org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType
 import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator
-import org.apache.hudi.common.util.{CommitUtils, Option => HOption, 
StringUtils}
+import org.apache.hudi.common.util.{CommitUtils, StringUtils, Option => 
HOption}
 import org.apache.hudi.common.util.ConfigUtils.getAllConfigKeys
 import org.apache.hudi.config.{HoodieCompactionConfig, HoodieInternalConfig, 
HoodieWriteConfig}
 import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH, 
INDEX_CLASS_NAME}
@@ -62,7 +62,6 @@ import org.apache.hudi.sync.common.util.SyncUtilHelpers
 import 
org.apache.hudi.sync.common.util.SyncUtilHelpers.getHoodieMetaSyncException
 import org.apache.hudi.util.{SparkConfigUtils, SparkKeyGenUtils}
 import org.apache.hudi.util.SparkConfigUtils.getStringWithAltKeys
-
 import org.apache.avro.Schema
 import org.apache.avro.generic.GenericData
 import org.apache.hadoop.conf.Configuration
@@ -81,7 +80,6 @@ import org.apache.spark.sql.types.StructType
 import org.slf4j.LoggerFactory
 
 import java.util.function.BiConsumer
-
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.util.{Failure, Success, Try}
@@ -815,7 +813,7 @@ class HoodieSparkSqlWriterInternal {
     val overwriteOperationType = 
Option(hoodieConfig.getString(HoodieInternalConfig.BULKINSERT_OVERWRITE_OPERATION_TYPE))
       .map(WriteOperationType.fromValue)
       .orNull
-    val instantTime = writeClient.createNewInstantTime()
+    var instantTime = writeClient.createNewInstantTime()
     val executor = mode match {
       case _ if overwriteOperationType == null =>
         // Don't need to overwrite
@@ -823,6 +821,10 @@ class HoodieSparkSqlWriterInternal {
       case SaveMode.Append if overwriteOperationType == 
WriteOperationType.INSERT_OVERWRITE =>
         // INSERT OVERWRITE PARTITION uses Append mode
         new DatasetBulkInsertOverwriteCommitActionExecutor(writeConfig, 
writeClient, instantTime)
+      case SaveMode.Append if overwriteOperationType == 
WriteOperationType.BUCKET_RESCALE => {
+        instantTime = writeConfig.getHashingConfigInstantToLoad()
+        new DatasetBucketRescaleCommitActionExecutor(writeConfig, writeClient, 
instantTime)
+    }
       case SaveMode.Overwrite if overwriteOperationType == 
WriteOperationType.INSERT_OVERWRITE_TABLE =>
         new DatasetBulkInsertOverwriteTableCommitActionExecutor(writeConfig, 
writeClient, instantTime)
       case _ =>
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
index ff3261bbcd6..6cf5053bdce 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
@@ -33,7 +33,6 @@ import org.apache.hudi.hive.ddl.HiveSyncMode
 import org.apache.hudi.keygen.{ComplexKeyGenerator, CustomAvroKeyGenerator, 
CustomKeyGenerator}
 import org.apache.hudi.sql.InsertMode
 import org.apache.hudi.sync.common.HoodieSyncConfig
-
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{SaveMode, SparkSession}
 import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
@@ -50,8 +49,8 @@ import org.apache.spark.sql.types.StructType
 import org.slf4j.LoggerFactory
 
 import java.util.Locale
-
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 
 trait ProvidesHoodieConfig extends Logging {
 
@@ -85,6 +84,27 @@ trait ProvidesHoodieConfig extends Logging {
       defaultOpts = defaultOpts, overridingOpts = overridingOpts)
   }
 
+  def buildBucketRescaleHoodieConfig(hoodieCatalogTable: HoodieCatalogTable): 
Map[String, String] = {
+    val sparkSession: SparkSession = hoodieCatalogTable.spark
+    val tableConfig = hoodieCatalogTable.tableConfig
+    val preCombineField = Option(tableConfig.getPreCombineField).getOrElse("")
+    val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable, 
tableConfig)
+
+    val defaultOpts = Map[String, String](
+      OPERATION.key -> BULK_INSERT_OPERATION_OPT_VAL,
+      HoodieInternalConfig.BULKINSERT_OVERWRITE_OPERATION_TYPE.key -> 
WriteOperationType.BUCKET_RESCALE.value(),
+      KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
+      SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> 
tableConfig.getKeyGeneratorClassName,
+      SqlKeyGenerator.PARTITION_SCHEMA -> 
hoodieCatalogTable.partitionSchema.toDDL,
+      HoodieSyncConfig.META_SYNC_ENABLED.key -> 
hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key),
+      HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> "false"
+    )
+
+    val overridingOpts = buildOverridingOpts(hoodieCatalogTable, 
preCombineField)
+    combineOptions(hoodieCatalogTable, tableConfig, 
sparkSession.sqlContext.conf,
+      defaultOpts = defaultOpts, overridingOpts = overridingOpts)
+  }
+
   /**
    * Deduce the sql write operation for INSERT_INTO
    */
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala
new file mode 100644
index 00000000000..4aaeec80a1a
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.avro.Schema
+import org.apache.hadoop.conf.Configuration
+import org.apache.hudi.{AvroConversionUtils, HoodieCLIUtils, 
HoodieSparkSqlWriter}
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.client.SparkRDDWriteClient
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.commit.DatasetBucketRescaleCommitActionExecutor
+import org.apache.hudi.common.config.{HoodieMetadataConfig, 
HoodieReaderConfig, SerializableSchema}
+import org.apache.hudi.common.engine.HoodieEngineContext
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.{FileSlice, 
PartitionBucketIndexHashingConfig, WriteOperationType}
+import org.apache.hudi.common.table.read.HoodieFileGroupReader
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.common.util.{Option, StringUtils}
+import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig}
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.index.bucket.{PartitionBucketIndexCalculator, 
PartitionBucketIndexUtils}
+import org.apache.hudi.internal.schema.InternalSchema
+import org.apache.hudi.internal.schema.utils.SerDeHelper
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration
+import org.apache.hudi.storage.{StoragePath, StoragePathInfo}
+import org.apache.hudi.table.SparkBroadcastManager
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{HoodieUnsafeUtils, Row, SaveMode}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
+import org.apache.spark.sql.catalyst.expressions.PredicateHelper
+import org.apache.spark.sql.hudi.ProvidesHoodieConfig
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, 
StructType}
+
+import java.util
+import java.util.function.Supplier
+import java.util.stream.Collectors
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+class PartitionBucketIndexManager() extends BaseProcedure
+  with ProcedureBuilder
+  with PredicateHelper
+  with ProvidesHoodieConfig
+  with Logging {
+
+  private val PARAMETERS = Array[ProcedureParameter](
+    ProcedureParameter.required(0, "table", DataTypes.StringType),
+    ProcedureParameter.optional(1, "overwrite", DataTypes.StringType),
+    ProcedureParameter.optional(2, "bucket-number", DataTypes.IntegerType, -1),
+    ProcedureParameter.optional(3, "add", DataTypes.StringType),
+    ProcedureParameter.optional(4, "dry-run", DataTypes.BooleanType, true),
+    ProcedureParameter.optional(5, "rollback", DataTypes.StringType),
+    ProcedureParameter.optional(6, "show-config", DataTypes.BooleanType, 
false),
+    ProcedureParameter.optional(7, "rule", DataTypes.StringType, "regex")
+  )
+
+  private val OUTPUT_TYPE = new StructType(Array[StructField](
+    StructField("result", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("operation", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("details", DataTypes.StringType, nullable = true, 
Metadata.empty)
+  ))
+
+  def parameters: Array[ProcedureParameter] = PARAMETERS
+
+  def outputType: StructType = OUTPUT_TYPE
+
+  override def call(args: ProcedureArgs): Seq[Row] = {
+    super.checkArgs(PARAMETERS, args)
+
+    val table = getArgValueOrDefault(args, 
PARAMETERS(0)).get.asInstanceOf[String]
+    val overwrite = getArgValueOrDefault(args, 
PARAMETERS(1)).orNull.asInstanceOf[String]
+    val bucketNumber = getArgValueOrDefault(args, 
PARAMETERS(2)).get.asInstanceOf[Int]
+    val add = getArgValueOrDefault(args, 
PARAMETERS(3)).orNull.asInstanceOf[String]
+    val dryRun = getArgValueOrDefault(args, 
PARAMETERS(4)).get.asInstanceOf[Boolean]
+    val rollback = getArgValueOrDefault(args, 
PARAMETERS(5)).orNull.asInstanceOf[String]
+    val showConfig = getArgValueOrDefault(args, 
PARAMETERS(6)).get.asInstanceOf[Boolean]
+    val rule = getArgValueOrDefault(args, 
PARAMETERS(7)).orNull.asInstanceOf[String]
+
+    try {
+      // Get table metadata
+      val hoodieCatalogTable = 
HoodieCLIUtils.getHoodieCatalogTable(sparkSession, table)
+      val basePath = hoodieCatalogTable.tableLocation
+      val metaClient = createMetaClient(jsc, basePath)
+      val writeClient = HoodieCLIUtils.createHoodieWriteClient(sparkSession, 
basePath, Map.empty,
+        scala.Option.apply(table))
+      val context = writeClient.getEngineContext
+
+      // Determine which operation to perform
+      if (showConfig) {
+        handleShowConfig(metaClient)
+      } else if (rollback != null) {
+        handleRollback(metaClient, rollback)
+      } else if (overwrite != null) {
+        handleOverwrite(hoodieCatalogTable, writeClient, context, metaClient, 
overwrite, bucketNumber, rule, dryRun)
+      } else if (add != null) {
+        handleAdd(metaClient, add, dryRun)
+      } else {
+        Seq(Row("ERROR", "INVALID_OPERATION", "No valid operation specified"))
+      }
+    } catch {
+      case e: Exception => Seq(Row("ERROR", "EXCEPTION", e.getMessage))
+    }
+  }
+
+  /**
+   * Handle the overwrite operation.
+   */
+  private def handleOverwrite(catalog: HoodieCatalogTable,
+                              writeClient: SparkRDDWriteClient[_],
+                              context : HoodieEngineContext,
+                              metaClient: HoodieTableMetaClient,
+                              expression: String,
+                              bucketNumber: Int,
+                              rule: String,
+                              dryRun: Boolean): Seq[Row] = {
+    val config = buildBucketRescaleHoodieConfig(catalog).asJava
+    val basePath = metaClient.getBasePath
+    val mdtEnable = metaClient.getStorage().exists(new 
StoragePath(metaClient.getBasePath, 
HoodieTableMetaClient.METADATA_TABLE_FOLDER_PATH))
+
+    // get all partition paths
+    val allPartitions = FSUtils.getAllPartitionPaths(context, 
metaClient.getStorage, metaClient.getBasePath, mdtEnable)
+
+    // get Map<partitionPath, bucketNumber> based on latest hashing_config
+    val latestHashingConfigInstant = 
PartitionBucketIndexUtils.getHashingConfigInstantToLoad(metaClient)
+    val calcWithLatestInstant = 
PartitionBucketIndexCalculator.getInstance(latestHashingConfigInstant, 
metaClient)
+    val partition2BucketWithLatestHashingConfig = 
PartitionBucketIndexUtils.getAllBucketNumbers(calcWithLatestInstant, 
allPartitions)
+
+    // get Map<partitionPath, bucketNumber> based on new given expression
+    val defaultBucketNumber = if (bucketNumber != -1) {
+      bucketNumber
+    } else {
+      // reuse latest default bucket number
+      calcWithLatestInstant.getHashingConfig.getDefaultBucketNumber
+    }
+    val instantTime = writeClient.createNewInstantTime()
+    config.put(HoodieIndexConfig.BUCKET_INDEX_PARTITION_EXPRESSIONS.key(), 
expression)
+    config.put(HoodieIndexConfig.BUCKET_INDEX_PARTITION_RULE_TYPE.key(), rule)
+    config.put(HoodieIndexConfig.BUCKET_INDEX_PARTITION_LOAD_INSTANT.key(), 
instantTime)
+    config.put(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key(), 
bucketNumber.toString)
+
+    val newConfig = new PartitionBucketIndexHashingConfig(expression, 
defaultBucketNumber, rule, PartitionBucketIndexHashingConfig.CURRENT_VERSION, 
instantTime)
+    val calcWithNewExpression = 
PartitionBucketIndexCalculator.getInstance(instantTime, newConfig)
+    val partition2BucketWithNewHashingConfig = 
PartitionBucketIndexUtils.getAllBucketNumbers(calcWithNewExpression, 
allPartitions)
+
+    // get partitions need to be rescaled
+    val rescalePartitionsMap = 
getDifferentPartitions(partition2BucketWithNewHashingConfig.asScala, 
partition2BucketWithLatestHashingConfig.asScala)
+    val partitionsToRescale = rescalePartitionsMap.keys
+
+    // get all fileSlices need to read
+    val allFilesMap = FSUtils.getFilesInPartitions(context, 
metaClient.getStorage(), 
HoodieMetadataConfig.newBuilder.enable(mdtEnable).build,
+      metaClient.getBasePath.toString, partitionsToRescale.toArray)
+    val files = allFilesMap.values().asScala.flatMap(x => x.asScala).toList
+    val view = new HoodieTableFileSystemView(metaClient, 
metaClient.getActiveTimeline, files.asJava)
+    val allFileSlice = partitionsToRescale.flatMap(partitionPath => {
+      view.getLatestFileSlices(partitionPath).iterator().asScala
+    }).toList
+
+    // read all fileSlice para and get DF
+    var tableSchemaWithMetaFields: Schema = null
+    try tableSchemaWithMetaFields = HoodieAvroUtils.addMetadataFields(new 
TableSchemaResolver(metaClient).getTableAvroSchema(false), false)
+    catch {
+      case e: Exception =>
+        throw new HoodieException("Failed to get table schema during 
clustering", e)
+    }
+
+    // broadcast reader context.
+    val broadcastManager = new SparkBroadcastManager(context, metaClient)
+    broadcastManager.prepareAndBroadcast()
+    val sparkSchemaWithMetaFields = 
AvroConversionUtils.convertAvroSchemaToStructType(tableSchemaWithMetaFields)
+    val serializableTableSchemaWithMetaFields = new 
SerializableSchema(tableSchemaWithMetaFields)
+    val latestInstantTime = 
metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants().lastInstant().get()
+
+    val res: RDD[InternalRow] = spark.sparkContext.parallelize(allFileSlice, 
allFileSlice.size).flatMap(fileSlice => {
+      // instantiate other supporting cast
+      val readerSchema = serializableTableSchemaWithMetaFields.get
+      val readerContextOpt = 
broadcastManager.retrieveFileGroupReaderContext(basePath)
+      val internalSchemaOption: Option[InternalSchema] = Option.empty()
+      // instantiate FG reader
+      val fileGroupReader = new HoodieFileGroupReader(readerContextOpt.get(),
+        metaClient.getStorage,
+        basePath.toString,
+        latestInstantTime.requestedTime(),
+        fileSlice,
+        readerSchema,
+        readerSchema,
+        internalSchemaOption, // not support evolution of schema for now
+        metaClient,
+        metaClient.getTableConfig.getProps,
+        0,
+        java.lang.Long.MAX_VALUE,
+        HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.defaultValue())
+      fileGroupReader.initRecordIterators()
+      val iterator = 
fileGroupReader.getClosableIterator.asInstanceOf[HoodieFileGroupReader.HoodieFileGroupReaderIterator[InternalRow]]
+      iterator.asScala
+    })
+    val dataFrame = HoodieUnsafeUtils.createDataFrameFromRDD(sparkSession, 
res, sparkSchemaWithMetaFields)
+
+    val (success, _, _, _, _, _) = HoodieSparkSqlWriter.write(
+      sparkSession.sqlContext,
+      SaveMode.Overwrite,
+      config.asScala.toMap,
+      dataFrame)
+
+    val details = s"Expression: $expression, Bucket Number: $bucketNumber, Dry 
Run: $dryRun"
+    Seq(Row("SUCCESS", "OVERWRITE", success))
+  }
+
+  /**
+   * Compares two maps of partition paths to bucket numbers and returns a map 
of partition paths
+   * that have different bucket numbers in the two maps, using the new bucket 
numbers.
+   *
+   * @param newMap The new map of partition paths to bucket numbers
+   * @param oldMap The old map of partition paths to bucket numbers
+   * @return A map containing only the partition paths that have different 
bucket numbers, with values from newMap
+   */
+  def getDifferentPartitions(newMap: mutable.Map[String, Integer], oldMap: 
mutable.Map[String, Integer]): mutable.Map[String, Integer] = {
+    newMap.filter {
+      case (partitionPath, bucketNumber) =>
+        // Include partition path if it's not in old map or has a different 
bucket number
+        !oldMap.contains(partitionPath) || oldMap(partitionPath) != 
bucketNumber
+    }
+  }
+
+  /**
+   * Handle the add operation.
+   */
+  private def handleAdd(metaClient: HoodieTableMetaClient, expression: String, 
dryRun: Boolean): Seq[Row] = {
+    // In a real implementation, this would call PartitionBucketIndexManager
+    // For now, just return a placeholder result
+    val details = s"Expression: $expression, Dry Run: $dryRun"
+
+    // Here would be the actual call to PartitionBucketIndexManager
+    // PartitionBucketIndexManager.addExpression(metaClient, expression, 
dryRun)
+
+    Seq(Row("SUCCESS", "ADD", details))
+  }
+
+  /**
+   * Handle the rollback operation.
+   */
+  private def handleRollback(metaClient: HoodieTableMetaClient, instantTime: 
String): Seq[Row] = {
+    // In a real implementation, this would call PartitionBucketIndexManager
+    // For now, just return a placeholder result
+    val details = s"Rolled back bucket rescale action: $instantTime"
+
+    // Here would be the actual call to PartitionBucketIndexManager
+    // PartitionBucketIndexManager.rollback(metaClient, instantTime)
+
+    Seq(Row("SUCCESS", "ROLLBACK", details))
+  }
+
+  /**
+   * Handle the show-config operation.
+   */
+  private def handleShowConfig(metaClient: HoodieTableMetaClient): Seq[Row] = {
+
+
+
+    Seq(Row("SUCCESS", "SHOW_CONFIG", null))
+  }
+
+  override def build: Procedure = new PartitionBucketIndexManager()
+}
+
+object PartitionBucketIndexManager {
+  val NAME = "PartitionBucketIndexManager"
+
+  def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+    override def get() = new PartitionBucketIndexManager()
+  }
+}
\ No newline at end of file
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
index e0e60e3f6b4..5fd8fd4ad19 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
@@ -74,6 +74,7 @@ class HoodieSparkSqlTestBase extends FunSuite with 
BeforeAndAfterAll {
   TimeZone.setDefault(DateTimeUtils.getTimeZone("UTC"))
   protected lazy val spark: SparkSession = SparkSession.builder()
     .config("spark.sql.warehouse.dir", sparkWareHouse.getCanonicalPath)
+    .config("spark.driver.bindAddress", "127.0.0.1")
     .config("spark.sql.session.timeZone", "UTC")
     .config("hoodie.insert.shuffle.parallelism", "4")
     .config("hoodie.upsert.shuffle.parallelism", "4")
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
index 7c1bc0aa2a2..532a000f0f6 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
@@ -1847,6 +1847,94 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
     }
   }
 
+  test("Test Bulk Insert Into Partition Bucket Index Table") {
+    withSQLConf(
+      "hoodie.datasource.write.operation" -> "bulk_insert",
+      "hoodie.bulkinsert.shuffle.parallelism" -> "1") {
+      Seq("mor", "cow").foreach { tableType =>
+        Seq("true", "false").foreach { bulkInsertAsRow =>
+          withTempDir { tmp =>
+            val tableName = generateTableName
+            // Create a partitioned table
+            spark.sql(
+              s"""
+                 |create table $tableName (
+                 |  id int,
+                 |  dt string,
+                 |  name string,
+                 |  price double,
+                 |  ts long
+                 |) using hudi
+                 | tblproperties (
+                 | primaryKey = 'id,name',
+                 | type = '$tableType',
+                 | preCombineField = 'ts',
+                 | hoodie.index.type = 'BUCKET',
+                 | hoodie.bucket.index.hash.field = 'id,name',
+                 | hoodie.datasource.write.row.writer.enable = 
'$bulkInsertAsRow')
+                 | partitioned by (dt)
+                 | location '${tmp.getCanonicalPath}'
+                 """.stripMargin)
+
+            // Note: Do not write the field alias, the partition field must be 
placed last.
+            spark.sql(
+              s"""
+                 | insert into $tableName values
+                 | (1, 'a1,1', 10, 1000, "2021-01-05"),
+                 | (2, 'a2', 20, 2000, "2021-01-06"),
+                 | (3, 'a3,3', 30, 3000, "2021-01-07")
+                 """.stripMargin)
+
+            checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+              Seq(1, "a1,1", 10.0, 1000, "2021-01-05"),
+              Seq(2, "a2", 20.0, 2000, "2021-01-06"),
+              Seq(3, "a3,3", 30.0, 3000, "2021-01-07")
+            )
+
+            // for COW with disabled Spark native row writer, multiple bulk 
inserts are restricted
+            if (tableType != "cow" && bulkInsertAsRow != "false") {
+              spark.sql(
+                s"""
+                   | insert into $tableName values
+                   | (1, 'a1', 10, 1000, "2021-01-05"),
+                   | (3, "a3", 30, 3000, "2021-01-07")
+                 """.stripMargin)
+
+              checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+                Seq(1, "a1,1", 10.0, 1000, "2021-01-05"),
+                Seq(1, "a1", 10.0, 1000, "2021-01-05"),
+                Seq(2, "a2", 20.0, 2000, "2021-01-06"),
+                Seq(3, "a3,3", 30.0, 3000, "2021-01-07"),
+                Seq(3, "a3", 30.0, 3000, "2021-01-07")
+              )
+
+              // there are two files in partition(dt = '2021-01-05')
+              checkAnswer(s"select count(distinct _hoodie_file_name) from 
$tableName where dt = '2021-01-05'")(
+                Seq(2)
+              )
+
+              // would generate 6 other files in partition(dt = '2021-01-05')
+              spark.sql(
+                s"""
+                   | insert into $tableName values
+                   | (4, 'a1,1', 10, 1000, "2021-01-05"),
+                   | (5, 'a1,1', 10, 1000, "2021-01-05"),
+                   | (6, 'a1,1', 10, 1000, "2021-01-05"),
+                   | (7, 'a1,1', 10, 1000, "2021-01-05"),
+                   | (8, 'a1,1', 10, 1000, "2021-01-05"),
+                   | (10, 'a3,3', 30, 3000, "2021-01-05")
+                 """.stripMargin)
+
+              checkAnswer(s"select count(distinct _hoodie_file_name) from 
$tableName where dt = '2021-01-05'")(
+                Seq(8)
+              )
+            }
+          }
+        }
+      }
+    }
+  }
+
   test("Test not supported multiple BULK INSERTs into COW with SIMPLE BUCKET 
and disabled Spark native row writer") {
     withSQLConf("hoodie.datasource.write.operation" -> "bulk_insert",
       "hoodie.bulkinsert.shuffle.parallelism" -> "1") {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestPartitionBucketIndexManager.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestPartitionBucketIndexManager.scala
new file mode 100644
index 00000000000..93d2a557afe
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestPartitionBucketIndexManager.scala
@@ -0,0 +1,31 @@
+package org.apache.spark.sql.hudi.procedure
+
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.hadoop.fs.HadoopFSUtils
+import org.apache.hudi.index.bucket.PartitionBucketIndexUtils
+
+class TestPartitionBucketIndexManager extends HoodieSparkProcedureTestBase {
+
+  test("Case1: Test Call drop_partition Procedure For Multiple Partitions: '*' 
stands for all partitions in leaf partition") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val tablePath = tmp.getCanonicalPath + "/" + tableName
+      val metaClient = getTableMetaClient(tablePath)
+      val expressions = "\\d{4}-(06-(01|17|18)|11-(01|10|11)),256"
+      val rule ="regex"
+      val defaultBucketNumber = 10
+      PartitionBucketIndexUtils.initHashingConfig(metaClient, expressions, 
rule, defaultBucketNumber, null)
+
+      spark.sql(s"""call PartitionBucketIndexManager(table => '$tableName', 
show-config => 'true')""").collect()
+
+    }
+  }
+
+  private def getTableMetaClient(tablePath: String): HoodieTableMetaClient = {
+    HoodieTableMetaClient.builder()
+      .setBasePath(tablePath)
+      
.setConf(HadoopFSUtils.getStorageConf(spark.sparkContext.hadoopConfiguration))
+      .build()
+  }
+
+}


Reply via email to