mustafasrepo commented on code in PR #5951:
URL: https://github.com/apache/arrow-datafusion/pull/5951#discussion_r1162386051
##########
datafusion/core/tests/sqllogictests/test_files/window.slt:
##########
@@ -2036,6 +2036,184 @@ SELECT
-1114 -1927628110
15673 -1899175111
+# test_window_agg_partition_by_set
+statement ok
+set datafusion.execution.target_partitions = 1;
+
+query TT
+EXPLAIN SELECT
+ c9,
+ SUM(c9) OVER(PARTITION BY c1, c2 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING
AND 5 FOLLOWING) as sum1,
+ SUM(c9) OVER(PARTITION BY c2, c1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING
AND 5 FOLLOWING) as sum2
+ FROM aggregate_test_100
+ ORDER BY c9
+ LIMIT 5
+----
+logical_plan
+Limit: skip=0, fetch=5
+ Sort: aggregate_test_100.c9 ASC NULLS LAST, fetch=5
+ Projection: aggregate_test_100.c9, SUM(aggregate_test_100.c9) PARTITION BY
[aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sum1,
SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c2,
aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sum2
+ WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY
[aggregate_test_100.c2, aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
+ WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY
[aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
+ TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6,
c7, c8, c9, c10, c11, c12, c13]
+physical_plan
+GlobalLimitExec: skip=0, fetch=5
+ SortExec: fetch=5, expr=[c9@0 ASC NULLS LAST]
+ ProjectionExec: expr=[c9@8 as c9, SUM(aggregate_test_100.c9) PARTITION BY
[aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@13 as sum1,
SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c2,
aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 5 FOLLOWING@14 as sum2]
+ BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name:
"SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
+ BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field {
name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id:
0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
+ SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST,c9@8 ASC
NULLS LAST]
+ CsvExec: files={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true,
limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+
+
+query III
+SELECT c9,
+ SUM(c9) OVER(PARTITION BY c1, c2 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING
AND 5 FOLLOWING) as sum1,
+ SUM(c9) OVER(PARTITION BY c2, c1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING
AND 5 FOLLOWING) as sum2
+ FROM aggregate_test_100
+ ORDER BY c9
+ LIMIT 5
+----
+28774375 9144476174 9144476174
+63044568 5125627947 5125627947
+141047417 3650978969 3650978969
+141680161 8526017165 8526017165
+145294611 6802765992 6802765992
+
+# test_window_agg_partition_by_set2
+
+query TT
+EXPLAIN SELECT
+ c9,
+ SUM(c9) OVER(PARTITION BY c1, c2 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING
AND UNBOUNDED FOLLOWING) as sum1,
+ SUM(c9) OVER(PARTITION BY c2, c1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING
AND UNBOUNDED FOLLOWING) as sum2
+ FROM aggregate_test_100
+ ORDER BY c9
+ LIMIT 5
+----
+logical_plan
+Limit: skip=0, fetch=5
+ Sort: aggregate_test_100.c9 ASC NULLS LAST, fetch=5
+ Projection: aggregate_test_100.c9, SUM(aggregate_test_100.c9) PARTITION BY
[aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING AS sum1,
SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c2,
aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING AS sum2
+ WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY
[aggregate_test_100.c2, aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING]]
+ WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY
[aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING]]
+ TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6,
c7, c8, c9, c10, c11, c12, c13]
+physical_plan
+GlobalLimitExec: skip=0, fetch=5
+ SortExec: fetch=5, expr=[c9@0 ASC NULLS LAST]
+ ProjectionExec: expr=[c9@8 as c9, SUM(aggregate_test_100.c9) PARTITION BY
[aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING@13 as sum1,
SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c2,
aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING@14 as sum2]
+ WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name:
"SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(NULL)) }]
+ WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name:
"SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(NULL)) }]
+ SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST,c9@8 ASC
NULLS LAST]
+ CsvExec: files={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true,
limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+
+
+query III
+SELECT c9,
+ SUM(c9) OVER(PARTITION BY c1, c2 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING
AND UNBOUNDED FOLLOWING) as sum1,
+ SUM(c9) OVER(PARTITION BY c2, c1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING
AND UNBOUNDED FOLLOWING) as sum2
+ FROM aggregate_test_100
+ ORDER BY c9
+ LIMIT 5
+----
+28774375 12665844451 12665844451
+63044568 5125627947 5125627947
+141047417 3650978969 3650978969
+141680161 11924524414 11924524414
+145294611 6802765992 6802765992
+
+
+# test_window_agg_child_equivalence
+
+query TT
+EXPLAIN SELECT c9,
+ SUM(c9) OVER(PARTITION BY c1, c2 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING
AND 5 FOLLOWING) as sum1,
+ SUM(c9) OVER(PARTITION BY c2, c1_alias ORDER BY c9 ASC ROWS BETWEEN 1
PRECEDING AND 5 FOLLOWING) as sum2
+ FROM (SELECT c1, c2, c9, c1 as c1_alias
+ FROM aggregate_test_100
+ ORDER BY c9) t1
+ LIMIT 5
+----
+logical_plan
+Projection: t1.c9, SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER BY [t1.c9 ASC
NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sum1, SUM(t1.c9)
PARTITION BY [t1.c2, t1.c1_alias] ORDER BY [t1.c9 ASC NULLS LAST] ROWS BETWEEN
1 PRECEDING AND 5 FOLLOWING AS sum2
+ Limit: skip=0, fetch=5
+ WindowAggr: windowExpr=[[SUM(t1.c9) PARTITION BY [t1.c2, t1.c1_alias]
ORDER BY [t1.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
+ WindowAggr: windowExpr=[[SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER BY
[t1.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
+ SubqueryAlias: t1
+ Sort: aggregate_test_100.c9 ASC NULLS LAST
+ Projection: aggregate_test_100.c1, aggregate_test_100.c2,
aggregate_test_100.c9, aggregate_test_100.c1 AS c1_alias
+ TableScan: aggregate_test_100 projection=[c1, c2, c9]
+physical_plan
+ProjectionExec: expr=[c9@2 as c9, SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER
BY [t1.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@4 as sum1,
SUM(t1.c9) PARTITION BY [t1.c2, t1.c1_alias] ORDER BY [t1.c9 ASC NULLS LAST]
ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@5 as sum2]
+ GlobalLimitExec: skip=0, fetch=5
+ BoundedWindowAggExec: wdw=[SUM(t1.c9): Ok(Field { name: "SUM(t1.c9)",
data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false,
metadata: {} }), frame: WindowFrame { units: Rows, start_bound:
Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
+ BoundedWindowAggExec: wdw=[SUM(t1.c9): Ok(Field { name: "SUM(t1.c9)",
data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false,
metadata: {} }), frame: WindowFrame { units: Rows, start_bound:
Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
+ SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST,c9@2 ASC NULLS
LAST]
+ ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c9@2 as c9, c1@0 as
c1_alias]
+ CsvExec: files={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true,
limit=None, projection=[c1, c2, c9]
+
+
+query III
+SELECT c9,
Review Comment:
This test checks whether equivalent expressions are considered during Sort
removal analysis. Since `c1` and `c1_alias` same column we shouldn't add
unnecessary `SortExec` in between `WindowAggExec`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]