alamb commented on code in PR #12399:
URL: https://github.com/apache/datafusion/pull/12399#discussion_r1752696184


##########
datafusion/sqllogictest/test_files/limit.slt:
##########
@@ -565,3 +565,146 @@ physical_plan
 
 statement ok
 drop table data;
+
+
+####################
+# Test issue: limit pushdown with offsets

Review Comment:
   ```suggestion
   # Test issue: limit pushdown with offsets
   # Ensure the offset is not lost: 
https://github.com/apache/datafusion/issues/12423
   ```



##########
datafusion/sqllogictest/test_files/limit.slt:
##########
@@ -565,3 +565,146 @@ physical_plan
 
 statement ok
 drop table data;
+
+
+####################
+# Test issue: limit pushdown with offsets
+####################
+
+statement ok
+CREATE EXTERNAL TABLE ordered_table (
+  a0 INT,
+  a INT,
+  b INT,
+  c INT UNSIGNED,
+  d INT
+)
+STORED AS CSV
+WITH ORDER (c ASC)
+LOCATION '../core/tests/data/window_2.csv'
+OPTIONS ('format.has_header' 'true');
+
+# all results
+query II
+SELECT b, sum(a) FROM ordered_table GROUP BY b order by b desc;
+----
+3 25
+2 25
+1 0
+0 0
+
+# limit only
+query II
+SELECT b, sum(a) FROM ordered_table GROUP BY b order by b desc LIMIT 3;
+----
+3 25
+2 25
+1 0
+
+# offset only
+query II
+SELECT b, sum(a) FROM ordered_table GROUP BY b order by b desc OFFSET 1;
+----
+2 25
+1 0
+0 0
+
+# offset + limit
+query II
+SELECT b, sum(a) FROM ordered_table GROUP BY b order by b desc OFFSET 1 LIMIT 
2;
+----
+2 25
+1 0
+
+# Applying offset & limit when multiple streams from groupby
+query TT
+EXPLAIN SELECT b, sum(a) FROM ordered_table GROUP BY b order by b desc OFFSET 
1 LIMIT 2;
+----
+logical_plan
+01)Limit: skip=1, fetch=2
+02)--Sort: ordered_table.b DESC NULLS FIRST, fetch=3
+03)----Aggregate: groupBy=[[ordered_table.b]], aggr=[[sum(CAST(ordered_table.a 
AS Int64))]]
+04)------TableScan: ordered_table projection=[a, b]
+physical_plan
+01)GlobalLimitExec: skip=1, fetch=2
+02)--SortPreservingMergeExec: [b@0 DESC], fetch=3
+03)----SortExec: TopK(fetch=3), expr=[b@0 DESC], preserve_partitioning=[true]
+04)------AggregateExec: mode=FinalPartitioned, gby=[b@0 as b], 
aggr=[sum(ordered_table.a)]
+05)--------CoalesceBatchesExec: target_batch_size=8192
+06)----------RepartitionExec: partitioning=Hash([b@0], 4), input_partitions=4
+07)------------AggregateExec: mode=Partial, gby=[b@1 as b], 
aggr=[sum(ordered_table.a)]
+08)--------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
+09)----------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], 
has_header=true
+
+# Applying offset & limit when multiple streams from union

Review Comment:
   ```suggestion
   # Applying offset & limit when multiple streams from union
   # the plan must still have a global limit to apply the offset
   ```



