jonvex commented on code in PR #9083:
URL: https://github.com/apache/hudi/pull/9083#discussion_r1249605180
##########
hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark32PlusAnalysis.scala:
##########
@@ -128,9 +131,123 @@ case class HoodieSpark32PlusResolveReferences(spark:
SparkSession) extends Rule[
catalogTable.location.toString))
LogicalRelation(relation, catalogTable)
}
+ case mO@MatchMergeIntoTable(targetTableO, sourceTableO, _)
+ ////// don't want to go to the spark mit resolution so we resolve the
source and target if they haven't been
+ if !mO.resolved =>
+ lazy val analyzer = spark.sessionState.analyzer
+ val targetTable = if (targetTableO.resolved) targetTableO else
analyzer.execute(targetTableO)
+ val sourceTable = if (sourceTableO.resolved) sourceTableO else
analyzer.execute(sourceTableO)
+ val m = mO.asInstanceOf[MergeIntoTable].copy(targetTable = targetTable,
sourceTable = sourceTable)
+ ///////
+ EliminateSubqueryAliases(targetTable) match {
+ case r: NamedRelation if r.skipSchemaResolution =>
+ // Do not resolve the expression if the target table accepts any
schema.
+ // This allows data sources to customize their own resolution logic
using
+ // custom resolution rules.
+ m
+
+ case _ =>
+ val newMatchedActions = m.matchedActions.map {
+ case DeleteAction(deleteCondition) =>
+ val resolvedDeleteCondition = deleteCondition.map(
+ resolveExpressionByPlanChildren(_, m))
+ DeleteAction(resolvedDeleteCondition)
+ case UpdateAction(updateCondition, assignments) =>
+ val resolvedUpdateCondition = updateCondition.map(
+ resolveExpressionByPlanChildren(_, m))
+ UpdateAction(
+ resolvedUpdateCondition,
+ // The update value can access columns from both target and
source tables.
+ resolveAssignments(assignments, m, resolveValuesWithSourceOnly
= false))
+ case UpdateStarAction(updateCondition) =>
+ //Hudi change: filter out meta fields
+ val assignments = targetTable.output.filter(a =>
!isMetaField(a.name)).map { attr =>
+ Assignment(attr, UnresolvedAttribute(Seq(attr.name)))
+ }
+ UpdateAction(
+ updateCondition.map(resolveExpressionByPlanChildren(_, m)),
+ // For UPDATE *, the value must from source table.
+ resolveAssignments(assignments, m, resolveValuesWithSourceOnly
= true))
+ case o => o
+ }
+ val newNotMatchedActions = m.notMatchedActions.map {
+ case InsertAction(insertCondition, assignments) =>
+ // The insert action is used when not matched, so its condition
and value can only
+ // access columns from the source table.
+ val resolvedInsertCondition = insertCondition.map(
+ resolveExpressionByPlanChildren(_, Project(Nil,
m.sourceTable)))
+ InsertAction(
+ resolvedInsertCondition,
+ resolveAssignments(assignments, m, resolveValuesWithSourceOnly
= true))
+ case InsertStarAction(insertCondition) =>
+ // The insert action is used when not matched, so its condition
and value can only
+ // access columns from the source table.
+ val resolvedInsertCondition = insertCondition.map(
+ resolveExpressionByPlanChildren(_, Project(Nil,
m.sourceTable)))
+ //Hudi change: filter out meta fields
+ val assignments = targetTable.output.filter(a =>
!isMetaField(a.name)).map { attr =>
+ Assignment(attr, UnresolvedAttribute(Seq(attr.name)))
+ }
+ InsertAction(
+ resolvedInsertCondition,
+ resolveAssignments(assignments, m, resolveValuesWithSourceOnly
= true))
+ case o => o
+ }
+ val resolvedMergeCondition =
resolveExpressionByPlanChildren(m.mergeCondition, m)
+ m.copy(mergeCondition = resolvedMergeCondition,
+ matchedActions = newMatchedActions,
+ notMatchedActions = newNotMatchedActions)
+ }
+ }
+
+ def resolveAssignments(
+ assignments: Seq[Assignment],
+ mergeInto: MergeIntoTable,
+ resolveValuesWithSourceOnly: Boolean):
Seq[Assignment] = {
+ assignments.map { assign =>
+ val resolvedKey = assign.key match {
+ case c if !c.resolved =>
+ resolveMergeExprOrFail(c, Project(Nil, mergeInto.targetTable))
+ case o => o
+ }
+ val resolvedValue = assign.value match {
+ // The update values may contain target and/or source references.
+ case c if !c.resolved =>
+ if (resolveValuesWithSourceOnly) {
+ resolveMergeExprOrFail(c, Project(Nil, mergeInto.sourceTable))
+ } else {
+ resolveMergeExprOrFail(c, mergeInto)
+ }
+ case o => o
+ }
+ Assignment(resolvedKey, resolvedValue)
+ }
+ }
+
+ private def resolveMergeExprOrFail(e: Expression, p: LogicalPlan):
Expression = {
+ try {
+ val resolved = resolveExpressionByPlanChildren(e, p)
+ resolved.references.filter(!_.resolved).foreach { a =>
+ // Note: This will throw error only on unresolved attribute issues,
+ // not other resolution errors like mismatched data types.
+ val cols = p.inputSet.toSeq.map(_.sql).mkString(", ")
+ sparkAdapter.failAnalysisForMIT(a, cols)
Review Comment:
The case insensitive tests will hit this now, but wasn't sure if we wanted
to do with that functionality
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]