msirek opened a new issue, #8101:
URL: https://github.com/apache/arrow-datafusion/issues/8101
### Is your feature request related to a problem or challenge?
#8038 adds support for pushing a LIMIT into a DISTINCT and/or GROUP BY
expression with no aggregate expressions.
For example, notice **lim=[5]** in this EXPLAIN, propagated from the LIMIT
clause:
```sql
EXPLAIN SELECT DISTINCT c3 FROM aggregate_test_100 GROUP BY c3 LIMIT 5;
----
logical_plan
Limit: skip=0, fetch=5
--Aggregate: groupBy=[[aggregate_test_100.c3]], aggr=[[]]
----Aggregate: groupBy=[[aggregate_test_100.c3]], aggr=[[]]
------TableScan: aggregate_test_100 projection=[c3]
physical_plan
GlobalLimitExec: skip=0, fetch=5
--AggregateExec: mode=Final, gby=[c3@0 as c3], aggr=[], lim=[5]
----CoalescePartitionsExec
------AggregateExec: mode=Partial, gby=[c3@0 as c3], aggr=[], lim=[5]
--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
----------AggregateExec: mode=Final, gby=[c3@0 as c3], aggr=[], lim=[5]
------------CoalescePartitionsExec
--------------AggregateExec: mode=Partial, gby=[c3@0 as c3], aggr=[], lim=[5]
----------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
------------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c3],
has_header=true
```
If there are multiple levels of aggregation with equivalent, but not
identical GROUP BY expressions, the limit cannot currently be pushed into all
aggregations:
```sql
EXPLAIN SELECT DISTINCT c3, c2 FROM aggregate_test_100 group by c2, c3 limit
3 offset 10;
logical_plan
Limit: skip=10, fetch=3
--Aggregate: groupBy=[[aggregate_test_100.c3, aggregate_test_100.c2]],
aggr=[[]]
----Projection: aggregate_test_100.c3, aggregate_test_100.c2
------Aggregate: groupBy=[[aggregate_test_100.c2, aggregate_test_100.c3]],
aggr=[[]]
--------TableScan: aggregate_test_100 projection=[c2, c3]
physical_plan
GlobalLimitExec: skip=10, fetch=3
--AggregateExec: mode=Final, gby=[c3@0 as c3, c2@1 as c2], aggr=[], lim=[13]
----CoalescePartitionsExec
------AggregateExec: mode=Partial, gby=[c3@0 as c3, c2@1 as c2], aggr=[],
lim=[13]
--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
----------ProjectionExec: expr=[c3@1 as c3, c2@0 as c2]
------------AggregateExec: mode=Final, gby=[c2@0 as c2, c3@1 as c3], aggr=[]
-- Limit could be pushed here, but isn't
--------------CoalescePartitionsExec
----------------AggregateExec: mode=Partial, gby=[c2@0 as c2, c3@1 as c3],
aggr=[] -- Limit could be pushed here, but isn't
------------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
--------------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2,
c3], has_header=true
```
### Describe the solution you'd like
This issue is opened to extend the `LimitedDistinctAggregation` to push the
limit as deep as possible into nested `AggregateExec` operations having
equivalent, but non-identical `PhysicalGroupBy` expressions.
This may also include GROUPING SETS, CUBE and ROLLUP expressions, which
should be researched to find out if this rewrite is applicable. For example:
```sql
EXPLAIN SELECT c2, c3 FROM aggregate_test_100 group by rollup(c2, c3) limit
3;
----
logical_plan
Limit: skip=0, fetch=3
--Aggregate: groupBy=[[ROLLUP (aggregate_test_100.c2,
aggregate_test_100.c3)]], aggr=[[]]
----TableScan: aggregate_test_100 projection=[c2, c3]
physical_plan
GlobalLimitExec: skip=0, fetch=3
--AggregateExec: mode=Final, gby=[c2@0 as c2, c3@1 as c3], aggr=[], lim=[3]
----CoalescePartitionsExec
------AggregateExec: mode=Partial, gby=[(NULL as c2, NULL as c3), (c2@0 as
c2, NULL as c3), (c2@0 as c2, c3@1 as c3)], aggr=[], lim=[3] --- Is is legal
to push the limit down to this level?
--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
----------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2,
c3], has_header=true
```
### Describe alternatives you've considered
None
### Additional context
There are [equivalence
classes](https://github.com/apache/arrow-datafusion/blob/15d8c9bf48a56ae9de34d18becab13fd1942dc4a/datafusion/physical-expr/src/equivalence.rs)
which currently define a "set of [`Arc<dyn PhysicalExpr>`]s that are known to
have the same value for all tuples in a relation", and there are related
utility functions such as `physical_exprs_bag_equal` which seem possibly useful
here:
https://github.com/apache/arrow-datafusion/blob/c2e768052c43e4bab6705ee76befc19de383c2cb/datafusion/physical-expr/src/physical_expr.rs#L247-L252
A `PhysicalGroupBy` is similar, but has more than just a slice of
`PhysicalExpr`s. It also has corresponding null expressions and a null mask for
each group in a grouping set:
https://github.com/apache/arrow-datafusion/blob/6071ee436051c9de0858835dd6ea1d763d4a1c12/datafusion/physical-plan/src/aggregates/mod.rs#L153-L163
So, the equivalence class interfaces maybe cannot be used directly out of
the box, and have a different meaning of equivalence, in that they're based on
equality predicates in the query. However, their implementation could inform a
similar technique for a new equivalence checking method for `PhysicalGroupBy`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]