kbendick commented on a change in pull request #2116:
URL: https://github.com/apache/iceberg/pull/2116#discussion_r560477255
##########
File path:
spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
##########
@@ -58,53 +61,138 @@ case class RewriteMergeInto(conf: SQLConf) extends
Rule[LogicalPlan] with Rewrit
override def apply(plan: LogicalPlan): LogicalPlan = {
plan resolveOperators {
+ case MergeIntoTable(target: DataSourceV2Relation, source: LogicalPlan,
cond, matchedActions, notMatchedActions)
+ if matchedActions.isEmpty =>
+
+ val mergeBuilder = target.table.asMergeable.newMergeBuilder("merge",
newWriteInfo(target.schema))
+ val targetTableScan = buildSimpleScanPlan(target.table, target.output,
mergeBuilder, cond)
+
+ // when there are no matched actions, use a left anti join to remove
any matching rows and rewrite to use
+ // append instead of replace. only unmatched source rows are passed to
the merge and actions are all inserts.
+ val joinPlan = Join(source, targetTableScan, LeftAnti, Some(cond),
JoinHint.NONE)
+
+ val mergeParams = MergeIntoParams(
+ isSourceRowNotPresent = FALSE_LITERAL,
+ isTargetRowNotPresent = TRUE_LITERAL,
+ matchedConditions = Nil,
+ matchedOutputs = Nil,
+ notMatchedConditions = notMatchedActions.map(getClauseCondition),
+ notMatchedOutputs = notMatchedActions.map(actionOutput),
+ targetOutput = Nil,
+ joinedAttributes = joinPlan.output
+ )
+ val mergePlan = MergeInto(mergeParams, target, joinPlan)
+
+ AppendData.byPosition(target, mergePlan, Map.empty)
+
+ case MergeIntoTable(target: DataSourceV2Relation, source: LogicalPlan,
cond, matchedActions, notMatchedActions)
+ if notMatchedActions.isEmpty =>
+
+ val (mergeBuilder, targetTableScan) =
buildDynamicFilterTargetScan(target, source, cond)
+
+ // rewrite the matched actions to ensure there is always an action to
produce the output row
+ val (matchedConditions, matchedOutputs) =
rewriteMatchedActions(matchedActions, target.output)
+
+ // when there are no not-matched actions, use a right outer join to
ignore source rows that do not match, but
+ // keep all unmatched target rows that must be preserved.
+ val sourceTableProj = source.output ++ Seq(Alias(TRUE_LITERAL,
ROW_FROM_SOURCE)())
+ val newSourceTableScan = Project(sourceTableProj, source)
+ val joinPlan = Join(newSourceTableScan, targetTableScan, RightOuter,
Some(cond), JoinHint.NONE)
+
+ val mergeParams = MergeIntoParams(
+ isSourceRowNotPresent = IsNull(findOutputAttr(joinPlan.output,
ROW_FROM_SOURCE)),
+ isTargetRowNotPresent = FALSE_LITERAL,
+ matchedConditions = matchedConditions,
+ matchedOutputs = matchedOutputs,
+ notMatchedConditions = Nil,
+ notMatchedOutputs = Nil,
+ targetOutput = target.output,
+ joinedAttributes = joinPlan.output
+ )
+ val mergePlan = MergeInto(mergeParams, target, joinPlan)
+ val batchWrite = mergeBuilder.asWriteBuilder.buildForBatch()
+
+ ReplaceData(target, batchWrite, mergePlan)
+
case MergeIntoTable(target: DataSourceV2Relation, source: LogicalPlan,
cond, matchedActions, notMatchedActions) =>
- // Construct the plan to prune target based on join condition between
source and target.
- val writeInfo = newWriteInfo(target.schema)
- val mergeBuilder = target.table.asMergeable.newMergeBuilder("merge",
writeInfo)
- val matchingRowsPlanBuilder = (rel: DataSourceV2ScanRelation) =>
- Join(source, rel, Inner, Some(cond), JoinHint.NONE)
- val targetTableScan = buildScanPlan(target.table, target.output,
mergeBuilder, cond, matchingRowsPlanBuilder)
-
- // Construct an outer join to help track changes in source and target.
- // TODO : Optimize this to use LEFT ANTI or RIGHT OUTER when
applicable.
+
+ val (mergeBuilder, targetTableScan) =
buildDynamicFilterTargetScan(target, source, cond)
+
+ // rewrite the matched actions to ensure there is always an action to
produce the output row
+ val (matchedConditions, matchedOutputs) =
rewriteMatchedActions(matchedActions, target.output)
+
+ // use a full outer join because there are both matched and not
matched actions
val sourceTableProj = source.output ++ Seq(Alias(TRUE_LITERAL,
ROW_FROM_SOURCE)())
- val targetTableProj = target.output ++ Seq(Alias(TRUE_LITERAL,
ROW_FROM_TARGET)())
+ val targetTableProj = targetTableScan.output ++
Seq(Alias(TRUE_LITERAL, ROW_FROM_TARGET)())
val newTargetTableScan = Project(targetTableProj, targetTableScan)
val newSourceTableScan = Project(sourceTableProj, source)
val joinPlan = Join(newSourceTableScan, newTargetTableScan, FullOuter,
Some(cond), JoinHint.NONE)
- // Construct the plan to replace the data based on the output of
`MergeInto`
val mergeParams = MergeIntoParams(
isSourceRowNotPresent = IsNull(findOutputAttr(joinPlan.output,
ROW_FROM_SOURCE)),
isTargetRowNotPresent = IsNull(findOutputAttr(joinPlan.output,
ROW_FROM_TARGET)),
- matchedConditions = matchedActions.map(getClauseCondition),
- matchedOutputs = matchedActions.map(actionOutput(_, target.output)),
+ matchedConditions = matchedConditions,
+ matchedOutputs = matchedOutputs,
notMatchedConditions = notMatchedActions.map(getClauseCondition),
- notMatchedOutputs = notMatchedActions.map(actionOutput(_,
target.output)),
- targetOutput = target.output :+ FALSE_LITERAL,
- deleteOutput = target.output :+ TRUE_LITERAL,
+ notMatchedOutputs = notMatchedActions.map(actionOutput),
+ targetOutput = target.output,
joinedAttributes = joinPlan.output
)
val mergePlan = MergeInto(mergeParams, target, joinPlan)
val batchWrite = mergeBuilder.asWriteBuilder.buildForBatch()
+
ReplaceData(target, batchWrite, mergePlan)
}
}
- private def actionOutput(clause: MergeAction, targetOutputCols:
Seq[Expression]): Seq[Expression] = {
+ private def actionOutput(clause: MergeAction): Option[Seq[Expression]] = {
clause match {
case u: UpdateAction =>
- u.assignments.map(_.value) :+ FALSE_LITERAL
+ Some(u.assignments.map(_.value))
case _: DeleteAction =>
- targetOutputCols :+ TRUE_LITERAL
+ None
case i: InsertAction =>
- i.assignments.map(_.value) :+ FALSE_LITERAL
+ Some(i.assignments.map(_.value))
}
}
private def getClauseCondition(clause: MergeAction): Expression = {
clause.condition.getOrElse(TRUE_LITERAL)
}
+
+ private def buildDynamicFilterTargetScan(
+ target: DataSourceV2Relation,
+ source: LogicalPlan,
+ cond: Expression): (MergeBuilder, LogicalPlan) = {
+ // Construct the plan to prune target based on join condition between
source and target.
+ val mergeBuilder = target.table.asMergeable.newMergeBuilder("merge",
newWriteInfo(target.schema))
+ val matchingRowsPlanBuilder = (rel: DataSourceV2ScanRelation) =>
+ Join(source, rel, Inner, Some(cond), JoinHint.NONE)
+ val targetTableScan = buildDynamicFilterScanPlan(
+ target.table, target.output, mergeBuilder, cond, matchingRowsPlanBuilder)
+
+ (mergeBuilder, targetTableScan)
+ }
+
+ private def rewriteMatchedActions(
+ matchedActions: Seq[MergeAction],
+ targetOutput: Seq[Expression]): (Seq[Expression],
Seq[Option[Seq[Expression]]]) = {
+ val startMatchedConditions = matchedActions.map(getClauseCondition)
+ val catchAllIndex = startMatchedConditions.indexWhere {
Review comment:
Again for my own understanding, the fact that the `Expression` are in a
`Seq` at this point in time means that they are all compounded as `and`
essentially?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]