gene-bordegaray opened a new issue, #23191:
URL: https://github.com/apache/datafusion/issues/23191
Related:
- #22395
- #21992
- #23184
## Is your feature request related to a problem or challenge?
Grouped aggregation can require input partitioning by the group-by
expressions so that all rows for
the same group are processed in the same partition. Today, aggregate modes
that need partitioned
input require `HashPartitioned` group-by expressions.
Range partitioning can also provide the needed grouping property. If the
input is range partitioned
by compatible group-by expressions, so equal group keys are colocated in the
same partition. In
those cases, requiring hash partitioning can introduce an unnecessary
repartition even when the
input is already partitioned correctly.
## Describe the solution you'd like
Allow compatible `Partitioning::Range` inputs to satisfy grouped aggregation
distribution
requirements where the aggregate only needs key colocation, not hash
partitioning specifically.
- Reviewing aggregate modes that currently require `HashPartitioned` input,
such as
`FinalPartitioned` and `SinglePartitioned`.
- Determining whether those modes can request key-based partitioning rather
than hash-specific
partitioning.
- Ensuring range partitioning is accepted only when the range ordering
expressions satisfy the
group-by expressions.
- Preserving conservative fallback to hash repartitioning when the range
partitioning is
incompatible.
**Examples that should satisfy the aggregation:**
```text
AggregateExec: mode=FinalPartitioned, groupBy=[a]
input: Partitioning::Range(ordering=[a ASC], split_points=[...])
```
Rows with the same `a` value are colocated in one range partition, so the
final partitioned
aggregate should not need an additional hash repartition.
```text
AggregateExec: mode=FinalPartitioned, groupBy=[a, b]
input: Partitioning::Range(ordering=[a ASC], split_points=[...])
```
Rows with the same `(a, b)` group key are colocated because they also have
the same `a` value. Range
partitioning by a subset of the group-by keys can be sufficient for key
colocation when that subset
cannot split equal groups across partitions.
**Examples that should not satisfy the aggregation:**
```text
AggregateExec: mode=FinalPartitioned, groupBy=[b]
input: Partitioning::Range(ordering=[a ASC], split_points=[...])
```
The input is partitioned by `a`, but the aggregate groups by `b`, so equal
`b` values may appear in
multiple partitions.
```text
AggregateExec: mode=FinalPartitioned, groupBy=[a]
input: Partitioning::Range(ordering=[a ASC, b ASC], split_points=[...])
```
The input range partitioning can split rows with the same `a` value across
different `b` ranges, so
rows for one `a` group may appear in multiple partitions.
```text
AggregateExec: mode=FinalPartitioned, groupBy=[a]
input: Partitioning::Range(ordering=[a DESC], split_points=[...])
```
This should only satisfy the aggregate if the range compatibility logic
treats the ordering and
split point semantics as compatible for key colocation. If the ordering
options cannot be proven
compatible, DataFusion should repartition.
## Describe alternatives you've considered
Another alternative is to wait until physical range repartition execution is
implemented, but this
issue is about consuming already range-partitioned inputs and does not
require inserting a new range
repartition.
## Additional context
This is similar in spirit to #23184 for joins, but aggregation is a
single-input operator. The goal
is to let aggregations express the distribution property they actually need.
Relevant APIs and code paths to review:
- `AggregateExec::required_input_distribution`, which currently requests
`HashPartitioned` for
partitioned aggregate modes.
- `AggregateMode::{FinalPartitioned, SinglePartitioned}` and any other modes
that require grouped
input partitioning.
- `Distribution::{HashPartitioned, KeyPartitioned}` for expressing whether
the operator needs hash
partitioning specifically or only key colocation.
- `Partitioning::satisfaction` and `Partitioning::Range` compatibility logic.
- `EnforceDistribution`, which decides whether to insert `RepartitionExec`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]