alexeykudinkin commented on code in PR #7871:
URL: https://github.com/apache/hudi/pull/7871#discussion_r1104996074


##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala:
##########
@@ -127,164 +155,189 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
     // target table side (since we're gonna be matching against primary-key 
column as is) expression
     // on the opposite side of the comparison should be cast-able to the 
primary-key column's data-type
     // t/h "up-cast" (ie w/o any loss in precision)
-    val target2Source = cleanedConditions.map {
-      case EqualTo(CoercedAttributeReference(attr), expr)
-        if targetAttrs.exists(f => attributeEqual(f, attr, resolver)) =>
-          if (exprUtils.canUpCast(expr.dataType, attr.dataType)) {
-            targetAttrs.find(f => resolver(f.name, attr.name)).get.name ->
-              castIfNeeded(expr, attr.dataType, sparkSession.sqlContext.conf)
-          } else {
-            throw new AnalysisException(s"Invalid MERGE INTO matching 
condition: ${expr.sql}: "
-              + s"can't cast ${expr.sql} (of ${expr.dataType}) to 
${attr.dataType}")
-          }
+    val targetAttr2ConditionExpressions = cleanedConditions.map {
+      case EqualTo(CoercedAttributeReference(attr), expr) if 
targetAttrs.exists(f => attributeEquals(f, attr)) =>
+        if (exprUtils.canUpCast(expr.dataType, attr.dataType)) {
+          // NOTE: It's critical we reference output attribute here and not 
the one from condition
+          val targetAttr = targetAttrs.find(f => attributeEquals(f, attr)).get
+          targetAttr -> castIfNeeded(expr, attr.dataType)
+        } else {
+          throw new AnalysisException(s"Invalid MERGE INTO matching condition: 
${expr.sql}: "
+            + s"can't cast ${expr.sql} (of ${expr.dataType}) to 
${attr.dataType}")
+        }
 
-      case EqualTo(expr, CoercedAttributeReference(attr))
-        if targetAttrs.exists(f => attributeEqual(f, attr, resolver)) =>
-          if (exprUtils.canUpCast(expr.dataType, attr.dataType)) {
-            targetAttrs.find(f => resolver(f.name, attr.name)).get.name ->
-              castIfNeeded(expr, attr.dataType, sparkSession.sqlContext.conf)
-          } else {
-            throw new AnalysisException(s"Invalid MERGE INTO matching 
condition: ${expr.sql}: "
-              + s"can't cast ${expr.sql} (of ${expr.dataType}) to 
${attr.dataType}")
-          }
+      case EqualTo(expr, CoercedAttributeReference(attr)) if 
targetAttrs.exists(f => attributeEquals(f, attr)) =>
+        if (exprUtils.canUpCast(expr.dataType, attr.dataType)) {
+          // NOTE: It's critical we reference output attribute here and not 
the one from condition
+          val targetAttr = targetAttrs.find(f => attributeEquals(f, attr)).get
+          targetAttr -> castIfNeeded(expr, attr.dataType)
+        } else {
+          throw new AnalysisException(s"Invalid MERGE INTO matching condition: 
${expr.sql}: "
+            + s"can't cast ${expr.sql} (of ${expr.dataType}) to 
${attr.dataType}")
+        }
 
       case expr =>
         throw new AnalysisException(s"Invalid MERGE INTO matching condition: 
`${expr.sql}`: "
           + "expected condition should be 'target.id = <source-column-expr>', 
e.g. "
           + "`t.id = s.id` or `t.id = cast(s.id, ...)`")
-    }.toMap
+    }
 
-    target2Source
+    targetAttr2ConditionExpressions.collect {
+      case (attr, expr) if resolver(attr.name, primaryKeyField) =>
+        // NOTE: Here we validate that condition expression involving 
primary-key column(s) is a simple
+        //       attribute-reference expression (possibly wrapped into a 
cast). This is necessary to disallow
+        //       statements like following
+        //
+        //         MERGE INTO ... AS t USING (
+        //            SELECT ... FROM ... AS s
+        //         )
+        //            ON t.id = s.id + 1
+        //            WHEN MATCHED THEN UPDATE *
+        //
+        //       Which (in the current design) could result in a primary key 
of the record being modified,
+        //       which is not allowed.
+        if (!resolvesToSourceAttribute(expr)) {
+          throw new AnalysisException("Only simple conditions of the form 
`t.id = s.id` are allowed on the " +
+            s"primary-key column. Found `${attr.sql} = ${expr.sql}`")
+        }
+
+        (attr, expr)
+    }
   }
 
   /**
-   * Get the mapping of target preCombineField to the source expression.
+   * Please check description for [[primaryKeyAttributeToConditionExpression]]
    */
-  private lazy val target2SourcePreCombineFiled: Option[(String, Expression)] 
= {
-    val updateActions = mergeInto.matchedActions.collect { case u: 
UpdateAction => u }
-    assert(updateActions.size <= 1, s"Only support one updateAction currently, 
current update action count is: ${updateActions.size}")
-
-    val updateAction = updateActions.headOption
-    hoodieCatalogTable.preCombineKey.map(preCombineField => {
-      val sourcePreCombineField =
-        updateAction.map(u => u.assignments.filter {
-            case Assignment(key: AttributeReference, _) => 
key.name.equalsIgnoreCase(preCombineField)
-            case _=> false
-          }.head.value
-        ).getOrElse {
-          // If there is no update action, mapping the target column to the 
source by order.
-          val target2Source = mergeInto.targetTable.output
-            .filter(attr => !isMetaField(attr.name))
-            .map(_.name)
-            .zip(mergeInto.sourceTable.output.filter(attr => 
!isMetaField(attr.name)))
-            .toMap
-          target2Source.getOrElse(preCombineField, null)
+  private lazy val preCombineAttributeAssociatedExpression: Option[(Attribute, 
Expression)] = {
+    val resolver = sparkSession.sessionState.analyzer.resolver
+    hoodieCatalogTable.preCombineKey.map { preCombineField =>
+      val targetPreCombineAttribute =
+        mergeInto.targetTable.output
+          .find { attr => resolver(attr.name, preCombineField) }
+          .get
+
+      // To find corresponding "pre-combine" attribute w/in the 
[[sourceTable]] we do
+      //    - Check if we can resolve the attribute w/in the source table as 
is; if unsuccessful, then
+      //    - Check if in any of the update actions, right-hand side of the 
assignment actually resolves
+      //    to it, in which case we will determine left-hand side expression 
as the value of "pre-combine"
+      //    attribute w/in the [[sourceTable]]
+      val sourceExpr = {
+        mergeInto.sourceTable.output.find(attr => resolver(attr.name, 
preCombineField)) match {
+          case Some(attr) => attr
+          case None =>
+            updatingActions.flatMap(_.assignments).collectFirst {
+              case Assignment(attr: AttributeReference, expr)
+                if resolver(attr.name, preCombineField) && 
resolvesToSourceAttribute(expr) => expr
+            } getOrElse {
+              throw new AnalysisException(s"Failed to resolve pre-combine 
field `${preCombineField}` w/in the source-table output")
+            }
+
         }
-      (preCombineField, sourcePreCombineField)
-    }).filter(p => p._2 != null)
+      }
+
+      (targetPreCombineAttribute, sourceExpr)
+    }
   }
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
     this.sparkSession = sparkSession
+    // TODO move to analysis phase
+    validate(mergeInto)

Review Comment:
   Nope. This is a future reference



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to