yihua commented on code in PR #12584:
URL: https://github.com/apache/hudi/pull/12584#discussion_r1907942035


##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala:
##########
@@ -802,27 +813,55 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
       assert(insert.assignments.length <= targetTableSchema.length,
         s"The number of insert assignments[${insert.assignments.length}] must 
be less than or equal to the " +
           s"targetTable field size[${targetTableSchema.length}]"))
-
+    // Precombine field and primary key field must be present in the 
assignment clause of all insert actions.
+    // Check has no effect if we don't have such fields in target table or we 
don't have insert actions
+    insertActions.foreach(action =>
+      hoodieCatalogTable.preCombineKey.foreach(
+      field => {
+        validateTargetTableAttrExistsInAssignments(
+        sparkSession.sessionState.conf.resolver,
+        mergeInto.targetTable,
+        Seq(field),
+        "pre-combine field",
+        action.assignments)
+      validateTargetTableAttrExistsInAssignments(
+        sparkSession.sessionState.conf.resolver,
+        mergeInto.targetTable,
+        hoodieCatalogTable.tableConfig.getRecordKeyFields.orElse(Array.empty),
+        "primaryKey field",
+        action.assignments)
+      }))
   }
 
-  private def checkUpdatingActions(updateActions: Seq[UpdateAction]): Unit = {
+  private def checkUpdatingActions(updateActions: Seq[UpdateAction], props: 
Map[String, String]): Unit = {
     if (hoodieCatalogTable.preCombineKey.isEmpty && updateActions.nonEmpty) {
-      logWarning(s"Updates without precombine can have nondeterministic 
behavior")
+      logWarning(s"Updates without pre-combine can have nondeterministic 
behavior")
     }
     updateActions.foreach(update =>
       assert(update.assignments.length <= targetTableSchema.length,
-        s"The number of update assignments[${update.assignments.length}] must 
be less than or equalequal to the " +
+        s"The number of update assignments[${update.assignments.length}] must 
be less than or equal to the " +
           s"targetTable field size[${targetTableSchema.length}]"))
 
-    // For MOR table, the target table field cannot be the right-value in the 
update action.
     if (targetTableType == MOR_TABLE_TYPE_OPT_VAL) {
+      // For MOR table, the target table field cannot be the right-value in 
the update action.
       updateActions.foreach(update => {
         val targetAttrs = update.assignments.flatMap(a => a.value.collect {
           case attr: AttributeReference if 
mergeInto.targetTable.outputSet.contains(attr) => attr
         })
         assert(targetAttrs.isEmpty,
           s"Target table's field(${targetAttrs.map(_.name).mkString(",")}) 
cannot be the right-value of the update clause for MOR table.")
       })
+      // Only when the partial update is enabled that primary key assignment 
is not mandatory in update actions for MOR tables.
+      if (!isPartialUpdateActionForMOR(props)) {
+        // For MOR table, update assignment clause must have primary key field 
being set explicitly even if it does not
+        // change. The check has no effect if there is no updateActions or we 
don't have primaryKey
+        updateActions.foreach(action => 
validateTargetTableAttrExistsInAssignments(
+          sparkSession.sessionState.conf.resolver,
+          mergeInto.targetTable,
+          
hoodieCatalogTable.tableConfig.getRecordKeyFields.orElse(Array.empty),
+          "primaryKey field",

Review Comment:
   use record key field to be consistent



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable.scala:
##########
@@ -1051,7 +1052,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
           s"""
              | merge into $tableName
              | using (
-             |  select 1 as id, 'a1' as name, 10 as price, $dataValue as c, 
'1' as flag
+             |  select 1 as id, 'a1' as name, 10 as price, cast($dataValue as 
$dataType) as c, '1' as flag

Review Comment:
   Use `1L` if there is any type mismatch in the tests.  The user is expected 
to provide the data with the correct types.



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable2.scala:
##########
@@ -1030,4 +1031,175 @@ class TestMergeIntoTable2 extends 
HoodieSparkSqlTestBase {
       checkAnswer(s"show partitions $destTable")(Seq("dt=2022-09-26"))
     }
   }
+
+  test("Test MergeInto Anti-Patterns of assignment clauses") {
+    Seq("cow", "mor").foreach { tableType =>
+      withRecordType()(withTempDir { tmp =>
+        log.info(s"Testing table type $tableType")
+        spark.sql(s"set 
${DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key} = false")

Review Comment:
   We should unset the config after the test or set the default of the config 
so that it does not affect other tests in the same Spark sessions (since this 
`SET` statement has global impact).



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoLogOnlyTable.scala:
##########
@@ -33,7 +33,7 @@ class TestMergeIntoLogOnlyTable extends 
HoodieSparkSqlTestBase {
            |  id int,
            |  name string,
            |  price double,
-           |  ts long
+           |  ts int

Review Comment:
   Similar here



##########
hudi-spark-datasource/hudi-spark/src/test/resources/sql-statements.sql:
##########
@@ -63,10 +63,10 @@ select id, name, price, cast(dt as string) from h0_p;
 # CREATE TABLE
 
 create table h1 (
-  id bigint,
+  id int,

Review Comment:
   Why do we need this change?



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala:
##########
@@ -802,27 +813,55 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
       assert(insert.assignments.length <= targetTableSchema.length,
         s"The number of insert assignments[${insert.assignments.length}] must 
be less than or equal to the " +
           s"targetTable field size[${targetTableSchema.length}]"))
-
+    // Precombine field and primary key field must be present in the 
assignment clause of all insert actions.
+    // Check has no effect if we don't have such fields in target table or we 
don't have insert actions
+    insertActions.foreach(action =>
+      hoodieCatalogTable.preCombineKey.foreach(

Review Comment:
   let's only validate precombine if the merge mode is `EVENT_TIME_ORDERING`.



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable.scala:
##########
@@ -1232,7 +1233,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
            |  id int,
            |  name string,
            |  value int,
-           |  ts long
+           |  ts int

Review Comment:
   Let's not change the types of precombine field.



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable.scala:
##########
@@ -40,7 +40,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with 
ScalaAssertionSuppo
              |  id int,
              |  name string,
              |  price double,
-             |  ts long
+             |  ts int

Review Comment:
   Since we also have "(int) 0" as the natural ordering, making this integer 
can mask issues around precombine field and merging logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to