[ 
https://issues.apache.org/jira/browse/HUDI-1884?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17381135#comment-17381135
 ] 

ASF GitHub Bot commented on HUDI-1884:
--------------------------------------

pengzhiwei2018 commented on a change in pull request #3154:
URL: https://github.com/apache/hudi/pull/3154#discussion_r670226276



##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
##########
@@ -102,7 +103,7 @@ case class HoodieResolveReferences(sparkSession: 
SparkSession) extends Rule[Logi
 
   def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp  {
     // Resolve merge into
-    case MergeIntoTable(target, source, mergeCondition, matchedActions, 
notMatchedActions)
+    case mergeInto@MergeIntoTable(target, source, mergeCondition, 
matchedActions, notMatchedActions)

Review comment:
       yes, will fix this format.

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
##########
@@ -164,7 +165,47 @@ case class HoodieResolveReferences(sparkSession: 
SparkSession) extends Rule[Logi
         case UpdateAction(condition, assignments) =>
           val (resolvedCondition, resolvedAssignments) =
             resolveConditionAssignments(condition, assignments)
-          UpdateAction(resolvedCondition, resolvedAssignments)
+
+          // Get the target table type and pre-combine field.
+          val targetTableId = getMergeIntoTargetTableId(mergeInto)
+          val targetTable =
+            sparkSession.sessionState.catalog.getTableMetadata(targetTableId)
+          val targetTableType = 
HoodieOptionConfig.getTableType(targetTable.storage.properties)
+          val preCombineField = 
HoodieOptionConfig.getPreCombineField(targetTable.storage.properties)
+
+          // Get the map of target attribute to value of the update 
assignments.
+          val target2Values = resolvedAssignments.map {
+              case Assignment(attr: AttributeReference, value) =>
+                attr.name -> value
+              case o => throw new IllegalArgumentException(s"Assignment key 
must be an attribute, current is: ${o.key}")
+          }.toMap
+
+          // Validate if there are incorrect target attributes.
+          val unKnowTargets = target2Values.keys
+            .filterNot(removeMetaFields(target.output).map(_.name).contains(_))
+          if (unKnowTargets.nonEmpty) {
+            throw new AnalysisException(s"Cannot find target attributes: 
${unKnowTargets.mkString(",")}.")
+          }
+
+          // Fill the missing target attribute in the update action for COW 
table to support partial update.
+          // e.g. If the update action missing 'id' attribute, we fill a "id = 
target.id" to the update action.
+          val newAssignments = removeMetaFields(target.output)
+            .map(attr => {
+              // TODO support partial update for MOR.
+              if (!target2Values.contains(attr.name) && targetTableType == 
MOR_TABLE_TYPE_OPT_VAL) {
+                throw new AnalysisException(s"Missing specify the value for 
target field: '${attr.name}' in merge into update action" +
+                  s" for MOR table. Currently we cannot support partial update 
for MOR," +
+                  s" please complete all the target fields just like 
'...update set id = s0.id, name = s0.name ....'")
+              }
+              if (preCombineField.isDefined && 
preCombineField.get.equalsIgnoreCase(attr.name)

Review comment:
       If the table has defined `preCombineField`, we must know the mapping of 
`preCombineField` in target  table to the source field. So we must specify the 
value for `preCombineField`, or not, we cannot read the `preCombineField` field 
from the source.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


> MergeInto Support Partial Update For COW 
> -----------------------------------------
>
>                 Key: HUDI-1884
>                 URL: https://issues.apache.org/jira/browse/HUDI-1884
>             Project: Apache Hudi
>          Issue Type: Sub-task
>            Reporter: pengzhiwei
>            Assignee: pengzhiwei
>            Priority: Major
>              Labels: pull-request-available
>
> Support partial update for merge-into statement, just like this:
> {code:java}
> merge into h0 
> using s0
> on s0.id = h0.id
> when matched then update set price = s0.price + 10
> when not matched then insert *{code}
> Allow to update partial fields.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to