yihua commented on code in PR #12584:
URL: https://github.com/apache/hudi/pull/12584#discussion_r1907940066


##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala:
##########
@@ -802,27 +813,55 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
       assert(insert.assignments.length <= targetTableSchema.length,
         s"The number of insert assignments[${insert.assignments.length}] must 
be less than or equal to the " +
           s"targetTable field size[${targetTableSchema.length}]"))
-
+    // Precombine field and primary key field must be present in the 
assignment clause of all insert actions.
+    // Check has no effect if we don't have such fields in target table or we 
don't have insert actions
+    insertActions.foreach(action =>
+      hoodieCatalogTable.preCombineKey.foreach(
+      field => {
+        validateTargetTableAttrExistsInAssignments(
+        sparkSession.sessionState.conf.resolver,
+        mergeInto.targetTable,
+        Seq(field),
+        "pre-combine field",
+        action.assignments)
+      validateTargetTableAttrExistsInAssignments(
+        sparkSession.sessionState.conf.resolver,
+        mergeInto.targetTable,
+        hoodieCatalogTable.tableConfig.getRecordKeyFields.orElse(Array.empty),
+        "primaryKey field",
+        action.assignments)
+      }))
   }
 
-  private def checkUpdatingActions(updateActions: Seq[UpdateAction]): Unit = {
+  private def checkUpdatingActions(updateActions: Seq[UpdateAction], props: 
Map[String, String]): Unit = {
     if (hoodieCatalogTable.preCombineKey.isEmpty && updateActions.nonEmpty) {
-      logWarning(s"Updates without precombine can have nondeterministic 
behavior")
+      logWarning(s"Updates without pre-combine can have nondeterministic 
behavior")

Review Comment:
   no need to change



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala:
##########
@@ -802,27 +813,55 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
       assert(insert.assignments.length <= targetTableSchema.length,
         s"The number of insert assignments[${insert.assignments.length}] must 
be less than or equal to the " +
           s"targetTable field size[${targetTableSchema.length}]"))
-
+    // Precombine field and primary key field must be present in the 
assignment clause of all insert actions.

Review Comment:
   ```suggestion
       // Precombine field and record key field must be present in the 
assignment clause of all insert actions.
   ```



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala:
##########
@@ -489,6 +481,25 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
     }
   }
 
+  private def isPartialUpdateActionForMOR(parameters: Map[String, String]) = {
+    val isPartialUpdateAction = (targetTableType == MOR_TABLE_TYPE_OPT_VAL
+      && UPSERT_OPERATION_OPT_VAL == getOperationType(parameters)
+      && parameters.getOrElse(
+      ENABLE_MERGE_INTO_PARTIAL_UPDATES.key,
+      ENABLE_MERGE_INTO_PARTIAL_UPDATES.defaultValue.toString).toBoolean
+      && updatingActions.nonEmpty)
+    isPartialUpdateAction

Review Comment:
   nit: inline the statement



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala:
##########
@@ -802,27 +813,55 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
       assert(insert.assignments.length <= targetTableSchema.length,
         s"The number of insert assignments[${insert.assignments.length}] must 
be less than or equal to the " +
           s"targetTable field size[${targetTableSchema.length}]"))
-
+    // Precombine field and primary key field must be present in the 
assignment clause of all insert actions.

Review Comment:
   I assume the insert action should always include all columns, so such check 
may not be necessary, i.e., missing any column would lead to insert failure 
already?



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala:
##########
@@ -802,27 +813,55 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
       assert(insert.assignments.length <= targetTableSchema.length,
         s"The number of insert assignments[${insert.assignments.length}] must 
be less than or equal to the " +
           s"targetTable field size[${targetTableSchema.length}]"))
-
+    // Precombine field and primary key field must be present in the 
assignment clause of all insert actions.
+    // Check has no effect if we don't have such fields in target table or we 
don't have insert actions
+    insertActions.foreach(action =>
+      hoodieCatalogTable.preCombineKey.foreach(
+      field => {
+        validateTargetTableAttrExistsInAssignments(
+        sparkSession.sessionState.conf.resolver,
+        mergeInto.targetTable,
+        Seq(field),
+        "pre-combine field",

Review Comment:
   ```suggestion
           "precombine field",
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to