andygrove commented on code in PR #4700:
URL: https://github.com/apache/datafusion-comet/pull/4700#discussion_r3475904853
##########
spark/src/main/scala/org/apache/spark/sql/comet/operators.scala:
##########
@@ -901,6 +943,43 @@ abstract class CometLeafExec extends CometNativeExec with
LeafExecNode {
}
}
+/**
+ * Marker trait for scan execs that surface planning data (a `commonData`
block + per-partition
+ * task bytes keyed by `sourceKey`) so that a parent `CometNativeExec` can
find and inject the
+ * data when the scan is fused into a larger native subtree.
+ *
+ * Implemented by `CometNativeScanExec` and the contrib's
`CometDeltaNativeScanExec` -- without
+ * it, [[PlanDataInjector.findAllPlanData]] cannot collect the per-partition
tasks and the
+ * parent's native execution receives an empty input.
(`CometIcebergNativeScanExec` does NOT use
+ * this trait; it has a dedicated `findAllPlanData` case.)
+ *
+ * Each implementation also resolves its own DPP subqueries via
`ensureSubqueriesResolved`
+ * (overridden from [[CometLeafExec]]) before `commonData`/`perPartitionData`
are read.
+ */
+trait CometScanWithPlanData {
+ def sourceKey: String
+ def commonData: Array[Byte]
+ def perPartitionData: Array[Array[Byte]]
+
+ // DPP / partition filters that may carry AQE SubqueryAdaptiveBroadcast
+ // subqueries needing rewrite by CometPlanAdaptiveDynamicPruningFilters.
+ // Default empty: scans with dedicated handling (CometNativeScanExec,
+ // CometIcebergNativeScanExec) don't use this path.
+ def dynamicPruningFilters: Seq[Expression] = Nil
+
+ // Install rewritten DPP filters on this scan. Implementers whose filters
live
+ // in a @transient field (which TreeNode.makeCopy can't carry, #3510) update
+ // them via a transient side-channel and return `this` -- so the optimizer
+ // rule's rewrite lands on the SAME instance that executes, instead of a copy
+ // that gets dropped when the enclosing native block is rebuilt. Only called
+ // when `dynamicPruningFilters` is non-empty, so the default is never reached
+ // for scans that leave it empty.
+ def withDynamicPruningFilters(filters: Seq[Expression]): SparkPlan =
Review Comment:
The mutate-and-return-`this` shape is a reasonable workaround given
`@transient` filters can't survive `makeCopy`, and the comment explains it
well. Since `CometPlanAdaptiveDynamicPruningFilters` already carries a
`TODO(#3510)` for the day `makeCopy` is fixed, could we mirror that marker
here? It would help this collapse back into a normal copy rather than lingering
once the underlying issue is resolved.
##########
spark/src/main/scala/org/apache/spark/sql/comet/operators.scala:
##########
@@ -901,6 +943,43 @@ abstract class CometLeafExec extends CometNativeExec with
LeafExecNode {
}
}
+/**
+ * Marker trait for scan execs that surface planning data (a `commonData`
block + per-partition
+ * task bytes keyed by `sourceKey`) so that a parent `CometNativeExec` can
find and inject the
+ * data when the scan is fused into a larger native subtree.
+ *
+ * Implemented by `CometNativeScanExec` and the contrib's
`CometDeltaNativeScanExec` -- without
+ * it, [[PlanDataInjector.findAllPlanData]] cannot collect the per-partition
tasks and the
+ * parent's native execution receives an empty input.
(`CometIcebergNativeScanExec` does NOT use
+ * this trait; it has a dedicated `findAllPlanData` case.)
+ *
+ * Each implementation also resolves its own DPP subqueries via
`ensureSubqueriesResolved`
+ * (overridden from [[CometLeafExec]]) before `commonData`/`perPartitionData`
are read.
+ */
+trait CometScanWithPlanData {
Review Comment:
The scaladoc says each implementation resolves its own DPP subqueries via
`ensureSubqueriesResolved` (overridden from `CometLeafExec`), but the trait
doesn't extend `CometLeafExec`, and `findAllPlanData` guards the call with a
runtime `case leaf: CometLeafExec => ... case _ =>`. An implementer that
forgets to also extend `CometLeafExec` compiles fine and then silently skips
subquery resolution, which is the deadlock surface `ensureSubqueriesResolved`
exists to prevent. Would it be worth making the requirement explicit with a
self-type (`trait CometScanWithPlanData { self: CometLeafExec => ... }`), or at
least a scaladoc note that implementers must extend `CometLeafExec`? That also
pairs well with the suggested stub test: a `CometLeafExec with
CometScanWithPlanData` stub exercises the real lifecycle path rather than the
`case _ =>` fallback.
##########
spark/src/main/scala/org/apache/comet/rules/CometPlanAdaptiveDynamicPruningFilters.scala:
##########
@@ -83,6 +83,15 @@ case object CometPlanAdaptiveDynamicPruningFilters
if icebergScan.runtimeFilters.exists(hasCometSAB) =>
logDebug("Converting AQE DPP for CometIcebergNativeScanExec")
convertIcebergScanDPP(icebergScan, plan)
+ // Comet scans whose DPP filters live in a @transient field (the
contrib's
+ // CometDeltaNativeScanExec). transformExpressions/makeCopy can't rewrite
+ // them, and a rewritten copy is orphaned when the enclosing native block
+ // is rebuilt (#3510). The scan's `withDynamicPruningFilters` installs
the
+ // rewrite in place and returns `this`, so it lands on the executing
+ // instance.
+ case p: CometScanWithPlanData if
p.dynamicPruningFilters.exists(hasCometSAB) =>
+ logDebug(s"Converting AQE DPP for ${p.getClass.getSimpleName} in
place")
+ p.withDynamicPruningFilters(p.dynamicPruningFilters.map(f =>
convertFilter(f, plan)))
case p: SparkPlan
Review Comment:
Not blocking for this part, just flagging for later: this arm excludes
`CometNativeScanExec` and `CometIcebergNativeScanExec` by explicit
`isInstanceOf` checks but not contrib trait scans. A future Delta scan with a
wrapped SAB but empty `dynamicPruningFilters` would fall through to
`convertNonCometNodeDPP`. That's the contrib's concern in a later part, worth
keeping in mind so the exclusion list doesn't silently misroute trait scans.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]