This is an automated email from the ASF dual-hosted git repository.

zhangyue19921010 pushed a commit to branch HUDI-8990-V2
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 139083dd2b16207a66397013e28cb51a06a22396
Author: YueZhang <[email protected]>
AuthorDate: Sun Mar 23 01:55:48 2025 +0800

    fix ut
    
    fix ut
    
    fix ut
    
    fix ut
---
 .../bucket/PartitionBucketIndexCalculator.java     |  1 +
 .../BucketIndexBulkInsertPartitionerWithRows.java  |  5 ++--
 .../model/PartitionBucketIndexHashingConfig.java   | 10 ++++++++
 .../DatasetBucketRescaleCommitActionExecutor.java  | 28 ++++++++++++++++------
 .../procedures/PartitionBucketIndexManager.scala   |  1 +
 .../sql/hudi/common/HoodieSparkSqlTestBase.scala   |  1 -
 .../TestInsertTableWithPartitionBucketIndex.scala  | 12 ++++++----
 7 files changed, 44 insertions(+), 14 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/PartitionBucketIndexCalculator.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/PartitionBucketIndexCalculator.java
index 0cbdf36d088..26d71760f03 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/PartitionBucketIndexCalculator.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/PartitionBucketIndexCalculator.java
@@ -50,6 +50,7 @@ import java.util.regex.Pattern;
  * exists for each unique hashingInstantToLoad value.
  */
 public class PartitionBucketIndexCalculator implements Serializable {
+  private static final long serialVersionUID = 1L;
   private static final Logger LOG = 
LoggerFactory.getLogger(PartitionBucketIndexCalculator.class);
   // Map to store singleton instances for each instantToLoad + configuration 
hash combination
   private static final ConcurrentMap<String, PartitionBucketIndexCalculator> 
INSTANCES = new ConcurrentHashMap<>();
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketIndexBulkInsertPartitionerWithRows.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketIndexBulkInsertPartitionerWithRows.java
index 3a94fa78cb3..ee1fcc9c380 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketIndexBulkInsertPartitionerWithRows.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketIndexBulkInsertPartitionerWithRows.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.execution.bulkinsert;
 
+import org.apache.hudi.common.model.PartitionBucketIndexHashingConfig;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.index.bucket.PartitionBucketIndexCalculator;
 import org.apache.hudi.table.BulkInsertPartitioner;
@@ -47,11 +48,11 @@ public class BucketIndexBulkInsertPartitionerWithRows 
implements BulkInsertParti
     }
   }
 
