rdblue commented on a change in pull request #1947:
URL: https://github.com/apache/iceberg/pull/1947#discussion_r559831495



##########
File path: 
spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/utils/RewriteRowLevelOperationHelper.scala
##########
@@ -103,6 +103,7 @@ trait RewriteRowLevelOperationHelper extends 
PredicateHelper with Logging {
   }
 
   private def buildFileFilterPlan(matchingRowsPlan: LogicalPlan): LogicalPlan 
= {
+    // TODO: For merge-into make sure _file is resolved only from target table.

Review comment:
       You can solve this problem by passing the target table attrs from the 
`DataSourceV2ScanRelation`:
   
   ```scala
       val matchingFilePlan = buildFileFilterPlan(scanRelation.output, 
matchingRowsPlanBuilder(scanRelation))
     ...
   
     private def buildFileFilterPlan(tableAttrs: Seq[AttributeReference], 
matchingRowsPlan: LogicalPlan): LogicalPlan = {
       val fileAttr = findOutputAttr(tableAttrs, FILE_NAME_COL)
       val agg = Aggregate(Seq(fileAttr), Seq(fileAttr), matchingRowsPlan)
       Project(Seq(findOutputAttr(agg.output, FILE_NAME_COL)), agg)
     }
   
     protected def findOutputAttr(attrs: Seq[Attribute], attrName: String): 
Attribute = {
       attrs.find(attr => resolver(attr.name, attrName)).getOrElse {
         throw new AnalysisException(s"Cannot find $attrName in $attrs")
       }
     }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to