This is an automated email from the ASF dual-hosted git repository.
jakevin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 72d7db299f fix: incorrect column pruning in sql with window operations
(#6039)
72d7db299f is described below
commit 72d7db299f8f4e8c6f58b52c9ecf0b2021562deb
Author: yujie.zhang <[email protected]>
AuthorDate: Sun Apr 23 18:32:38 2023 +0800
fix: incorrect column pruning in sql with window operations (#6039)
---
datafusion/core/tests/sql/window.rs | 16 +-
.../core/tests/sqllogictests/test_files/window.slt | 275 +++++++++++----------
datafusion/optimizer/src/push_down_projection.rs | 76 +++++-
3 files changed, 229 insertions(+), 138 deletions(-)
diff --git a/datafusion/core/tests/sql/window.rs
b/datafusion/core/tests/sql/window.rs
index ea3a8002a7..5d0fb37991 100644
--- a/datafusion/core/tests/sql/window.rs
+++ b/datafusion/core/tests/sql/window.rs
@@ -109,11 +109,11 @@ mod tests {
"ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, sum3@2
as sum3, min1@3 as min1, min2@4 as min2, min3@5 as min3, max1@6 as max1, max2@7
as max2, max3@8 as max3, cnt1@9 as cnt1, cnt2@10 as cnt2, sumr1@11 as sumr1,
sumr2@12 as sumr2, sumr3@13 as sumr3, minr1@14 as minr1, minr2@15 as minr2,
minr3@16 as minr3, maxr1@17 as maxr1, maxr2@18 as maxr2, maxr3@19 as maxr3,
cntr1@20 as cntr1, cntr2@21 as cntr2, sum4@22 as sum4, cnt3@23 as cnt3]",
" GlobalLimitExec: skip=0, fetch=5",
" SortExec: fetch=5, expr=[inc_col@24 DESC]",
- " ProjectionExec: expr=[SUM(annotated_data.inc_col) ORDER
BY [annotated_data.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1
FOLLOWING@14 as sum1, SUM(annotated_data.desc_col) ORDER BY [annotated_data.ts
ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING@15 as sum2,
SUM(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 10 FOLLOWING@16 as sum3, MIN(annotated_data.inc_col)
ORDER BY [annotated_data.ts ASC NULLS L [...]
+ " ProjectionExec: expr=[SUM(annotated_data.inc_col) ORDER
BY [annotated_data.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1
FOLLOWING@13 as sum1, SUM(annotated_data.desc_col) ORDER BY [annotated_data.ts
ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING@14 as sum2,
SUM(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 10 FOLLOWING@15 as sum3, MIN(annotated_data.inc_col)
ORDER BY [annotated_data.ts ASC NULLS L [...]
" BoundedWindowAggExec:
wdw=[SUM(annotated_data.desc_col): Ok(Field { name:
\"SUM(annotated_data.desc_col)\", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(8)), end_bound: Following(UInt64(1)) },
COUNT(UInt8(1)): Ok(Field { name: \"COUNT(UInt8(1))\", data_type: Int64,
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame:
WindowFrame { units [...]
- " BoundedWindowAggExec:
wdw=[SUM(annotated_data.inc_col): Ok(Field { name:
\"SUM(annotated_data.inc_col)\", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(Int32(10)), end_bound: Following(Int32(1)) },
SUM(annotated_data.desc_col): Ok(Field { name:
\"SUM(annotated_data.desc_col)\", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), [...]
- " BoundedWindowAggExec:
wdw=[SUM(annotated_data.inc_col): Ok(Field { name:
\"SUM(annotated_data.inc_col)\", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(Int32(4)), end_bound: Following(Int32(1)) },
SUM(annotated_data.desc_col): Ok(Field { name:
\"SUM(annotated_data.desc_col)\", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), [...]
- ]
+ " ProjectionExec: expr=[inc_col@1 as inc_col,
desc_col@2 as desc_col, SUM(annotated_data.inc_col) ORDER BY [annotated_data.ts
DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING@3 as
SUM(annotated_data.inc_col), SUM(annotated_data.desc_col) ORDER BY
[annotated_data.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8
FOLLOWING@4 as SUM(annotated_data.desc_col), SUM(annotated_data.desc_col) ORDER
BY [annotated_data.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECE [...]
+ " BoundedWindowAggExec:
wdw=[SUM(annotated_data.inc_col): Ok(Field { name:
\"SUM(annotated_data.inc_col)\", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(Int32(10)), end_bound: Following(Int32(1)) },
SUM(annotated_data.desc_col): Ok(Field { name:
\"SUM(annotated_data.desc_col)\", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }) [...]
+ " BoundedWindowAggExec:
wdw=[SUM(annotated_data.inc_col): Ok(Field { name:
\"SUM(annotated_data.inc_col)\", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(Int32(4)), end_bound: Following(Int32(1)) },
SUM(annotated_data.desc_col): Ok(Field { name:
\"SUM(annotated_data.desc_col)\", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} } [...]
};
let actual: Vec<&str> = formatted.trim().lines().collect();
@@ -185,7 +185,7 @@ mod tests {
"ProjectionExec: expr=[fv1@0 as fv1, fv2@1 as fv2, lv1@2 as
lv1, lv2@3 as lv2, nv1@4 as nv1, nv2@5 as nv2, rn1@6 as rn1, rn2@7 as rn2,
rank1@8 as rank1, rank2@9 as rank2, dense_rank1@10 as dense_rank1,
dense_rank2@11 as dense_rank2, lag1@12 as lag1, lag2@13 as lag2, lead1@14 as
lead1, lead2@15 as lead2, fvr1@16 as fvr1, fvr2@17 as fvr2, lvr1@18 as lvr1,
lvr2@19 as lvr2, lagr1@20 as lagr1, lagr2@21 as lagr2, leadr1@22 as leadr1,
leadr2@23 as leadr2]",
" GlobalLimitExec: skip=0, fetch=5",
" SortExec: fetch=5, expr=[ts@24 DESC]",
- " ProjectionExec:
expr=[FIRST_VALUE(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS
LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@11 as fv1,
FIRST_VALUE(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST]
ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@12 as fv2,
LAST_VALUE(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST]
RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@13 as lv1,
LAST_VALUE(annotated_data.inc_col) ORDER BY [an [...]
+ " ProjectionExec:
expr=[FIRST_VALUE(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS
LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@10 as fv1,
FIRST_VALUE(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST]
ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@11 as fv2,
LAST_VALUE(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST]
RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@12 as lv1,
LAST_VALUE(annotated_data.inc_col) ORDER BY [an [...]
" BoundedWindowAggExec:
wdw=[FIRST_VALUE(annotated_data.inc_col): Ok(Field { name:
\"FIRST_VALUE(annotated_data.inc_col)\", data_type: Int32, nullable: true,
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Range, start_bound: Preceding(Int32(10)), end_bound: Following(Int32(1))
}, FIRST_VALUE(annotated_data.inc_col): Ok(Field { name:
\"FIRST_VALUE(annotated_data.inc_col)\", data_type: Int32, nullable: true,
dict_id: 0, dict_is_order [...]
" BoundedWindowAggExec:
wdw=[FIRST_VALUE(annotated_data.inc_col): Ok(Field { name:
\"FIRST_VALUE(annotated_data.inc_col)\", data_type: Int32, nullable: true,
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Range, start_bound: Preceding(Int32(1)), end_bound: Following(Int32(10))
}, FIRST_VALUE(annotated_data.inc_col): Ok(Field { name:
\"FIRST_VALUE(annotated_data.inc_col)\", data_type: Int32, nullable: true,
dict_id: 0, dict_is_ord [...]
]
@@ -245,7 +245,7 @@ mod tests {
"ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, min1@2
as min1, min2@3 as min2, max1@4 as max1, max2@5 as max2, count1@6 as count1,
count2@7 as count2, avg1@8 as avg1, avg2@9 as avg2]",
" GlobalLimitExec: skip=0, fetch=5",
" SortExec: fetch=5, expr=[inc_col@10 ASC NULLS LAST]",
- " ProjectionExec: expr=[SUM(annotated_data.inc_col) ORDER
BY [annotated_data.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5
FOLLOWING@8 as sum1, SUM(annotated_data.inc_col) ORDER BY [annotated_data.ts
DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING@3 as sum2,
MIN(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] RANGE
BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING@9 as min1,
MIN(annotated_data.inc_col) ORDER BY [annotate [...]
+ " ProjectionExec: expr=[SUM(annotated_data.inc_col) ORDER
BY [annotated_data.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5
FOLLOWING@7 as sum1, SUM(annotated_data.inc_col) ORDER BY [annotated_data.ts
DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING@2 as sum2,
MIN(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] RANGE
BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING@8 as min1,
MIN(annotated_data.inc_col) ORDER BY [annotate [...]
" BoundedWindowAggExec:
wdw=[SUM(annotated_data.inc_col): Ok(Field { name:
\"SUM(annotated_data.inc_col)\", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(Int32(NULL)), end_bound: Following(Int32(5)) },
MIN(annotated_data.inc_col): Ok(Field { name: \"MIN(annotated_data.inc_col)\",
data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata:
{} }), fr [...]
" BoundedWindowAggExec:
wdw=[SUM(annotated_data.inc_col): Ok(Field { name:
\"SUM(annotated_data.inc_col)\", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(Int32(NULL)), end_bound: Following(Int32(3)) },
MIN(annotated_data.inc_col): Ok(Field { name: \"MIN(annotated_data.inc_col)\",
data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata:
{} }), [...]
]
@@ -300,7 +300,7 @@ mod tests {
"ProjectionExec: expr=[first_value1@0 as first_value1,
first_value2@1 as first_value2, last_value1@2 as last_value1, last_value2@3 as
last_value2, nth_value1@4 as nth_value1]",
" GlobalLimitExec: skip=0, fetch=5",
" SortExec: fetch=5, expr=[inc_col@5 ASC NULLS LAST]",
- " ProjectionExec:
expr=[FIRST_VALUE(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS
LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING@5 as first_value1,
FIRST_VALUE(annotated_data.inc_col) ORDER BY [annotated_data.ts DESC NULLS
FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING@3 as first_value2,
LAST_VALUE(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST]
ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING@6 as last_value1, LAS [...]
+ " ProjectionExec:
expr=[FIRST_VALUE(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS
LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING@4 as first_value1,
FIRST_VALUE(annotated_data.inc_col) ORDER BY [annotated_data.ts DESC NULLS
FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING@2 as first_value2,
LAST_VALUE(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST]
ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING@5 as last_value1, LAS [...]
" BoundedWindowAggExec:
wdw=[FIRST_VALUE(annotated_data.inc_col): Ok(Field { name:
\"FIRST_VALUE(annotated_data.inc_col)\", data_type: Int32, nullable: true,
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound:
Following(UInt64(1)) }, LAST_VALUE(annotated_data.inc_col): Ok(Field { name:
\"LAST_VALUE(annotated_data.inc_col)\", data_type: Int32, nullable: true,
dict_id: 0, dict_is_orde [...]
" BoundedWindowAggExec:
wdw=[FIRST_VALUE(annotated_data.inc_col): Ok(Field { name:
\"FIRST_VALUE(annotated_data.inc_col)\", data_type: Int32, nullable: true,
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound:
Following(UInt64(3)) }, LAST_VALUE(annotated_data.inc_col): Ok(Field { name:
\"LAST_VALUE(annotated_data.inc_col)\", data_type: Int32, nullable: true,
dict_id: 0, dict_is_or [...]
]
@@ -354,7 +354,7 @@ mod tests {
vec![
"ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2,
count1@2 as count1, count2@3 as count2]",
" GlobalLimitExec: skip=0, fetch=5",
- " ProjectionExec: expr=[SUM(annotated_data.inc_col) ORDER
BY [annotated_data.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1
FOLLOWING@5 as sum1, SUM(annotated_data.inc_col) ORDER BY [annotated_data.ts
DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING@3 as sum2,
COUNT(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] ROWS
BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING@6 as count1,
COUNT(annotated_data.inc_col) ORDER BY [annotat [...]
+ " ProjectionExec: expr=[SUM(annotated_data.inc_col) ORDER
BY [annotated_data.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1
FOLLOWING@4 as sum1, SUM(annotated_data.inc_col) ORDER BY [annotated_data.ts
DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING@2 as sum2,
COUNT(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] ROWS
BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING@5 as count1,
COUNT(annotated_data.inc_col) ORDER BY [annotat [...]
" BoundedWindowAggExec: wdw=[SUM(annotated_data.inc_col):
Ok(Field { name: \"SUM(annotated_data.inc_col)\", data_type: Int64, nullable:
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound:
Following(UInt64(1)) }, COUNT(annotated_data.inc_col): Ok(Field { name:
\"COUNT(annotated_data.inc_col)\", data_type: Int64, nullable: true, dict_id:
0, dict_is_ordered: false, metadata: {} }), [...]
" BoundedWindowAggExec:
wdw=[SUM(annotated_data.inc_col): Ok(Field { name:
\"SUM(annotated_data.inc_col)\", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(3)) },
COUNT(annotated_data.inc_col): Ok(Field { name:
\"COUNT(annotated_data.inc_col)\", data_type: Int64, nullable: true, dict_id:
0, dict_is_ordered: false, metadata: {} } [...]
]
diff --git a/datafusion/core/tests/sqllogictests/test_files/window.slt
b/datafusion/core/tests/sqllogictests/test_files/window.slt
index 0170eed929..04b1877e10 100644
--- a/datafusion/core/tests/sqllogictests/test_files/window.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/window.slt
@@ -1209,14 +1209,16 @@ EXPLAIN SELECT
logical_plan
Projection: aggregate_test_100.c9, SUM(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW AS sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9
ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED
PRECEDING AND CURRENT ROW AS sum2
WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW]]
- WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
- TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7,
c8, c9, c10, c11, c12, c13]
+ Projection: aggregate_test_100.c9, SUM(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+ WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+ TableScan: aggregate_test_100 projection=[c8, c9]
physical_plan
-ProjectionExec: expr=[c9@8 as c9, SUM(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW@14 as sum1, SUM(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@13 as sum2]
+ProjectionExec: expr=[c9@0 as c9, SUM(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW@2 as sum1, SUM(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as sum2]
BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name:
"SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }]
- BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name:
"SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }]
- SortExec: expr=[c9@8 ASC NULLS LAST,c8@7 ASC NULLS LAST]
- CsvExec: files={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true,
limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+ ProjectionExec: expr=[c9@1 as c9, SUM(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as
SUM(aggregate_test_100.c9)]
+ BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name:
"SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }]
+ SortExec: expr=[c9@1 ASC NULLS LAST,c8@0 ASC NULLS LAST]
+ CsvExec: files={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true,
limit=None, projection=[c8, c9]
# over_order_by_sort_keys_sorting_prefix_compacting
@@ -1229,14 +1231,14 @@ Projection: aggregate_test_100.c2,
MAX(aggregate_test_100.c9) ORDER BY [aggregat
WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED
PRECEDING AND UNBOUNDED FOLLOWING]]
WindowAggr: windowExpr=[[MAX(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW]]
WindowAggr: windowExpr=[[MIN(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
- TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7,
c8, c9, c10, c11, c12, c13]
+ TableScan: aggregate_test_100 projection=[c2, c9]
physical_plan
-ProjectionExec: expr=[c2@1 as c2, MAX(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW@14 as MAX(aggregate_test_100.c9), SUM(aggregate_test_100.c9) ROWS
BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@15 as
SUM(aggregate_test_100.c9), MIN(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@13 as MIN(aggregat [...]
+ProjectionExec: expr=[c2@0 as c2, MAX(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW@3 as MAX(aggregate_test_100.c9), SUM(aggregate_test_100.c9) ROWS
BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@4 as
SUM(aggregate_test_100.c9), MIN(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as MIN(aggregate_t [...]
WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name:
"SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]
BoundedWindowAggExec: wdw=[MAX(aggregate_test_100.c9): Ok(Field { name:
"MAX(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(Int8(NULL)), end_bound: CurrentRow }]
BoundedWindowAggExec: wdw=[MIN(aggregate_test_100.c9): Ok(Field { name:
"MIN(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(Int8(NULL)), end_bound: CurrentRow }]
- SortExec: expr=[c2@1 ASC NULLS LAST,c9@8 ASC NULLS LAST]
- CsvExec: files={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true,
limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+ SortExec: expr=[c2@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]
+ CsvExec: files={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true,
limit=None, projection=[c2, c9]
# FIXME: for now we are not detecting prefix of sorting keys in order to
re-arrange with global and save one SortExec
# over_order_by_sort_keys_sorting_global_order_compacting
@@ -1251,16 +1253,16 @@ Sort: aggregate_test_100.c2 ASC NULLS LAST
WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED
PRECEDING AND UNBOUNDED FOLLOWING]]
WindowAggr: windowExpr=[[MAX(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
WindowAggr: windowExpr=[[MIN(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
- TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6,
c7, c8, c9, c10, c11, c12, c13]
+ TableScan: aggregate_test_100 projection=[c2, c9]
physical_plan
SortExec: expr=[c2@0 ASC NULLS LAST]
- ProjectionExec: expr=[c2@1 as c2, MAX(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@14 as
MAX(aggregate_test_100.c9), SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED
PRECEDING AND UNBOUNDED FOLLOWING@15 as SUM(aggregate_test_100.c9),
MIN(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST,
aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PREC [...]
+ ProjectionExec: expr=[c2@0 as c2, MAX(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as
MAX(aggregate_test_100.c9), SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED
PRECEDING AND UNBOUNDED FOLLOWING@4 as SUM(aggregate_test_100.c9),
MIN(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST,
aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECED [...]
WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name:
"SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]
BoundedWindowAggExec: wdw=[MAX(aggregate_test_100.c9): Ok(Field { name:
"MAX(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }]
- SortExec: expr=[c9@8 ASC NULLS LAST,c2@1 ASC NULLS LAST]
+ SortExec: expr=[c9@1 ASC NULLS LAST,c2@0 ASC NULLS LAST]
BoundedWindowAggExec: wdw=[MIN(aggregate_test_100.c9): Ok(Field {
name: "MIN(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id:
0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(Int8(NULL)), end_bound: CurrentRow }]
- SortExec: expr=[c2@1 ASC NULLS LAST,c9@8 ASC NULLS LAST]
- CsvExec: files={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true,
limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+ SortExec: expr=[c2@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]
+ CsvExec: files={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true,
limit=None, projection=[c2, c9]
# test_window_partition_by_order_by
statement ok
@@ -1275,20 +1277,22 @@ EXPLAIN SELECT
logical_plan
Projection: SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1,
aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING
WindowAggr: windowExpr=[[COUNT(UInt8(1)) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
- WindowAggr: windowExpr=[[SUM(aggregate_test_100.c4) PARTITION BY
[aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
- TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7,
c8, c9, c10, c11, c12, c13]
+ Projection: aggregate_test_100.c1, aggregate_test_100.c2,
SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1,
aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING
+ WindowAggr: windowExpr=[[SUM(aggregate_test_100.c4) PARTITION BY
[aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
+ TableScan: aggregate_test_100 projection=[c1, c2, c4]
physical_plan
-ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY
[aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@13 as
SUM(aggregate_test_100.c4), COUNT(UInt8(1)) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING@14 as COUNT(UInt8(1))]
+ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY
[aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@2 as
SUM(aggregate_test_100.c4), COUNT(UInt8(1)) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as COUNT(UInt8(1))]
BoundedWindowAggExec: wdw=[COUNT(UInt8(1)): Ok(Field { name:
"COUNT(UInt8(1))", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }]
SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]
CoalesceBatchesExec: target_batch_size=4096
RepartitionExec: partitioning=Hash([Column { name: "c1", index: 0 }],
2), input_partitions=2
- BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4): Ok(Field {
name: "SUM(aggregate_test_100.c4)", data_type: Int64, nullable: true, dict_id:
0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }]
- SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]
- CoalesceBatchesExec: target_batch_size=4096
- RepartitionExec: partitioning=Hash([Column { name: "c1",
index: 0 }, Column { name: "c2", index: 1 }], 2), input_partitions=2
- RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
- CsvExec: files={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true,
limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+ ProjectionExec: expr=[c1@0 as c1, c2@1 as c2,
SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1,
aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as SUM(aggregate_test_100.c4)]
+ BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4): Ok(Field {
name: "SUM(aggregate_test_100.c4)", data_type: Int64, nullable: true, dict_id:
0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }]
+ SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]
+ CoalesceBatchesExec: target_batch_size=4096
+ RepartitionExec: partitioning=Hash([Column { name: "c1",
index: 0 }, Column { name: "c2", index: 1 }], 2), input_partitions=2
+ RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
+ CsvExec: files={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true,
limit=None, projection=[c1, c2, c4]
# test_window_agg_sort_reversed_plan
# Only 1 SortExec was added
@@ -1305,14 +1309,14 @@ Projection: aggregate_test_100.c9,
SUM(aggregate_test_100.c9) ORDER BY [aggregat
Limit: skip=0, fetch=5
WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5
FOLLOWING]]
WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5
FOLLOWING]]
- TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7,
c8, c9, c10, c11, c12, c13]
+ TableScan: aggregate_test_100 projection=[c9]
physical_plan
-ProjectionExec: expr=[c9@8 as c9, SUM(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5
FOLLOWING@14 as sum1, SUM(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5
FOLLOWING@13 as sum2]
+ProjectionExec: expr=[c9@0 as c9, SUM(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5
FOLLOWING@2 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9
DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@1 as sum2]
GlobalLimitExec: skip=0, fetch=5
BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name:
"SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }]
BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name:
"SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
- SortExec: expr=[c9@8 DESC]
- CsvExec: files={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true,
limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+ SortExec: expr=[c9@0 DESC]
+ CsvExec: files={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true,
limit=None, projection=[c9]
query III
SELECT
@@ -1346,14 +1350,14 @@ Projection: aggregate_test_100.c9,
FIRST_VALUE(aggregate_test_100.c9) ORDER BY [
Limit: skip=0, fetch=5
WindowAggr: windowExpr=[[FIRST_VALUE(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5
FOLLOWING, LAG(aggregate_test_100.c9, Int64(2), Int64(10101)) ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW, LEAD(aggregate_test_100.c9, Int64(2), Int64(10101)) ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW]]
WindowAggr: windowExpr=[[FIRST_VALUE(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5
FOLLOWING, LAG(aggregate_test_100.c9, Int64(2), Int64(10101)) ORDER BY
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1
FOLLOWING, LEAD(aggregate_test_100.c9, Int64(2), Int64(10101)) ORDER BY
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1
FOLLOWING]]
- TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7,
c8, c9, c10, c11, c12, c13]
+ TableScan: aggregate_test_100 projection=[c9]
physical_plan
-ProjectionExec: expr=[c9@8 as c9, FIRST_VALUE(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5
FOLLOWING@16 as fv1, FIRST_VALUE(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5
FOLLOWING@13 as fv2, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)) ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW@17 as lag1, LAG(aggregate_test_100.c9,I [...]
+ProjectionExec: expr=[c9@0 as c9, FIRST_VALUE(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5
FOLLOWING@4 as fv1, FIRST_VALUE(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5
FOLLOWING@1 as fv2, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)) ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW@5 as lag1, LAG(aggregate_test_100.c9,Int6 [...]
GlobalLimitExec: skip=0, fetch=5
BoundedWindowAggExec: wdw=[FIRST_VALUE(aggregate_test_100.c9): Ok(Field {
name: "FIRST_VALUE(aggregate_test_100.c9)", data_type: UInt64, nullable: true,
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1))
}, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)): Ok(Field { name:
"LAG(aggregate_test_100.c9,Int64(2),Int64(10101))", data_type: UInt64,
nullable: true, dict_id: 0, dict_is_orde [...]
BoundedWindowAggExec: wdw=[FIRST_VALUE(aggregate_test_100.c9): Ok(Field
{ name: "FIRST_VALUE(aggregate_test_100.c9)", data_type: UInt64, nullable:
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5))
}, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)): Ok(Field { name:
"LAG(aggregate_test_100.c9,Int64(2),Int64(10101))", data_type: UInt64,
nullable: true, dict_id: 0, dict_is_or [...]
- SortExec: expr=[c9@8 DESC]
- CsvExec: files={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true,
limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+ SortExec: expr=[c9@0 DESC]
+ CsvExec: files={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true,
limit=None, projection=[c9]
query IIIIIII
SELECT
@@ -1389,15 +1393,15 @@ Projection: aggregate_test_100.c9, ROW_NUMBER() ORDER
BY [aggregate_test_100.c9
Limit: skip=0, fetch=5
WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [aggregate_test_100.c9 ASC
NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [aggregate_test_100.c9
DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
- TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7,
c8, c9, c10, c11, c12, c13]
+ TableScan: aggregate_test_100 projection=[c9]
physical_plan
-ProjectionExec: expr=[c9@8 as c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@14 as rn1,
ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1
PRECEDING AND 5 FOLLOWING@13 as rn2]
+ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@2 as rn1, ROW_NUMBER()
ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND
5 FOLLOWING@1 as rn2]
GlobalLimitExec: skip=0, fetch=5
BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()",
data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false,
metadata: {} }), frame: WindowFrame { units: Rows, start_bound:
Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
- SortExec: expr=[c9@8 ASC NULLS LAST]
+ SortExec: expr=[c9@0 ASC NULLS LAST]
BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name:
"ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
- SortExec: expr=[c9@8 DESC]
- CsvExec: files={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true,
limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+ SortExec: expr=[c9@0 DESC]
+ CsvExec: files={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true,
limit=None, projection=[c9]
query III
@@ -1431,16 +1435,16 @@ Projection: aggregate_test_100.c9,
SUM(aggregate_test_100.c9) ORDER BY [aggregat
WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c1 ASC NULLS LAST,
aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [aggregate_test_100.c9
DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c1 DESC NULLS
FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
- TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6,
c7, c8, c9, c10, c11, c12, c13]
+ TableScan: aggregate_test_100 projection=[c1, c2, c9]
physical_plan
-ProjectionExec: expr=[c9@8 as c9, SUM(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c1 ASC NULLS LAST,
aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5
FOLLOWING@15 as sum1, SUM(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c1 DESC NULLS
FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@13 as sum2, ROW_NUMBER() ORDER
BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECED [...]
+ProjectionExec: expr=[c9@2 as c9, SUM(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c1 ASC NULLS LAST,
aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5
FOLLOWING@5 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9
DESC NULLS FIRST, aggregate_test_100.c1 DESC NULLS FIRST] ROWS BETWEEN 1
PRECEDING AND 5 FOLLOWING@3 as sum2, ROW_NUMBER() ORDER BY
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDIN [...]
GlobalLimitExec: skip=0, fetch=5
BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name:
"SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
- SortExec: expr=[c9@8 ASC NULLS LAST,c1@0 ASC NULLS LAST,c2@1 ASC NULLS
LAST]
+ SortExec: expr=[c9@2 ASC NULLS LAST,c1@0 ASC NULLS LAST,c2@1 ASC NULLS
LAST]
BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name:
"ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field {
name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id:
0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
- SortExec: expr=[c9@8 DESC,c1@0 DESC]
- CsvExec: files={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true,
limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+ SortExec: expr=[c9@2 DESC,c1@0 DESC]
+ CsvExec: files={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true,
limit=None, projection=[c1, c2, c9]
query IIII
SELECT
@@ -1505,31 +1509,33 @@ logical_plan
Projection: SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE
BETWEEN 10 PRECEDING AND 11 FOLLOWING AS a, SUM(null_cases.c1) ORDER BY
[null_cases.c3 ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING AS
b, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN
10 PRECEDING AND 11 FOLLOWING AS c, SUM(null_cases.c1) ORDER BY [null_cases.c3
ASC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING AS d,
SUM(null_cases.c1) ORDER BY [null_cases.c [...]
Limit: skip=0, fetch=5
WindowAggr: windowExpr=[[SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC
NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING, SUM(null_cases.c1)
ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE
BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING, SUM(null_cases.c1) ORDER BY
[null_cases.c3 ASC NULLS LAST] RANGE BETWEEN CURRENT ROW AND UNBOUNDED
FOLLOWING]]
- WindowAggr: windowExpr=[[SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC
NULLS LAST, null_cases.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW]]
- WindowAggr: windowExpr=[[SUM(null_cases.c1) ORDER BY [null_cases.c3
ASC NULLS LAST, null_cases.c1 ASC NULLS FIRST] RANGE BETWEEN UNBOUNDED
PRECEDING AND CURRENT ROW]]
- WindowAggr: windowExpr=[[SUM(null_cases.c1) ORDER BY [null_cases.c3
ASC NULLS LAST, null_cases.c1 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED
PRECEDING AND CURRENT ROW]]
- WindowAggr: windowExpr=[[SUM(null_cases.c1) ORDER BY
[null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING,
SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN
UNBOUNDED PRECEDING AND CURRENT ROW, SUM(null_cases.c1) ORDER BY [null_cases.c3
ASC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING,
SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN
CURRENT ROW AND UNBOUNDED FOLLOWING]]
- WindowAggr: windowExpr=[[SUM(null_cases.c1) ORDER BY
[null_cases.c3 DESC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING,
SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS LAST] RANGE BETWEEN
UNBOUNDED PRECEDING AND CURRENT ROW, SUM(null_cases.c1) ORDER BY [null_cases.c3
DESC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING,
SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS LAST] RANGE BETWEEN
CURRENT ROW AND UNBOUNDED FOLLOWING]]
- WindowAggr: windowExpr=[[SUM(null_cases.c1) ORDER BY
[null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING,
SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN
UNBOUNDED PRECEDING AND CURRENT ROW, SUM(null_cases.c1) ORDER BY [null_cases.c3
DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING,
SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN
CURRENT ROW AND UNBOUNDED FOLLOWING]]
- WindowAggr: windowExpr=[[SUM(null_cases.c1) ORDER BY
[null_cases.c3 DESC NULLS FIRST, null_cases.c1 ASC NULLS LAST] RANGE BETWEEN
UNBOUNDED PRECEDING AND CURRENT ROW]]
- TableScan: null_cases projection=[c1, c2, c3]
+ Projection: null_cases.c1, null_cases.c3, SUM(null_cases.c1) ORDER BY
[null_cases.c3 DESC NULLS FIRST, null_cases.c1 ASC NULLS LAST] RANGE BETWEEN
UNBOUNDED PRECEDING AND CURRENT ROW, SUM(null_cases.c1) ORDER BY [null_cases.c3
DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING,
SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN
UNBOUNDED PRECEDING AND CURRENT ROW, SUM(null_cases.c1) ORDER BY [null_cases.c3
DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED [...]
+ WindowAggr: windowExpr=[[SUM(null_cases.c1) ORDER BY [null_cases.c3
ASC NULLS LAST, null_cases.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW]]
+ WindowAggr: windowExpr=[[SUM(null_cases.c1) ORDER BY [null_cases.c3
ASC NULLS LAST, null_cases.c1 ASC NULLS FIRST] RANGE BETWEEN UNBOUNDED
PRECEDING AND CURRENT ROW]]
+ WindowAggr: windowExpr=[[SUM(null_cases.c1) ORDER BY
[null_cases.c3 ASC NULLS LAST, null_cases.c1 DESC NULLS FIRST] RANGE BETWEEN
UNBOUNDED PRECEDING AND CURRENT ROW]]
+ WindowAggr: windowExpr=[[SUM(null_cases.c1) ORDER BY
[null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING,
SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN
UNBOUNDED PRECEDING AND CURRENT ROW, SUM(null_cases.c1) ORDER BY [null_cases.c3
ASC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING,
SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN
CURRENT ROW AND UNBOUNDED FOLLOWING]]
+ WindowAggr: windowExpr=[[SUM(null_cases.c1) ORDER BY
[null_cases.c3 DESC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING,
SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS LAST] RANGE BETWEEN
UNBOUNDED PRECEDING AND CURRENT ROW, SUM(null_cases.c1) ORDER BY [null_cases.c3
DESC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING,
SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS LAST] RANGE BETWEEN
CURRENT ROW AND UNBOUNDED FOLLOWING]]
+ WindowAggr: windowExpr=[[SUM(null_cases.c1) ORDER BY
[null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING,
SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN
UNBOUNDED PRECEDING AND CURRENT ROW, SUM(null_cases.c1) ORDER BY [null_cases.c3
DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING,
SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN
CURRENT ROW AND UNBOUNDED FOLLOWING]]
+ WindowAggr: windowExpr=[[SUM(null_cases.c1) ORDER BY
[null_cases.c3 DESC NULLS FIRST, null_cases.c1 ASC NULLS LAST] RANGE BETWEEN
UNBOUNDED PRECEDING AND CURRENT ROW]]
+ TableScan: null_cases projection=[c1, c2, c3]
physical_plan
-ProjectionExec: expr=[SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS
LAST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@19 as a, SUM(null_cases.c1)
ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 11
FOLLOWING@19 as b, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST]
RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@4 as c, SUM(null_cases.c1) ORDER BY
[null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@12
as d, SUM(null_cases.c1) O [...]
+ProjectionExec: expr=[SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS
LAST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@18 as a, SUM(null_cases.c1)
ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 11
FOLLOWING@18 as b, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST]
RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@3 as c, SUM(null_cases.c1) ORDER BY
[null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@11
as d, SUM(null_cases.c1) O [...]
GlobalLimitExec: skip=0, fetch=5
WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name:
"SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(Int64(10)), end_bound: Following(Int64(11)) },
SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64,
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame:
WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), [...]
- BoundedWindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name:
"SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]
- SortExec: expr=[c3@2 ASC NULLS LAST,c2@1 ASC NULLS LAST]
- BoundedWindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name:
"SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]
- SortExec: expr=[c3@2 ASC NULLS LAST,c1@0 ASC]
- BoundedWindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name:
"SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]
- SortExec: expr=[c3@2 ASC NULLS LAST,c1@0 DESC]
- WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name:
"SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(Int64(11)), end_bound: Following(Int64(10)) },
SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64,
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame:
WindowFrame { units: Range, start_bound: CurrentRo [...]
- WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name:
"SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(Int64(10)), end_bound: Following(Int64(11)) },
SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64,
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame:
WindowFrame { units: Range, start_bound: Precedi [...]
- SortExec: expr=[c3@2 DESC NULLS LAST]
- WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field {
name: "SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(Int64(10)), end_bound: Following(Int64(11)) },
SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64,
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame:
WindowFrame { units: Range, start_bound: Pre [...]
- BoundedWindowAggExec: wdw=[SUM(null_cases.c1):
Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, nullable: true,
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]
- SortExec: expr=[c3@2 DESC,c1@0 ASC NULLS LAST]
- CsvExec: files={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/null_cases.csv]]}, has_header=true,
limit=None, projection=[c1, c2, c3]
+ ProjectionExec: expr=[c1@0 as c1, c3@2 as c3, SUM(null_cases.c1) ORDER
BY [null_cases.c3 DESC NULLS FIRST, null_cases.c1 ASC NULLS LAST] RANGE BETWEEN
UNBOUNDED PRECEDING AND CURRENT ROW@3 as SUM(null_cases.c1), SUM(null_cases.c1)
ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11
FOLLOWING@4 as SUM(null_cases.c1), SUM(null_cases.c1) ORDER BY [null_cases.c3
DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as
SUM(null_cases.c1), SUM(null [...]
+ BoundedWindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name:
"SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]
+ SortExec: expr=[c3@2 ASC NULLS LAST,c2@1 ASC NULLS LAST]
+ BoundedWindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name:
"SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]
+ SortExec: expr=[c3@2 ASC NULLS LAST,c1@0 ASC]
+ BoundedWindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field {
name: "SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]
+ SortExec: expr=[c3@2 ASC NULLS LAST,c1@0 DESC]
+ WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name:
"SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(Int64(11)), end_bound: Following(Int64(10)) },
SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64,
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame:
WindowFrame { units: Range, start_bound: Current [...]
+ WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name:
"SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(Int64(10)), end_bound: Following(Int64(11)) },
SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64,
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame:
WindowFrame { units: Range, start_bound: Prece [...]
+ SortExec: expr=[c3@2 DESC NULLS LAST]
+ WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field {
name: "SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(Int64(10)), end_bound: Following(Int64(11)) },
SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64,
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame:
WindowFrame { units: Range, start_bound: P [...]
+ BoundedWindowAggExec: wdw=[SUM(null_cases.c1):
Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, nullable: true,
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]
+ SortExec: expr=[c3@2 DESC,c1@0 ASC NULLS LAST]
+ CsvExec: files={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/null_cases.csv]]}, has_header=true,
limit=None, projection=[c1, c2, c3]
query IIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIII
SELECT
@@ -1597,14 +1603,14 @@ Projection: aggregate_test_100.c9,
SUM(aggregate_test_100.c9) ORDER BY [aggregat
Limit: skip=0, fetch=5
WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS
BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c9 DESC NULLS FIRST]
ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
- TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7,
c8, c9, c10, c11, c12, c13]
+ TableScan: aggregate_test_100 projection=[c1, c9]
physical_plan
-ProjectionExec: expr=[c9@8 as c9, SUM(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c9 DESC NULLS FIRST]
ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@13 as sum1, SUM(aggregate_test_100.c9)
PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 DESC NULLS
FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@14 as sum2]
+ProjectionExec: expr=[c9@1 as c9, SUM(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c9 DESC NULLS FIRST]
ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@2 as sum1, SUM(aggregate_test_100.c9)
PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 DESC NULLS
FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@3 as sum2]
GlobalLimitExec: skip=0, fetch=5
BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name:
"SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name:
"SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
- SortExec: expr=[c1@0 ASC NULLS LAST,c9@8 DESC]
- CsvExec: files={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true,
limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+ SortExec: expr=[c1@0 ASC NULLS LAST,c9@1 DESC]
+ CsvExec: files={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true,
limit=None, projection=[c1, c9]
query III
@@ -1641,14 +1647,14 @@ Projection: aggregate_test_100.c9,
SUM(aggregate_test_100.c9) PARTITION BY [aggr
Limit: skip=0, fetch=5
WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS
BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
- TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7,
c8, c9, c10, c11, c12, c13]
+ TableScan: aggregate_test_100 projection=[c1, c9]
physical_plan
-ProjectionExec: expr=[c9@8 as c9, SUM(aggregate_test_100.c9) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 5 FOLLOWING@14 as sum1, SUM(aggregate_test_100.c9)
PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 DESC NULLS
FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@13 as sum2]
+ProjectionExec: expr=[c9@1 as c9, SUM(aggregate_test_100.c9) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 5 FOLLOWING@3 as sum1, SUM(aggregate_test_100.c9)
PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 DESC NULLS
FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@2 as sum2]
GlobalLimitExec: skip=0, fetch=5
BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name:
"SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }]
BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name:
"SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
- SortExec: expr=[c1@0 ASC NULLS LAST,c9@8 DESC]
- CsvExec: files={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true,
limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+ SortExec: expr=[c1@0 ASC NULLS LAST,c9@1 DESC]
+ CsvExec: files={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true,
limit=None, projection=[c1, c9]
query III
SELECT
@@ -1684,15 +1690,17 @@ logical_plan
Projection: aggregate_test_100.c3, SUM(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST,
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum1,
SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 +
aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum2
Limit: skip=0, fetch=5
WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST,
aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW]]
- WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST,
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
- TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7,
c8, c9, c10, c11, c12, c13]
+ Projection: aggregate_test_100.c3, aggregate_test_100.c4,
aggregate_test_100.c9, SUM(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST,
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+ WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST,
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+ TableScan: aggregate_test_100 projection=[c2, c3, c4, c9]
physical_plan
-ProjectionExec: expr=[c3@2 as c3, SUM(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST,
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@13 as sum1,
SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 +
aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@14 as sum2]
+ProjectionExec: expr=[c3@0 as c3, SUM(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST,
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as sum1,
SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 +
aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as sum2]
GlobalLimitExec: skip=0, fetch=5
WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name:
"SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: CurrentRow, end_bound: Following(Int16(NULL)) }]
- BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name:
"SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(Int16(NULL)), end_bound: CurrentRow }]
- SortExec: expr=[c3@2 + c4@3 DESC,c9@8 DESC,c2@1 ASC NULLS LAST]
- CsvExec: files={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true,
limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+ ProjectionExec: expr=[c3@1 as c3, c4@2 as c4, c9@3 as c9,
SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 +
aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST,
aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW@4 as SUM(aggregate_test_100.c9)]
+ BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field {
name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id:
0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(Int16(NULL)), end_bound: CurrentRow }]
+ SortExec: expr=[c3@1 + c4@2 DESC,c9@3 DESC,c2@0 ASC NULLS LAST]
+ CsvExec: files={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true,
limit=None, projection=[c2, c3, c4, c9]
query III
SELECT c3,
@@ -1776,20 +1784,22 @@ Limit: skip=0, fetch=5
Sort: aggregate_test_100.c3 ASC NULLS LAST, fetch=5
Projection: aggregate_test_100.c3, SUM(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c3 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS
FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW AS sum1, SUM(aggregate_test_100.c9) PARTITION BY
[aggregate_test_100.c3] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum2
WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY
[aggregate_test_100.c3] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
- WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c3 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS
FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW]]
- TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6,
c7, c8, c9, c10, c11, c12, c13]
+ Projection: aggregate_test_100.c3, aggregate_test_100.c9,
SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 DESC NULLS FIRST,
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+ WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c3 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS
FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW]]
+ TableScan: aggregate_test_100 projection=[c2, c3, c9]
physical_plan
GlobalLimitExec: skip=0, fetch=5
SortPreservingMergeExec: [c3@0 ASC NULLS LAST]
- ProjectionExec: expr=[c3@2 as c3, SUM(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c3 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS
FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW@13 as sum1, SUM(aggregate_test_100.c9) PARTITION BY
[aggregate_test_100.c3] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@14 as sum2]
+ ProjectionExec: expr=[c3@0 as c3, SUM(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c3 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS
FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW@2 as sum1, SUM(aggregate_test_100.c9) PARTITION BY
[aggregate_test_100.c3] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as sum2]
BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name:
"SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }]
- SortExec: expr=[c3@2 ASC NULLS LAST,c9@8 DESC]
+ SortExec: expr=[c3@0 ASC NULLS LAST,c9@1 DESC]
CoalesceBatchesExec: target_batch_size=4096
- RepartitionExec: partitioning=Hash([Column { name: "c3", index: 2
}], 2), input_partitions=2
+ RepartitionExec: partitioning=Hash([Column { name: "c3", index: 0
}], 2), input_partitions=2
RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
- BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9):
Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable:
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Range, start_bound: Preceding(Int16(NULL)), end_bound: CurrentRow }]
- SortExec: expr=[c3@2 DESC,c9@8 DESC,c2@1 ASC NULLS LAST]
- CsvExec: files={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true,
limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+ ProjectionExec: expr=[c3@1 as c3, c9@2 as c9,
SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 DESC NULLS FIRST,
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as
SUM(aggregate_test_100.c9)]
+ BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9):
Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable:
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Range, start_bound: Preceding(Int16(NULL)), end_bound: CurrentRow }]
+ SortExec: expr=[c3@1 DESC,c9@2 DESC,c2@0 ASC NULLS LAST]
+ CsvExec: files={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true,
limit=None, projection=[c2, c3, c9]
query III
@@ -1818,16 +1828,16 @@ logical_plan
Sort: aggregate_test_100.c1 ASC NULLS LAST
Projection: aggregate_test_100.c1, ROW_NUMBER() PARTITION BY
[aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED
FOLLOWING AS rn1
WindowAggr: windowExpr=[[ROW_NUMBER() PARTITION BY [aggregate_test_100.c1]
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]
- TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7,
c8, c9, c10, c11, c12, c13]
+ TableScan: aggregate_test_100 projection=[c1]
physical_plan
SortPreservingMergeExec: [c1@0 ASC NULLS LAST]
- ProjectionExec: expr=[c1@0 as c1, ROW_NUMBER() PARTITION BY
[aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED
FOLLOWING@13 as rn1]
+ ProjectionExec: expr=[c1@0 as c1, ROW_NUMBER() PARTITION BY
[aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED
FOLLOWING@1 as rn1]
BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()",
data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false,
metadata: {} }), frame: WindowFrame { units: Rows, start_bound:
Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]
SortExec: expr=[c1@0 ASC NULLS LAST]
CoalesceBatchesExec: target_batch_size=4096
RepartitionExec: partitioning=Hash([Column { name: "c1", index: 0
}], 2), input_partitions=2
RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
- CsvExec: files={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true,
limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+ CsvExec: files={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true,
limit=None, projection=[c1]
query TI
SELECT c1, ROW_NUMBER() OVER (PARTITION BY c1) as rn1 FROM aggregate_test_100
ORDER BY c1 ASC
@@ -1944,17 +1954,17 @@ logical_plan
Sort: aggregate_test_100.c1 ASC NULLS LAST
Projection: aggregate_test_100.c1, ROW_NUMBER() PARTITION BY
[aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED
FOLLOWING AS rn1
WindowAggr: windowExpr=[[ROW_NUMBER() PARTITION BY [aggregate_test_100.c1]
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]
- TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7,
c8, c9, c10, c11, c12, c13]
+ TableScan: aggregate_test_100 projection=[c1]
physical_plan
SortExec: expr=[c1@0 ASC NULLS LAST]
CoalescePartitionsExec
- ProjectionExec: expr=[c1@0 as c1, ROW_NUMBER() PARTITION BY
[aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED
FOLLOWING@13 as rn1]
+ ProjectionExec: expr=[c1@0 as c1, ROW_NUMBER() PARTITION BY
[aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED
FOLLOWING@1 as rn1]
BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name:
"ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]
SortExec: expr=[c1@0 ASC NULLS LAST]
CoalesceBatchesExec: target_batch_size=4096
RepartitionExec: partitioning=Hash([Column { name: "c1", index: 0
}], 2), input_partitions=2
RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
- CsvExec: files={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true,
limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+ CsvExec: files={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true,
limit=None, projection=[c1]
statement ok
set datafusion.optimizer.repartition_sorts = true;
@@ -1971,19 +1981,19 @@ Sort: aggregate_test_100.c1 ASC NULLS LAST
Projection: aggregate_test_100.c1, SUM(aggregate_test_100.c9) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 3 FOLLOWING AS sum1, SUM(aggregate_test_100.c9) ORDER
BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5
FOLLOWING AS sum2
WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5
FOLLOWING]]
WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 3 FOLLOWING]]
- TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7,
c8, c9, c10, c11, c12, c13]
+ TableScan: aggregate_test_100 projection=[c1, c9]
physical_plan
SortExec: expr=[c1@0 ASC NULLS LAST]
- ProjectionExec: expr=[c1@0 as c1, SUM(aggregate_test_100.c9) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 3 FOLLOWING@13 as sum1, SUM(aggregate_test_100.c9)
ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5
FOLLOWING@14 as sum2]
+ ProjectionExec: expr=[c1@0 as c1, SUM(aggregate_test_100.c9) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 3 FOLLOWING@2 as sum1, SUM(aggregate_test_100.c9) ORDER
BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5
FOLLOWING@3 as sum2]
BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name:
"SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
- SortPreservingMergeExec: [c9@8 ASC NULLS LAST]
- SortExec: expr=[c9@8 ASC NULLS LAST]
+ SortPreservingMergeExec: [c9@1 ASC NULLS LAST]
+ SortExec: expr=[c9@1 ASC NULLS LAST]
BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field {
name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id:
0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(3)) }]
- SortExec: expr=[c1@0 ASC NULLS LAST,c9@8 ASC NULLS LAST]
+ SortExec: expr=[c1@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]
CoalesceBatchesExec: target_batch_size=4096
RepartitionExec: partitioning=Hash([Column { name: "c1",
index: 0 }], 2), input_partitions=2
RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
- CsvExec: files={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true,
limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+ CsvExec: files={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true,
limit=None, projection=[c1, c9]
# test_window_agg_with_global_limit
statement ok
@@ -2058,20 +2068,22 @@ Limit: skip=0, fetch=5
Sort: aggregate_test_100.c9 ASC NULLS LAST, fetch=5
Projection: aggregate_test_100.c9, SUM(aggregate_test_100.c9) PARTITION BY
[aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sum1,
SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c2,
aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sum2, SUM(aggregate_test_100.c9)
PARTITION BY [aggregate_test_100.c1, aggregate_test_100. [...]
WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY
[aggregate_test_100.c2, aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
- WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY
[aggregate_test_100.c2, aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9
ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING
AND UNBOUNDED FOLLOWING]]
- WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY
[aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
- WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY
[aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9
ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING
AND UNBOUNDED FOLLOWING]]
- TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5,
c6, c7, c8, c9, c10, c11, c12, c13]
+ Projection: aggregate_test_100.c1, aggregate_test_100.c2,
aggregate_test_100.c9, SUM(aggregate_test_100.c9) PARTITION BY
[aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9
ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING
AND UNBOUNDED FOLLOWING, SUM(aggregate_test_100.c9) PARTITION BY
[aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING, SUM [...]
+ WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY
[aggregate_test_100.c2, aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9
ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING
AND UNBOUNDED FOLLOWING]]
+ WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY
[aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
+ WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY
[aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9
ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING
AND UNBOUNDED FOLLOWING]]
+ TableScan: aggregate_test_100 projection=[c1, c2, c8, c9]
physical_plan
GlobalLimitExec: skip=0, fetch=5
SortExec: fetch=5, expr=[c9@0 ASC NULLS LAST]
- ProjectionExec: expr=[c9@8 as c9, SUM(aggregate_test_100.c9) PARTITION BY
[aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@14 as sum1,
SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c2,
aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 5 FOLLOWING@16 as sum2, SUM(aggregate_test_100.c9)
PARTITION BY [aggregate_test_100.c1, aggregate_test [...]
+ ProjectionExec: expr=[c9@2 as c9, SUM(aggregate_test_100.c9) PARTITION BY
[aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@4 as sum1,
SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c2,
aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 5 FOLLOWING@6 as sum2, SUM(aggregate_test_100.c9)
PARTITION BY [aggregate_test_100.c1, aggregate_test_1 [...]
BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name:
"SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
- WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name:
"SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(NULL)) }]
- BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field {
name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id:
0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
- WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name:
"SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(NULL)) }]
- SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST,c9@8 ASC
NULLS LAST,c8@7 ASC NULLS LAST]
- CsvExec: files={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true,
limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+ ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c9@3 as c9,
SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1,
aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST,
aggregate_test_100.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED
FOLLOWING@4 as SUM(aggregate_test_100.c9), SUM(aggregate_test_100.c9) PARTITION
BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLO
[...]
+ WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name:
"SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(NULL)) }]
+ BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field {
name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id:
0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
+ WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name:
"SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(NULL)) }]
+ SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST,c9@3
ASC NULLS LAST,c8@2 ASC NULLS LAST]
+ CsvExec: files={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true,
limit=None, projection=[c1, c2, c8, c9]
query IIIII
@@ -2108,23 +2120,28 @@ logical_plan
Projection: t1.c9, SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER BY [t1.c9 ASC
NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sum1, SUM(t1.c9)
PARTITION BY [t1.c2, t1.c1_alias] ORDER BY [t1.c9 ASC NULLS LAST] ROWS BETWEEN
1 PRECEDING AND 5 FOLLOWING AS sum2, SUM(t1.c9) PARTITION BY [t1.c1, t1.c2]
ORDER BY [t1.c9 ASC NULLS LAST, t1.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING
AND UNBOUNDED FOLLOWING AS sum3, SUM(t1.c9) PARTITION BY [t1.c2, t1.c1_alias]
ORDER BY [t1.c9 ASC NULLS LAS [...]
Limit: skip=0, fetch=5
WindowAggr: windowExpr=[[SUM(t1.c9) PARTITION BY [t1.c2, t1.c1_alias]
ORDER BY [t1.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
- WindowAggr: windowExpr=[[SUM(t1.c9) PARTITION BY [t1.c2, t1.c1_alias]
ORDER BY [t1.c9 ASC NULLS LAST, t1.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING
AND UNBOUNDED FOLLOWING]]
- WindowAggr: windowExpr=[[SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER
BY [t1.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
- WindowAggr: windowExpr=[[SUM(t1.c9) PARTITION BY [t1.c1, t1.c2]
ORDER BY [t1.c9 ASC NULLS LAST, t1.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING
AND UNBOUNDED FOLLOWING]]
- SubqueryAlias: t1
- Sort: aggregate_test_100.c9 ASC NULLS LAST
- Projection: aggregate_test_100.c1, aggregate_test_100.c2,
aggregate_test_100.c8, aggregate_test_100.c9, aggregate_test_100.c1 AS c1_alias
- TableScan: aggregate_test_100 projection=[c1, c2, c8, c9]
+ Projection: t1.c2, t1.c9, t1.c1_alias, SUM(t1.c9) PARTITION BY [t1.c1,
t1.c2] ORDER BY [t1.c9 ASC NULLS LAST, t1.c8 ASC NULLS LAST] ROWS BETWEEN 1
PRECEDING AND UNBOUNDED FOLLOWING, SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER
BY [t1.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING, SUM(t1.c9)
PARTITION BY [t1.c2, t1.c1_alias] ORDER BY [t1.c9 ASC NULLS LAST, t1.c8 ASC
NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING
+ WindowAggr: windowExpr=[[SUM(t1.c9) PARTITION BY [t1.c2, t1.c1_alias]
ORDER BY [t1.c9 ASC NULLS LAST, t1.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING
AND UNBOUNDED FOLLOWING]]
+ Projection: t1.c2, t1.c8, t1.c9, t1.c1_alias, SUM(t1.c9) PARTITION
BY [t1.c1, t1.c2] ORDER BY [t1.c9 ASC NULLS LAST, t1.c8 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING, SUM(t1.c9) PARTITION BY [t1.c1,
t1.c2] ORDER BY [t1.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING
+ WindowAggr: windowExpr=[[SUM(t1.c9) PARTITION BY [t1.c1, t1.c2]
ORDER BY [t1.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
+ WindowAggr: windowExpr=[[SUM(t1.c9) PARTITION BY [t1.c1, t1.c2]
ORDER BY [t1.c9 ASC NULLS LAST, t1.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING
AND UNBOUNDED FOLLOWING]]
+ SubqueryAlias: t1
+ Sort: aggregate_test_100.c9 ASC NULLS LAST
+ Projection: aggregate_test_100.c1, aggregate_test_100.c2,
aggregate_test_100.c8, aggregate_test_100.c9, aggregate_test_100.c1 AS c1_alias
+ TableScan: aggregate_test_100 projection=[c1, c2, c8, c9]
physical_plan
-ProjectionExec: expr=[c9@3 as c9, SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER
BY [t1.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@6 as sum1,
SUM(t1.c9) PARTITION BY [t1.c2, t1.c1_alias] ORDER BY [t1.c9 ASC NULLS LAST]
ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@8 as sum2, SUM(t1.c9) PARTITION BY
[t1.c1, t1.c2] ORDER BY [t1.c9 ASC NULLS LAST, t1.c8 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING@5 as sum3, SUM(t1.c9) PARTITION BY
[t1.c2, t1.c1_alias] ORDER BY [...]
+ProjectionExec: expr=[c9@1 as c9, SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER
BY [t1.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@4 as sum1,
SUM(t1.c9) PARTITION BY [t1.c2, t1.c1_alias] ORDER BY [t1.c9 ASC NULLS LAST]
ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@6 as sum2, SUM(t1.c9) PARTITION BY
[t1.c1, t1.c2] ORDER BY [t1.c9 ASC NULLS LAST, t1.c8 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING@3 as sum3, SUM(t1.c9) PARTITION BY
[t1.c2, t1.c1_alias] ORDER BY [...]
GlobalLimitExec: skip=0, fetch=5
BoundedWindowAggExec: wdw=[SUM(t1.c9): Ok(Field { name: "SUM(t1.c9)",
data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false,
metadata: {} }), frame: WindowFrame { units: Rows, start_bound:
Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
- WindowAggExec: wdw=[SUM(t1.c9): Ok(Field { name: "SUM(t1.c9)",
data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false,
metadata: {} }), frame: WindowFrame { units: Rows, start_bound:
Preceding(UInt64(1)), end_bound: Following(UInt64(NULL)) }]
- BoundedWindowAggExec: wdw=[SUM(t1.c9): Ok(Field { name: "SUM(t1.c9)",
data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false,
metadata: {} }), frame: WindowFrame { units: Rows, start_bound:
Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
- WindowAggExec: wdw=[SUM(t1.c9): Ok(Field { name: "SUM(t1.c9)",
data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false,
metadata: {} }), frame: WindowFrame { units: Rows, start_bound:
Preceding(UInt64(1)), end_bound: Following(UInt64(NULL)) }]
- SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST,c9@3 ASC
NULLS LAST,c8@2 ASC NULLS LAST]
- ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c8@2 as c8, c9@3
as c9, c1@0 as c1_alias]
- CsvExec: files={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true,
limit=None, projection=[c1, c2, c8, c9]
+ ProjectionExec: expr=[c2@0 as c2, c9@2 as c9, c1_alias@3 as c1_alias,
SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER BY [t1.c9 ASC NULLS LAST, t1.c8
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING@4 as
SUM(t1.c9), SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER BY [t1.c9 ASC NULLS
LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@5 as SUM(t1.c9), SUM(t1.c9)
PARTITION BY [t1.c2, t1.c1_alias] ORDER BY [t1.c9 ASC NULLS LAST, t1.c8 ASC
NULLS LAST] ROWS BETWEEN 1 PRECEDING AND [...]
+ WindowAggExec: wdw=[SUM(t1.c9): Ok(Field { name: "SUM(t1.c9)",
data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false,
metadata: {} }), frame: WindowFrame { units: Rows, start_bound:
Preceding(UInt64(1)), end_bound: Following(UInt64(NULL)) }]
+ SortExec: expr=[c2@0 ASC NULLS LAST,c1_alias@3 ASC NULLS LAST,c9@2
ASC NULLS LAST,c8@1 ASC NULLS LAST]
+ ProjectionExec: expr=[c2@1 as c2, c8@2 as c8, c9@3 as c9,
c1_alias@4 as c1_alias, SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER BY [t1.c9
ASC NULLS LAST, t1.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED
FOLLOWING@5 as SUM(t1.c9), SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER BY
[t1.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@6 as SUM(t1.c9)]
+ BoundedWindowAggExec: wdw=[SUM(t1.c9): Ok(Field { name:
"SUM(t1.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered:
false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound:
Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
+ WindowAggExec: wdw=[SUM(t1.c9): Ok(Field { name: "SUM(t1.c9)",
data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false,
metadata: {} }), frame: WindowFrame { units: Rows, start_bound:
Preceding(UInt64(1)), end_bound: Following(UInt64(NULL)) }]
+ SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST,c9@3
ASC NULLS LAST,c8@2 ASC NULLS LAST]
+ ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c8@2 as c8,
c9@3 as c9, c1@0 as c1_alias]
+ CsvExec: files={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true,
limit=None, projection=[c1, c2, c8, c9]
query IIIII
@@ -2157,17 +2174,19 @@ Projection: sum1, sum2
Sort: aggregate_test_100.c9 ASC NULLS LAST, fetch=5
Projection: SUM(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c1
ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] GROUPS BETWEEN 1
PRECEDING AND 1 FOLLOWING AS sum1, SUM(aggregate_test_100.c12) ORDER BY
[aggregate_test_100.c1 ASC NULLS LAST] GROUPS BETWEEN 5 PRECEDING AND 3
PRECEDING AS sum2, aggregate_test_100.c9
WindowAggr: windowExpr=[[SUM(aggregate_test_100.c12) ORDER BY
[aggregate_test_100.c1 ASC NULLS LAST] GROUPS BETWEEN 5 PRECEDING AND 3
PRECEDING]]
- WindowAggr: windowExpr=[[SUM(aggregate_test_100.c12) ORDER BY
[aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST]
GROUPS BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
- TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6,
c7, c8, c9, c10, c11, c12, c13]
+ Projection: aggregate_test_100.c1, aggregate_test_100.c9,
aggregate_test_100.c12, SUM(aggregate_test_100.c12) ORDER BY
[aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST]
GROUPS BETWEEN 1 PRECEDING AND 1 FOLLOWING
+ WindowAggr: windowExpr=[[SUM(aggregate_test_100.c12) ORDER BY
[aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST]
GROUPS BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
+ TableScan: aggregate_test_100 projection=[c1, c2, c9, c12]
physical_plan
ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2]
GlobalLimitExec: skip=0, fetch=5
SortExec: fetch=5, expr=[c9@2 ASC NULLS LAST]
- ProjectionExec: expr=[SUM(aggregate_test_100.c12) ORDER BY
[aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST]
GROUPS BETWEEN 1 PRECEDING AND 1 FOLLOWING@13 as sum1,
SUM(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST]
GROUPS BETWEEN 5 PRECEDING AND 3 PRECEDING@14 as sum2, c9@8 as c9]
+ ProjectionExec: expr=[SUM(aggregate_test_100.c12) ORDER BY
[aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST]
GROUPS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as sum1,
SUM(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST]
GROUPS BETWEEN 5 PRECEDING AND 3 PRECEDING@4 as sum2, c9@1 as c9]
BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c12): Ok(Field {
name: "SUM(aggregate_test_100.c12)", data_type: Float64, nullable: true,
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Groups, start_bound: Preceding(UInt64(5)), end_bound:
Preceding(UInt64(3)) }]
- BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c12): Ok(Field {
name: "SUM(aggregate_test_100.c12)", data_type: Float64, nullable: true,
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Groups, start_bound: Preceding(UInt64(1)), end_bound:
Following(UInt64(1)) }]
- SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]
- CsvExec: files={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true,
limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+ ProjectionExec: expr=[c1@0 as c1, c9@2 as c9, c12@3 as c12,
SUM(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST,
aggregate_test_100.c2 ASC NULLS LAST] GROUPS BETWEEN 1 PRECEDING AND 1
FOLLOWING@4 as SUM(aggregate_test_100.c12)]
+ BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c12): Ok(Field {
name: "SUM(aggregate_test_100.c12)", data_type: Float64, nullable: true,
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Groups, start_bound: Preceding(UInt64(1)), end_bound:
Following(UInt64(1)) }]
+ SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]
+ CsvExec: files={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true,
limit=None, projection=[c1, c2, c9, c12]
query RR
SELECT SUM(c12) OVER(ORDER BY c1, c2 GROUPS BETWEEN 1 PRECEDING AND 1
FOLLOWING) as sum1,
diff --git a/datafusion/optimizer/src/push_down_projection.rs
b/datafusion/optimizer/src/push_down_projection.rs
index 97ba5a92d7..5c6f10825a 100644
--- a/datafusion/optimizer/src/push_down_projection.rs
+++ b/datafusion/optimizer/src/push_down_projection.rs
@@ -312,11 +312,28 @@ impl OptimizerRule for PushDownProjection {
if new_window_expr.is_empty() {
// none columns in window expr are needed, remove the
window expr
- let new_window = window.input.as_ref().clone();
+ let input = window.input.clone();
+ let new_window = restrict_outputs(input.clone(),
&required_columns)?
+ .unwrap_or((*input).clone());
generate_plan!(projection_is_empty, plan, new_window)
} else {
- let new_window =
LogicalPlanBuilder::from((*(window.input)).clone())
+ let mut referenced_inputs = HashSet::new();
+ exprlist_to_columns(&new_window_expr, &mut
referenced_inputs)?;
+ window
+ .input
+ .schema()
+ .fields()
+ .iter()
+ .filter(|f|
required_columns.contains(&f.qualified_column()))
+ .for_each(|f| {
+ referenced_inputs.insert(f.qualified_column());
+ });
+
+ let input = window.input.clone();
+ let new_input = restrict_outputs(input.clone(),
&referenced_inputs)?
+ .unwrap_or((*input).clone());
+ let new_window = LogicalPlanBuilder::from(new_input)
.window(new_window_expr)?
.build()?;
@@ -553,6 +570,21 @@ fn push_down_scan(
}))
}
+fn restrict_outputs(
+ plan: Arc<LogicalPlan>,
+ permitted_outputs: &HashSet<Column>,
+) -> Result<Option<LogicalPlan>> {
+ let schema = plan.schema();
+ if permitted_outputs.len() == schema.fields().len() {
+ return Ok(None);
+ }
+ Ok(Some(generate_projection(
+ permitted_outputs,
+ schema,
+ plan.clone(),
+ )?))
+}
+
#[cfg(test)]
mod tests {
use super::*;
@@ -564,12 +596,15 @@ mod tests {
use datafusion_common::DFSchema;
use datafusion_expr::expr;
use datafusion_expr::expr::Cast;
+ use datafusion_expr::WindowFrame;
+ use datafusion_expr::WindowFunction;
use datafusion_expr::{
col, count, lit,
logical_plan::{builder::LogicalPlanBuilder, table_scan, JoinType},
max, min, AggregateFunction, Expr,
};
use std::collections::HashMap;
+ use std::vec;
#[test]
fn aggregate_no_group_by() -> Result<()> {
@@ -1053,6 +1088,43 @@ mod tests {
assert_optimized_plan_eq(&plan, expected)
}
+ #[test]
+ fn test_window() -> Result<()> {
+ let table_scan = test_table_scan()?;
+
+ let max1 = Expr::WindowFunction(expr::WindowFunction::new(
+ WindowFunction::AggregateFunction(AggregateFunction::Max),
+ vec![col("test.a")],
+ vec![col("test.b")],
+ vec![],
+ WindowFrame::new(false),
+ ));
+
+ let max2 = Expr::WindowFunction(expr::WindowFunction::new(
+ WindowFunction::AggregateFunction(AggregateFunction::Max),
+ vec![col("test.b")],
+ vec![],
+ vec![],
+ WindowFrame::new(false),
+ ));
+ let col1 = col(max1.display_name()?);
+ let col2 = col(max2.display_name()?);
+
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .window(vec![max1])?
+ .window(vec![max2])?
+ .project(vec![col1, col2])?
+ .build()?;
+
+ let expected = "Projection: MAX(test.a) PARTITION BY [test.b] ROWS
BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING, MAX(test.b) ROWS BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING\
+ \n WindowAggr: windowExpr=[[MAX(test.b) ROWS BETWEEN UNBOUNDED
PRECEDING AND UNBOUNDED FOLLOWING]]\
+ \n Projection: test.b, MAX(test.a) PARTITION BY [test.b] ROWS
BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING\
+ \n WindowAggr: windowExpr=[[MAX(test.a) PARTITION BY [test.b]
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]\
+ \n TableScan: test projection=[a, b]";
+
+ assert_optimized_plan_eq(&plan, expected)
+ }
+
fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) ->
Result<()> {
let optimized_plan = optimize(plan).expect("failed to optimize plan");
let formatted_plan = format!("{optimized_plan:?}");