aokolnychyi commented on a change in pull request #3763:
URL: https://github.com/apache/iceberg/pull/3763#discussion_r773297836
##########
File path:
spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteRowLevelCommand.scala
##########
@@ -54,6 +60,50 @@ trait RewriteRowLevelCommand extends Rule[LogicalPlan] {
}
}
+ protected def buildWriteDeltaProjections(
+ plan: LogicalPlan,
+ rowAttrs: Seq[Attribute],
+ rowIdAttrs: Seq[Attribute],
+ metadataAttrs: Seq[Attribute]): WriteDeltaProjections = {
+
+ val rowProjection = if (rowAttrs.nonEmpty) {
+ Some(newProjection(plan, rowAttrs, usePlanTypes = true))
+ } else {
+ None
+ }
+
+ // in MERGE, the plan may contain both delete and insert records that may
affect
+ // the nullability of metadata columns (e.g. metadata columns for new
records are always null)
+ // since metadata columns are never passed with new records to insert,
+ // use the actual metadata column type instead of the one present in the
plan
+
+ val rowIdProjection = newProjection(plan, rowIdAttrs, usePlanTypes = false)
+
+ val metadataProjection = if (metadataAttrs.nonEmpty) {
+ Some(newProjection(plan, metadataAttrs, usePlanTypes = false))
+ } else {
+ None
+ }
+
+ WriteDeltaProjections(rowProjection, rowIdProjection, metadataProjection)
+ }
+
+ // the projection is done by name, ignoring expr IDs
+ private def newProjection(
Review comment:
Renamed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]