msirek commented on code in PR #8038:
URL: https://github.com/apache/arrow-datafusion/pull/8038#discussion_r1387165575


##########
datafusion/sqllogictest/test_files/aggregate.slt:
##########
@@ -2504,6 +2504,204 @@ NULL 0 0
 b 0 0
 c 1 1
 
+#
+# Push limit into distinct group-by aggregation tests
+#
+
+# Make results deterministic
+statement ok
+set datafusion.optimizer.repartition_aggregations = false;
+
+#
+query TT
+EXPLAIN SELECT DISTINCT c3 FROM aggregate_test_100 group by c3 limit 5;
+----
+logical_plan
+Limit: skip=0, fetch=5
+--Aggregate: groupBy=[[aggregate_test_100.c3]], aggr=[[]]
+----Aggregate: groupBy=[[aggregate_test_100.c3]], aggr=[[]]
+------TableScan: aggregate_test_100 projection=[c3]
+physical_plan
+GlobalLimitExec: skip=0, fetch=5
+--AggregateExec: mode=Final, gby=[c3@0 as c3], aggr=[], lim=[5]
+----CoalescePartitionsExec
+------AggregateExec: mode=Partial, gby=[c3@0 as c3], aggr=[], lim=[5]
+--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+----------AggregateExec: mode=Final, gby=[c3@0 as c3], aggr=[], lim=[5]
+------------CoalescePartitionsExec
+--------------AggregateExec: mode=Partial, gby=[c3@0 as c3], aggr=[], lim=[5]
+----------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
+------------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c3], 
has_header=true
+
+query I
+SELECT DISTINCT c3 FROM aggregate_test_100 group by c3 limit 5;
+----
+1
+-40
+29
+-85
+-82
+
+query TT
+EXPLAIN SELECT c2, c3 FROM aggregate_test_100 group by c2, c3 limit 5 offset 4;
+----
+logical_plan
+Limit: skip=4, fetch=5
+--Aggregate: groupBy=[[aggregate_test_100.c2, aggregate_test_100.c3]], 
aggr=[[]]
+----TableScan: aggregate_test_100 projection=[c2, c3]
+physical_plan
+GlobalLimitExec: skip=4, fetch=5
+--AggregateExec: mode=Final, gby=[c2@0 as c2, c3@1 as c3], aggr=[], lim=[9]
+----CoalescePartitionsExec
+------AggregateExec: mode=Partial, gby=[c2@0 as c2, c3@1 as c3], aggr=[], 
lim=[9]
+--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+----------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, 
c3], has_header=true
+
+query II
+SELECT c2, c3 FROM aggregate_test_100 group by c2, c3 limit 5 offset 4;
+----
+5 -82
+4 -111
+3 104
+3 13
+1 38
+
+# The limit should only apply to the aggregations which group by c3
+query TT
+EXPLAIN SELECT DISTINCT c3 FROM aggregate_test_100 WHERE c3 between 10 and 20 
group by c2, c3 limit 4;
+----
+logical_plan
+Limit: skip=0, fetch=4
+--Aggregate: groupBy=[[aggregate_test_100.c3]], aggr=[[]]
+----Projection: aggregate_test_100.c3
+------Aggregate: groupBy=[[aggregate_test_100.c2, aggregate_test_100.c3]], 
aggr=[[]]
+--------Filter: aggregate_test_100.c3 >= Int16(10) AND aggregate_test_100.c3 
<= Int16(20)
+----------TableScan: aggregate_test_100 projection=[c2, c3], 
partial_filters=[aggregate_test_100.c3 >= Int16(10), aggregate_test_100.c3 <= 
Int16(20)]
+physical_plan
+GlobalLimitExec: skip=0, fetch=4
+--AggregateExec: mode=Final, gby=[c3@0 as c3], aggr=[], lim=[4]
+----CoalescePartitionsExec
+------AggregateExec: mode=Partial, gby=[c3@0 as c3], aggr=[], lim=[4]
+--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+----------ProjectionExec: expr=[c3@1 as c3]
+------------AggregateExec: mode=Final, gby=[c2@0 as c2, c3@1 as c3], aggr=[]
+--------------CoalescePartitionsExec
+----------------AggregateExec: mode=Partial, gby=[c2@0 as c2, c3@1 as c3], 
aggr=[]
+------------------CoalesceBatchesExec: target_batch_size=8192
+--------------------FilterExec: c3@1 >= 10 AND c3@1 <= 20
+----------------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
+------------------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, 
c3], has_header=true
+
+query I
+SELECT DISTINCT c3 FROM aggregate_test_100 WHERE c3 between 10 and 20 group by 
c2, c3 limit 4;
+----
+13
+17
+12
+14
+
+# An aggregate expression causes the limit to not be pushed to the aggregation
+query TT
+EXPLAIN SELECT max(c1), c2, c3 FROM aggregate_test_100 group by c2, c3 limit 5;
+----
+logical_plan
+Projection: MAX(aggregate_test_100.c1), aggregate_test_100.c2, 
aggregate_test_100.c3
+--Limit: skip=0, fetch=5
+----Aggregate: groupBy=[[aggregate_test_100.c2, aggregate_test_100.c3]], 
aggr=[[MAX(aggregate_test_100.c1)]]
+------TableScan: aggregate_test_100 projection=[c1, c2, c3]
+physical_plan
+ProjectionExec: expr=[MAX(aggregate_test_100.c1)@2 as 
MAX(aggregate_test_100.c1), c2@0 as c2, c3@1 as c3]
+--GlobalLimitExec: skip=0, fetch=5
+----AggregateExec: mode=Final, gby=[c2@0 as c2, c3@1 as c3], 
aggr=[MAX(aggregate_test_100.c1)]
+------CoalescePartitionsExec
+--------AggregateExec: mode=Partial, gby=[c2@1 as c2, c3@2 as c3], 
aggr=[MAX(aggregate_test_100.c1)]
+----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, 
c2, c3], has_header=true
+
+# TODO(msirek): Extend checking in LimitedDistinctAggregation equal groupings 
to ignore the order of columns

Review Comment:
   > BTW the `equivalence` module (recently worked on from @ozankabak and 
@mustafasrepo ) 
https://github.com/apache/arrow-datafusion/blob/15d8c9bf48a56ae9de34d18becab13fd1942dc4a/datafusion/physical-expr/src/equivalence.rs
 has logic to perform this type of analysis
   
   Thanks, I will take a look.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to