This is an automated email from the ASF dual-hosted git repository.

zhangyue19921010 pushed a commit to branch HUDI-9207
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 2e77c75eab29e282355f3b16434dbb330b07a57b
Author: YueZhang <[email protected]>
AuthorDate: Fri Mar 21 18:50:14 2025 +0800

    Spark Insert Overwrite Support Row Writer
---
 .../hudi/HoodieDatasetBulkInsertHelper.scala       | 15 +++++-
 .../spark/sql/hudi/dml/TestInsertTable.scala       | 55 ++++++++++++++++++++++
 2 files changed, 69 insertions(+), 1 deletion(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
index f4bb5b81e85..ab5cd537cfc 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
@@ -33,7 +33,7 @@ import org.apache.hudi.index.HoodieIndex.BucketIndexEngineType
 import org.apache.hudi.index.{HoodieIndex, SparkHoodieIndexFactory}
 import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
 import org.apache.hudi.keygen.{AutoRecordGenWrapperKeyGenerator, 
BuiltinKeyGenerator, KeyGenUtils}
-import 
org.apache.hudi.table.action.commit.{BulkInsertDataInternalWriterHelper, 
ConsistentBucketBulkInsertDataInternalWriterHelper, ParallelismHelper}
+import 
org.apache.hudi.table.action.commit.{BucketBulkInsertDataInternalWriterHelper, 
BulkInsertDataInternalWriterHelper, 
ConsistentBucketBulkInsertDataInternalWriterHelper, ParallelismHelper}
 import org.apache.hudi.table.{BulkInsertPartitioner, HoodieTable}
 import org.apache.hudi.util.JFunction.toJavaSerializableFunctionUnchecked
 import org.apache.spark.TaskContext
@@ -179,6 +179,19 @@ object HoodieDatasetBulkInsertHelper
               writeConfig.populateMetaFields,
               arePartitionRecordsSorted,
               shouldPreserveHoodieMetadata)
+          case HoodieIndex.IndexType.BUCKET if 
writeConfig.getBucketIndexEngineType
+            == BucketIndexEngineType.SIMPLE =>
+            new BucketBulkInsertDataInternalWriterHelper(
+              table,
+              writeConfig,
+              instantTime,
+              taskPartitionId,
+              taskId,
+              taskEpochId,
+              schema,
+              writeConfig.populateMetaFields,
+              arePartitionRecordsSorted,
+              shouldPreserveHoodieMetadata)
           case _ =>
             new BulkInsertDataInternalWriterHelper(
               table,
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
index 7c1bc0aa2a2..6d90ac8e90d 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
@@ -1847,6 +1847,61 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
     }
   }
 
+  test("Test Insert Overwrite Bucket Index Table when disable row writer") {
+    withSQLConf(
+      "hoodie.datasource.write.operation" -> "bulk_insert",
+      "hoodie.bulkinsert.shuffle.parallelism" -> "1") {
+      withTempDir { tmp =>
+        val tableName = generateTableName
+        // Create a partitioned table
+        spark.sql(
+          s"""
+             |create table $tableName (
+             |  id int,
+             |  dt string,
+             |  name string,
+             |  price double,
+             |  ts long
+             |) using hudi
+             | tblproperties (
+             | primaryKey = 'id,name',
+             | type = 'cow',
+             | preCombineField = 'ts',
+             | hoodie.index.type = 'BUCKET',
+             | hoodie.bucket.index.hash.field = 'id,name',
+             | hoodie.datasource.write.row.writer.enable = 'true')
+             | partitioned by (dt)
+             | location '${tmp.getCanonicalPath}'
+       """.stripMargin)
+
+        // Note: Do not write the field alias, the partition field must be 
placed last.
+        spark.sql(
+          s"""
+             | insert into $tableName values
+             | (1, 'a1,1', 10, 1000, "2021-01-05"),
+             | (2, 'a2', 20, 2000, "2021-01-06"),
+             | (3, 'a3,3', 30, 3000, "2021-01-07")
+       """.stripMargin)
+
+        checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+          Seq(1, "a1,1", 10.0, 1000, "2021-01-05"),
+          Seq(2, "a2", 20.0, 2000, "2021-01-06"),
+          Seq(3, "a3,3", 30.0, 3000, "2021-01-07")
+        )
+
+        spark.sql(
+          s"""
+             | insert overwrite $tableName values
+             | (1, 'a1,1', 100, 1000, "2021-01-05")
+       """.stripMargin)
+
+        checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+          Seq(1, "a1,1", 100.0, 1000, "2021-01-05")
+        )
+      }
+    }
+  }
+
   test("Test not supported multiple BULK INSERTs into COW with SIMPLE BUCKET 
and disabled Spark native row writer") {
     withSQLConf("hoodie.datasource.write.operation" -> "bulk_insert",
       "hoodie.bulkinsert.shuffle.parallelism" -> "1") {

Reply via email to