yihua commented on code in PR #12584:
URL: https://github.com/apache/hudi/pull/12584#discussion_r1913982213


##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable2.scala:
##########
@@ -1030,4 +1031,183 @@ class TestMergeIntoTable2 extends 
HoodieSparkSqlTestBase {
       checkAnswer(s"show partitions $destTable")(Seq("dt=2022-09-26"))
     }
   }
+
+  test("Test MergeInto Anti-Patterns of assignment clauses") {
+    Seq("cow", "mor").foreach { tableType =>
+      Seq("COMMIT_TIME_ORDERING", "EVENT_TIME_ORDERING").foreach { mergeMode =>
+        withRecordType()(withTempDir { tmp =>
+          
withSparkSqlSessionConfig(DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key
 -> "false") {
+            log.info(s"Testing table type $tableType with merge mode 
$mergeMode")
+
+            val tableName = generateTableName
+            // Create table with primaryKey and preCombineField
+            spark.sql(
+              s"""
+                 |create table $tableName (
+                 |  id int,
+                 |  name string,
+                 |  price double,
+                 |  ts int,
+                 |  dt string
+                 |) using hudi
+                 | tblproperties (
+                 |  type = '$tableType',
+                 |  primaryKey = 'id',
+                 |  preCombineField = 'ts',
+                 |  hoodie.record.merge.mode = '$mergeMode'
+                 | )
+                 | partitioned by(dt)
+                 | location '${tmp.getCanonicalPath}'
+           """.stripMargin)
+
+            // Insert initial data
+            spark.sql(s"insert into $tableName values (1, 'a1', 10, 1000, 
'2021-03-21')")
+
+            // Test 1: Update statements where at least one misses primary key 
assignment
+            if (tableType.equals("mor")) {
+              checkException(
+                s"""
+                   |merge into $tableName as t0
+                   |using (
+                   |  select 1 as id, 'a1' as name, 11 as price, 1001 as ts, 
'2021-03-21' as dt
+                   |) as s0
+                   |on t0.id = s0.id
+                   |when matched and s0.id = 1 then update set
+                   |  name = s0.name,
+                   |  price = s0.price,
+                   |  ts = s0.ts,
+                   |  dt = s0.dt
+                   |when matched and s0.id = 2 then update set *
+               """.stripMargin
+              )("No matching assignment found for target table record key 
field `id`")
+
+              checkException(
+                s"""
+                   |merge into $tableName as t0
+                   |using (
+                   |  select 1 as id, 'a1' as name, 11 as price, 1001 as ts, 
'2021-03-21' as dt
+                   |) as s0
+                   |on t0.id = s0.id
+                   |when matched then update set
+                   |  name = s0.name,
+                   |  price = s0.price,
+                   |  ts = s0.ts,
+                   |  dt = s0.dt
+               """.stripMargin
+              )("No matching assignment found for target table record key 
field `id`")
+            }
+
+            // Test 2: At least one partial insert assignment clause misses 
primary key.
+            checkException(
+              s"""
+                 |merge into $tableName as t0
+                 |using (
+                 |  select 2 as id, 'a2' as name, 12 as price, 1002 as ts, 
'2021-03-21' as dt
+                 |) as s0
+                 |on t0.id = s0.id
+                 |when not matched and s0.id = 1 then insert (name, price, ts, 
dt)
+                 |values (s0.name, s0.price, s0.ts, s0.dt)
+                 |when not matched and s0.id = 2 then insert *
+               """.stripMargin
+            )("No matching assignment found for target table record key field 
`id`")
+
+            checkException(
+              s"""
+                 |merge into $tableName as t0
+                 |using (
+                 |  select 2 as id, 'a2' as name, 12 as price, 1002 as ts, 
'2021-03-21' as dt
+                 |) as s0
+                 |on t0.id = s0.id
+                 |when not matched then insert (name, price, ts, dt)
+                 |values (s0.name, s0.price, s0.ts, s0.dt)
+               """.stripMargin
+            )("No matching assignment found for target table record key field 
`id`")
+
+            // Test 3: Partial insert missing preCombineField - only validate 
for EVENT_TIME_ORDERING
+            val mergeStmt =
+              s"""
+                 |merge into $tableName as t0
+                 |using (
+                 |  select 2 as id, 'a2' as name, 12 as price, 1002 as ts, 
'2021-03-21' as dt
+                 |) as s0
+                 |on t0.id = s0.id
+                 |when not matched and s0.id = 1 then insert (id, name, price, 
dt)
+                 |values (s0.id, s0.name, s0.price, s0.dt)
+                 |when not matched and s0.id = 2 then insert *
+               """.stripMargin
+
+            if (mergeMode == "EVENT_TIME_ORDERING") {
+              checkException(mergeStmt)(
+                "No matching assignment found for target table precombine 
field `ts`"
+              )
+            } else {
+              // For COMMIT_TIME_ORDERING, this should execute without error
+              spark.sql(mergeStmt)
+            }
+
+            // Verify data state
+            if (mergeMode == "COMMIT_TIME_ORDERING") {
+              checkAnswer(s"select id, name, price, ts, dt from $tableName 
where id = 1")(
+                Seq(1, "a1", 10.0, 1000, "2021-03-21")
+              )
+            } else {
+              // For EVENT_TIME_ORDERING, original data should be unchanged 
due to exception
+              checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+                Seq(1, "a1", 10.0, 1000, "2021-03-21")

Review Comment:
   The result is the same here. Is that expected?  Could you craft a test case 
that would generate different result for COMMIT_TIME_ORDERING and 
EVENT_TIME_ORDERING merge modes?



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable2.scala:
##########
@@ -1030,4 +1031,183 @@ class TestMergeIntoTable2 extends 
HoodieSparkSqlTestBase {
       checkAnswer(s"show partitions $destTable")(Seq("dt=2022-09-26"))
     }
   }
+
+  test("Test MergeInto Anti-Patterns of assignment clauses") {
+    Seq("cow", "mor").foreach { tableType =>
+      Seq("COMMIT_TIME_ORDERING", "EVENT_TIME_ORDERING").foreach { mergeMode =>
+        withRecordType()(withTempDir { tmp =>
+          
withSparkSqlSessionConfig(DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key
 -> "false") {
+            log.info(s"Testing table type $tableType with merge mode 
$mergeMode")
+
+            val tableName = generateTableName
+            // Create table with primaryKey and preCombineField
+            spark.sql(
+              s"""
+                 |create table $tableName (
+                 |  id int,
+                 |  name string,
+                 |  price double,
+                 |  ts int,
+                 |  dt string
+                 |) using hudi
+                 | tblproperties (
+                 |  type = '$tableType',
+                 |  primaryKey = 'id',
+                 |  preCombineField = 'ts',
+                 |  hoodie.record.merge.mode = '$mergeMode'
+                 | )
+                 | partitioned by(dt)
+                 | location '${tmp.getCanonicalPath}'
+           """.stripMargin)
+
+            // Insert initial data
+            spark.sql(s"insert into $tableName values (1, 'a1', 10, 1000, 
'2021-03-21')")
+
+            // Test 1: Update statements where at least one misses primary key 
assignment
+            if (tableType.equals("mor")) {
+              checkException(
+                s"""
+                   |merge into $tableName as t0
+                   |using (
+                   |  select 1 as id, 'a1' as name, 11 as price, 1001 as ts, 
'2021-03-21' as dt
+                   |) as s0
+                   |on t0.id = s0.id
+                   |when matched and s0.id = 1 then update set
+                   |  name = s0.name,
+                   |  price = s0.price,
+                   |  ts = s0.ts,
+                   |  dt = s0.dt
+                   |when matched and s0.id = 2 then update set *
+               """.stripMargin
+              )("No matching assignment found for target table record key 
field `id`")
+
+              checkException(
+                s"""
+                   |merge into $tableName as t0
+                   |using (
+                   |  select 1 as id, 'a1' as name, 11 as price, 1001 as ts, 
'2021-03-21' as dt
+                   |) as s0
+                   |on t0.id = s0.id
+                   |when matched then update set
+                   |  name = s0.name,
+                   |  price = s0.price,
+                   |  ts = s0.ts,
+                   |  dt = s0.dt
+               """.stripMargin
+              )("No matching assignment found for target table record key 
field `id`")
+            }
+
+            // Test 2: At least one partial insert assignment clause misses 
primary key.
+            checkException(
+              s"""
+                 |merge into $tableName as t0
+                 |using (
+                 |  select 2 as id, 'a2' as name, 12 as price, 1002 as ts, 
'2021-03-21' as dt
+                 |) as s0
+                 |on t0.id = s0.id
+                 |when not matched and s0.id = 1 then insert (name, price, ts, 
dt)
+                 |values (s0.name, s0.price, s0.ts, s0.dt)
+                 |when not matched and s0.id = 2 then insert *
+               """.stripMargin
+            )("No matching assignment found for target table record key field 
`id`")
+
+            checkException(
+              s"""
+                 |merge into $tableName as t0
+                 |using (
+                 |  select 2 as id, 'a2' as name, 12 as price, 1002 as ts, 
'2021-03-21' as dt
+                 |) as s0
+                 |on t0.id = s0.id
+                 |when not matched then insert (name, price, ts, dt)
+                 |values (s0.name, s0.price, s0.ts, s0.dt)
+               """.stripMargin
+            )("No matching assignment found for target table record key field 
`id`")
+
+            // Test 3: Partial insert missing preCombineField - only validate 
for EVENT_TIME_ORDERING
+            val mergeStmt =
+              s"""
+                 |merge into $tableName as t0
+                 |using (
+                 |  select 2 as id, 'a2' as name, 12 as price, 1002 as ts, 
'2021-03-21' as dt
+                 |) as s0
+                 |on t0.id = s0.id
+                 |when not matched and s0.id = 1 then insert (id, name, price, 
dt)
+                 |values (s0.id, s0.name, s0.price, s0.dt)
+                 |when not matched and s0.id = 2 then insert *
+               """.stripMargin
+
+            if (mergeMode == "EVENT_TIME_ORDERING") {
+              checkException(mergeStmt)(
+                "No matching assignment found for target table precombine 
field `ts`"
+              )
+            } else {
+              // For COMMIT_TIME_ORDERING, this should execute without error
+              spark.sql(mergeStmt)
+            }
+
+            // Verify data state
+            if (mergeMode == "COMMIT_TIME_ORDERING") {
+              checkAnswer(s"select id, name, price, ts, dt from $tableName 
where id = 1")(
+                Seq(1, "a1", 10.0, 1000, "2021-03-21")
+              )
+            } else {
+              // For EVENT_TIME_ORDERING, original data should be unchanged 
due to exception
+              checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+                Seq(1, "a1", 10.0, 1000, "2021-03-21")
+              )
+            }
+          }
+        })
+      }
+    }
+  }
+
+  test("Test merge into Allowed-patterns of assignment clauses") {
+    withRecordType()(withTempDir { tmp =>
+      Seq("cow", "mor").foreach { tableType =>
+        
withSparkSqlSessionConfig(DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key
 -> "false") {
+          val tableName = generateTableName
+          spark.sql(

Review Comment:
   Let's test both COMMIT_TIME_ORDERING and EVENT_TIME_ORDERING merge modes.



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable2.scala:
##########
@@ -17,14 +17,16 @@
 
 package org.apache.spark.sql.hudi.dml
 
-import org.apache.hudi.HoodieSparkUtils
+import org.apache.hudi.AutoRecordKeyGenerationUtils.getClass
+import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkUtils}
 import 
org.apache.hudi.config.HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT
 import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient
-

Review Comment:
   Similar here for import grouping.



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable2.scala:
##########
@@ -1030,4 +1031,183 @@ class TestMergeIntoTable2 extends 
HoodieSparkSqlTestBase {
       checkAnswer(s"show partitions $destTable")(Seq("dt=2022-09-26"))
     }
   }
+
+  test("Test MergeInto Anti-Patterns of assignment clauses") {
+    Seq("cow", "mor").foreach { tableType =>
+      Seq("COMMIT_TIME_ORDERING", "EVENT_TIME_ORDERING").foreach { mergeMode =>
+        withRecordType()(withTempDir { tmp =>
+          
withSparkSqlSessionConfig(DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key
 -> "false") {
+            log.info(s"Testing table type $tableType with merge mode 
$mergeMode")
+
+            val tableName = generateTableName
+            // Create table with primaryKey and preCombineField
+            spark.sql(
+              s"""
+                 |create table $tableName (
+                 |  id int,
+                 |  name string,
+                 |  price double,
+                 |  ts int,
+                 |  dt string
+                 |) using hudi
+                 | tblproperties (
+                 |  type = '$tableType',
+                 |  primaryKey = 'id',
+                 |  preCombineField = 'ts',
+                 |  hoodie.record.merge.mode = '$mergeMode'

Review Comment:
   ```suggestion
                    |  recordMergeMode = '$mergeMode'
   ```
   for simplicity



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to