This is an automated email from the ASF dual-hosted git repository.
forwardxu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 419f881167 [HUDI-4643] MergeInto syntax WHEN MATCHED is optional but
must be set (#6443)
419f881167 is described below
commit 419f881167bc9800a93d0e121cb376cf092bfb6f
Author: 董可伦 <[email protected]>
AuthorDate: Sat Aug 20 12:34:46 2022 +0800
[HUDI-4643] MergeInto syntax WHEN MATCHED is optional but must be set
(#6443)
---
.../hudi/command/MergeIntoHoodieTableCommand.scala | 6 +-
.../spark/sql/hudi/TestMergeIntoTable2.scala | 94 +++++++++++++++++++++-
2 files changed, 98 insertions(+), 2 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index 6cd96d7064..5676b72aef 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -153,7 +153,11 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
if (mergeInto.matchedActions.nonEmpty) { // Do the upsert
executeUpsert(sourceDF, parameters)
} else { // If there is no match actions in the statement, execute insert
operation only.
- executeInsertOnly(sourceDF, parameters)
+ val targetDF = Dataset.ofRows(sparkSession, mergeInto.targetTable)
+ val primaryKeys =
hoodieCatalogTable.tableConfig.getRecordKeyFieldProp.split(",")
+ // Only records that are not included in the target table can be inserted
+ val insertSourceDF = sourceDF.join(targetDF, primaryKeys,"leftanti")
+ executeInsertOnly(insertSourceDF, parameters)
}
sparkSession.catalog.refreshTable(targetTableIdentify.unquotedString)
Seq.empty[Row]
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala
index e162368dac..1c2dc0aab6 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala
@@ -258,7 +258,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
| )
""".stripMargin)
- // Insert data to source table
+ // Insert data
spark.sql(s"insert into $tableName select 1, 'a1', 1, 10, '2021-03-21'")
checkAnswer(s"select id, name, price, ts, dt from $tableName")(
Seq(1, "a1", 1.0, 10, "2021-03-21")
@@ -544,4 +544,96 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
}
}
+ test("Test only insert when source table contains history") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ // Create table
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long,
+ | dt string
+ |) using hudi
+ | location '${tmp.getCanonicalPath}/$tableName'
+ | tblproperties (
+ | primaryKey ='id',
+ | preCombineField = 'ts'
+ | )
+ """.stripMargin)
+ // Insert data
+ spark.sql(s"insert into $tableName select 1, 'a1', 1, 10, '2022-08-18'")
+ checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+ Seq(1, "a1", 1.0, 10, "2022-08-18")
+ )
+
+ // Insert data which not matched insert-condition.
+ spark.sql(
+ s"""
+ | merge into $tableName as t0
+ | using (
+ | select 1 as id, 'a1' as name, 11 as price, 110 as ts,
'2022-08-19' as dt union all
+ | select 2 as id, 'a2' as name, 10 as price, 100 as ts,
'2022-08-18' as dt
+ | ) as s0
+ | on t0.id = s0.id
+ | when not matched then insert *
+ """.stripMargin
+ )
+
+ checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+ Seq(1, "a1", 1.0, 10, "2022-08-18"),
+ Seq(2, "a2", 10.0, 100, "2022-08-18")
+ )
+ }
+ }
+
+ test("Test only insert when source table contains history and target table
has multiple keys") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ // Create table with multiple keys
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id1 int,
+ | id2 int,
+ | name string,
+ | price double,
+ | ts long,
+ | dt string
+ |) using hudi
+ | location '${tmp.getCanonicalPath}/$tableName'
+ | tblproperties (
+ | primaryKey ='id1,id2',
+ | preCombineField = 'ts'
+ | )
+ """.stripMargin)
+ spark.sql("set hoodie.merge.allow.duplicate.on.inserts = true")
+ // Insert data
+ spark.sql(s"insert into $tableName select 1, 1, 'a1', 1, 10,
'2022-08-18'")
+ checkAnswer(s"select id1, id2, name, price, ts, dt from $tableName")(
+ Seq(1, 1, "a1", 1.0, 10, "2022-08-18")
+ )
+
+ // Insert data which not matched insert-condition.
+ spark.sql(
+ s"""
+ | merge into $tableName as t0
+ | using (
+ | select 1 as id1, 1 as id2, 'a1' as name, 11 as price, 110 as ts,
'2022-08-19' as dt union all
+ | select 1 as id1, 2 as id2, 'a2' as name, 10 as price, 100 as ts,
'2022-08-18' as dt
+ | ) as s0
+ | on t0.id1 = s0.id1
+ | when not matched then insert *
+ """.stripMargin
+ )
+
+ checkAnswer(s"select id1, id2, name, price, ts, dt from $tableName")(
+ Seq(1, 1, "a1", 1.0, 10, "2022-08-18"),
+ Seq(1, 2, "a2", 10.0, 100, "2022-08-18")
+ )
+ }
+ }
+
}