-  public BucketIndexBulkInsertPartitionerWithRows(String indexKeyFields, int 
bucketNum, PartitionBucketIndexCalculator calc) {
+  public BucketIndexBulkInsertPartitionerWithRows(String indexKeyFields, int 
bucketNum, PartitionBucketIndexHashingConfig hashingConfig) {
     this.indexKeyFields = indexKeyFields;
     this.bucketNum = bucketNum;
     this.isPartitionBucketIndexEnable = true;
-    this.calc = calc;
+    this.calc = 
PartitionBucketIndexCalculator.getInstance(hashingConfig.getInstant(), 
hashingConfig);
   }
 
   @Override
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/PartitionBucketIndexHashingConfig.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/PartitionBucketIndexHashingConfig.java
index a658003f851..195b3e23275 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/PartitionBucketIndexHashingConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/PartitionBucketIndexHashingConfig.java
@@ -32,6 +32,7 @@ import java.nio.charset.StandardCharsets;
 
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class PartitionBucketIndexHashingConfig implements Serializable {
+  private static final long serialVersionUID = 1L;
   private static final Logger LOG = 
LoggerFactory.getLogger(PartitionBucketIndexHashingConfig.class);
   public static final String HASHING_CONFIG_FILE_SUFFIX = ".hashing_config";
   public static final Integer CURRENT_VERSION = 1;
@@ -97,4 +98,13 @@ public class PartitionBucketIndexHashingConfig implements 
Serializable {
   public String getExpressions() {
     return expressions;
   }
+
+  public String toString() {
+    return "PartitionBucketIndexHashingConfig{" + "expressions='" + 
expressions + '\''
+        + ", defaultBucketNumber='" + defaultBucketNumber + '\''
+        + ", rule='" + rule + '\''
+        + ", version='" + version + '\''
+        + ", instant=" + instant
+        + '}';
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBucketRescaleCommitActionExecutor.java
 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBucketRescaleCommitActionExecutor.java
index 1c6d90eb6f7..3820840036a 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBucketRescaleCommitActionExecutor.java
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBucketRescaleCommitActionExecutor.java
@@ -20,11 +20,13 @@ package org.apache.hudi.commit;
 
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.model.PartitionBucketIndexHashingConfig;
 import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaPairRDD;
 import 
org.apache.hudi.execution.bulkinsert.BucketIndexBulkInsertPartitionerWithRows;
-import org.apache.hudi.index.bucket.PartitionBucketIndexCalculator;
 import org.apache.hudi.index.bucket.PartitionBucketIndexUtils;
 import org.apache.hudi.table.BulkInsertPartitioner;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
@@ -32,10 +34,18 @@ import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
 
 public class DatasetBucketRescaleCommitActionExecutor extends 
DatasetBulkInsertOverwriteCommitActionExecutor {
 
-  private final PartitionBucketIndexCalculator calc;
+  private static final long serialVersionUID = 1L;
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(DatasetBucketRescaleCommitActionExecutor.class);
+  private final PartitionBucketIndexHashingConfig hashingConfig;
 
   public DatasetBucketRescaleCommitActionExecutor(HoodieWriteConfig config,
                                                   SparkRDDWriteClient 
writeClient,
@@ -45,9 +55,8 @@ public class DatasetBucketRescaleCommitActionExecutor extends 
DatasetBulkInsertO
     String instant = config.getHashingConfigInstantToLoad();
     String rule = config.getBucketIndexPartitionRuleType();
     int bucketNumber = config.getBucketIndexNumBuckets();
-    PartitionBucketIndexHashingConfig hashingConfig = new 
PartitionBucketIndexHashingConfig(expression,
+    this.hashingConfig = new PartitionBucketIndexHashingConfig(expression,
         bucketNumber, rule, PartitionBucketIndexHashingConfig.CURRENT_VERSION, 
instant);
-    this.calc = PartitionBucketIndexCalculator.getInstance(instantTime, 
hashingConfig);
   }
 
   /**
@@ -59,7 +68,7 @@ public class DatasetBucketRescaleCommitActionExecutor extends 
DatasetBulkInsertO
   @Override
   protected BulkInsertPartitioner<Dataset<Row>> getPartitioner(boolean 
populateMetaFields, boolean isTablePartitioned) {
     return new 
BucketIndexBulkInsertPartitionerWithRows(writeConfig.getBucketIndexHashFieldWithDefault(),
-        writeConfig.getBucketIndexNumBuckets(), calc);
+        writeConfig.getBucketIndexNumBuckets(), hashingConfig);
   }
 
   /**
@@ -69,9 +78,14 @@ public class DatasetBucketRescaleCommitActionExecutor 
extends DatasetBulkInsertO
   @Override
   protected void afterExecute(HoodieWriteMetadata<JavaRDD<WriteStatus>> 
result) {
     super.afterExecute(result);
-
-    PartitionBucketIndexHashingConfig hashingConfig = calc.getHashingConfig();
     boolean res = PartitionBucketIndexUtils.saveHashingConfig(hashingConfig, 
table.getMetaClient());
     ValidationUtils.checkArgument(res);
+    LOG.info("Finish to save hashing config " + hashingConfig);
+  }
+
+  @Override
+  protected Map<String, List<String>> 
getPartitionToReplacedFileIds(HoodieData<WriteStatus> writeStatuses) {
+    return HoodieJavaPairRDD.getJavaPairRDD(writeStatuses.map(status -> 
status.getStat().getPartitionPath()).distinct().mapToPair(partitionPath ->
+        Pair.of(partitionPath, 
getAllExistingFileIds(partitionPath)))).collectAsMap();
   }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala
index 85acd617041..70c6cca7573 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala
@@ -242,6 +242,7 @@ class PartitionBucketIndexManager extends BaseProcedure
       })
     }
     val dataFrame = HoodieUnsafeUtils.createDataFrameFromRDD(sparkSession, 
res, sparkSchemaWithMetaFields)
+    logInfo("Start to do bucket rescale for " + rescalePartitionsMap)
     val (success, _, _, _, _, _) = HoodieSparkSqlWriter.write(
       sparkSession.sqlContext,
       SaveMode.Append,
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
index 5fd8fd4ad19..e0e60e3f6b4 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
@@ -74,7 +74,6 @@ class HoodieSparkSqlTestBase extends FunSuite with 
BeforeAndAfterAll {
   TimeZone.setDefault(DateTimeUtils.getTimeZone("UTC"))
   protected lazy val spark: SparkSession = SparkSession.builder()
     .config("spark.sql.warehouse.dir", sparkWareHouse.getCanonicalPath)
-    .config("spark.driver.bindAddress", "127.0.0.1")
     .config("spark.sql.session.timeZone", "UTC")
     .config("hoodie.insert.shuffle.parallelism", "4")
     .config("hoodie.upsert.shuffle.parallelism", "4")
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTableWithPartitionBucketIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTableWithPartitionBucketIndex.scala
index 61f4bb8a558..903d6bf1406 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTableWithPartitionBucketIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTableWithPartitionBucketIndex.scala
@@ -51,6 +51,7 @@ class TestInsertTableWithPartitionBucketIndex extends 
HoodieSparkSqlTestBase {
         Seq("true", "false").foreach { bulkInsertAsRow =>
           withTempDir { tmp =>
             val tableName = generateTableName
+            val tablePath = tmp.getCanonicalPath + "/" + tableName
             // Create a partitioned table
             spark.sql(
               s"""
@@ -70,7 +71,7 @@ class TestInsertTableWithPartitionBucketIndex extends 
HoodieSparkSqlTestBase {
                  | hoodie.bucket.index.num.buckets = 1,
                  | hoodie.datasource.write.row.writer.enable = 
'$bulkInsertAsRow')
                  | partitioned by (dt)
-                 | location '${tmp.getCanonicalPath}'
+                 | location '${tablePath}'
            """.stripMargin)
 
             // Note: Do not write the field alias, the partition field must be 
placed last.
@@ -112,7 +113,7 @@ class TestInsertTableWithPartitionBucketIndex extends 
HoodieSparkSqlTestBase {
               Seq(3, "a3,3", 30.0, 3000, "2021-01-07"),
               Seq(33, "a3,3", 30.0, 3000, "2021-01-07")
             )
-            val metaClient = createMetaClient(spark, tmp.getCanonicalPath)
+            val metaClient = createMetaClient(spark, tablePath)
             val actual: List[String] = 
PartitionBucketIndexUtils.getAllFileIDWithPartition(metaClient).asScala.toList
             val expected: List[String] = List("dt=2021-01-05" + "00000000",
               "dt=2021-01-05" + "00000000",
@@ -121,6 +122,7 @@ class TestInsertTableWithPartitionBucketIndex extends 
HoodieSparkSqlTestBase {
               "dt=2021-01-07" + "00000000",
               "dt=2021-01-07" + "00000001")
             assert(actual.sorted == expected.sorted)
+            spark.sparkContext.persistentRdds.foreach(rddPair => 
rddPair._2.unpersist(true))
           }
         }
       }
@@ -146,6 +148,7 @@ class TestInsertTableWithPartitionBucketIndex extends 
HoodieSparkSqlTestBase {
       Seq("cow", "mor").foreach { tableType =>
         withTempDir { tmp =>
           val tableName = generateTableName
+          val tablePath = tmp.getCanonicalPath + "/" + tableName
           // Create a partitioned table
           spark.sql(
             s"""
@@ -164,7 +167,7 @@ class TestInsertTableWithPartitionBucketIndex extends 
HoodieSparkSqlTestBase {
                | hoodie.bucket.index.hash.field = 'id,name',
                | hoodie.bucket.index.num.buckets = 1)
                | partitioned by (dt)
-               | location '${tmp.getCanonicalPath}'
+               | location '$tablePath'
                | """.stripMargin)
 
           // Note: Do not write the field alias, the partition field must be 
placed last.
@@ -208,7 +211,7 @@ class TestInsertTableWithPartitionBucketIndex extends 
HoodieSparkSqlTestBase {
             Seq(3, "a3,3", 30.0, 3000, "2021-01-07"),
             Seq(33, "a3,3", 30.0, 3000, "2021-01-07")
           )
-          val metaClient = createMetaClient(spark, tmp.getCanonicalPath)
+          val metaClient = createMetaClient(spark, tablePath)
           val actual: List[String] = 
PartitionBucketIndexUtils.getAllFileIDWithPartition(metaClient).asScala.toList
           val expected: List[String] = List("dt=2021-01-05" + "00000000",
             "dt=2021-01-05" + "00000000",
@@ -218,6 +221,7 @@ class TestInsertTableWithPartitionBucketIndex extends 
HoodieSparkSqlTestBase {
             "dt=2021-01-07" + "00000001")
           // compare file group as expected
           assert(actual.sorted == expected.sorted)
+          spark.sparkContext.persistentRdds.foreach(rddPair => 
rddPair._2.unpersist(true))
         }
       }
     }

Reply via email to