This is an automated email from the ASF dual-hosted git repository. zhangyue19921010 pushed a commit to branch HUDI-8990-V2 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 139083dd2b16207a66397013e28cb51a06a22396 Author: YueZhang <[email protected]> AuthorDate: Sun Mar 23 01:55:48 2025 +0800 fix ut fix ut fix ut fix ut --- .../bucket/PartitionBucketIndexCalculator.java | 1 + .../BucketIndexBulkInsertPartitionerWithRows.java | 5 ++-- .../model/PartitionBucketIndexHashingConfig.java | 10 ++++++++ .../DatasetBucketRescaleCommitActionExecutor.java | 28 ++++++++++++++++------ .../procedures/PartitionBucketIndexManager.scala | 1 + .../sql/hudi/common/HoodieSparkSqlTestBase.scala | 1 - .../TestInsertTableWithPartitionBucketIndex.scala | 12 ++++++---- 7 files changed, 44 insertions(+), 14 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/PartitionBucketIndexCalculator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/PartitionBucketIndexCalculator.java index 0cbdf36d088..26d71760f03 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/PartitionBucketIndexCalculator.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/PartitionBucketIndexCalculator.java @@ -50,6 +50,7 @@ import java.util.regex.Pattern; * exists for each unique hashingInstantToLoad value. */ public class PartitionBucketIndexCalculator implements Serializable { + private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(PartitionBucketIndexCalculator.class); // Map to store singleton instances for each instantToLoad + configuration hash combination private static final ConcurrentMap<String, PartitionBucketIndexCalculator> INSTANCES = new ConcurrentHashMap<>(); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketIndexBulkInsertPartitionerWithRows.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketIndexBulkInsertPartitionerWithRows.java index 3a94fa78cb3..ee1fcc9c380 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketIndexBulkInsertPartitionerWithRows.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketIndexBulkInsertPartitionerWithRows.java @@ -18,6 +18,7 @@ package org.apache.hudi.execution.bulkinsert; +import org.apache.hudi.common.model.PartitionBucketIndexHashingConfig; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.index.bucket.PartitionBucketIndexCalculator; import org.apache.hudi.table.BulkInsertPartitioner; @@ -47,11 +48,11 @@ public class BucketIndexBulkInsertPartitionerWithRows implements BulkInsertParti } } - public BucketIndexBulkInsertPartitionerWithRows(String indexKeyFields, int bucketNum, PartitionBucketIndexCalculator calc) { + public BucketIndexBulkInsertPartitionerWithRows(String indexKeyFields, int bucketNum, PartitionBucketIndexHashingConfig hashingConfig) { this.indexKeyFields = indexKeyFields; this.bucketNum = bucketNum; this.isPartitionBucketIndexEnable = true; - this.calc = calc; + this.calc = PartitionBucketIndexCalculator.getInstance(hashingConfig.getInstant(), hashingConfig); } @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/PartitionBucketIndexHashingConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/model/PartitionBucketIndexHashingConfig.java index a658003f851..195b3e23275 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/PartitionBucketIndexHashingConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/PartitionBucketIndexHashingConfig.java @@ -32,6 +32,7 @@ import java.nio.charset.StandardCharsets; @JsonIgnoreProperties(ignoreUnknown = true) public class PartitionBucketIndexHashingConfig implements Serializable { + private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(PartitionBucketIndexHashingConfig.class); public static final String HASHING_CONFIG_FILE_SUFFIX = ".hashing_config"; public static final Integer CURRENT_VERSION = 1; @@ -97,4 +98,13 @@ public class PartitionBucketIndexHashingConfig implements Serializable { public String getExpressions() { return expressions; } + + public String toString() { + return "PartitionBucketIndexHashingConfig{" + "expressions='" + expressions + '\'' + + ", defaultBucketNumber='" + defaultBucketNumber + '\'' + + ", rule='" + rule + '\'' + + ", version='" + version + '\'' + + ", instant=" + instant + + '}'; + } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBucketRescaleCommitActionExecutor.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBucketRescaleCommitActionExecutor.java index 1c6d90eb6f7..3820840036a 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBucketRescaleCommitActionExecutor.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBucketRescaleCommitActionExecutor.java @@ -20,11 +20,13 @@ package org.apache.hudi.commit; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.model.PartitionBucketIndexHashingConfig; import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.data.HoodieJavaPairRDD; import org.apache.hudi.execution.bulkinsert.BucketIndexBulkInsertPartitionerWithRows; -import org.apache.hudi.index.bucket.PartitionBucketIndexCalculator; import org.apache.hudi.index.bucket.PartitionBucketIndexUtils; import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.table.action.HoodieWriteMetadata; @@ -32,10 +34,18 @@ import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; public class DatasetBucketRescaleCommitActionExecutor extends DatasetBulkInsertOverwriteCommitActionExecutor { - private final PartitionBucketIndexCalculator calc; + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(DatasetBucketRescaleCommitActionExecutor.class); + private final PartitionBucketIndexHashingConfig hashingConfig; public DatasetBucketRescaleCommitActionExecutor(HoodieWriteConfig config, SparkRDDWriteClient writeClient, @@ -45,9 +55,8 @@ public class DatasetBucketRescaleCommitActionExecutor extends DatasetBulkInsertO String instant = config.getHashingConfigInstantToLoad(); String rule = config.getBucketIndexPartitionRuleType(); int bucketNumber = config.getBucketIndexNumBuckets(); - PartitionBucketIndexHashingConfig hashingConfig = new PartitionBucketIndexHashingConfig(expression, + this.hashingConfig = new PartitionBucketIndexHashingConfig(expression, bucketNumber, rule, PartitionBucketIndexHashingConfig.CURRENT_VERSION, instant); - this.calc = PartitionBucketIndexCalculator.getInstance(instantTime, hashingConfig); } /** @@ -59,7 +68,7 @@ public class DatasetBucketRescaleCommitActionExecutor extends DatasetBulkInsertO @Override protected BulkInsertPartitioner<Dataset<Row>> getPartitioner(boolean populateMetaFields, boolean isTablePartitioned) { return new BucketIndexBulkInsertPartitionerWithRows(writeConfig.getBucketIndexHashFieldWithDefault(), - writeConfig.getBucketIndexNumBuckets(), calc); + writeConfig.getBucketIndexNumBuckets(), hashingConfig); } /** @@ -69,9 +78,14 @@ public class DatasetBucketRescaleCommitActionExecutor extends DatasetBulkInsertO @Override protected void afterExecute(HoodieWriteMetadata<JavaRDD<WriteStatus>> result) { super.afterExecute(result); - - PartitionBucketIndexHashingConfig hashingConfig = calc.getHashingConfig(); boolean res = PartitionBucketIndexUtils.saveHashingConfig(hashingConfig, table.getMetaClient()); ValidationUtils.checkArgument(res); + LOG.info("Finish to save hashing config " + hashingConfig); + } + + @Override + protected Map<String, List<String>> getPartitionToReplacedFileIds(HoodieData<WriteStatus> writeStatuses) { + return HoodieJavaPairRDD.getJavaPairRDD(writeStatuses.map(status -> status.getStat().getPartitionPath()).distinct().mapToPair(partitionPath -> + Pair.of(partitionPath, getAllExistingFileIds(partitionPath)))).collectAsMap(); } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala index 85acd617041..70c6cca7573 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala @@ -242,6 +242,7 @@ class PartitionBucketIndexManager extends BaseProcedure }) } val dataFrame = HoodieUnsafeUtils.createDataFrameFromRDD(sparkSession, res, sparkSchemaWithMetaFields) + logInfo("Start to do bucket rescale for " + rescalePartitionsMap) val (success, _, _, _, _, _) = HoodieSparkSqlWriter.write( sparkSession.sqlContext, SaveMode.Append, diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala index 5fd8fd4ad19..e0e60e3f6b4 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala @@ -74,7 +74,6 @@ class HoodieSparkSqlTestBase extends FunSuite with BeforeAndAfterAll { TimeZone.setDefault(DateTimeUtils.getTimeZone("UTC")) protected lazy val spark: SparkSession = SparkSession.builder() .config("spark.sql.warehouse.dir", sparkWareHouse.getCanonicalPath) - .config("spark.driver.bindAddress", "127.0.0.1") .config("spark.sql.session.timeZone", "UTC") .config("hoodie.insert.shuffle.parallelism", "4") .config("hoodie.upsert.shuffle.parallelism", "4") diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTableWithPartitionBucketIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTableWithPartitionBucketIndex.scala index 61f4bb8a558..903d6bf1406 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTableWithPartitionBucketIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTableWithPartitionBucketIndex.scala @@ -51,6 +51,7 @@ class TestInsertTableWithPartitionBucketIndex extends HoodieSparkSqlTestBase { Seq("true", "false").foreach { bulkInsertAsRow => withTempDir { tmp => val tableName = generateTableName + val tablePath = tmp.getCanonicalPath + "/" + tableName // Create a partitioned table spark.sql( s""" @@ -70,7 +71,7 @@ class TestInsertTableWithPartitionBucketIndex extends HoodieSparkSqlTestBase { | hoodie.bucket.index.num.buckets = 1, | hoodie.datasource.write.row.writer.enable = '$bulkInsertAsRow') | partitioned by (dt) - | location '${tmp.getCanonicalPath}' + | location '${tablePath}' """.stripMargin) // Note: Do not write the field alias, the partition field must be placed last. @@ -112,7 +113,7 @@ class TestInsertTableWithPartitionBucketIndex extends HoodieSparkSqlTestBase { Seq(3, "a3,3", 30.0, 3000, "2021-01-07"), Seq(33, "a3,3", 30.0, 3000, "2021-01-07") ) - val metaClient = createMetaClient(spark, tmp.getCanonicalPath) + val metaClient = createMetaClient(spark, tablePath) val actual: List[String] = PartitionBucketIndexUtils.getAllFileIDWithPartition(metaClient).asScala.toList val expected: List[String] = List("dt=2021-01-05" + "00000000", "dt=2021-01-05" + "00000000", @@ -121,6 +122,7 @@ class TestInsertTableWithPartitionBucketIndex extends HoodieSparkSqlTestBase { "dt=2021-01-07" + "00000000", "dt=2021-01-07" + "00000001") assert(actual.sorted == expected.sorted) + spark.sparkContext.persistentRdds.foreach(rddPair => rddPair._2.unpersist(true)) } } } @@ -146,6 +148,7 @@ class TestInsertTableWithPartitionBucketIndex extends HoodieSparkSqlTestBase { Seq("cow", "mor").foreach { tableType => withTempDir { tmp => val tableName = generateTableName + val tablePath = tmp.getCanonicalPath + "/" + tableName // Create a partitioned table spark.sql( s""" @@ -164,7 +167,7 @@ class TestInsertTableWithPartitionBucketIndex extends HoodieSparkSqlTestBase { | hoodie.bucket.index.hash.field = 'id,name', | hoodie.bucket.index.num.buckets = 1) | partitioned by (dt) - | location '${tmp.getCanonicalPath}' + | location '$tablePath' | """.stripMargin) // Note: Do not write the field alias, the partition field must be placed last. @@ -208,7 +211,7 @@ class TestInsertTableWithPartitionBucketIndex extends HoodieSparkSqlTestBase { Seq(3, "a3,3", 30.0, 3000, "2021-01-07"), Seq(33, "a3,3", 30.0, 3000, "2021-01-07") ) - val metaClient = createMetaClient(spark, tmp.getCanonicalPath) + val metaClient = createMetaClient(spark, tablePath) val actual: List[String] = PartitionBucketIndexUtils.getAllFileIDWithPartition(metaClient).asScala.toList val expected: List[String] = List("dt=2021-01-05" + "00000000", "dt=2021-01-05" + "00000000", @@ -218,6 +221,7 @@ class TestInsertTableWithPartitionBucketIndex extends HoodieSparkSqlTestBase { "dt=2021-01-07" + "00000001") // compare file group as expected assert(actual.sorted == expected.sorted) + spark.sparkContext.persistentRdds.foreach(rddPair => rddPair._2.unpersist(true)) } } }
