mbutrovich opened a new issue, #4022:
URL: https://github.com/apache/datafusion-comet/issues/4022

   ### Describe the bug
   
   When `CometIcebergNativeScanExec` resolves AQE DPP via 
`SubqueryAdaptiveBroadcastExec`, the dim table is read and executed twice:
   
   1. Once by the join's `CometBroadcastExchangeExec` (Arrow format, via 
`BroadcastQueryStage`)
   2. Once by `sab.child.executeCollect()` in `serializedPartitionData` (row 
format, for DPP partition key extraction)
   
   Spark's `PlanAdaptiveDynamicPruningFilters` rule is designed to optimize 
this by creating a `SubqueryBroadcastExec` that reuses the join's broadcast 
exchange. However, this rule searches for `BroadcastHashJoinExec` in the plan 
(lines 56-59) and doesn't recognize Comet operators like 
`CometBroadcastHashJoin`. Since the rule can't match, it never creates the 
reuse. Iceberg's code bypasses the rule entirely by calling 
`sab.child.executeCollect()` directly — which works correctly but doesn't reuse 
the broadcast.
   
   Instrumentation confirms:
   - `sab.child` is an independent `AdaptiveSparkPlanExec` (not the join's 
`BroadcastQueryStage`)
   - On second trigger: `isFinalPlan: true`, `executedPlan: 
CometNativeColumnarToRowExec` — it ran its own execution
   - The final plan shows only 1 `CometBroadcastExchangeExec` and 0 
`ReusedExchangeExec` — no reuse
   
   This is the AQE equivalent of the V1 non-AQE broadcast reuse issue fixed in 
#4011 with `CometSubqueryBroadcastExec`. That fix only applies to non-AQE DPP 
(`SubqueryBroadcastExec`); this issue is for AQE DPP 
(`SubqueryAdaptiveBroadcastExec`).
   
   ### Steps to reproduce
   
   Run the existing `CometIcebergNativeSuite` test "runtime filtering - join 
with dynamic partition pruning" with instrumentation in 
`CometIcebergNativeScanExec.serializedPartitionData` to observe the double 
execution. The test creates a partitioned Iceberg fact table joined with a 
Parquet dim table using a `BROADCAST` hint.
   
   ### Expected behavior
   
   The DPP subquery should reuse the broadcast from the join's 
`CometBroadcastExchangeExec` instead of executing the dim table scan 
independently. Possible approaches:
   
   1. A `CometSubqueryAdaptiveBroadcastExec` (analogous to 
`CometSubqueryBroadcastExec` from #4011) that wraps the join's 
`CometBroadcastExchangeExec` and decodes Arrow broadcast data for DPP key 
extraction
   2. Or a new Comet rule at the AQE stage that handles broadcast reuse for 
Comet operators, since `PlanAdaptiveDynamicPruningFilters` can't recognize 
Comet operators and we can't modify Spark rules. This is related to the broader 
AQE rule redesign discussed in #3510.
   
   ### Additional context
   
   - This affects both Iceberg V2 scans and would affect V1 Parquet scans if 
AQE DPP support is added (#3510)
   - The performance impact depends on dim table size — for small dim tables 
(typical DPP case) the double read is negligible, but for larger dim tables it 
could matter
   - Under AQE, `ReuseExchangeAndSubquery` doesn't apply the same way — AQE 
uses `QueryStageExec` for stage reuse. A fix would need to work within AQE's 
stage materialization model rather than the static `ReuseExchangeAndSubquery` 
rule
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to