msirek opened a new issue, #8101:
URL: https://github.com/apache/arrow-datafusion/issues/8101

   ### Is your feature request related to a problem or challenge?
   
   #8038 adds support for pushing a LIMIT into a DISTINCT and/or GROUP BY 
expression with no aggregate expressions.
   For example, notice **lim=[5]** in this EXPLAIN, propagated from the LIMIT 
clause:
   ```sql
   EXPLAIN SELECT DISTINCT c3 FROM aggregate_test_100 GROUP BY c3 LIMIT 5;
   ----
   logical_plan
   Limit: skip=0, fetch=5
   --Aggregate: groupBy=[[aggregate_test_100.c3]], aggr=[[]]
   ----Aggregate: groupBy=[[aggregate_test_100.c3]], aggr=[[]]
   ------TableScan: aggregate_test_100 projection=[c3]
   physical_plan
   GlobalLimitExec: skip=0, fetch=5
   --AggregateExec: mode=Final, gby=[c3@0 as c3], aggr=[], lim=[5]
   ----CoalescePartitionsExec
   ------AggregateExec: mode=Partial, gby=[c3@0 as c3], aggr=[], lim=[5]
   --------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
   ----------AggregateExec: mode=Final, gby=[c3@0 as c3], aggr=[], lim=[5]
   ------------CoalescePartitionsExec
   --------------AggregateExec: mode=Partial, gby=[c3@0 as c3], aggr=[], lim=[5]
   ----------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
   ------------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c3], 
has_header=true
   ```
   If there are multiple levels of aggregation with equivalent, but not 
identical GROUP BY expressions, the limit cannot currently be pushed into all 
aggregations:
   ```sql
   EXPLAIN SELECT DISTINCT c3, c2 FROM aggregate_test_100 group by c2, c3 limit 
3 offset 10;
   logical_plan
   Limit: skip=10, fetch=3
   --Aggregate: groupBy=[[aggregate_test_100.c3, aggregate_test_100.c2]], 
aggr=[[]]
   ----Projection: aggregate_test_100.c3, aggregate_test_100.c2
   ------Aggregate: groupBy=[[aggregate_test_100.c2, aggregate_test_100.c3]], 
aggr=[[]]
   --------TableScan: aggregate_test_100 projection=[c2, c3]
   physical_plan
   GlobalLimitExec: skip=10, fetch=3
   --AggregateExec: mode=Final, gby=[c3@0 as c3, c2@1 as c2], aggr=[], lim=[13]
   ----CoalescePartitionsExec
   ------AggregateExec: mode=Partial, gby=[c3@0 as c3, c2@1 as c2], aggr=[], 
lim=[13]
   --------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
   ----------ProjectionExec: expr=[c3@1 as c3, c2@0 as c2]
   ------------AggregateExec: mode=Final, gby=[c2@0 as c2, c3@1 as c3], aggr=[] 
-- Limit could be pushed here, but isn't
   --------------CoalescePartitionsExec
   ----------------AggregateExec: mode=Partial, gby=[c2@0 as c2, c3@1 as c3], 
aggr=[]  -- Limit could be pushed here, but isn't
   ------------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
   --------------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, 
c3], has_header=true
   ```
   
   ### Describe the solution you'd like
   
   This issue is opened to extend the `LimitedDistinctAggregation` to push the 
limit as deep as possible into nested `AggregateExec` operations having 
equivalent, but non-identical `PhysicalGroupBy` expressions.
   
   This may also include GROUPING SETS, CUBE and ROLLUP expressions, which 
should be researched to find out if this rewrite is applicable.  For example:
   ```sql
   EXPLAIN SELECT c2, c3 FROM aggregate_test_100 group by rollup(c2, c3) limit 
3;
   ----
   logical_plan
   Limit: skip=0, fetch=3
   --Aggregate: groupBy=[[ROLLUP (aggregate_test_100.c2, 
aggregate_test_100.c3)]], aggr=[[]]
   ----TableScan: aggregate_test_100 projection=[c2, c3]
   physical_plan
   GlobalLimitExec: skip=0, fetch=3
   --AggregateExec: mode=Final, gby=[c2@0 as c2, c3@1 as c3], aggr=[], lim=[3]
   ----CoalescePartitionsExec
   ------AggregateExec: mode=Partial, gby=[(NULL as c2, NULL as c3), (c2@0 as 
c2, NULL as c3), (c2@0 as c2, c3@1 as c3)], aggr=[], lim=[3]  --- Is is legal 
to push the limit down to this level?
   --------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
   ----------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, 
c3], has_header=true
   ```
   
   ### Describe alternatives you've considered
   
   None
   
   ### Additional context
   
   There are [equivalence 
classes](https://github.com/apache/arrow-datafusion/blob/15d8c9bf48a56ae9de34d18becab13fd1942dc4a/datafusion/physical-expr/src/equivalence.rs)
 which currently define a "set of [`Arc<dyn PhysicalExpr>`]s that are known to 
have the same value for all tuples in a relation", and there are related 
utility functions such as `physical_exprs_bag_equal` which seem possibly useful 
here:
   
   
https://github.com/apache/arrow-datafusion/blob/c2e768052c43e4bab6705ee76befc19de383c2cb/datafusion/physical-expr/src/physical_expr.rs#L247-L252
   
   A `PhysicalGroupBy` is similar, but has more than just a slice of 
`PhysicalExpr`s. It also has corresponding null expressions and a null mask for 
each group in a grouping set:
   
   
https://github.com/apache/arrow-datafusion/blob/6071ee436051c9de0858835dd6ea1d763d4a1c12/datafusion/physical-plan/src/aggregates/mod.rs#L153-L163
   
   So, the equivalence class interfaces maybe cannot be used directly out of 
the box, and have a different meaning of equivalence, in that they're based on 
equality predicates in the query. However, their implementation could inform a 
similar technique for a new equivalence checking method for `PhysicalGroupBy`.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to