##########
datafusion/physical-optimizer/src/limit_pushdown.rs:
##########
@@ -256,21 +265,24 @@ pub(crate) fn pushdown_limits(
     pushdown_plan: Arc<dyn ExecutionPlan>,
     global_state: GlobalRequirements,
 ) -> Result<Arc<dyn ExecutionPlan>> {
+    // Call pushdown_limit_helper.
+    // This will either extract the limit node (returning the child), or apply 
the limit pushdown.

Review Comment:
   this might be a good comment to add to the `pushdown_limit_helper` function 
as well



##########
datafusion/sqllogictest/test_files/limit.slt:
##########
@@ -565,3 +565,146 @@ physical_plan
 
 statement ok
 drop table data;
+
+
+####################
+# Test issue: limit pushdown with offsets
+####################
+
+statement ok
+CREATE EXTERNAL TABLE ordered_table (
+  a0 INT,
+  a INT,
+  b INT,
+  c INT UNSIGNED,
+  d INT
+)
+STORED AS CSV
+WITH ORDER (c ASC)
+LOCATION '../core/tests/data/window_2.csv'
+OPTIONS ('format.has_header' 'true');
+
+# all results
+query II
+SELECT b, sum(a) FROM ordered_table GROUP BY b order by b desc;
+----
+3 25
+2 25
+1 0
+0 0
+
+# limit only
+query II
+SELECT b, sum(a) FROM ordered_table GROUP BY b order by b desc LIMIT 3;
+----
+3 25
+2 25
+1 0
+
+# offset only
+query II
+SELECT b, sum(a) FROM ordered_table GROUP BY b order by b desc OFFSET 1;
+----
+2 25
+1 0
+0 0
+
+# offset + limit
+query II
+SELECT b, sum(a) FROM ordered_table GROUP BY b order by b desc OFFSET 1 LIMIT 
2;
+----
+2 25
+1 0
+
+# Applying offset & limit when multiple streams from groupby
+query TT
+EXPLAIN SELECT b, sum(a) FROM ordered_table GROUP BY b order by b desc OFFSET 
1 LIMIT 2;
+----
+logical_plan
+01)Limit: skip=1, fetch=2
+02)--Sort: ordered_table.b DESC NULLS FIRST, fetch=3
+03)----Aggregate: groupBy=[[ordered_table.b]], aggr=[[sum(CAST(ordered_table.a 
AS Int64))]]
+04)------TableScan: ordered_table projection=[a, b]
+physical_plan
+01)GlobalLimitExec: skip=1, fetch=2
+02)--SortPreservingMergeExec: [b@0 DESC], fetch=3
+03)----SortExec: TopK(fetch=3), expr=[b@0 DESC], preserve_partitioning=[true]
+04)------AggregateExec: mode=FinalPartitioned, gby=[b@0 as b], 
aggr=[sum(ordered_table.a)]
+05)--------CoalesceBatchesExec: target_batch_size=8192
+06)----------RepartitionExec: partitioning=Hash([b@0], 4), input_partitions=4
+07)------------AggregateExec: mode=Partial, gby=[b@1 as b], 
aggr=[sum(ordered_table.a)]
+08)--------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
+09)----------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], 
has_header=true
+
+# Applying offset & limit when multiple streams from union
+query TT
+explain select * FROM (
+  select c FROM ordered_table
+  UNION ALL
+  select d FROM ordered_table
+) order by 1 desc LIMIT 10 OFFSET 4;
+----
+logical_plan
+01)Limit: skip=4, fetch=10
+02)--Sort: ordered_table.c DESC NULLS FIRST, fetch=14
+03)----Union
+04)------Projection: CAST(ordered_table.c AS Int64) AS c
+05)--------TableScan: ordered_table projection=[c]
+06)------Projection: CAST(ordered_table.d AS Int64) AS c
+07)--------TableScan: ordered_table projection=[d]
+physical_plan
+01)GlobalLimitExec: skip=4, fetch=10
+02)--SortPreservingMergeExec: [c@0 DESC], fetch=14
+03)----UnionExec
+04)------SortExec: TopK(fetch=14), expr=[c@0 DESC], 
preserve_partitioning=[true]
+05)--------ProjectionExec: expr=[CAST(c@0 AS Int64) as c]
+06)----------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
+07)------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c], 
output_ordering=[c@0 ASC NULLS LAST], has_header=true
+08)------SortExec: TopK(fetch=14), expr=[c@0 DESC], 
preserve_partitioning=[true]
+09)--------ProjectionExec: expr=[CAST(d@0 AS Int64) as c]
+10)----------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
+11)------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[d], 
has_header=true
+
+# ApplyingmLIMIT & OFFSET to subquery.

Review Comment:
   ```suggestion
   # Applying LIMIT & OFFSET to subquery.
   ```



##########
datafusion/physical-optimizer/src/limit_pushdown.rs:
##########
@@ -199,10 +203,15 @@ pub fn pushdown_limit_helper(
             // This plan is combining input partitions, so we need to add the
             // fetch info to plan if possible. If not, we must add a 
`LimitExec`
             // with the information from the global state.
+            let mut new_plan = plan_with_fetch;
+            if global_state.skip > 0 {

Review Comment:
   is it worth a comment explaining *why* a skip means we can't apply this rule?



##########
datafusion/sqllogictest/test_files/limit.slt:
##########
@@ -565,3 +565,146 @@ physical_plan
 
 statement ok
 drop table data;
+
+
+####################
+# Test issue: limit pushdown with offsets
+####################
+
+statement ok
+CREATE EXTERNAL TABLE ordered_table (
+  a0 INT,
+  a INT,
+  b INT,
+  c INT UNSIGNED,
+  d INT
+)
+STORED AS CSV
+WITH ORDER (c ASC)
+LOCATION '../core/tests/data/window_2.csv'
+OPTIONS ('format.has_header' 'true');
+
+# all results
+query II
+SELECT b, sum(a) FROM ordered_table GROUP BY b order by b desc;
+----
+3 25
+2 25
+1 0
+0 0
+
+# limit only
+query II
+SELECT b, sum(a) FROM ordered_table GROUP BY b order by b desc LIMIT 3;
+----
+3 25
+2 25
+1 0
+
+# offset only
+query II
+SELECT b, sum(a) FROM ordered_table GROUP BY b order by b desc OFFSET 1;
+----
+2 25
+1 0
+0 0
+
+# offset + limit
+query II
+SELECT b, sum(a) FROM ordered_table GROUP BY b order by b desc OFFSET 1 LIMIT 
2;
+----
+2 25
+1 0
+
+# Applying offset & limit when multiple streams from groupby

Review Comment:
   For explain plans I think it helps to add a comment explaining what is being 
tested -- this helps future people understand if updates to the plans are 
acceptable
   
   ```suggestion
   # Applying offset & limit when multiple streams from groupby
   # the plan must still have a global limit to apply the offset
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to