This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 69b41e1b0a06 fix: INSERT INTO should preserve duplication with
hoodie.spark.sql.insert.into.operation=insert (#14046)
69b41e1b0a06 is described below
commit 69b41e1b0a061576a0f9bac82d0c43ae67d2c850
Author: Vamshi Krishna Kyatham
<[email protected]>
AuthorDate: Fri Oct 3 07:00:52 2025 -0700
fix: INSERT INTO should preserve duplication with
hoodie.spark.sql.insert.into.operation=insert (#14046)
---
.../spark/sql/hudi/ProvidesHoodieConfig.scala | 11 ++++++-
.../sql/hudi/dml/insert/TestInsertTable.scala | 36 +++++++++++++++++++++-
2 files changed, 45 insertions(+), 2 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
index e126bb97f239..b57d08f2800c 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
@@ -215,7 +215,16 @@ trait ProvidesHoodieConfig extends Logging {
val insertDupPolicy = combinedOpts.getOrElse(INSERT_DUP_POLICY.key(),
INSERT_DUP_POLICY.defaultValue())
val isNonStrictMode = insertMode == InsertMode.NON_STRICT
val isPartitionedTable = hoodieCatalogTable.partitionFields.nonEmpty
- val combineBeforeInsert = !hoodieCatalogTable.orderingFields.isEmpty &&
hoodieCatalogTable.primaryKeys.nonEmpty
+ val combineBeforeInsert =
combinedOpts.get(HoodieWriteConfig.COMBINE_BEFORE_INSERT.key()) match {
+ case Some(value) =>
+ value.toBoolean
+ case None =>
+ if (sparkSqlInsertIntoOperationSet &&
sparkSqlInsertIntoOperation.equals(INSERT_OPERATION_OPT_VAL)) {
+ false
+ } else {
+ !hoodieCatalogTable.orderingFields.isEmpty &&
hoodieCatalogTable.primaryKeys.nonEmpty
+ }
+ }
/*
* The sql write operation has higher precedence than the legacy insert
mode.
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/insert/TestInsertTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/insert/TestInsertTable.scala
index 627c6a34e606..ee4e61c45511 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/insert/TestInsertTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/insert/TestInsertTable.scala
@@ -2498,18 +2498,31 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
assertResult(expectedOperationtype) {
getLastCommitMetadata(spark,
s"${tmp.getCanonicalPath}/$tableName").getOperationType
}
+ // Check if INSERT operation was explicitly set in setOptions
+ val isExplicitInsertOperation = setOptions.exists(option =>
+ option.toLowerCase.contains("set " + SPARK_SQL_INSERT_INTO_OPERATION.key
+ " = insert")
+ )
+
if (expectedOperationtype == WriteOperationType.UPSERT) {
// dedup should happen within same batch being ingested and existing
records on storage should get updated
checkAnswer(s"select id, name, price, dt from $tableName order by id")(
Seq(1, "a1_1", 10.0, "2021-07-18"),
Seq(2, "a2_2", 30.0, "2021-07-18")
)
+ } else if (isExplicitInsertOperation) {
+ // duplications are retained as INSERT is explicitly set
+ checkAnswer(s"select id, name, price, dt from $tableName order by id")(
+ Seq(1, "a1", 10.0, "2021-07-18"),
+ Seq(1, "a1_1", 10.0, "2021-07-18"),
+ Seq(2, "a2", 20.0, "2021-07-18"),
+ Seq(2, "a2_2", 30.0, "2021-07-18")
+ )
} else {
// no dedup across batches
checkAnswer(s"select id, name, price, dt from $tableName order by id")(
Seq(1, "a1", 10.0, "2021-07-18"),
Seq(1, "a1_1", 10.0, "2021-07-18"),
- // Seq(2, "a2", 20.0, "2021-07-18"), // preCombine within same batch
kicks in if preCombine is set
+ // Seq(2, "a2", 20.0, "2021-07-18"), // preCombine with no sql insert
within same batch kicks in if preCombine is set
Seq(2, "a2_2", 30.0, "2021-07-18")
)
}
@@ -3051,12 +3064,33 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
getLastCommitMetadata(spark,
s"${tmp.getCanonicalPath}/$tableName").getOperationType
}
+ // Check if INSERT operation was explicitly set in setOptions
+ val isExplicitInsertOperation = setOptions.exists(option =>
+ option.toLowerCase.contains("set " +
SPARK_SQL_INSERT_INTO_OPERATION.key + "=insert")
+ )
+
if (expectedOperationtype == WriteOperationType.UPSERT) {
// dedup should happen within same batch being ingested and existing
records on storage should get updated
checkAnswer(s"select id, name, price, dt from $tableName order by id")(
Seq(1, "a1_1", 10.0, "2021-07-18"),
Seq(2, "a2_2", 30.0, "2021-07-18")
)
+ } else if (isExplicitInsertOperation) {
+ // When INSERT operation is explicitly set, duplicates should be
preserved
+ if (insertDupPolicy == NONE_INSERT_DUP_POLICY) {
+ checkAnswer(s"select id, name, price, dt from $tableName order by
id")(
+ Seq(1, "a1", 10.0, "2021-07-18"),
+ Seq(1, "a1_1", 10.0, "2021-07-18"),
+ Seq(2, "a2", 20.0, "2021-07-18"),
+ Seq(2, "a2_2", 30.0, "2021-07-18")
+ )
+ } else if (insertDupPolicy == DROP_INSERT_DUP_POLICY) {
+ checkAnswer(s"select id, name, price, dt from $tableName order by
id")(
+ Seq(1, "a1", 10.0, "2021-07-18"),
+ Seq(2, "a2", 20.0, "2021-07-18"), // same-batch duplicates
preserved with explicit INSERT
+ Seq(2, "a2_2", 30.0, "2021-07-18")
+ )
+ }
} else {
if (insertDupPolicy == NONE_INSERT_DUP_POLICY) {
// no dedup across batches