wirybeaver opened a new pull request, #1722: URL: https://github.com/apache/datafusion-ballista/pull/1722
## Which issue does this PR close? Part of #1359 (AQE epic) — roadmap line: *"switch from streaming aggregation to hash aggregation (extended rules)"* ## What does this PR do? Adds `DynamicAggregateAlgorithmRule`, an AQE physical optimizer rule that re-derives `InputOrderMode` for each `AggregateExec` after a shuffle stage resolves and rewrites the operator when the derived mode differs from the cached one. ### Problem DataFusion freezes `InputOrderMode` (`Linear` / `Sorted` / `PartiallySorted`) at plan time. In Ballista's AQE this creates two issues: 1. **Wasted memory**: a downstream aggregate stays `Linear` (hash table, O(distinct-groups) memory) even when an upstream stage completes and grants ordering on the group-by columns — where `Sorted` (streaming, O(1) memory) would suffice. 2. **Stale correctness assumption**: after a subtree rewrite, an aggregate may hold a `Sorted` claim when its input is no longer ordered. ### Approach `AggregateExec::with_new_children` already calls `try_new_with_schema` which re-derives `input_order_mode` from the current input `EquivalenceProperties`. The rule: 1. Walks the plan with `transform_up`. 2. At each `AggregateExec`, skips if no resolved `ExchangeExec` exists in the subtree (idempotence guard). 3. Calls `with_new_children([same_input])` to force re-derivation. 4. Returns `Transformed::yes(rebuilt)` only when the derived mode differs from the cached one. No upstream DataFusion changes are required. ### Configuration ```sql SET ballista.aqe.dynamic_aggregate.enabled = true; ``` Disabled by default (`false`) pending benchmarking. ## Checklist - [x] New rule registered in `default_optimizers()` before `DistributedExchangeRule` - [x] Config key `ballista.aqe.dynamic_aggregate.enabled` added to `BallistaConfig` - [x] 9 unit tests: gate off, no-exchange skip, unresolved-exchange skip, sorted/unsorted input, schema preservation, idempotence - [x] All 42 AQE optimizer rule tests pass -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
