This is an automated email from the ASF dual-hosted git repository. forwardxu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 6589ebe8486 [HUDI-5904] support more than one update actions in merge into table (#8133) 6589ebe8486 is described below commit 6589ebe84865b4e5e3b90b6629aa477816a81a48 Author: xuzifu666 <x...@zepp.com> AuthorDate: Sat Mar 11 09:43:53 2023 +0800 [HUDI-5904] support more than one update actions in merge into table (#8133) * support one more update actions in merge into sql Co-authored-by: xuyu <11161...@vivo.com> --- .../hudi/command/MergeIntoHoodieTableCommand.scala | 3 -- .../apache/spark/sql/hudi/TestMergeIntoTable.scala | 55 +++++++++++++++++++++- 2 files changed, 54 insertions(+), 4 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala index c5a6281cf2d..418b0c17adb 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala @@ -623,9 +623,6 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie } private def checkUpdatingActions(updateActions: Seq[UpdateAction]): Unit = { - if (updateActions.length > 1) { - throw new AnalysisException(s"Only one updating action is supported in MERGE INTO statement (provided ${updateActions.length})") - } //updateActions.foreach(update => // assert(update.assignments.length == targetTableSchema.length, diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala index 6a203718fd1..9b979e8cc7f 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.hudi import org.apache.hudi.{DataSourceReadOptions, HoodieDataSourceHelpers, HoodieSparkUtils, ScalaAssertionSupport} import org.apache.hudi.common.fs.FSUtils -import org.apache.hudi.exception.SchemaCompatibilityException import org.apache.spark.sql.internal.SQLConf class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSupport { @@ -115,6 +114,60 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo }) } + test("Test MergeInto with more than once update actions") { + withRecordType()(withTempDir {tmp => + val targetTable = generateTableName + spark.sql( + s""" + |create table ${targetTable} ( + | id int, + | name string, + | data int, + | country string, + | ts bigint + |) using hudi + |tblproperties ( + | type = 'cow', + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + |partitioned by (country) + |location '${tmp.getCanonicalPath}/$targetTable' + |""".stripMargin) + spark.sql( + s""" + |merge into ${targetTable} as target + |using ( + |select 1 as id, 'lb' as name, 6 as data, 'shu' as country, 1646643193 as ts + |) source + |on source.id = target.id + |when matched then + |update set * + |when not matched then + |insert * + |""".stripMargin) + spark.sql( + s""" + |merge into ${targetTable} as target + |using ( + |select 1 as id, 'lb' as name, 5 as data, 'shu' as country, 1646643196 as ts + |) source + |on source.id = target.id + |when matched and source.data > target.data then + |update set target.data = source.data, target.ts = source.ts + |when matched and source.data = 5 then + |update set target.data = source.data, target.ts = source.ts + |when not matched then + |insert * + |""".stripMargin) + + checkAnswer(s"select id, name, data, country, ts from $targetTable")( + Seq(1, "lb", 5, "shu", 1646643196L) + ) + + }) + } + test("Test MergeInto with ignored record") { withRecordType()(withTempDir {tmp => spark.sql("set hoodie.payload.combined.schema.validate = true")