gene-bordegaray opened a new issue, #23191:
URL: https://github.com/apache/datafusion/issues/23191

   
   Related:
   
   - #22395
   - #21992
   - #23184
   
   ## Is your feature request related to a problem or challenge?
   
   Grouped aggregation can require input partitioning by the group-by 
expressions so that all rows for
   the same group are processed in the same partition. Today, aggregate modes 
that need partitioned
   input require `HashPartitioned` group-by expressions.
   
   Range partitioning can also provide the needed grouping property. If the 
input is range partitioned
   by compatible group-by expressions, so equal group keys are colocated in the 
same partition. In
   those cases, requiring hash partitioning can introduce an unnecessary 
repartition even when the
   input is already partitioned correctly.
   
   ## Describe the solution you'd like
   
   Allow compatible `Partitioning::Range` inputs to satisfy grouped aggregation 
distribution
   requirements where the aggregate only needs key colocation, not hash 
partitioning specifically.
   
   - Reviewing aggregate modes that currently require `HashPartitioned` input, 
such as
     `FinalPartitioned` and `SinglePartitioned`.
   - Determining whether those modes can request key-based partitioning rather 
than hash-specific
     partitioning.
   - Ensuring range partitioning is accepted only when the range ordering 
expressions satisfy the
     group-by expressions.
   - Preserving conservative fallback to hash repartitioning when the range 
partitioning is
     incompatible.
   
   **Examples that should satisfy the aggregation:**
   
   ```text
   AggregateExec: mode=FinalPartitioned, groupBy=[a]
     input: Partitioning::Range(ordering=[a ASC], split_points=[...])
   ```
   
   Rows with the same `a` value are colocated in one range partition, so the 
final partitioned
   aggregate should not need an additional hash repartition.
   
   ```text
   AggregateExec: mode=FinalPartitioned, groupBy=[a, b]
     input: Partitioning::Range(ordering=[a ASC], split_points=[...])
   ```
   
   Rows with the same `(a, b)` group key are colocated because they also have 
the same `a` value. Range
   partitioning by a subset of the group-by keys can be sufficient for key 
colocation when that subset
   cannot split equal groups across partitions.
   
   **Examples that should not satisfy the aggregation:**
   
   ```text
   AggregateExec: mode=FinalPartitioned, groupBy=[b]
     input: Partitioning::Range(ordering=[a ASC], split_points=[...])
   ```
   
   The input is partitioned by `a`, but the aggregate groups by `b`, so equal 
`b` values may appear in
   multiple partitions.
   
   ```text
   AggregateExec: mode=FinalPartitioned, groupBy=[a]
     input: Partitioning::Range(ordering=[a ASC, b ASC], split_points=[...])
   ```
   
   The input range partitioning can split rows with the same `a` value across 
different `b` ranges, so
   rows for one `a` group may appear in multiple partitions.
   
   ```text
   AggregateExec: mode=FinalPartitioned, groupBy=[a]
     input: Partitioning::Range(ordering=[a DESC], split_points=[...])
   ```
   
   This should only satisfy the aggregate if the range compatibility logic 
treats the ordering and
   split point semantics as compatible for key colocation. If the ordering 
options cannot be proven
   compatible, DataFusion should repartition.
   
   ## Describe alternatives you've considered
   
   Another alternative is to wait until physical range repartition execution is 
implemented, but this
   issue is about consuming already range-partitioned inputs and does not 
require inserting a new range
   repartition.
   
   ## Additional context
   
   This is similar in spirit to #23184 for joins, but aggregation is a 
single-input operator. The goal
   is to let aggregations express the distribution property they actually need.
   
   Relevant APIs and code paths to review:
   
   - `AggregateExec::required_input_distribution`, which currently requests 
`HashPartitioned` for
     partitioned aggregate modes.
   - `AggregateMode::{FinalPartitioned, SinglePartitioned}` and any other modes 
that require grouped
     input partitioning.
   - `Distribution::{HashPartitioned, KeyPartitioned}` for expressing whether 
the operator needs hash
     partitioning specifically or only key colocation.
   - `Partitioning::satisfaction` and `Partitioning::Range` compatibility logic.
   - `EnforceDistribution`, which decides whether to insert `RepartitionExec`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to