mbutrovich opened a new pull request, #4011:
URL: https://github.com/apache/datafusion-comet/pull/4011

   ## Which issue does this PR close?
   
   Partially addresses #3510. Related to #121.
   
   ## Rationale for this change
   
   `CometNativeScanExec` (the native DataFusion-based Parquet V1 scan) 
currently falls back to Spark for queries that use Dynamic Partition Pruning 
(DPP), even though the non-AQE DPP path already works through the existing lazy 
partition serialization infrastructure introduced in #3511 (which was a subset 
of #3349).
   
   ### Background: Spark's two DPP paths
   
   Spark has two physical rules that create DPP filters, and they produce 
different subquery plan types:
   
   | Rule | When it runs | Subquery type produced | Comet compatible? |
   |------|-------------|----------------------|-------------------|
   | `PlanDynamicPruningFilters` | During physical planning, *before* Comet 
columnar rules | `InSubqueryExec(SubqueryBroadcastExec)` or 
`InSubqueryExec(SubqueryExec)` | Yes (this PR) |
   | `PlanAdaptiveDynamicPruningFilters` | Inside AQE during stage 
re-optimization, *after* Comet columnar rules | 
`InSubqueryExec(SubqueryAdaptiveBroadcastExec)` | No |
   
   The non-AQE rule (`PlanDynamicPruningFilters`) runs before Comet's rules, so 
by the time `CometScanRule` sees the plan, the DPP filters are already 
materialized as standard `InSubqueryExec` nodes with `SubqueryBroadcastExec` or 
`SubqueryExec` plans. These are resolved by Spark's normal subquery execution 
machinery.
   
   The AQE rule (`PlanAdaptiveDynamicPruningFilters`) runs after Comet's rules 
have already replaced operators like `BroadcastHashJoinExec` with 
`CometBroadcastHashJoin`. The AQE rule searches for `BroadcastHashJoinExec` to 
determine broadcast reuse — it doesn't recognize Comet operators, so it can't 
create the DPP filters correctly. This is the fundamental incompatibility 
discussed in #3446 and #3510.
   
   ### What this PR enables
   
   - Non-AQE DPP (AQE disabled, or AQE enabled but `PlanDynamicPruningFilters` 
handles DPP before AQE): partition filters containing `SubqueryBroadcastExec` 
or `SubqueryExec` are resolved at execution time and used to prune file 
partitions in `CometNativeScanExec`.
   
   ### What this PR does not enable
   
   - AQE DPP (`SubqueryAdaptiveBroadcastExec`): still falls back to Spark, 
gated by the existing `spark.comet.dppFallback.enabled` config. Supporting this 
would require either modifying Spark's AQE rules to recognize Comet operators, 
or bypassing the rule entirely (as `CometIcebergNativeScanExec` does via 
reflection). This is tracked as a separate item in #3510.
   
   The prior DPP fallback gate did not distinguish between these cases, 
rejecting both. This PR narrows the gate to only reject AQE DPP.
   
   ## What changes are included in this PR?
   
   ### Detection: distinguish AQE from non-AQE DPP
   
   Added `isAqeDynamicPruningFilter` helper that checks for 
`SubqueryAdaptiveBroadcastExec` inside `InSubqueryExec` plan. The existing 
`isDynamicPruningFilter` (checks for any `PlanExpression`) remains for other 
uses.
   
   ### Gate changes (CometScanRule + CometNativeScan)
   
   Changed both DPP rejection gates from `isDynamicPruningFilter` (rejects all 
DPP) to `isAqeDynamicPruningFilter` (rejects only AQE DPP).
   
   ### Execution: explicit DPP resolution in `serializedPartitionData`
   
   `serializedPartitionData` can be triggered from `findAllPlanData` (via 
`commonData`) on a `BroadcastExchangeExec` thread — outside the normal 
`prepare()` -> `executeSubqueries()` flow. We call `e.updateResult()` 
explicitly to resolve DPP subqueries before accessing file partitions. This 
mirrors the pattern in `CometIcebergNativeScanExec`.
   
   ### Planning-time fix: avoid triggering DPP during serde
   
   `CometNativeScan.convert()` previously called `scan.getFilePartitions()` to 
extract object store options, which triggered `dynamicallySelectedPartitions` 
-> DPP eval at planning time (before subqueries are resolved). Changed to use 
`scan.selectedPartitions` (static partition listing, excludes DPP filters) 
since object store options don't depend on DPP filtering.
   
   ### Config doc update
   
   Updated `spark.comet.dppFallback.enabled` documentation to clarify it now 
only affects AQE DPP.
   
   ## How are these changes tested?
   
   - **New tests**: `"non-AQE DPP with BHJ works with CometNativeScanExec"` and 
`"non-AQE DPP with SMJ works with CometNativeScanExec"` in `CometExecSuite`. 
These disable AQE, create partitioned tables, run DPP queries, and assert:
     - Correct results (`checkSparkAnswerAndOperator`)
     - `CometNativeScanExec` is in the plan (not fallen back)
     - `DynamicPruningExpression` is present in the scan's partition filters 
(BHJ test)
     - No fallback message in explain info
   - **Existing tests pass unchanged**: `"DPP fallback"`, `"DPP fallback avoids 
inefficient Comet shuffle"`, `CometDppFallbackRepro3949Suite`, 
`CometShuffleFallbackStickinessSuite` — these use AQE (default on) so they 
exercise the AQE DPP path which still falls back.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to