This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 69b41e1b0a06 fix: INSERT INTO should preserve duplication with 
hoodie.spark.sql.insert.into.operation=insert (#14046)
69b41e1b0a06 is described below

commit 69b41e1b0a061576a0f9bac82d0c43ae67d2c850
Author: Vamshi Krishna Kyatham 
<[email protected]>
AuthorDate: Fri Oct 3 07:00:52 2025 -0700

    fix: INSERT INTO should preserve duplication with 
hoodie.spark.sql.insert.into.operation=insert (#14046)
---
 .../spark/sql/hudi/ProvidesHoodieConfig.scala      | 11 ++++++-
 .../sql/hudi/dml/insert/TestInsertTable.scala      | 36 +++++++++++++++++++++-
 2 files changed, 45 insertions(+), 2 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
index e126bb97f239..b57d08f2800c 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
@@ -215,7 +215,16 @@ trait ProvidesHoodieConfig extends Logging {
     val insertDupPolicy = combinedOpts.getOrElse(INSERT_DUP_POLICY.key(), 
INSERT_DUP_POLICY.defaultValue())
     val isNonStrictMode = insertMode == InsertMode.NON_STRICT
     val isPartitionedTable = hoodieCatalogTable.partitionFields.nonEmpty
-    val combineBeforeInsert = !hoodieCatalogTable.orderingFields.isEmpty && 
hoodieCatalogTable.primaryKeys.nonEmpty
+    val combineBeforeInsert = 
combinedOpts.get(HoodieWriteConfig.COMBINE_BEFORE_INSERT.key()) match {
+      case Some(value) =>
+        value.toBoolean
+      case None =>
+        if (sparkSqlInsertIntoOperationSet && 
sparkSqlInsertIntoOperation.equals(INSERT_OPERATION_OPT_VAL)) {
+          false
+        } else {
+          !hoodieCatalogTable.orderingFields.isEmpty && 
hoodieCatalogTable.primaryKeys.nonEmpty
+        }
+    }
 
     /*
      * The sql write operation has higher precedence than the legacy insert 
mode.
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/insert/TestInsertTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/insert/TestInsertTable.scala
index 627c6a34e606..ee4e61c45511 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/insert/TestInsertTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/insert/TestInsertTable.scala
@@ -2498,18 +2498,31 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
     assertResult(expectedOperationtype) {
       getLastCommitMetadata(spark, 
s"${tmp.getCanonicalPath}/$tableName").getOperationType
     }
+    // Check if INSERT operation was explicitly set in setOptions
+    val isExplicitInsertOperation = setOptions.exists(option =>
+      option.toLowerCase.contains("set " + SPARK_SQL_INSERT_INTO_OPERATION.key 
+ " = insert")
+    )
+
     if (expectedOperationtype == WriteOperationType.UPSERT) {
       // dedup should happen within same batch being ingested and existing 
records on storage should get updated
       checkAnswer(s"select id, name, price, dt from $tableName order by id")(
         Seq(1, "a1_1", 10.0, "2021-07-18"),
         Seq(2, "a2_2", 30.0, "2021-07-18")
       )
+    } else if (isExplicitInsertOperation) {
+      // duplications are retained as INSERT is explicitly set
+      checkAnswer(s"select id, name, price, dt from $tableName order by id")(
+        Seq(1, "a1", 10.0, "2021-07-18"),
+        Seq(1, "a1_1", 10.0, "2021-07-18"),
+        Seq(2, "a2", 20.0, "2021-07-18"),
+        Seq(2, "a2_2", 30.0, "2021-07-18")
+      )
     } else {
       // no dedup across batches
       checkAnswer(s"select id, name, price, dt from $tableName order by id")(
         Seq(1, "a1", 10.0, "2021-07-18"),
         Seq(1, "a1_1", 10.0, "2021-07-18"),
-        // Seq(2, "a2", 20.0, "2021-07-18"), // preCombine within same batch 
kicks in if preCombine is set
+        // Seq(2, "a2", 20.0, "2021-07-18"), // preCombine with no sql insert 
within same batch kicks in if preCombine is set
         Seq(2, "a2_2", 30.0, "2021-07-18")
       )
     }
@@ -3051,12 +3064,33 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
         getLastCommitMetadata(spark, 
s"${tmp.getCanonicalPath}/$tableName").getOperationType
       }
 
+      // Check if INSERT operation was explicitly set in setOptions
+      val isExplicitInsertOperation = setOptions.exists(option =>
+        option.toLowerCase.contains("set " + 
SPARK_SQL_INSERT_INTO_OPERATION.key + "=insert")
+      )
+
       if (expectedOperationtype == WriteOperationType.UPSERT) {
         // dedup should happen within same batch being ingested and existing 
records on storage should get updated
         checkAnswer(s"select id, name, price, dt from $tableName order by id")(
           Seq(1, "a1_1", 10.0, "2021-07-18"),
           Seq(2, "a2_2", 30.0, "2021-07-18")
         )
+      } else if (isExplicitInsertOperation) {
+        // When INSERT operation is explicitly set, duplicates should be 
preserved
+        if (insertDupPolicy == NONE_INSERT_DUP_POLICY) {
+          checkAnswer(s"select id, name, price, dt from $tableName order by 
id")(
+            Seq(1, "a1", 10.0, "2021-07-18"),
+            Seq(1, "a1_1", 10.0, "2021-07-18"),
+            Seq(2, "a2", 20.0, "2021-07-18"),
+            Seq(2, "a2_2", 30.0, "2021-07-18")
+          )
+        } else if (insertDupPolicy == DROP_INSERT_DUP_POLICY) {
+          checkAnswer(s"select id, name, price, dt from $tableName order by 
id")(
+            Seq(1, "a1", 10.0, "2021-07-18"),
+            Seq(2, "a2", 20.0, "2021-07-18"), // same-batch duplicates 
preserved with explicit INSERT
+            Seq(2, "a2_2", 30.0, "2021-07-18")
+          )
+        }
       } else {
         if (insertDupPolicy == NONE_INSERT_DUP_POLICY) {
           // no dedup across batches

Reply via email to