andygrove commented on code in PR #1746: URL: https://github.com/apache/datafusion-comet/pull/1746#discussion_r2096187472
########## spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala: ########## @@ -2376,12 +2375,26 @@ object QueryPlanSerde extends Logging with CometExprShim { val cond = exprToProto(condition, child.output) if (cond.isDefined && childOp.nonEmpty) { + // We need to determine whether to use DataFusion's FilterExec or Comet's + // FilterExec. The difference is that DataFusion's implementation will sometimes pass + // batches through whereas the Comet implementation guarantees that a copy is always + // made, which is critical when using `native_comet` scans due to buffer re-use + + // TODO this could be optimized more to stop walking the tree on hitting + // certain operators such as join or aggregate which will copy batches + def containsNativeCometScan(plan: SparkPlan): Boolean = { + plan match { + case w: CometScanWrapper => containsNativeCometScan(w.originalPlan) + case scan: CometScanExec => scan.scanImpl == CometConf.SCAN_NATIVE_COMET Review Comment: We only need to use Comet's FilterExec if any of the scans are `NATIVE_COMET` due to the buffer re-use in that scan. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org