mbutrovich opened a new issue, #4022: URL: https://github.com/apache/datafusion-comet/issues/4022
### Describe the bug When `CometIcebergNativeScanExec` resolves AQE DPP via `SubqueryAdaptiveBroadcastExec`, the dim table is read and executed twice: 1. Once by the join's `CometBroadcastExchangeExec` (Arrow format, via `BroadcastQueryStage`) 2. Once by `sab.child.executeCollect()` in `serializedPartitionData` (row format, for DPP partition key extraction) Spark's `PlanAdaptiveDynamicPruningFilters` rule is designed to optimize this by creating a `SubqueryBroadcastExec` that reuses the join's broadcast exchange. However, this rule searches for `BroadcastHashJoinExec` in the plan (lines 56-59) and doesn't recognize Comet operators like `CometBroadcastHashJoin`. Since the rule can't match, it never creates the reuse. Iceberg's code bypasses the rule entirely by calling `sab.child.executeCollect()` directly — which works correctly but doesn't reuse the broadcast. Instrumentation confirms: - `sab.child` is an independent `AdaptiveSparkPlanExec` (not the join's `BroadcastQueryStage`) - On second trigger: `isFinalPlan: true`, `executedPlan: CometNativeColumnarToRowExec` — it ran its own execution - The final plan shows only 1 `CometBroadcastExchangeExec` and 0 `ReusedExchangeExec` — no reuse This is the AQE equivalent of the V1 non-AQE broadcast reuse issue fixed in #4011 with `CometSubqueryBroadcastExec`. That fix only applies to non-AQE DPP (`SubqueryBroadcastExec`); this issue is for AQE DPP (`SubqueryAdaptiveBroadcastExec`). ### Steps to reproduce Run the existing `CometIcebergNativeSuite` test "runtime filtering - join with dynamic partition pruning" with instrumentation in `CometIcebergNativeScanExec.serializedPartitionData` to observe the double execution. The test creates a partitioned Iceberg fact table joined with a Parquet dim table using a `BROADCAST` hint. ### Expected behavior The DPP subquery should reuse the broadcast from the join's `CometBroadcastExchangeExec` instead of executing the dim table scan independently. Possible approaches: 1. A `CometSubqueryAdaptiveBroadcastExec` (analogous to `CometSubqueryBroadcastExec` from #4011) that wraps the join's `CometBroadcastExchangeExec` and decodes Arrow broadcast data for DPP key extraction 2. Or a new Comet rule at the AQE stage that handles broadcast reuse for Comet operators, since `PlanAdaptiveDynamicPruningFilters` can't recognize Comet operators and we can't modify Spark rules. This is related to the broader AQE rule redesign discussed in #3510. ### Additional context - This affects both Iceberg V2 scans and would affect V1 Parquet scans if AQE DPP support is added (#3510) - The performance impact depends on dim table size — for small dim tables (typical DPP case) the double read is negligible, but for larger dim tables it could matter - Under AQE, `ReuseExchangeAndSubquery` doesn't apply the same way — AQE uses `QueryStageExec` for stage reuse. A fix would need to work within AQE's stage materialization model rather than the static `ReuseExchangeAndSubquery` rule -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
