wirybeaver opened a new pull request, #1722:
URL: https://github.com/apache/datafusion-ballista/pull/1722

   ## Which issue does this PR close?
   
   Part of #1359 (AQE epic) — roadmap line: *"switch from streaming aggregation 
to hash aggregation (extended rules)"*
   
   ## What does this PR do?
   
   Adds `DynamicAggregateAlgorithmRule`, an AQE physical optimizer rule that 
re-derives `InputOrderMode` for each `AggregateExec` after a shuffle stage 
resolves and rewrites the operator when the derived mode differs from the 
cached one.
   
   ### Problem
   
   DataFusion freezes `InputOrderMode` (`Linear` / `Sorted` / 
`PartiallySorted`) at plan time. In Ballista's AQE this creates two issues:
   
   1. **Wasted memory**: a downstream aggregate stays `Linear` (hash table, 
O(distinct-groups) memory) even when an upstream stage completes and grants 
ordering on the group-by columns — where `Sorted` (streaming, O(1) memory) 
would suffice.
   2. **Stale correctness assumption**: after a subtree rewrite, an aggregate 
may hold a `Sorted` claim when its input is no longer ordered.
   
   ### Approach
   
   `AggregateExec::with_new_children` already calls `try_new_with_schema` which 
re-derives `input_order_mode` from the current input `EquivalenceProperties`. 
The rule:
   
   1. Walks the plan with `transform_up`.
   2. At each `AggregateExec`, skips if no resolved `ExchangeExec` exists in 
the subtree (idempotence guard).
   3. Calls `with_new_children([same_input])` to force re-derivation.
   4. Returns `Transformed::yes(rebuilt)` only when the derived mode differs 
from the cached one.
   
   No upstream DataFusion changes are required.
   
   ### Configuration
   
   ```sql
   SET ballista.aqe.dynamic_aggregate.enabled = true;
   ```
   
   Disabled by default (`false`) pending benchmarking.
   
   ## Checklist
   
   - [x] New rule registered in `default_optimizers()` before 
`DistributedExchangeRule`
   - [x] Config key `ballista.aqe.dynamic_aggregate.enabled` added to 
`BallistaConfig`
   - [x] 9 unit tests: gate off, no-exchange skip, unresolved-exchange skip, 
sorted/unsorted input, schema preservation, idempotence
   - [x] All 42 AQE optimizer rule tests pass


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to