Davis-Zhang-Onehouse commented on code in PR #12584:
URL: https://github.com/apache/hudi/pull/12584#discussion_r1908079891


##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala:
##########
@@ -802,27 +813,55 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
       assert(insert.assignments.length <= targetTableSchema.length,
         s"The number of insert assignments[${insert.assignments.length}] must 
be less than or equal to the " +
           s"targetTable field size[${targetTableSchema.length}]"))
-
+    // Precombine field and primary key field must be present in the 
assignment clause of all insert actions.
+    // Check has no effect if we don't have such fields in target table or we 
don't have insert actions
+    insertActions.foreach(action =>
+      hoodieCatalogTable.preCombineKey.foreach(
+      field => {
+        validateTargetTableAttrExistsInAssignments(
+        sparkSession.sessionState.conf.resolver,
+        mergeInto.targetTable,
+        Seq(field),
+        "pre-combine field",

Review Comment:
   done for all



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala:
##########
@@ -802,27 +813,55 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
       assert(insert.assignments.length <= targetTableSchema.length,
         s"The number of insert assignments[${insert.assignments.length}] must 
be less than or equal to the " +
           s"targetTable field size[${targetTableSchema.length}]"))
-
+    // Precombine field and primary key field must be present in the 
assignment clause of all insert actions.
+    // Check has no effect if we don't have such fields in target table or we 
don't have insert actions
+    insertActions.foreach(action =>
+      hoodieCatalogTable.preCombineKey.foreach(
+      field => {
+        validateTargetTableAttrExistsInAssignments(
+        sparkSession.sessionState.conf.resolver,
+        mergeInto.targetTable,
+        Seq(field),
+        "pre-combine field",
+        action.assignments)
+      validateTargetTableAttrExistsInAssignments(
+        sparkSession.sessionState.conf.resolver,
+        mergeInto.targetTable,
+        hoodieCatalogTable.tableConfig.getRecordKeyFields.orElse(Array.empty),
+        "primaryKey field",
+        action.assignments)
+      }))
   }
 
-  private def checkUpdatingActions(updateActions: Seq[UpdateAction]): Unit = {
+  private def checkUpdatingActions(updateActions: Seq[UpdateAction], props: 
Map[String, String]): Unit = {
     if (hoodieCatalogTable.preCombineKey.isEmpty && updateActions.nonEmpty) {
-      logWarning(s"Updates without precombine can have nondeterministic 
behavior")
+      logWarning(s"Updates without pre-combine can have nondeterministic 
behavior")
     }
     updateActions.foreach(update =>
       assert(update.assignments.length <= targetTableSchema.length,
-        s"The number of update assignments[${update.assignments.length}] must 
be less than or equalequal to the " +
+        s"The number of update assignments[${update.assignments.length}] must 
be less than or equal to the " +
           s"targetTable field size[${targetTableSchema.length}]"))
 
-    // For MOR table, the target table field cannot be the right-value in the 
update action.
     if (targetTableType == MOR_TABLE_TYPE_OPT_VAL) {
+      // For MOR table, the target table field cannot be the right-value in 
the update action.
       updateActions.foreach(update => {
         val targetAttrs = update.assignments.flatMap(a => a.value.collect {
           case attr: AttributeReference if 
mergeInto.targetTable.outputSet.contains(attr) => attr
         })
         assert(targetAttrs.isEmpty,
           s"Target table's field(${targetAttrs.map(_.name).mkString(",")}) 
cannot be the right-value of the update clause for MOR table.")
       })
+      // Only when the partial update is enabled that primary key assignment 
is not mandatory in update actions for MOR tables.
+      if (!isPartialUpdateActionForMOR(props)) {
+        // For MOR table, update assignment clause must have primary key field 
being set explicitly even if it does not
+        // change. The check has no effect if there is no updateActions or we 
don't have primaryKey
+        updateActions.foreach(action => 
validateTargetTableAttrExistsInAssignments(
+          sparkSession.sessionState.conf.resolver,
+          mergeInto.targetTable,
+          
hoodieCatalogTable.tableConfig.getRecordKeyFields.orElse(Array.empty),
+          "primaryKey field",

Review Comment:
   done for all



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala:
##########
@@ -489,6 +481,25 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
     }
   }
 
+  private def isPartialUpdateActionForMOR(parameters: Map[String, String]) = {
+    val isPartialUpdateAction = (targetTableType == MOR_TABLE_TYPE_OPT_VAL
+      && UPSERT_OPERATION_OPT_VAL == getOperationType(parameters)
+      && parameters.getOrElse(
+      ENABLE_MERGE_INTO_PARTIAL_UPDATES.key,
+      ENABLE_MERGE_INTO_PARTIAL_UPDATES.defaultValue.toString).toBoolean
+      && updatingActions.nonEmpty)
+    isPartialUpdateAction

Review Comment:
   done



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala:
##########
@@ -802,27 +813,55 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
       assert(insert.assignments.length <= targetTableSchema.length,
         s"The number of insert assignments[${insert.assignments.length}] must 
be less than or equal to the " +
           s"targetTable field size[${targetTableSchema.length}]"))
-
+    // Precombine field and primary key field must be present in the 
assignment clause of all insert actions.

Review Comment:
   that's not correct, I have test guard against such case



##########
hudi-spark-datasource/hudi-spark/src/test/resources/sql-statements.sql:
##########
@@ -63,10 +63,10 @@ select id, name, price, cast(dt as string) from h0_p;
 # CREATE TABLE
 
 create table h1 (
-  id bigint,
+  id int,

Review Comment:
   reverted, was thinking to port some test maintenance along the way



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoLogOnlyTable.scala:
##########
@@ -33,7 +33,7 @@ class TestMergeIntoLogOnlyTable extends 
HoodieSparkSqlTestBase {
            |  id int,
            |  name string,
            |  price double,
-           |  ts long
+           |  ts int

Review Comment:
   reverted



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable.scala:
##########
@@ -1051,7 +1052,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
           s"""
              | merge into $tableName
              | using (
-             |  select 1 as id, 'a1' as name, 10 as price, $dataValue as c, 
'1' as flag
+             |  select 1 as id, 'a1' as name, 10 as price, cast($dataValue as 
$dataType) as c, '1' as flag

Review Comment:
   reverted



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala:
##########
@@ -137,19 +137,19 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
     if (primaryKeyFields.isPresent) {
       //pkless tables can have more complex conditions
       if (!conditions.forall(p => p.isInstanceOf[EqualTo])) {
-        throw new AnalysisException(s"Currently only equality predicates are 
supported in MERGE INTO statement on primary key table" +
+        throw new AnalysisException(s"Currently only equality predicates are 
supported in MERGE INTO statement on record key table" +
           s"(provided ${mergeInto.mergeCondition.sql}")
       }
     }
     val resolver = sparkSession.sessionState.analyzer.resolver
     val partitionPathFields = hoodieCatalogTable.tableConfig.getPartitionFields
-    //ensure all primary key fields are part of the merge condition
+    //ensure all record key fields are part of the merge condition
     //allow partition path to be part of the merge condition but not required
     val targetAttr2ConditionExpressions = doCasting(conditions, 
primaryKeyFields.isPresent)
     val expressionSet = scala.collection.mutable.Set[(Attribute, 
Expression)](targetAttr2ConditionExpressions:_*)
     var partitionAndKeyFields: Seq[(String,String)] = Seq.empty
     if (primaryKeyFields.isPresent) {
-     partitionAndKeyFields = partitionAndKeyFields ++ 
primaryKeyFields.get().map(pk => ("primaryKey", pk)).toSeq
+     partitionAndKeyFields = partitionAndKeyFields ++ 
primaryKeyFields.get().map(pk => ("recordKey", pk)).toSeq

Review Comment:
   DONE



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable2.scala:
##########
@@ -17,14 +17,16 @@
 
 package org.apache.spark.sql.hudi.dml
 
-import org.apache.hudi.HoodieSparkUtils
+import org.apache.hudi.AutoRecordKeyGenerationUtils.getClass
+import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkUtils}
 import 
org.apache.hudi.config.HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT
 import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient
-

Review Comment:
   done



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable2.scala:
##########
@@ -1030,4 +1031,175 @@ class TestMergeIntoTable2 extends 
HoodieSparkSqlTestBase {
       checkAnswer(s"show partitions $destTable")(Seq("dt=2022-09-26"))
     }
   }
+
+  test("Test MergeInto Anti-Patterns of assignment clauses") {
+    Seq("cow", "mor").foreach { tableType =>
+      withRecordType()(withTempDir { tmp =>
+        log.info(s"Testing table type $tableType")
+        spark.sql(s"set 
${DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key} = false")

Review Comment:
   done



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable2.scala:
##########
@@ -1030,4 +1031,183 @@ class TestMergeIntoTable2 extends 
HoodieSparkSqlTestBase {
       checkAnswer(s"show partitions $destTable")(Seq("dt=2022-09-26"))
     }
   }
+
+  test("Test MergeInto Anti-Patterns of assignment clauses") {
+    Seq("cow", "mor").foreach { tableType =>
+      Seq("COMMIT_TIME_ORDERING", "EVENT_TIME_ORDERING").foreach { mergeMode =>
+        withRecordType()(withTempDir { tmp =>
+          
withSparkSqlSessionConfig(DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key
 -> "false") {
+            log.info(s"Testing table type $tableType with merge mode 
$mergeMode")
+
+            val tableName = generateTableName
+            // Create table with primaryKey and preCombineField
+            spark.sql(
+              s"""
+                 |create table $tableName (
+                 |  id int,
+                 |  name string,
+                 |  price double,
+                 |  ts int,
+                 |  dt string
+                 |) using hudi
+                 | tblproperties (
+                 |  type = '$tableType',
+                 |  primaryKey = 'id',
+                 |  preCombineField = 'ts',
+                 |  hoodie.record.merge.mode = '$mergeMode'
+                 | )
+                 | partitioned by(dt)
+                 | location '${tmp.getCanonicalPath}'
+           """.stripMargin)
+
+            // Insert initial data
+            spark.sql(s"insert into $tableName values (1, 'a1', 10, 1000, 
'2021-03-21')")
+
+            // Test 1: Update statements where at least one misses primary key 
assignment
+            if (tableType.equals("mor")) {
+              checkException(
+                s"""
+                   |merge into $tableName as t0
+                   |using (
+                   |  select 1 as id, 'a1' as name, 11 as price, 1001 as ts, 
'2021-03-21' as dt
+                   |) as s0
+                   |on t0.id = s0.id
+                   |when matched and s0.id = 1 then update set
+                   |  name = s0.name,
+                   |  price = s0.price,
+                   |  ts = s0.ts,
+                   |  dt = s0.dt
+                   |when matched and s0.id = 2 then update set *
+               """.stripMargin
+              )("No matching assignment found for target table record key 
field `id`")
+
+              checkException(
+                s"""
+                   |merge into $tableName as t0
+                   |using (
+                   |  select 1 as id, 'a1' as name, 11 as price, 1001 as ts, 
'2021-03-21' as dt
+                   |) as s0
+                   |on t0.id = s0.id
+                   |when matched then update set
+                   |  name = s0.name,
+                   |  price = s0.price,
+                   |  ts = s0.ts,
+                   |  dt = s0.dt
+               """.stripMargin
+              )("No matching assignment found for target table record key 
field `id`")
+            }
+
+            // Test 2: At least one partial insert assignment clause misses 
primary key.
+            checkException(
+              s"""
+                 |merge into $tableName as t0
+                 |using (
+                 |  select 2 as id, 'a2' as name, 12 as price, 1002 as ts, 
'2021-03-21' as dt
+                 |) as s0
+                 |on t0.id = s0.id
+                 |when not matched and s0.id = 1 then insert (name, price, ts, 
dt)
+                 |values (s0.name, s0.price, s0.ts, s0.dt)
+                 |when not matched and s0.id = 2 then insert *
+               """.stripMargin
+            )("No matching assignment found for target table record key field 
`id`")
+
+            checkException(
+              s"""
+                 |merge into $tableName as t0
+                 |using (
+                 |  select 2 as id, 'a2' as name, 12 as price, 1002 as ts, 
'2021-03-21' as dt
+                 |) as s0
+                 |on t0.id = s0.id
+                 |when not matched then insert (name, price, ts, dt)
+                 |values (s0.name, s0.price, s0.ts, s0.dt)
+               """.stripMargin
+            )("No matching assignment found for target table record key field 
`id`")
+
+            // Test 3: Partial insert missing preCombineField - only validate 
for EVENT_TIME_ORDERING
+            val mergeStmt =
+              s"""
+                 |merge into $tableName as t0
+                 |using (
+                 |  select 2 as id, 'a2' as name, 12 as price, 1002 as ts, 
'2021-03-21' as dt
+                 |) as s0
+                 |on t0.id = s0.id
+                 |when not matched and s0.id = 1 then insert (id, name, price, 
dt)
+                 |values (s0.id, s0.name, s0.price, s0.dt)
+                 |when not matched and s0.id = 2 then insert *
+               """.stripMargin
+
+            if (mergeMode == "EVENT_TIME_ORDERING") {
+              checkException(mergeStmt)(
+                "No matching assignment found for target table precombine 
field `ts`"
+              )
+            } else {
+              // For COMMIT_TIME_ORDERING, this should execute without error
+              spark.sql(mergeStmt)
+            }
+
+            // Verify data state
+            if (mergeMode == "COMMIT_TIME_ORDERING") {
+              checkAnswer(s"select id, name, price, ts, dt from $tableName 
where id = 1")(
+                Seq(1, "a1", 10.0, 1000, "2021-03-21")
+              )
+            } else {
+              // For EVENT_TIME_ORDERING, original data should be unchanged 
due to exception
+              checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+                Seq(1, "a1", 10.0, 1000, "2021-03-21")

Review Comment:
   done



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable.scala:
##########
@@ -1232,7 +1233,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
            |  id int,
            |  name string,
            |  value int,
-           |  ts long
+           |  ts int

Review Comment:
   reverted



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable2.scala:
##########
@@ -1030,4 +1031,175 @@ class TestMergeIntoTable2 extends 
HoodieSparkSqlTestBase {
       checkAnswer(s"show partitions $destTable")(Seq("dt=2022-09-26"))
     }
   }
+
+  test("Test MergeInto Anti-Patterns of assignment clauses") {
+    Seq("cow", "mor").foreach { tableType =>
+      withRecordType()(withTempDir { tmp =>
+        log.info(s"Testing table type $tableType")
+        spark.sql(s"set 
${DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key} = false")
+        val tableName = generateTableName
+        // Create table with primaryKey and preCombineField
+        spark.sql(
+          s"""
+             |create table $tableName (
+             |  id int,
+             |  name string,
+             |  price double,
+             |  ts int,
+             |  dt string
+             |) using hudi
+             | tblproperties (
+             |  type = '$tableType',
+             |  primaryKey = 'id',
+             |  preCombineField = 'ts'
+             | )
+             | partitioned by(dt)
+             | location '${tmp.getCanonicalPath}'
+         """.stripMargin)
+
+        // Insert initial data
+        spark.sql(s"insert into $tableName values (1, 'a1', 10, 1000, 
'2021-03-21')")
+
+        // Test 1: Update statements where at least one misses primary key 
assignment
+        if (tableType.equals("mor")) {

Review Comment:
   that should be covered by the test below "Test merge into Allowed-patterns 
of assignment clauses"



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala:
##########
@@ -802,27 +813,55 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
       assert(insert.assignments.length <= targetTableSchema.length,
         s"The number of insert assignments[${insert.assignments.length}] must 
be less than or equal to the " +
           s"targetTable field size[${targetTableSchema.length}]"))
-
+    // Precombine field and primary key field must be present in the 
assignment clause of all insert actions.

Review Comment:
   done



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala:
##########
@@ -802,27 +813,55 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
       assert(insert.assignments.length <= targetTableSchema.length,
         s"The number of insert assignments[${insert.assignments.length}] must 
be less than or equal to the " +
           s"targetTable field size[${targetTableSchema.length}]"))
-
+    // Precombine field and primary key field must be present in the 
assignment clause of all insert actions.
+    // Check has no effect if we don't have such fields in target table or we 
don't have insert actions
+    insertActions.foreach(action =>
+      hoodieCatalogTable.preCombineKey.foreach(

Review Comment:
   done



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable.scala:
##########
@@ -40,7 +40,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with 
ScalaAssertionSuppo
              |  id int,
              |  name string,
              |  price double,
-             |  ts long
+             |  ts int

Review Comment:
   reverted



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala:
##########
@@ -802,27 +813,55 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
       assert(insert.assignments.length <= targetTableSchema.length,
         s"The number of insert assignments[${insert.assignments.length}] must 
be less than or equal to the " +
           s"targetTable field size[${targetTableSchema.length}]"))
-
+    // Precombine field and primary key field must be present in the 
assignment clause of all insert actions.
+    // Check has no effect if we don't have such fields in target table or we 
don't have insert actions
+    insertActions.foreach(action =>
+      hoodieCatalogTable.preCombineKey.foreach(
+      field => {
+        validateTargetTableAttrExistsInAssignments(
+        sparkSession.sessionState.conf.resolver,
+        mergeInto.targetTable,
+        Seq(field),
+        "pre-combine field",
+        action.assignments)
+      validateTargetTableAttrExistsInAssignments(
+        sparkSession.sessionState.conf.resolver,
+        mergeInto.targetTable,
+        hoodieCatalogTable.tableConfig.getRecordKeyFields.orElse(Array.empty),
+        "primaryKey field",
+        action.assignments)
+      }))
   }
 
-  private def checkUpdatingActions(updateActions: Seq[UpdateAction]): Unit = {
+  private def checkUpdatingActions(updateActions: Seq[UpdateAction], props: 
Map[String, String]): Unit = {
     if (hoodieCatalogTable.preCombineKey.isEmpty && updateActions.nonEmpty) {
-      logWarning(s"Updates without precombine can have nondeterministic 
behavior")
+      logWarning(s"Updates without pre-combine can have nondeterministic 
behavior")

Review Comment:
   done



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable2.scala:
##########
@@ -1030,4 +1031,183 @@ class TestMergeIntoTable2 extends 
HoodieSparkSqlTestBase {
       checkAnswer(s"show partitions $destTable")(Seq("dt=2022-09-26"))
     }
   }
+
+  test("Test MergeInto Anti-Patterns of assignment clauses") {
+    Seq("cow", "mor").foreach { tableType =>
+      Seq("COMMIT_TIME_ORDERING", "EVENT_TIME_ORDERING").foreach { mergeMode =>
+        withRecordType()(withTempDir { tmp =>
+          
withSparkSqlSessionConfig(DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key
 -> "false") {
+            log.info(s"Testing table type $tableType with merge mode 
$mergeMode")
+
+            val tableName = generateTableName
+            // Create table with primaryKey and preCombineField
+            spark.sql(
+              s"""
+                 |create table $tableName (
+                 |  id int,
+                 |  name string,
+                 |  price double,
+                 |  ts int,
+                 |  dt string
+                 |) using hudi
+                 | tblproperties (
+                 |  type = '$tableType',
+                 |  primaryKey = 'id',
+                 |  preCombineField = 'ts',
+                 |  hoodie.record.merge.mode = '$mergeMode'

Review Comment:
   done



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala:
##########
@@ -489,6 +479,24 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
     }
   }
 
+  private def isPartialUpdateActionForMOR(parameters: Map[String, String]) = {
+    (targetTableType == MOR_TABLE_TYPE_OPT_VAL
+      && UPSERT_OPERATION_OPT_VAL == getOperationType(parameters)
+      && parameters.getOrElse(
+      ENABLE_MERGE_INTO_PARTIAL_UPDATES.key,
+      ENABLE_MERGE_INTO_PARTIAL_UPDATES.defaultValue.toString).toBoolean
+      && updatingActions.nonEmpty)
+  }
+
+  private def getOperationType(parameters: Map[String, String]) = {
+    val operation = if 
(StringUtils.isNullOrEmpty(parameters.getOrElse(PRECOMBINE_FIELD.key, "")) && 
updatingActions.isEmpty) {
+      INSERT_OPERATION_OPT_VAL
+    } else {
+      UPSERT_OPERATION_OPT_VAL
+    }
+    operation

Review Comment:
   done



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala:
##########
@@ -31,10 +31,11 @@ import org.apache.hudi.exception.{HoodieException, 
HoodieNotSupportedException}
 import org.apache.hudi.hive.HiveSyncConfigHolder
 import org.apache.hudi.sync.common.HoodieSyncConfig
 import org.apache.hudi.util.JFunction.scalaFunction1Noop
-
 import org.apache.avro.Schema
+import org.apache.hudi.common.table.HoodieTableConfig

Review Comment:
   nice, applied!



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala:
##########
@@ -176,9 +176,9 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
           expressionSet.remove((attr, expr))
           (attr, expr)
       }
-      if (resolving.isEmpty && rk._1.equals("primaryKey")
+      if (resolving.isEmpty && rk._1.equals("recordKey")

Review Comment:
   done



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable2.scala:
##########
@@ -1030,4 +1031,183 @@ class TestMergeIntoTable2 extends 
HoodieSparkSqlTestBase {
       checkAnswer(s"show partitions $destTable")(Seq("dt=2022-09-26"))
     }
   }
+
+  test("Test MergeInto Anti-Patterns of assignment clauses") {
+    Seq("cow", "mor").foreach { tableType =>
+      Seq("COMMIT_TIME_ORDERING", "EVENT_TIME_ORDERING").foreach { mergeMode =>
+        withRecordType()(withTempDir { tmp =>
+          
withSparkSqlSessionConfig(DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key
 -> "false") {
+            log.info(s"Testing table type $tableType with merge mode 
$mergeMode")
+
+            val tableName = generateTableName
+            // Create table with primaryKey and preCombineField
+            spark.sql(
+              s"""
+                 |create table $tableName (
+                 |  id int,
+                 |  name string,
+                 |  price double,
+                 |  ts int,
+                 |  dt string
+                 |) using hudi
+                 | tblproperties (
+                 |  type = '$tableType',
+                 |  primaryKey = 'id',
+                 |  preCombineField = 'ts',
+                 |  hoodie.record.merge.mode = '$mergeMode'
+                 | )
+                 | partitioned by(dt)
+                 | location '${tmp.getCanonicalPath}'
+           """.stripMargin)
+
+            // Insert initial data
+            spark.sql(s"insert into $tableName values (1, 'a1', 10, 1000, 
'2021-03-21')")
+
+            // Test 1: Update statements where at least one misses primary key 
assignment
+            if (tableType.equals("mor")) {
+              checkException(
+                s"""
+                   |merge into $tableName as t0
+                   |using (
+                   |  select 1 as id, 'a1' as name, 11 as price, 1001 as ts, 
'2021-03-21' as dt
+                   |) as s0
+                   |on t0.id = s0.id
+                   |when matched and s0.id = 1 then update set
+                   |  name = s0.name,
+                   |  price = s0.price,
+                   |  ts = s0.ts,
+                   |  dt = s0.dt
+                   |when matched and s0.id = 2 then update set *
+               """.stripMargin
+              )("No matching assignment found for target table record key 
field `id`")
+
+              checkException(
+                s"""
+                   |merge into $tableName as t0
+                   |using (
+                   |  select 1 as id, 'a1' as name, 11 as price, 1001 as ts, 
'2021-03-21' as dt
+                   |) as s0
+                   |on t0.id = s0.id
+                   |when matched then update set
+                   |  name = s0.name,
+                   |  price = s0.price,
+                   |  ts = s0.ts,
+                   |  dt = s0.dt
+               """.stripMargin
+              )("No matching assignment found for target table record key 
field `id`")
+            }
+
+            // Test 2: At least one partial insert assignment clause misses 
primary key.
+            checkException(
+              s"""
+                 |merge into $tableName as t0
+                 |using (
+                 |  select 2 as id, 'a2' as name, 12 as price, 1002 as ts, 
'2021-03-21' as dt
+                 |) as s0
+                 |on t0.id = s0.id
+                 |when not matched and s0.id = 1 then insert (name, price, ts, 
dt)
+                 |values (s0.name, s0.price, s0.ts, s0.dt)
+                 |when not matched and s0.id = 2 then insert *
+               """.stripMargin
+            )("No matching assignment found for target table record key field 
`id`")
+
+            checkException(
+              s"""
+                 |merge into $tableName as t0
+                 |using (
+                 |  select 2 as id, 'a2' as name, 12 as price, 1002 as ts, 
'2021-03-21' as dt
+                 |) as s0
+                 |on t0.id = s0.id
+                 |when not matched then insert (name, price, ts, dt)
+                 |values (s0.name, s0.price, s0.ts, s0.dt)
+               """.stripMargin
+            )("No matching assignment found for target table record key field 
`id`")
+
+            // Test 3: Partial insert missing preCombineField - only validate 
for EVENT_TIME_ORDERING
+            val mergeStmt =
+              s"""
+                 |merge into $tableName as t0
+                 |using (
+                 |  select 2 as id, 'a2' as name, 12 as price, 1002 as ts, 
'2021-03-21' as dt
+                 |) as s0
+                 |on t0.id = s0.id
+                 |when not matched and s0.id = 1 then insert (id, name, price, 
dt)
+                 |values (s0.id, s0.name, s0.price, s0.dt)
+                 |when not matched and s0.id = 2 then insert *
+               """.stripMargin
+
+            if (mergeMode == "EVENT_TIME_ORDERING") {
+              checkException(mergeStmt)(
+                "No matching assignment found for target table precombine 
field `ts`"
+              )
+            } else {
+              // For COMMIT_TIME_ORDERING, this should execute without error
+              spark.sql(mergeStmt)
+            }
+
+            // Verify data state
+            if (mergeMode == "COMMIT_TIME_ORDERING") {
+              checkAnswer(s"select id, name, price, ts, dt from $tableName 
where id = 1")(
+                Seq(1, "a1", 10.0, 1000, "2021-03-21")
+              )
+            } else {
+              // For EVENT_TIME_ORDERING, original data should be unchanged 
due to exception
+              checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+                Seq(1, "a1", 10.0, 1000, "2021-03-21")
+              )
+            }
+          }
+        })
+      }
+    }
+  }
+
+  test("Test merge into Allowed-patterns of assignment clauses") {
+    withRecordType()(withTempDir { tmp =>
+      Seq("cow", "mor").foreach { tableType =>
+        
withSparkSqlSessionConfig(DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key
 -> "false") {
+          val tableName = generateTableName
+          spark.sql(

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to