yihua commented on code in PR #12798:
URL: https://github.com/apache/hudi/pull/12798#discussion_r1972599542
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala:
##########
@@ -924,26 +972,82 @@ object MergeIntoHoodieTableCommand {
fields: Seq[String],
fieldType: String,
assignments:
Seq[Assignment]): Unit = {
- // To find corresponding [[fieldType]] attribute w/in the [[assignments]]
we do
- // - Check if target table itself has the attribute
- // - Check if in any of the assignment actions, whose right-hand side
attribute
- // resolves to the source attribute. For example,
- // WHEN MATCHED THEN UPDATE SET targetTable.attribute = <expr>
- // the left-hand side of the assignment can be resolved to the target
fields we are
- // validating here.
fields.foreach { field =>
targetTable.output
.find(attr => resolver(attr.name, field))
- .getOrElse(throw new AnalysisException(s"Failed to resolve $fieldType
`$field` in target table"))
+ .getOrElse(throw new MergeIntoFieldResolutionException(s"Failed to
resolve $fieldType `$field` in target table"))
if (!assignments.exists {
case Assignment(attr: AttributeReference, _) if resolver(attr.name,
field) => true
case _ => false
}) {
- throw new AnalysisException(s"No matching assignment found for target
table $fieldType `$field`")
+ throw new MergeIntoFieldResolutionException(s"No matching assignment
found for target table $fieldType `$field`")
}
}
}
+
+ /**
+ * Generic method to resolve field associations between target and source
tables
+ *
+ * @param resolver The resolver to use
+ * @param targetTable The target table of the merge
+ * @param sourceTable The source table of the merge
+ * @param fields The fields from the target table whose association with the
source to be resolved
+ * @param fieldType String describing the type of field (for error messages)
+ * @param assignments The assignments clause of the merge into used for
resolving the association
+ * @return Sequence of resolved (target table attribute, source table
expression)
+ * mapping for target [[fields]].
+ *
+ * @throws AnalysisException if a field cannot be resolved
+ */
+ def resolveFieldAssociationsBetweenSourceAndTarget(resolver: Resolver,
+ targetTable: LogicalPlan,
+ sourceTable: LogicalPlan,
+ fields: Seq[String],
+ fieldType: String,
+ assignments:
Seq[Assignment]
+ ): Seq[(Attribute, Expression)] = {
+ fields.map { field =>
+ val targetAttribute = targetTable.output
+ .find(attr => resolver(attr.name, field))
+ .getOrElse(throw new MergeIntoFieldResolutionException(
+ s"Failed to resolve $fieldType `$field` in target table"))
+
+ val sourceExpr = sourceTable.output
+ .find(attr => resolver(attr.name, field))
+ .getOrElse {
+ assignments.collectFirst {
+ case Assignment(attr: AttributeReference, expr)
+ if resolver(attr.name, field) &&
resolvesToSourceAttribute(sourceTable, expr) => expr
+ }.getOrElse {
+ throw new MergeIntoFieldResolutionException(
+ s"Failed to resolve $fieldType `$field` w/in the source-table
output")
+ }
+ }
+
+ (targetAttribute, sourceExpr)
+ }
+ }
+
+ def resolvesToSourceAttribute(sourceTable: LogicalPlan, expr: Expression):
Boolean = {
+ val sourceTableOutputSet = sourceTable.outputSet
+ expr match {
+ case attr: AttributeReference => sourceTableOutputSet.contains(attr)
+ case MatchCast(attr: AttributeReference, _, _, _) =>
sourceTableOutputSet.contains(attr)
+
+ case _ => false
+ }
+ }
+
+ def validateDataTypes(attr: Attribute, expr: Expression, columnType:
String): Unit = {
+ if (attr.dataType != expr.dataType) {
+ throw new AnalysisException(
Review Comment:
Got it, let's make this `MergeIntoFieldTypeMismatchException`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]