mbutrovich opened a new pull request, #4215:
URL: https://github.com/apache/datafusion-comet/pull/4215

   ## Which issue does this PR close?
   
   Closes #4022.
   
   ## Rationale for this change
   
   Under AQE, Spark's `PlanAdaptiveDynamicPruningFilters` rewrites 
`SubqueryAdaptiveBroadcastExec` (SAB) to `SubqueryBroadcastExec` so DPP filters 
reuse the join's already-materialized broadcast. For Iceberg native scans this 
rewrite was a no-op: `CometIcebergNativeScanExec` kept `runtimeFilters` inside 
its `@transient originalPlan`, where neither Spark's expression walks nor our 
own `transformExpressionsUp` passes could see them. The SAB stayed unconverted 
and the dim table executed a second time as a standalone broadcast.
   
   #4112 fixed the equivalent problem for V1 native Parquet by lifting 
`runtimeFilters` to a top-level constructor field and using Spark's standard 
`prepare` / `waitForSubqueries` flow. This PR applies the same design to V2 
Iceberg, replacing the earlier prototype in #4033, and aligns 
`CometNativeScanExec` and `CometIcebergNativeScanExec` so both scans go through 
the same DPP and subquery resolution path.
   
   ## What changes are included in this PR?
   
   - Lifted `runtimeFilters` to a top-level constructor field on 
`CometIcebergNativeScanExec` so Spark's `productIterator`-based expression 
walks (and our `transformExpressionsUp` passes) see and rewrite it directly. 
Mirrors `BatchScanExec` and matches the V1 design from #4112.
   - Added `CometLeafExec.ensureSubqueriesResolved()`, bridging Comet's custom 
`findAllPlanData` data-collection path with Spark's standard `prepare` -> 
`waitForSubqueries` flow. Removes the deadlock-prone reflection hack from #4033 
and eliminates ad-hoc double-checked locking.
   - Refactored `CometNativeScanExec` to use the same flow (dropped its 
redundant `doPrepare` override and outer DPP filter loop) so V1 and V2 stay in 
sync.
   - New Iceberg branch in `CometPlanAdaptiveDynamicPruningFilters` (3.5+) that 
converts the SAB inside `runtimeFilters` to `CometSubqueryBroadcastExec` (or 
`SubqueryBroadcastExec` if the join fell back to vanilla Spark). Matches by 
`buildKeys` exprIds to disambiguate multiple broadcast joins, and rewrites to 
`Literal.TrueLiteral` when no matching broadcast join exists (e.g., SMJ) so DPP 
is disabled but results stay correct. On 3.4 Iceberg falls back without reuse: 
`CometSpark34AqeDppFallbackRule` walks scan `partitionFilters`, which 
`BatchScanExec` doesn't have.
   - `LazyIcebergMetric` defers metric value resolution. 
`SparkPlanInfo.fromSparkPlan` reads the metrics map for SQL UI events at 
planning time, before AQE's queryStageOptimizerRules run; without deferral that 
read would trigger `serializedPartitionData` against an unconverted SAB.
   - `serializedPartitionData` rebuilds `originalPlan` with the current 
top-level `runtimeFilters` before serializing. Otherwise Spark's 
`PlanAdaptiveDynamicPruningFilters` rewrite is invisible to the `@transient 
originalPlan` and `serializePartitions` re-translates the unresolved 
`InSubqueryExec`.
   - Handle the `ParallelCollectionRDD` shape that `BatchScanExec.inputRDD` 
returns when DPP prunes all partitions (matched by class name since it is 
`private[spark]`).
   
   ## How are these changes tested?
   
   - 14 new AQE DPP tests in `CometIcebergNativeSuite` covering broadcast 
reuse, multiple DPP filters sharing a broadcast, buildKeys-based disambiguation 
across joins, BHJ fallback to vanilla Spark, SMJ (no broadcast) graceful 
disable, empty broadcast pruning all partitions, cross-stage scalar subqueries, 
and the V2 SPJ shape variations across Spark versions.
   - Verified on Spark 3.4, 3.5, 4.0, 4.1 for `CometIcebergNativeSuite`, 
`CometExecSuite`, `CometDppFallbackRepro3949Suite`, and 
`CometShuffleFallbackStickinessSuite`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to