This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6589ebe8486 [HUDI-5904] support more than one update actions in merge 
into table (#8133)
6589ebe8486 is described below

commit 6589ebe84865b4e5e3b90b6629aa477816a81a48
Author: xuzifu666 <x...@zepp.com>
AuthorDate: Sat Mar 11 09:43:53 2023 +0800

    [HUDI-5904] support more than one update actions in merge into table (#8133)
    
    * support one more update actions in merge into sql
    
    Co-authored-by: xuyu <11161...@vivo.com>
---
 .../hudi/command/MergeIntoHoodieTableCommand.scala |  3 --
 .../apache/spark/sql/hudi/TestMergeIntoTable.scala | 55 +++++++++++++++++++++-
 2 files changed, 54 insertions(+), 4 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index c5a6281cf2d..418b0c17adb 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -623,9 +623,6 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
   }
 
   private def checkUpdatingActions(updateActions: Seq[UpdateAction]): Unit = {
-    if (updateActions.length > 1) {
-      throw new AnalysisException(s"Only one updating action is supported in 
MERGE INTO statement (provided ${updateActions.length})")
-    }
 
     //updateActions.foreach(update =>
     //  assert(update.assignments.length == targetTableSchema.length,
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
index 6a203718fd1..9b979e8cc7f 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.hudi
 
 import org.apache.hudi.{DataSourceReadOptions, HoodieDataSourceHelpers, 
HoodieSparkUtils, ScalaAssertionSupport}
 import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.exception.SchemaCompatibilityException
 import org.apache.spark.sql.internal.SQLConf
 
 class TestMergeIntoTable extends HoodieSparkSqlTestBase with 
ScalaAssertionSupport {
@@ -115,6 +114,60 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
     })
   }
 
+  test("Test MergeInto with more than once update actions") {
+    withRecordType()(withTempDir {tmp =>
+      val targetTable = generateTableName
+      spark.sql(
+        s"""
+           |create table ${targetTable} (
+           |  id int,
+           |  name string,
+           |  data int,
+           |  country string,
+           |  ts bigint
+           |) using hudi
+           |tblproperties (
+           |  type = 'cow',
+           |  primaryKey = 'id',
+           |  preCombineField = 'ts'
+           | )
+           |partitioned by (country)
+           |location '${tmp.getCanonicalPath}/$targetTable'
+           |""".stripMargin)
+      spark.sql(
+        s"""
+           |merge into ${targetTable} as target
+           |using (
+           |select 1 as id, 'lb' as name, 6 as data, 'shu' as country, 
1646643193 as ts
+           |) source
+           |on source.id = target.id
+           |when matched then
+           |update set *
+           |when not matched then
+           |insert *
+           |""".stripMargin)
+      spark.sql(
+        s"""
+           |merge into ${targetTable} as target
+           |using (
+           |select 1 as id, 'lb' as name, 5 as data, 'shu' as country, 
1646643196 as ts
+           |) source
+           |on source.id = target.id
+           |when matched and source.data > target.data then
+           |update set target.data = source.data, target.ts = source.ts
+           |when matched and source.data = 5 then
+           |update set target.data = source.data, target.ts = source.ts
+           |when not matched then
+           |insert *
+           |""".stripMargin)
+
+      checkAnswer(s"select id, name, data, country, ts from $targetTable")(
+        Seq(1, "lb", 5, "shu", 1646643196L)
+      )
+
+    })
+  }
+
   test("Test MergeInto with ignored record") {
     withRecordType()(withTempDir {tmp =>
       spark.sql("set hoodie.payload.combined.schema.validate = true")

Reply via email to