This is an automated email from the ASF dual-hosted git repository. zhangyue19921010 pushed a commit to branch HUDI-9207 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 2e77c75eab29e282355f3b16434dbb330b07a57b Author: YueZhang <[email protected]> AuthorDate: Fri Mar 21 18:50:14 2025 +0800 Spark Insert Overwrite Support Row Writer --- .../hudi/HoodieDatasetBulkInsertHelper.scala | 15 +++++- .../spark/sql/hudi/dml/TestInsertTable.scala | 55 ++++++++++++++++++++++ 2 files changed, 69 insertions(+), 1 deletion(-) diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala index f4bb5b81e85..ab5cd537cfc 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala @@ -33,7 +33,7 @@ import org.apache.hudi.index.HoodieIndex.BucketIndexEngineType import org.apache.hudi.index.{HoodieIndex, SparkHoodieIndexFactory} import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory import org.apache.hudi.keygen.{AutoRecordGenWrapperKeyGenerator, BuiltinKeyGenerator, KeyGenUtils} -import org.apache.hudi.table.action.commit.{BulkInsertDataInternalWriterHelper, ConsistentBucketBulkInsertDataInternalWriterHelper, ParallelismHelper} +import org.apache.hudi.table.action.commit.{BucketBulkInsertDataInternalWriterHelper, BulkInsertDataInternalWriterHelper, ConsistentBucketBulkInsertDataInternalWriterHelper, ParallelismHelper} import org.apache.hudi.table.{BulkInsertPartitioner, HoodieTable} import org.apache.hudi.util.JFunction.toJavaSerializableFunctionUnchecked import org.apache.spark.TaskContext @@ -179,6 +179,19 @@ object HoodieDatasetBulkInsertHelper writeConfig.populateMetaFields, arePartitionRecordsSorted, shouldPreserveHoodieMetadata) + case HoodieIndex.IndexType.BUCKET if writeConfig.getBucketIndexEngineType + == BucketIndexEngineType.SIMPLE => + new BucketBulkInsertDataInternalWriterHelper( + table, + writeConfig, + instantTime, + taskPartitionId, + taskId, + taskEpochId, + schema, + writeConfig.populateMetaFields, + arePartitionRecordsSorted, + shouldPreserveHoodieMetadata) case _ => new BulkInsertDataInternalWriterHelper( table, diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala index 7c1bc0aa2a2..6d90ac8e90d 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala @@ -1847,6 +1847,61 @@ class TestInsertTable extends HoodieSparkSqlTestBase { } } + test("Test Insert Overwrite Bucket Index Table when disable row writer") { + withSQLConf( + "hoodie.datasource.write.operation" -> "bulk_insert", + "hoodie.bulkinsert.shuffle.parallelism" -> "1") { + withTempDir { tmp => + val tableName = generateTableName + // Create a partitioned table + spark.sql( + s""" + |create table $tableName ( + | id int, + | dt string, + | name string, + | price double, + | ts long + |) using hudi + | tblproperties ( + | primaryKey = 'id,name', + | type = 'cow', + | preCombineField = 'ts', + | hoodie.index.type = 'BUCKET', + | hoodie.bucket.index.hash.field = 'id,name', + | hoodie.datasource.write.row.writer.enable = 'true') + | partitioned by (dt) + | location '${tmp.getCanonicalPath}' + """.stripMargin) + + // Note: Do not write the field alias, the partition field must be placed last. + spark.sql( + s""" + | insert into $tableName values + | (1, 'a1,1', 10, 1000, "2021-01-05"), + | (2, 'a2', 20, 2000, "2021-01-06"), + | (3, 'a3,3', 30, 3000, "2021-01-07") + """.stripMargin) + + checkAnswer(s"select id, name, price, ts, dt from $tableName")( + Seq(1, "a1,1", 10.0, 1000, "2021-01-05"), + Seq(2, "a2", 20.0, 2000, "2021-01-06"), + Seq(3, "a3,3", 30.0, 3000, "2021-01-07") + ) + + spark.sql( + s""" + | insert overwrite $tableName values + | (1, 'a1,1', 100, 1000, "2021-01-05") + """.stripMargin) + + checkAnswer(s"select id, name, price, ts, dt from $tableName")( + Seq(1, "a1,1", 100.0, 1000, "2021-01-05") + ) + } + } + } + test("Test not supported multiple BULK INSERTs into COW with SIMPLE BUCKET and disabled Spark native row writer") { withSQLConf("hoodie.datasource.write.operation" -> "bulk_insert", "hoodie.bulkinsert.shuffle.parallelism" -> "1") {
