This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new a56297799e0 [HUDI-6660]  For merge into use primary key constraint 
when optimized writes are enabled (#9382)
a56297799e0 is described below

commit a56297799e0eebbff8159a989d6cd9acb0308c2d
Author: Lokesh Jain <[email protected]>
AuthorDate: Tue Aug 8 04:49:47 2023 +0530

    [HUDI-6660]  For merge into use primary key constraint when optimized 
writes are enabled (#9382)
    
    - [HUDI-6660] Relax primary key constraint for merge into join condition 
when optimized writes are enabled
---
 .../hudi/command/MergeIntoHoodieTableCommand.scala |  3 +-
 .../TestMergeIntoTableWithNonRecordKeyField.scala  | 56 +++++++++++++++++-----
 2 files changed, 46 insertions(+), 13 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index f830c552bc8..accacd82662 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -172,7 +172,8 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
           expressionSet.remove((attr, expr))
           (attr, expr)
       }
-      if (resolving.isEmpty && rk._1.equals("primaryKey")) {
+      if (resolving.isEmpty && rk._1.equals("primaryKey")
+        && 
sparkSession.sqlContext.conf.getConfString(SPARK_SQL_OPTIMIZED_WRITES.key(), 
"false") == "true") {
         throw new AnalysisException(s"Hudi tables with primary key are 
required to match on all primary key colums. Column: '${rk._2}' not found")
       }
       resolving
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTableWithNonRecordKeyField.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTableWithNonRecordKeyField.scala
index 48964b37323..bcb29809eba 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTableWithNonRecordKeyField.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTableWithNonRecordKeyField.scala
@@ -123,20 +123,52 @@ class TestMergeIntoTableWithNonRecordKeyField extends 
HoodieSparkSqlTestBase wit
              | )
        """.stripMargin)
 
-        val errorMessage2 = if (HoodieSparkUtils.gteqSpark3_1) {
-          "Hudi tables with primary key are required to match on all primary 
key colums. Column: 'name' not found"
+        spark.sql(
+          s"""
+             |insert into $tableName3 values
+             |    (1, 'a1', 10, 100),
+             |    (2, 'a2', 20, 200),
+             |    (3, 'u3', 20, 100)
+             |""".stripMargin)
+
+        if (sparkSqlOptimizedWrites) {
+          val errorMessage2 = if (HoodieSparkUtils.gteqSpark3_1) {
+            "Hudi tables with primary key are required to match on all primary 
key colums. Column: 'name' not found"
+          } else {
+            "Hudi tables with primary key are required to match on all primary 
key colums. Column: 'name' not found;"
+          }
+          checkException(
+            s"""
+               | merge into $tableName3 as t0
+               | using (
+               |  select * from $tableName2
+               | ) as s0
+               | on t0.id = s0.id
+               | when matched then update set id = t0.id, name = t0.name,
+               |  price = t0.price, ts = s0.ts
+               | when not matched then insert (id,name,price,ts) values(s0.id, 
s0.name, s0.price, s0.ts)
+           """.stripMargin)(errorMessage2)
         } else {
-          "Hudi tables with primary key are required to match on all primary 
key colums. Column: 'name' not found;"
+          spark.sql(
+            s"""
+               | merge into $tableName3 as t0
+               | using (
+               |  select * from $tableName2
+               | ) as s0
+               | on t0.id = s0.id
+               | when matched then update set id = t0.id, name = t0.name,
+               |  price = t0.price, ts = s0.ts
+               | when not matched then insert (id,name,price,ts) values(s0.id, 
s0.name, s0.price, s0.ts)
+           """.stripMargin
+          )
+          checkAnswer(s"select id, name, price, ts from $tableName3")(
+            Seq(1, "a1", 10.0, 100),
+            Seq(1, "u1", 10.0, 999),
+            Seq(2, "a2", 20.0, 200),
+            Seq(3, "u3", 20.0, 9999),
+            Seq(4, "u4", 40.0, 99999)
+          )
         }
-
-        checkException(
-          s"""
-             |merge into $tableName3 as oldData
-             |using $tableName2
-             |on oldData.id = $tableName2.id
-             |when matched then update set oldData.name = $tableName2.name
-             |when not matched then insert *
-             |""".stripMargin)(errorMessage2)
       }
     }
   }

Reply via email to