This is an automated email from the ASF dual-hosted git repository.
forwardxu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 6589ebe8486 [HUDI-5904] support more than one update actions in merge
into table (#8133)
6589ebe8486 is described below
commit 6589ebe84865b4e5e3b90b6629aa477816a81a48
Author: xuzifu666 <[email protected]>
AuthorDate: Sat Mar 11 09:43:53 2023 +0800
[HUDI-5904] support more than one update actions in merge into table (#8133)
* support one more update actions in merge into sql
Co-authored-by: xuyu <[email protected]>
---
.../hudi/command/MergeIntoHoodieTableCommand.scala | 3 --
.../apache/spark/sql/hudi/TestMergeIntoTable.scala | 55 +++++++++++++++++++++-
2 files changed, 54 insertions(+), 4 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index c5a6281cf2d..418b0c17adb 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -623,9 +623,6 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
}
private def checkUpdatingActions(updateActions: Seq[UpdateAction]): Unit = {
- if (updateActions.length > 1) {
- throw new AnalysisException(s"Only one updating action is supported in
MERGE INTO statement (provided ${updateActions.length})")
- }
//updateActions.foreach(update =>
// assert(update.assignments.length == targetTableSchema.length,
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
index 6a203718fd1..9b979e8cc7f 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.hudi
import org.apache.hudi.{DataSourceReadOptions, HoodieDataSourceHelpers,
HoodieSparkUtils, ScalaAssertionSupport}
import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.exception.SchemaCompatibilityException
import org.apache.spark.sql.internal.SQLConf
class TestMergeIntoTable extends HoodieSparkSqlTestBase with
ScalaAssertionSupport {
@@ -115,6 +114,60 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase
with ScalaAssertionSuppo
})
}
+ test("Test MergeInto with more than once update actions") {
+ withRecordType()(withTempDir {tmp =>
+ val targetTable = generateTableName
+ spark.sql(
+ s"""
+ |create table ${targetTable} (
+ | id int,
+ | name string,
+ | data int,
+ | country string,
+ | ts bigint
+ |) using hudi
+ |tblproperties (
+ | type = 'cow',
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ | )
+ |partitioned by (country)
+ |location '${tmp.getCanonicalPath}/$targetTable'
+ |""".stripMargin)
+ spark.sql(
+ s"""
+ |merge into ${targetTable} as target
+ |using (
+ |select 1 as id, 'lb' as name, 6 as data, 'shu' as country,
1646643193 as ts
+ |) source
+ |on source.id = target.id
+ |when matched then
+ |update set *
+ |when not matched then
+ |insert *
+ |""".stripMargin)
+ spark.sql(
+ s"""
+ |merge into ${targetTable} as target
+ |using (
+ |select 1 as id, 'lb' as name, 5 as data, 'shu' as country,
1646643196 as ts
+ |) source
+ |on source.id = target.id
+ |when matched and source.data > target.data then
+ |update set target.data = source.data, target.ts = source.ts
+ |when matched and source.data = 5 then
+ |update set target.data = source.data, target.ts = source.ts
+ |when not matched then
+ |insert *
+ |""".stripMargin)
+
+ checkAnswer(s"select id, name, data, country, ts from $targetTable")(
+ Seq(1, "lb", 5, "shu", 1646643196L)
+ )
+
+ })
+ }
+
test("Test MergeInto with ignored record") {
withRecordType()(withTempDir {tmp =>
spark.sql("set hoodie.payload.combined.schema.validate = true")