jonvex commented on code in PR #9083:
URL: https://github.com/apache/hudi/pull/9083#discussion_r1251342069
##########
hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark32PlusAnalysis.scala:
##########
@@ -128,9 +131,135 @@ case class HoodieSpark32PlusResolveReferences(spark:
SparkSession) extends Rule[
catalogTable.location.toString))
LogicalRelation(relation, catalogTable)
}
+ case mO@MatchMergeIntoTable(targetTableO, sourceTableO, _)
+ //// Hudi change: don't want to go to the spark mit resolution so we
resolve the source and target if they haven't been
+ //
+ if !mO.resolved =>
+ lazy val analyzer = spark.sessionState.analyzer
+ val targetTable = if (targetTableO.resolved) targetTableO else
analyzer.execute(targetTableO)
+ val sourceTable = if (sourceTableO.resolved) sourceTableO else
analyzer.execute(sourceTableO)
+ val m = mO.asInstanceOf[MergeIntoTable].copy(targetTable = targetTable,
sourceTable = sourceTable)
+ //
+ ////
+ EliminateSubqueryAliases(targetTable) match {
+ case r: NamedRelation if r.skipSchemaResolution =>
+ // Do not resolve the expression if the target table accepts any
schema.
+ // This allows data sources to customize their own resolution logic
using
+ // custom resolution rules.
+ m
+
+ case _ =>
+ val newMatchedActions = m.matchedActions.map {
+ case DeleteAction(deleteCondition) =>
+ val resolvedDeleteCondition = deleteCondition.map(
+ resolveExpressionByPlanChildren(_, m))
+ DeleteAction(resolvedDeleteCondition)
+ case UpdateAction(updateCondition, assignments) =>
+ val resolvedUpdateCondition = updateCondition.map(
+ resolveExpressionByPlanChildren(_, m))
+ UpdateAction(
+ resolvedUpdateCondition,
+ // The update value can access columns from both target and
source tables.
+ resolveAssignments(assignments, m, resolveValuesWithSourceOnly
= false))
+ case UpdateStarAction(updateCondition) =>
+ ////Hudi change: filter out meta fields
+ //
+ val assignments = targetTable.output.filter(a =>
!isMetaField(a.name)).map { attr =>
+ Assignment(attr, UnresolvedAttribute(Seq(attr.name)))
+ }
+ //
+ ////
+ UpdateAction(
+ updateCondition.map(resolveExpressionByPlanChildren(_, m)),
+ // For UPDATE *, the value must from source table.
+ resolveAssignments(assignments, m, resolveValuesWithSourceOnly
= true))
+ case o => o
+ }
+ val newNotMatchedActions = m.notMatchedActions.map {
+ case InsertAction(insertCondition, assignments) =>
+ // The insert action is used when not matched, so its condition
and value can only
+ // access columns from the source table.
+ val resolvedInsertCondition = insertCondition.map(
+ resolveExpressionByPlanChildren(_, Project(Nil,
m.sourceTable)))
+ InsertAction(
+ resolvedInsertCondition,
+ resolveAssignments(assignments, m, resolveValuesWithSourceOnly
= true))
+ case InsertStarAction(insertCondition) =>
+ // The insert action is used when not matched, so its condition
and value can only
+ // access columns from the source table.
+ val resolvedInsertCondition = insertCondition.map(
+ resolveExpressionByPlanChildren(_, Project(Nil,
m.sourceTable)))
+ ////Hudi change: filter out meta fields
+ //
+ val assignments = targetTable.output.filter(a =>
!isMetaField(a.name)).map { attr =>
+ Assignment(attr, UnresolvedAttribute(Seq(attr.name)))
+ }
+ //
+ ////
+ InsertAction(
+ resolvedInsertCondition,
+ resolveAssignments(assignments, m, resolveValuesWithSourceOnly
= true))
+ case o => o
Review Comment:
I marked the custom changes by surrounding them in
////
//
changes
//
////
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]