This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new a57e270088 Cache common referred expression at the window input (#9009)
a57e270088 is described below

commit a57e2700881dfdf2a5eb81d403860d3bcb824997
Author: Mustafa Akur <[email protected]>
AuthorDate: Mon Jan 29 15:25:28 2024 +0300

    Cache common referred expression at the window input (#9009)
    
    * Initial commit
    
    * Update test
    
    * Minor changes
    
    * Tmp
    
    * Retract some changes
    
    * Add lias to window
    
    * Fix name change issue
    
    * Minor changes
    
    * Minor changes
    
    * Un comment new rule
    
    * Open up new rules
    
    * Minor changes
    
    * Change test
    
    * remove prints
    
    * Update slt tests
    
    * Remove leftover code
    
    * Resolve linter errors
    
    * Minor changes
    
    * Remove group window rule
    
    * Remove unnecessary changes
    
    * Minor changes
    
    * Update datafusion/optimizer/src/common_subexpr_eliminate.rs
    
    Co-authored-by: Huaijin <[email protected]>
    
    * Update comment, add new test
    
    ---------
    
    Co-authored-by: Huaijin <[email protected]>
---
 .../optimizer/src/common_subexpr_eliminate.rs      |  79 +++++--
 datafusion/sqllogictest/test_files/window.slt      | 259 ++++++++++++---------
 2 files changed, 211 insertions(+), 127 deletions(-)

diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs 
b/datafusion/optimizer/src/common_subexpr_eliminate.rs
index fe71171ce5..ae720bc689 100644
--- a/datafusion/optimizer/src/common_subexpr_eliminate.rs
+++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs
@@ -166,25 +166,74 @@ impl CommonSubexprEliminate {
         window: &Window,
         config: &dyn OptimizerConfig,
     ) -> Result<LogicalPlan> {
-        let Window {
-            input,
-            window_expr,
-            schema,
-        } = window;
+        let mut window_exprs = vec![];
+        let mut arrays_per_window = vec![];
         let mut expr_set = ExprSet::new();
 
-        let input_schema = Arc::clone(input.schema());
-        let arrays =
-            to_arrays(window_expr, input_schema, &mut expr_set, 
ExprMask::Normal)?;
+        // Get all window expressions inside the consecutive window operators.
+        // Consecutive window expressions may refer to same complex expression.
+        // If same complex expression is referred more than once by subsequent 
`WindowAggr`s,
+        // we can cache complex expression by evaluating it with a projection 
before the
+        // first WindowAggr.
+        // This enables us to cache complex expression "c3+c4" for following 
plan:
+        // WindowAggr: windowExpr=[[SUM(c9) ORDER BY [c3 + c4] RANGE BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW]]
+        // --WindowAggr: windowExpr=[[SUM(c9) ORDER BY [c3 + c4] RANGE BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW]]
+        // where, it is referred once by each `WindowAggr` (total of 2) in the 
plan.
+        let mut plan = LogicalPlan::Window(window.clone());
+        while let LogicalPlan::Window(window) = plan {
+            let Window {
+                input, window_expr, ..
+            } = window;
+            plan = input.as_ref().clone();
+
+            let input_schema = Arc::clone(input.schema());
+            let arrays =
+                to_arrays(&window_expr, input_schema, &mut expr_set, 
ExprMask::Normal)?;
+
+            window_exprs.push(window_expr);
+            arrays_per_window.push(arrays);
+        }
 
-        let (mut new_expr, new_input) =
-            self.rewrite_expr(&[window_expr], &[&arrays], input, &expr_set, 
config)?;
+        let mut window_exprs = window_exprs
+            .iter()
+            .map(|expr| expr.as_slice())
+            .collect::<Vec<_>>();
+        let arrays_per_window = arrays_per_window
+            .iter()
+            .map(|arrays| arrays.as_slice())
+            .collect::<Vec<_>>();
 
-        Ok(LogicalPlan::Window(Window {
-            input: Arc::new(new_input),
-            window_expr: pop_expr(&mut new_expr)?,
-            schema: schema.clone(),
-        }))
+        assert_eq!(window_exprs.len(), arrays_per_window.len());
+        let (mut new_expr, new_input) = self.rewrite_expr(
+            &window_exprs,
+            &arrays_per_window,
+            &plan,
+            &expr_set,
+            config,
+        )?;
+        assert_eq!(window_exprs.len(), new_expr.len());
+
+        // Construct consecutive window operator, with their corresponding new 
window expressions.
+        plan = new_input;
+        while let Some(new_window_expr) = new_expr.pop() {
+            // Since `new_expr` and `window_exprs` length are same. We can 
safely `.unwrap` here.
+            let orig_window_expr = window_exprs.pop().unwrap();
+            assert_eq!(new_window_expr.len(), orig_window_expr.len());
+
+            // Rename new re-written window expressions with original name (by 
giving alias)
+            // Otherwise we may receive schema error, in subsequent operators.
+            let new_window_expr = new_window_expr
+                .into_iter()
+                .zip(orig_window_expr.iter())
+                .map(|(new_window_expr, window_expr)| {
+                    let original_name = window_expr.name_for_alias()?;
+                    new_window_expr.alias_if_changed(original_name)
+                })
+                .collect::<Result<Vec<_>>>()?;
+            plan = LogicalPlan::Window(Window::try_new(new_window_expr, 
Arc::new(plan))?);
+        }
+
+        Ok(plan)
     }
 
     fn try_optimize_aggregate(
diff --git a/datafusion/sqllogictest/test_files/window.slt 
b/datafusion/sqllogictest/test_files/window.slt
index aec2fed738..afdd4a9b48 100644
--- a/datafusion/sqllogictest/test_files/window.slt
+++ b/datafusion/sqllogictest/test_files/window.slt
@@ -1687,18 +1687,22 @@ EXPLAIN SELECT c3,
 logical_plan
 Projection: aggregate_test_100.c3, SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, 
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum1, 
SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + 
aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum2
 --Limit: skip=0, fetch=5
-----WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, 
aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW]]
-------Projection: aggregate_test_100.c3, aggregate_test_100.c4, 
aggregate_test_100.c9, SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, 
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
---------WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, 
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
-----------TableScan: aggregate_test_100 projection=[c2, c3, c4, c9]
+----WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c3 + 
aggregate_test_100.c4aggregate_test_100.c4aggregate_test_100.c3 AS 
aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, 
aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW AS SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + 
aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+------Projection: aggregate_test_100.c3 + 
aggregate_test_100.c4aggregate_test_100.c4aggregate_test_100.c3, 
aggregate_test_100.c3, aggregate_test_100.c9, SUM(aggregate_test_100.c9) ORDER 
BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, 
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+--------WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c3 + 
aggregate_test_100.c4aggregate_test_100.c4aggregate_test_100.c3 AS 
aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, 
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS SUM(aggregate_test_100.c9) 
ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, 
aggregate_test_100.c9 DESC NULLS FI [...]
+----------Projection: aggregate_test_100.c3 + aggregate_test_100.c4 AS 
aggregate_test_100.c3 + 
aggregate_test_100.c4aggregate_test_100.c4aggregate_test_100.c3, 
aggregate_test_100.c2, aggregate_test_100.c3, aggregate_test_100.c9
+------------TableScan: aggregate_test_100 projection=[c2, c3, c4, c9]
 physical_plan
-ProjectionExec: expr=[c3@0 as c3, SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, 
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as sum1, 
SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + 
aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as sum2]
+ProjectionExec: expr=[c3@1 as c3, SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, 
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as sum1, 
SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + 
aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as sum2]
 --GlobalLimitExec: skip=0, fetch=5
 ----WindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, 
aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW: Ok(Field { name: "SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, 
aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: 
false, me [...]
-------ProjectionExec: expr=[c3@1 as c3, c4@2 as c4, c9@3 as c9, 
SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + 
aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, 
aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW@4 as SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + 
aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, 
aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN U [...]
+------ProjectionExec: expr=[aggregate_test_100.c3 + 
aggregate_test_100.c4aggregate_test_100.c4aggregate_test_100.c3@0 as 
aggregate_test_100.c3 + 
aggregate_test_100.c4aggregate_test_100.c4aggregate_test_100.c3, c3@2 as c3, 
c9@3 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + 
aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, 
aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW@4 as SUM(aggregate_test_100.c [...]
 --------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, 
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: 
"SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + 
aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, 
aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECED [...]
-----------SortExec: expr=[c3@1 + c4@2 DESC,c9@3 DESC,c2@0 ASC NULLS LAST]
-------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, 
c3, c4, c9], has_header=true
+----------SortPreservingMergeExec: [aggregate_test_100.c3 + 
aggregate_test_100.c4aggregate_test_100.c4aggregate_test_100.c3@0 DESC,c9@3 
DESC,c2@1 ASC NULLS LAST]
+------------SortExec: expr=[aggregate_test_100.c3 + 
aggregate_test_100.c4aggregate_test_100.c4aggregate_test_100.c3@0 DESC,c9@3 
DESC,c2@1 ASC NULLS LAST]
+--------------ProjectionExec: expr=[c3@1 + c4@2 as aggregate_test_100.c3 + 
aggregate_test_100.c4aggregate_test_100.c4aggregate_test_100.c3, c2@0 as c2, 
c3@1 as c3, c9@3 as c9]
+----------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
+------------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, 
c3, c4, c9], has_header=true
 
 
 query III
@@ -2540,25 +2544,23 @@ Projection: sum1, sum2, sum3, min1, min2, min3, max1, 
max2, max3, cnt1, cnt2, su
 --Limit: skip=0, fetch=5
 ----Sort: annotated_data_finite.inc_col DESC NULLS FIRST, fetch=5
 ------Projection: SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 
FOLLOWING AS sum1, SUM(annotated_data_finite.desc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 
FOLLOWING AS sum2, SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 
FOLLOWING AS sum3, MIN(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_f [...]
---------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_finite.desc_col AS 
Int64)) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) ROWS BETWEEN 
8 PRECEDING AND 1 FOLLOWING AS COUNT(*) ROWS BETWEEN 8 PRECEDING AND 1 
FOLLOWING]]
-----------Projection: annotated_data_finite.inc_col, 
annotated_data_finite.desc_col, SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 
FOLLOWING, SUM(annotated_data_finite.desc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 
FOLLOWING, SUM(annotated_data_finite.desc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING, MIN(annotated [...]
-------------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_finite.inc_col AS 
Int64)annotated_data_finite.inc_col AS annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 
FOLLOWING, SUM(CAST(annotated_data_finite.desc_col AS Int64)) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 
FOLLOWING, SUM(CAST(annotated_data_finite.inc_col AS 
Int64)annotated_data_finite.inc_col AS annotated_data_finite. [...]
---------------Projection: CAST(annotated_data_finite.inc_col AS Int64) AS 
CAST(annotated_data_finite.inc_col AS Int64)annotated_data_finite.inc_col, 
annotated_data_finite.ts, annotated_data_finite.inc_col, 
annotated_data_finite.desc_col, SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 
FOLLOWING, SUM(annotated_data_finite.desc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 FO 
[...]
-----------------WindowAggr: 
windowExpr=[[SUM(CAST(annotated_data_finite.inc_col AS Int64)) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 
FOLLOWING, SUM(CAST(annotated_data_finite.desc_col AS 
Int64)annotated_data_finite.desc_col AS annotated_data_finite.desc_col) ORDER 
BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 
FOLLOWING, SUM(CAST(annotated_data_finite.desc_col AS 
Int64)annotated_data_finite.desc_col AS annotated_d [...]
-------------------Projection: CAST(annotated_data_finite.desc_col AS Int64) AS 
CAST(annotated_data_finite.desc_col AS Int64)annotated_data_finite.desc_col, 
annotated_data_finite.ts, annotated_data_finite.inc_col, 
annotated_data_finite.desc_col
---------------------TableScan: annotated_data_finite projection=[ts, inc_col, 
desc_col]
+--------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_finite.desc_col AS 
Int64)annotated_data_finite.desc_col AS annotated_data_finite.desc_col) ROWS 
BETWEEN 8 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) ROWS BETWEEN 8 PRECEDING 
AND 1 FOLLOWING AS COUNT(*) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING]]
+----------Projection: CAST(annotated_data_finite.desc_col AS 
Int64)annotated_data_finite.desc_col, annotated_data_finite.inc_col, 
SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC 
NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING, 
SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC 
NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING, 
SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC 
NULLS FIRST] ROWS BETWE [...]
+------------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_finite.inc_col AS 
Int64)annotated_data_finite.inc_col AS annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 
FOLLOWING, SUM(CAST(annotated_data_finite.desc_col AS 
Int64)annotated_data_finite.desc_col AS annotated_data_finite.desc_col) ORDER 
BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 
FOLLOWING, SUM(CAST(annotated_data_finite.inc_col  [...]
+--------------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_finite.inc_col 
AS Int64)annotated_data_finite.inc_col AS annotated_data_finite.inc_col) ORDER 
BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 
FOLLOWING, SUM(CAST(annotated_data_finite.desc_col AS 
Int64)annotated_data_finite.desc_col AS annotated_data_finite.desc_col) ORDER 
BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 
FOLLOWING, SUM(CAST(annotated_data_finite.des [...]
+----------------Projection: CAST(annotated_data_finite.desc_col AS Int64) AS 
CAST(annotated_data_finite.desc_col AS Int64)annotated_data_finite.desc_col, 
CAST(annotated_data_finite.inc_col AS Int64) AS 
CAST(annotated_data_finite.inc_col AS Int64)annotated_data_finite.inc_col, 
annotated_data_finite.ts, annotated_data_finite.inc_col, 
annotated_data_finite.desc_col
+------------------TableScan: annotated_data_finite projection=[ts, inc_col, 
desc_col]
 physical_plan
 ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, sum3@2 as sum3, min1@3 
as min1, min2@4 as min2, min3@5 as min3, max1@6 as max1, max2@7 as max2, max3@8 
as max3, cnt1@9 as cnt1, cnt2@10 as cnt2, sumr1@11 as sumr1, sumr2@12 as sumr2, 
sumr3@13 as sumr3, minr1@14 as minr1, minr2@15 as minr2, minr3@16 as minr3, 
maxr1@17 as maxr1, maxr2@18 as maxr2, maxr3@19 as maxr3, cntr1@20 as cntr1, 
cntr2@21 as cntr2, sum4@22 as sum4, cnt3@23 as cnt3]
 --GlobalLimitExec: skip=0, fetch=5
 ----SortExec: TopK(fetch=5), expr=[inc_col@24 DESC]
-------ProjectionExec: expr=[SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 
FOLLOWING@13 as sum1, SUM(annotated_data_finite.desc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 
FOLLOWING@14 as sum2, SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 
FOLLOWING@15 as sum3, MIN(annotated_data_finite.inc_col) ORDER B [...]
+------ProjectionExec: expr=[SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 
FOLLOWING@13 as sum1, SUM(annotated_data_finite.desc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 
FOLLOWING@14 as sum2, SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 
FOLLOWING@15 as sum3, MIN(annotated_data_finite.inc_col) ORDER B [...]
 --------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite.desc_col) ROWS 
BETWEEN 8 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"SUM(annotated_data_finite.desc_col) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING", 
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(8)), 
end_bound: Following(UInt64(1)) }, COUNT(*) ROWS BETWEEN 8 PRECEDING AND 1 
FOLLOWING: Ok(Field { name: "COUNT(*) ROWS BETWEEN  [...]
-----------ProjectionExec: expr=[inc_col@2 as inc_col, desc_col@3 as desc_col, 
SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC 
NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING@4 as 
SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC 
NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING, 
SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC 
NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING@5 as SUM(annotate [...]
+----------ProjectionExec: expr=[CAST(annotated_data_finite.desc_col AS 
Int64)annotated_data_finite.desc_col@0 as CAST(annotated_data_finite.desc_col 
AS Int64)annotated_data_finite.desc_col, inc_col@3 as inc_col, 
SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC 
NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING@5 as 
SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC 
NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING, SUM(annotated_ [...]
 ------------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite.inc_col) 
ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING 
AND 1 FOLLOWING: Ok(Field { name: "SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(Int32(10)), end_bound: Follow [...]
---------------ProjectionExec: expr=[CAST(inc_col@2 AS Int64) as 
CAST(annotated_data_finite.inc_col AS Int64)annotated_data_finite.inc_col, ts@1 
as ts, inc_col@2 as inc_col, desc_col@3 as desc_col, 
SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC 
NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING@4 as 
SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC 
NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING, 
SUM(annotated_data_finite.des [...]
-----------------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite.inc_col) 
ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING 
AND 4 FOLLOWING: Ok(Field { name: "SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(Int32(4)), end_bound: F [...]
-------------------ProjectionExec: expr=[CAST(desc_col@2 AS Int64) as 
CAST(annotated_data_finite.desc_col AS Int64)annotated_data_finite.desc_col, 
ts@0 as ts, inc_col@1 as inc_col, desc_col@2 as desc_col]
---------------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts, 
inc_col, desc_col], output_ordering=[ts@0 ASC NULLS LAST], has_header=true
+--------------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite.inc_col) 
ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING 
AND 4 FOLLOWING: Ok(Field { name: "SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(Int32(4)), end_bound: Fol [...]
+----------------ProjectionExec: expr=[CAST(desc_col@2 AS Int64) as 
CAST(annotated_data_finite.desc_col AS Int64)annotated_data_finite.desc_col, 
CAST(inc_col@1 AS Int64) as CAST(annotated_data_finite.inc_col AS 
Int64)annotated_data_finite.inc_col, ts@0 as ts, inc_col@1 as inc_col, 
desc_col@2 as desc_col]
+------------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts, 
inc_col, desc_col], output_ordering=[ts@0 ASC NULLS LAST], has_header=true
 
 query IIIIIIIIIIIIIIIIIIIIIIII
 SELECT
@@ -2706,17 +2708,19 @@ Projection: sum1, sum2, min1, min2, max1, max2, count1, 
count2, avg1, avg2
 --Limit: skip=0, fetch=5
 ----Sort: annotated_data_finite.inc_col ASC NULLS LAST, fetch=5
 ------Projection: SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
5 FOLLOWING AS sum1, SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING AS sum2, MIN(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
5 FOLLOWING AS min1, MIN(annotated_data_finite.inc_col) OR [...]
---------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_finite.inc_col AS 
Int64)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 
UNBOUNDED PRECEDING AND 5 FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER 
BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND 5 FOLLOWING, MAX(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
5 FOLLOWING, COUNT(annotated_data_finite.inc_c [...]
-----------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_finite.inc_col AS 
Int64)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 
PRECEDING AND UNBOUNDED FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING, MAX(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING, COUNT(annotated_data_fini [...]
-------------TableScan: annotated_data_finite projection=[ts, inc_col]
+--------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_finite.inc_col AS 
Int64)annotated_data_finite.inc_col AS annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
5 FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
5 FOLLOWING, MAX(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDE [...]
+----------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_finite.inc_col AS 
Int64)annotated_data_finite.inc_col AS annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING, MAX(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN  [...]
+------------Projection: CAST(annotated_data_finite.inc_col AS Float64) AS 
CAST(annotated_data_finite.inc_col AS Float64)annotated_data_finite.inc_col, 
CAST(annotated_data_finite.inc_col AS Int64) AS 
CAST(annotated_data_finite.inc_col AS Int64)annotated_data_finite.inc_col, 
annotated_data_finite.ts, annotated_data_finite.inc_col
+--------------TableScan: annotated_data_finite projection=[ts, inc_col]
 physical_plan
 ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, min1@2 as min1, min2@3 
as min2, max1@4 as max1, max2@5 as max2, count1@6 as count1, count2@7 as 
count2, avg1@8 as avg1, avg2@9 as avg2]
 --GlobalLimitExec: skip=0, fetch=5
 ----SortExec: TopK(fetch=5), expr=[inc_col@10 ASC NULLS LAST]
-------ProjectionExec: expr=[SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
5 FOLLOWING@7 as sum1, SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING@2 as sum2, MIN(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
5 FOLLOWING@8 as min1, MIN(annotated_data_fi [...]
+------ProjectionExec: expr=[SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
5 FOLLOWING@9 as sum1, SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING@4 as sum2, MIN(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
5 FOLLOWING@10 as min1, MIN(annotated_data_f [...]
 --------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
5 FOLLOWING: Ok(Field { name: "SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
5 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(Int32(NULL)), end_b [...]
 ----------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite.inc_col) ORDER 
BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING: Ok(Field { name: "SUM(annotated_data_finite.inc_col) ORDER 
BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int32(NULL)), [...]
-------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts, 
inc_col], output_ordering=[ts@0 ASC NULLS LAST], has_header=true
+------------ProjectionExec: expr=[CAST(inc_col@1 AS Float64) as 
CAST(annotated_data_finite.inc_col AS Float64)annotated_data_finite.inc_col, 
CAST(inc_col@1 AS Int64) as CAST(annotated_data_finite.inc_col AS 
Int64)annotated_data_finite.inc_col, ts@0 as ts, inc_col@1 as inc_col]
+--------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts, 
inc_col], output_ordering=[ts@0 ASC NULLS LAST], has_header=true
 
 query IIIIIIIIRR
 SELECT
@@ -2805,17 +2809,18 @@ Projection: sum1, sum2, count1, count2
 --Limit: skip=0, fetch=5
 ----Sort: annotated_data_infinite.ts ASC NULLS LAST, fetch=5
 ------Projection: SUM(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING AS sum1, SUM(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING AS sum2, COUNT(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING AS count1, COUNT(annotated_data_inf [...]
---------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite.inc_col AS 
Int64)) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN 
UNBOUNDED PRECEDING AND 1 FOLLOWING, COUNT(annotated_data_infinite.inc_col) 
ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED 
PRECEDING AND 1 FOLLOWING]]
-----------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite.inc_col AS 
Int64)) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 
PRECEDING AND UNBOUNDED FOLLOWING, COUNT(annotated_data_infinite.inc_col) ORDER 
BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING]]
-------------TableScan: annotated_data_infinite projection=[ts, inc_col]
+--------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite.inc_col AS 
Int64)annotated_data_infinite.inc_col AS annotated_data_infinite.inc_col) ORDER 
BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING, COUNT(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING]]
+----------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite.inc_col AS 
Int64)annotated_data_infinite.inc_col AS annotated_data_infinite.inc_col) ORDER 
BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING, COUNT(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING]]
+------------Projection: CAST(annotated_data_infinite.inc_col AS Int64) AS 
CAST(annotated_data_infinite.inc_col AS Int64)annotated_data_infinite.inc_col, 
annotated_data_infinite.ts, annotated_data_infinite.inc_col
+--------------TableScan: annotated_data_infinite projection=[ts, inc_col]
 physical_plan
 ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, count1@2 as count1, 
count2@3 as count2]
 --GlobalLimitExec: skip=0, fetch=5
-----ProjectionExec: expr=[SUM(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING@4 as sum1, SUM(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING@2 as sum2, COUNT(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING@5 as count1, COUNT(anno [...]
+----ProjectionExec: expr=[SUM(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING@5 as sum1, SUM(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING@3 as sum2, COUNT(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING@6 as count1, COUNT(anno [...]
 ------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING: Ok(Field { name: "SUM(annotated_data_infinite.inc_col) ORDER 
BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(NULL)), e [...]
 --------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite.inc_col) ORDER 
BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING: Ok(Field { name: "SUM(annotated_data_infinite.inc_col) 
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING 
AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(NUL [...]
-----------StreamingTableExec: partition_sizes=1, projection=[ts, inc_col], 
infinite_source=true, output_ordering=[ts@0 ASC NULLS LAST]
-
+----------ProjectionExec: expr=[CAST(inc_col@1 AS Int64) as 
CAST(annotated_data_infinite.inc_col AS Int64)annotated_data_infinite.inc_col, 
ts@0 as ts, inc_col@1 as inc_col]
+------------StreamingTableExec: partition_sizes=1, projection=[ts, inc_col], 
infinite_source=true, output_ordering=[ts@0 ASC NULLS LAST]
 
 query IIII
 SELECT
@@ -2851,16 +2856,18 @@ Projection: sum1, sum2, count1, count2
 --Limit: skip=0, fetch=5
 ----Sort: annotated_data_infinite.ts ASC NULLS LAST, fetch=5
 ------Projection: SUM(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING AS sum1, SUM(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING AS sum2, COUNT(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING AS count1, COUNT(annotated_data_inf [...]
---------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite.inc_col AS 
Int64)) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN 
UNBOUNDED PRECEDING AND 1 FOLLOWING, COUNT(annotated_data_infinite.inc_col) 
ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED 
PRECEDING AND 1 FOLLOWING]]
-----------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite.inc_col AS 
Int64)) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 
PRECEDING AND UNBOUNDED FOLLOWING, COUNT(annotated_data_infinite.inc_col) ORDER 
BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING]]
-------------TableScan: annotated_data_infinite projection=[ts, inc_col]
+--------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite.inc_col AS 
Int64)annotated_data_infinite.inc_col AS annotated_data_infinite.inc_col) ORDER 
BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING, COUNT(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING]]
+----------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite.inc_col AS 
Int64)annotated_data_infinite.inc_col AS annotated_data_infinite.inc_col) ORDER 
BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING, COUNT(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING]]
+------------Projection: CAST(annotated_data_infinite.inc_col AS Int64) AS 
CAST(annotated_data_infinite.inc_col AS Int64)annotated_data_infinite.inc_col, 
annotated_data_infinite.ts, annotated_data_infinite.inc_col
+--------------TableScan: annotated_data_infinite projection=[ts, inc_col]
 physical_plan
 ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, count1@2 as count1, 
count2@3 as count2]
 --GlobalLimitExec: skip=0, fetch=5
-----ProjectionExec: expr=[SUM(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING@4 as sum1, SUM(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING@2 as sum2, COUNT(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING@5 as count1, COUNT(anno [...]
+----ProjectionExec: expr=[SUM(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING@5 as sum1, SUM(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING@3 as sum2, COUNT(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING@6 as count1, COUNT(anno [...]
 ------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING: Ok(Field { name: "SUM(annotated_data_infinite.inc_col) ORDER 
BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(NULL)), e [...]
 --------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite.inc_col) ORDER 
BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING: Ok(Field { name: "SUM(annotated_data_infinite.inc_col) 
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING 
AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(NUL [...]
-----------StreamingTableExec: partition_sizes=1, projection=[ts, inc_col], 
infinite_source=true, output_ordering=[ts@0 ASC NULLS LAST]
+----------ProjectionExec: expr=[CAST(inc_col@1 AS Int64) as 
CAST(annotated_data_infinite.inc_col AS Int64)annotated_data_infinite.inc_col, 
ts@0 as ts, inc_col@1 as inc_col]
+------------StreamingTableExec: partition_sizes=1, projection=[ts, inc_col], 
infinite_source=true, output_ordering=[ts@0 ASC NULLS LAST]
 
 
 query IIII
@@ -2947,34 +2954,24 @@ logical_plan
 Projection: annotated_data_infinite2.a, annotated_data_infinite2.b, 
annotated_data_infinite2.c, SUM(annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC 
NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING AS sum1, 
SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS [...]
 --Limit: skip=0, fetch=5
 ----WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.c AS 
Int64)annotated_data_infinite2.c AS annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS 
LAST, annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC 
NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, 
SUM(CAST(annotated_data_infinite2.c AS Int64)annotated_data_infinite2.c AS 
annotated_data_infinite2.c) PARTITION BY [annotated_data_infin [...]
-------Projection: CAST(annotated_data_infinite2.c AS Int64) AS 
CAST(annotated_data_infinite2.c AS Int64)annotated_data_infinite2.c, 
annotated_data_infinite2.a, annotated_data_infinite2.b, 
annotated_data_infinite2.c, annotated_data_infinite2.d, 
SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_infinite2.c) 
PARTITION BY [annot [...]
---------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.c AS 
Int64)annotated_data_infinite2.c AS annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a, 
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, 
SUM(CAST(annotated_data_infinite2.c AS Int64)annotated_data_infinite2.c AS 
annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.b, 
annotated_data_inf [...]
-----------Projection: CAST(annotated_data_infinite2.c AS Int64) AS 
CAST(annotated_data_infinite2.c AS Int64)annotated_data_infinite2.c, 
annotated_data_infinite2.a, annotated_data_infinite2.b, 
annotated_data_infinite2.c, annotated_data_infinite2.d, 
SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_infinite2.c) 
PARTITION BY [a [...]
-------------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.c AS 
Int64)annotated_data_infinite2.c AS annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING, SUM(CAST(annotated_data_infinite2.c AS 
Int64)annotated_data_infinite2.c AS annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [anno [...]
---------------Projection: CAST(annotated_data_infinite2.c AS Int64) AS 
CAST(annotated_data_infinite2.c AS Int64)annotated_data_infinite2.c, 
annotated_data_infinite2.a, annotated_data_infinite2.b, 
annotated_data_infinite2.c, annotated_data_infinite2.d, 
SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_infinite2.c) 
PARTITION B [...]
-----------------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.c 
AS Int64)annotated_data_infinite2.c AS annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC 
NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, 
SUM(CAST(annotated_data_infinite2.c AS Int64)annotated_data_infinite2.c AS 
annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite [...]
-------------------Projection: CAST(annotated_data_infinite2.c AS Int64) AS 
CAST(annotated_data_infinite2.c AS Int64)annotated_data_infinite2.c, 
annotated_data_infinite2.a, annotated_data_infinite2.b, 
annotated_data_infinite2.c, annotated_data_infinite2.d, 
SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_infinite2.c) 
PARTITI [...]
---------------------WindowAggr: 
windowExpr=[[SUM(CAST(annotated_data_infinite2.c AS 
Int64)annotated_data_infinite2.c AS annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b, 
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, 
SUM(CAST(annotated_data_infinite2.c AS Int64)annotated_data_infinite2.c AS 
annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annota 
[...]
-----------------------Projection: CAST(annotated_data_infinite2.c AS Int64) AS 
CAST(annotated_data_infinite2.c AS Int64)annotated_data_infinite2.c, 
annotated_data_infinite2.a, annotated_data_infinite2.b, 
annotated_data_infinite2.c, annotated_data_infinite2.d, 
SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_infinite2.c) 
PAR [...]
-------------------------WindowAggr: 
windowExpr=[[SUM(CAST(annotated_data_infinite2.c AS 
Int64)annotated_data_infinite2.c AS annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING, SUM(CAST(annotated_data_infinite2.c AS 
Int64)annotated_data_infinite2.c AS annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] OR [...]
---------------------------Projection: CAST(annotated_data_infinite2.c AS 
Int64) AS CAST(annotated_data_infinite2.c AS Int64)annotated_data_infinite2.c, 
annotated_data_infinite2.a, annotated_data_infinite2.b, 
annotated_data_infinite2.c, annotated_data_infinite2.d
-----------------------------TableScan: annotated_data_infinite2 projection=[a, 
b, c, d]
+------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.c AS 
Int64)annotated_data_infinite2.c AS annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a, 
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, 
SUM(CAST(annotated_data_infinite2.c AS Int64)annotated_data_infinite2.c AS 
annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.b, 
annotated_data_infin [...]
+--------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.c AS 
Int64)annotated_data_infinite2.c AS annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING, SUM(CAST(annotated_data_infinite2.c AS 
Int64)annotated_data_infinite2.c AS annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotate 
[...]
+----------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.c AS 
Int64)annotated_data_infinite2.c AS annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC 
NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, 
SUM(CAST(annotated_data_infinite2.c AS Int64)annotated_data_infinite2.c AS 
annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, a [...]
+------------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.c AS 
Int64)annotated_data_infinite2.c AS annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b, 
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, 
SUM(CAST(annotated_data_infinite2.c AS Int64)annotated_data_infinite2.c AS 
annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, 
annotated_data [...]
+--------------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.c AS 
Int64)annotated_data_infinite2.c AS annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING, SUM(CAST(annotated_data_infinite2.c AS 
Int64)annotated_data_infinite2.c AS annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [an [...]
+----------------Projection: CAST(annotated_data_infinite2.c AS Int64) AS 
CAST(annotated_data_infinite2.c AS Int64)annotated_data_infinite2.c, 
annotated_data_infinite2.a, annotated_data_infinite2.b, 
annotated_data_infinite2.c, annotated_data_infinite2.d
+------------------TableScan: annotated_data_infinite2 projection=[a, b, c, d]
 physical_plan
 ProjectionExec: expr=[a@1 as a, b@2 as b, c@3 as c, 
SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS 
LAST, annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING@9 as sum1, SUM(annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC NULL 
[...]
 --GlobalLimitExec: skip=0, fetch=5
 ----BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS 
LAST, annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC 
NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.d] 
ORDER BY [annotated_data_infinite2.a ASC NULLS LAST, annotated_data_infinite2.b 
ASC NULLS LAST, annotated_data_inf [...]
-------ProjectionExec: expr=[CAST(c@3 AS Int64) as 
CAST(annotated_data_infinite2.c AS Int64)annotated_data_infinite2.c, a@1 as a, 
b@2 as b, c@3 as c, d@4 as d, SUM(annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING@5 as SUM(annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infin [...]
---------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c) PARTITION 
BY [annotated_data_infinite2.b, annotated_data_infinite2.a, 
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.b, 
annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1  
[...]
-----------ProjectionExec: expr=[CAST(c@3 AS Int64) as 
CAST(annotated_data_infinite2.c AS Int64)annotated_data_infinite2.c, a@1 as a, 
b@2 as b, c@3 as c, d@4 as d, SUM(annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING@5 as SUM(annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_i [...]
-------------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c) 
PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING: Ok(Field { name: "SUM(annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING", data_type: Int64, nullable: true, dict_i [...]
---------------ProjectionExec: expr=[CAST(c@3 AS Int64) as 
CAST(annotated_data_infinite2.c AS Int64)annotated_data_infinite2.c, a@1 as a, 
b@2 as b, c@3 as c, d@4 as d, SUM(annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING@5 as SUM(annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_da 
[...]
-----------------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c) 
PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC 
NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS 
LAST, annotated_data_infinite2.c ASC NULLS [...]
-------------------ProjectionExec: expr=[CAST(c@3 AS Int64) as 
CAST(annotated_data_infinite2.c AS Int64)annotated_data_infinite2.c, a@1 as a, 
b@2 as b, c@3 as c, d@4 as d, SUM(annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING@5 as SUM(annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotate 
[...]
---------------------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c) 
PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b, 
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.b, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PREC [...]
-----------------------ProjectionExec: expr=[CAST(c@3 AS Int64) as 
CAST(annotated_data_infinite2.c AS Int64)annotated_data_infinite2.c, a@1 as a, 
b@2 as b, c@3 as c, d@4 as d, SUM(annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING@5 as SUM(annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [anno [...]
-------------------------BoundedWindowAggExec: 
wdw=[SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable:  
[...]
---------------------------ProjectionExec: expr=[CAST(c@2 AS Int64) as 
CAST(annotated_data_infinite2.c AS Int64)annotated_data_infinite2.c, a@0 as a, 
b@1 as b, c@2 as c, d@3 as d]
-----------------------------StreamingTableExec: partition_sizes=1, 
projection=[a, b, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST]
+------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a, 
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.b, 
annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FO 
[...]
+--------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c) PARTITION 
BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING: Ok(Field { name: "SUM(annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0 [...]
+----------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c) PARTITION 
BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC 
NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS 
LAST, annotated_data_infinite2.c ASC NULLS LAST] [...]
+------------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c) 
PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b, 
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.b, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AN [...]
+--------------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c) 
PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING: Ok(Field { name: "SUM(annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING", data_type: Int64, nullable: true, dict [...]
+----------------ProjectionExec: expr=[CAST(c@2 AS Int64) as 
CAST(annotated_data_infinite2.c AS Int64)annotated_data_infinite2.c, a@0 as a, 
b@1 as b, c@2 as c, d@3 as d]
+------------------StreamingTableExec: partition_sizes=1, projection=[a, b, c, 
d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS 
LAST, c@2 ASC NULLS LAST]
 
 query IIIIIIIIIIIIIII
 SELECT a, b, c,
@@ -3026,40 +3023,30 @@ Limit: skip=0, fetch=5
 --Sort: annotated_data_finite2.c ASC NULLS LAST, fetch=5
 ----Projection: annotated_data_finite2.a, annotated_data_finite2.b, 
annotated_data_finite2.c, SUM(annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY 
[annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS 
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING AS sum1, 
SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, 
annotated_data_finite2.d] ORDER BY [annotated_data_finite2.b ASC NULLS LAST, 
annotated_dat [...]
 ------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_finite2.c AS 
Int64)annotated_data_finite2.c AS annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.d] ORDER BY [annotated_data_finite2.a ASC NULLS LAST, 
annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS 
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, 
SUM(CAST(annotated_data_finite2.c AS Int64)annotated_data_finite2.c AS 
annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.d] ORDER BY [an 
[...]
---------Projection: CAST(annotated_data_finite2.c AS Int64) AS 
CAST(annotated_data_finite2.c AS Int64)annotated_data_finite2.c, 
annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.c, 
annotated_data_finite2.d, SUM(annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY 
[annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING, SUM(annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.a, a [...]
-----------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_finite2.c AS 
Int64)annotated_data_finite2.c AS annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.b, annotated_data_finite2.a, annotated_data_finite2.d] 
ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 
1 FOLLOWING, SUM(CAST(annotated_data_finite2.c AS 
Int64)annotated_data_finite2.c AS annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.b, annotated_data_finite2.a, annotated_da [...]
-------------Projection: CAST(annotated_data_finite2.c AS Int64) AS 
CAST(annotated_data_finite2.c AS Int64)annotated_data_finite2.c, 
annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.c, 
annotated_data_finite2.d, SUM(annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY 
[annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2. 
[...]
---------------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_finite2.c AS 
Int64)annotated_data_finite2.c AS annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.b, annotated_data_finite2.a] ORDER BY 
[annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING, SUM(CAST(annotated_data_finite2.c AS Int64)annotated_data_finite2.c 
AS annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.b, 
annotated_data_finite2.a] ORDER BY [annotated_data_finite2.c [...]
-----------------Projection: CAST(annotated_data_finite2.c AS Int64) AS 
CAST(annotated_data_finite2.c AS Int64)annotated_data_finite2.c, 
annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.c, 
annotated_data_finite2.d, SUM(annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY 
[annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_fini [...]
-------------------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_finite2.c 
AS Int64)annotated_data_finite2.c AS annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY 
[annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS 
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, 
SUM(CAST(annotated_data_finite2.c AS Int64)annotated_data_finite2.c AS 
annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, 
annotated_data_ [...]
---------------------Projection: CAST(annotated_data_finite2.c AS Int64) AS 
CAST(annotated_data_finite2.c AS Int64)annotated_data_finite2.c, 
annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.c, 
annotated_data_finite2.d, SUM(annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY 
[annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_ [...]
-----------------------WindowAggr: 
windowExpr=[[SUM(CAST(annotated_data_finite2.c AS 
Int64)annotated_data_finite2.c AS annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] 
ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 
1 FOLLOWING, SUM(CAST(annotated_data_finite2.c AS 
Int64)annotated_data_finite2.c AS annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.a, annotated_data_finite2.b,  [...]
-------------------------Projection: CAST(annotated_data_finite2.c AS Int64) AS 
CAST(annotated_data_finite2.c AS Int64)annotated_data_finite2.c, 
annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.c, 
annotated_data_finite2.d, SUM(annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY 
[annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING, SUM(annotated_data_finite2.c) PARTITION BY [annotated_d [...]
---------------------------WindowAggr: 
windowExpr=[[SUM(CAST(annotated_data_finite2.c AS 
Int64)annotated_data_finite2.c AS annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY 
[annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING, SUM(CAST(annotated_data_finite2.c AS Int64)annotated_data_finite2.c 
AS annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, 
annotated_data_finite2.b] ORDER BY [annotated_da [...]
-----------------------------Projection: CAST(annotated_data_finite2.c AS 
Int64) AS CAST(annotated_data_finite2.c AS Int64)annotated_data_finite2.c, 
annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.c, 
annotated_data_finite2.d
-------------------------------TableScan: annotated_data_finite2 projection=[a, 
b, c, d]
+--------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_finite2.c AS 
Int64)annotated_data_finite2.c AS annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.b, annotated_data_finite2.a, annotated_data_finite2.d] 
ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 
1 FOLLOWING, SUM(CAST(annotated_data_finite2.c AS 
Int64)annotated_data_finite2.c AS annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.b, annotated_data_finite2.a, annotated_data [...]
+----------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_finite2.c AS 
Int64)annotated_data_finite2.c AS annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.b, annotated_data_finite2.a] ORDER BY 
[annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING, SUM(CAST(annotated_data_finite2.c AS Int64)annotated_data_finite2.c 
AS annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.b, 
annotated_data_finite2.a] ORDER BY [annotated_data_finite2.c ASC [...]
+------------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_finite2.c AS 
Int64)annotated_data_finite2.c AS annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY 
[annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS 
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, 
SUM(CAST(annotated_data_finite2.c AS Int64)annotated_data_finite2.c AS 
annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, 
annotated_data_finite [...]
+--------------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_finite2.c AS 
Int64)annotated_data_finite2.c AS annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] 
ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 
1 FOLLOWING, SUM(CAST(annotated_data_finite2.c AS 
Int64)annotated_data_finite2.c AS annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.a, annotated_data_finite2.b, annotate [...]
+----------------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_finite2.c AS 
Int64)annotated_data_finite2.c AS annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY 
[annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING, SUM(CAST(annotated_data_finite2.c AS Int64)annotated_data_finite2.c 
AS annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, 
annotated_data_finite2.b] ORDER BY [annotated_data_finite2 [...]
+------------------Projection: CAST(annotated_data_finite2.c AS Int64) AS 
CAST(annotated_data_finite2.c AS Int64)annotated_data_finite2.c, 
annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.c, 
annotated_data_finite2.d
+--------------------TableScan: annotated_data_finite2 projection=[a, b, c, d]
 physical_plan
 GlobalLimitExec: skip=0, fetch=5
 --SortExec: TopK(fetch=5), expr=[c@2 ASC NULLS LAST]
 ----ProjectionExec: expr=[a@1 as a, b@2 as b, c@3 as c, 
SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, 
annotated_data_finite2.d] ORDER BY [annotated_data_finite2.b ASC NULLS LAST, 
annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING@9 as sum1, SUM(annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY 
[annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS 
LAST] ROWS BET [...]
 ------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.d] ORDER BY [annotated_data_finite2.a ASC NULLS LAST, 
annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS 
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.d] ORDER BY 
[annotated_data_finite2.a ASC NULLS LAST, annotated_data_finite2.b ASC NULLS 
LAST, annotated_data_finite2.c ASC NULLS [...]
 --------SortExec: expr=[d@4 ASC NULLS LAST,a@1 ASC NULLS LAST,b@2 ASC NULLS 
LAST,c@3 ASC NULLS LAST]
-----------ProjectionExec: expr=[CAST(c@3 AS Int64) as 
CAST(annotated_data_finite2.c AS Int64)annotated_data_finite2.c, a@1 as a, b@2 
as b, c@3 as c, d@4 as d, SUM(annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY 
[annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING@5 as SUM(annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY 
[annotated_data_finite2.c ASC NULLS [...]
-------------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite2.c) PARTITION 
BY [annotated_data_finite2.b, annotated_data_finite2.a, 
annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] 
ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.b, 
annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY 
[annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING", data [...]
---------------SortExec: expr=[b@2 ASC NULLS LAST,a@1 ASC NULLS LAST,d@4 ASC 
NULLS LAST,c@3 ASC NULLS LAST]
-----------------ProjectionExec: expr=[CAST(c@3 AS Int64) as 
CAST(annotated_data_finite2.c AS Int64)annotated_data_finite2.c, a@1 as a, b@2 
as b, c@3 as c, d@4 as d, SUM(annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY 
[annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING@5 as SUM(annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY 
[annotated_data_finite2.c ASC [...]
-------------------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite2.c) 
PARTITION BY [annotated_data_finite2.b, annotated_data_finite2.a] ORDER BY 
[annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING: Ok(Field { name: "SUM(annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.b, annotated_data_finite2.a] ORDER BY 
[annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict [...]
---------------------SortExec: expr=[b@2 ASC NULLS LAST,a@1 ASC NULLS LAST,c@3 
ASC NULLS LAST]
-----------------------ProjectionExec: expr=[CAST(c@3 AS Int64) as 
CAST(annotated_data_finite2.c AS Int64)annotated_data_finite2.c, a@1 as a, b@2 
as b, c@3 as c, d@4 as d, SUM(annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY 
[annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING@5 as SUM(annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY 
[annotated_data_finite2 [...]
-------------------------BoundedWindowAggExec: 
wdw=[SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, 
annotated_data_finite2.d] ORDER BY [annotated_data_finite2.b ASC NULLS LAST, 
annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING: Ok(Field { name: "SUM(annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY 
[annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS 
LAST] ROWS  [...]
---------------------------SortExec: expr=[a@1 ASC NULLS LAST,d@4 ASC NULLS 
LAST,b@2 ASC NULLS LAST,c@3 ASC NULLS LAST]
-----------------------------ProjectionExec: expr=[CAST(c@3 AS Int64) as 
CAST(annotated_data_finite2.c AS Int64)annotated_data_finite2.c, a@1 as a, b@2 
as b, c@3 as c, d@4 as d, SUM(annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY 
[annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING@5 as SUM(annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_f 
[...]
-------------------------------BoundedWindowAggExec: 
wdw=[SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, 
annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY 
[annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING: Ok(Field { name: "SUM(annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] 
ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 
 [...]
---------------------------------SortExec: expr=[a@1 ASC NULLS LAST,b@2 ASC 
NULLS LAST,d@4 ASC NULLS LAST,c@3 ASC NULLS LAST]
-----------------------------------ProjectionExec: expr=[CAST(c@3 AS Int64) as 
CAST(annotated_data_finite2.c AS Int64)annotated_data_finite2.c, a@1 as a, b@2 
as b, c@3 as c, d@4 as d, SUM(annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY 
[annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING@5 as SUM(annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_ [...]
-------------------------------------BoundedWindowAggExec: 
wdw=[SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, 
annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] 
ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, 
annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] 
ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true 
[...]
---------------------------------------ProjectionExec: expr=[CAST(c@2 AS Int64) 
as CAST(annotated_data_finite2.c AS Int64)annotated_data_finite2.c, a@0 as a, 
b@1 as b, c@2 as c, d@3 as d]
-----------------------------------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, 
c, d], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS 
LAST], has_header=true
+----------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite2.c) PARTITION 
BY [annotated_data_finite2.b, annotated_data_finite2.a, 
annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] 
ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.b, 
annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY 
[annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING", data_t [...]
+------------SortExec: expr=[b@2 ASC NULLS LAST,a@1 ASC NULLS LAST,d@4 ASC 
NULLS LAST,c@3 ASC NULLS LAST]
+--------------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite2.c) 
PARTITION BY [annotated_data_finite2.b, annotated_data_finite2.a] ORDER BY 
[annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING: Ok(Field { name: "SUM(annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.b, annotated_data_finite2.a] ORDER BY 
[annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ [...]
+----------------SortExec: expr=[b@2 ASC NULLS LAST,a@1 ASC NULLS LAST,c@3 ASC 
NULLS LAST]
+------------------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite2.c) 
PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY 
[annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS 
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, 
annotated_data_finite2.d] ORDER BY [annotated_data_finite2.b ASC NULLS LAST, 
annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEE [...]
+--------------------SortExec: expr=[a@1 ASC NULLS LAST,d@4 ASC NULLS LAST,b@2 
ASC NULLS LAST,c@3 ASC NULLS LAST]
+----------------------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite2.c) 
PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, 
annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] 
ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, 
annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY 
[annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOW 
[...]
+------------------------SortExec: expr=[a@1 ASC NULLS LAST,b@2 ASC NULLS 
LAST,d@4 ASC NULLS LAST,c@3 ASC NULLS LAST]
+--------------------------BoundedWindowAggExec: 
wdw=[SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, 
annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] 
ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, 
annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] 
ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, 
dict_id: [...]
+----------------------------ProjectionExec: expr=[CAST(c@2 AS Int64) as 
CAST(annotated_data_finite2.c AS Int64)annotated_data_finite2.c, a@0 as a, b@1 
as b, c@2 as c, d@3 as d]
+------------------------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, 
c, d], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS 
LAST], has_header=true
 
 query IIIIIIIIIIIIIII
 SELECT a, b, c,
@@ -3224,20 +3211,22 @@ FROM annotated_data_infinite2;
 ----
 logical_plan
 Projection: SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW AS sum1, SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW AS sum2, SUM(annotated_data_infinite2.a) PARTITION BY [annota 
[...]
---WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.a AS Int64)) 
PARTITION BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a 
ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
-----Projection: annotated_data_infinite2.a, annotated_data_infinite2.d, 
SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, 
SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, SUM(anno [...]
-------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.a AS Int64)) 
PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW]]
---------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.a AS 
Int64)) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] 
ORDER BY [annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW]]
-----------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.a AS 
Int64)) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] 
ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW]]
-------------TableScan: annotated_data_infinite2 projection=[a, b, c, d]
+--WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.a AS 
Int64)annotated_data_infinite2.a AS annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+----Projection: CAST(annotated_data_infinite2.a AS 
Int64)annotated_data_infinite2.a, annotated_data_infinite2.a, 
annotated_data_infinite2.d, SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW, SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NU [...]
+------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.a AS 
Int64)annotated_data_infinite2.a AS annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW]]
+--------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.a AS 
Int64)annotated_data_infinite2.a AS annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW]]
+----------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.a AS 
Int64)annotated_data_infinite2.a AS annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW]]
+------------Projection: CAST(annotated_data_infinite2.a AS Int64) AS 
CAST(annotated_data_infinite2.a AS Int64)annotated_data_infinite2.a, 
annotated_data_infinite2.a, annotated_data_infinite2.b, 
annotated_data_infinite2.c, annotated_data_infinite2.d
+--------------TableScan: annotated_data_infinite2 projection=[a, b, c, d]
 physical_plan
-ProjectionExec: expr=[SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW@2 as sum1, SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW@4 as sum2, SUM(annotated_data_infinite2.a) PARTIT [...]
+ProjectionExec: expr=[SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW@3 as sum1, SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW@5 as sum2, SUM(annotated_data_infinite2.a) PARTIT [...]
 --BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: 
"SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.d] 
ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), f [...]
-----ProjectionExec: expr=[a@0 as a, d@3 as d, SUM(annotated_data_infinite2.a) 
PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW@4 as SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW, SUM(annotated_data_infinite2.a [...]
+----ProjectionExec: expr=[CAST(annotated_data_infinite2.a AS 
Int64)annotated_data_infinite2.a@0 as CAST(annotated_data_infinite2.a AS 
Int64)annotated_data_infinite2.a, a@1 as a, d@4 as d, 
SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as 
SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infin [...]
 ------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW: Ok(Field { name: "SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW", data_type: Int64, nullable:  [...]
 --------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) PARTITION 
BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW: Ok(Field { name: "SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW", data_type: Int64, nullable [...]
 ----------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) PARTITION 
BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW: Ok(Field { name: "SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW", data_type: Int64, nullab [...]
-------------StreamingTableExec: partition_sizes=1, projection=[a, b, c, d], 
infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, 
c@2 ASC NULLS LAST]
+------------ProjectionExec: expr=[CAST(a@0 AS Int64) as 
CAST(annotated_data_infinite2.a AS Int64)annotated_data_infinite2.a, a@0 as a, 
b@1 as b, c@2 as c, d@3 as d]
+--------------StreamingTableExec: partition_sizes=1, projection=[a, b, c, d], 
infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, 
c@2 ASC NULLS LAST]
 
 statement ok
 set datafusion.execution.target_partitions = 2;
@@ -3253,29 +3242,31 @@ FROM annotated_data_infinite2;
 ----
 logical_plan
 Projection: SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW AS sum1, SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW AS sum2, SUM(annotated_data_infinite2.a) PARTITION BY [annota 
[...]
---WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.a AS Int64)) 
PARTITION BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a 
ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
-----Projection: annotated_data_infinite2.a, annotated_data_infinite2.d, 
SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, 
SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, SUM(anno [...]
-------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.a AS Int64)) 
PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW]]
---------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.a AS 
Int64)) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] 
ORDER BY [annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW]]
-----------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.a AS 
Int64)) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] 
ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW]]
-------------TableScan: annotated_data_infinite2 projection=[a, b, c, d]
+--WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.a AS 
Int64)annotated_data_infinite2.a AS annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+----Projection: CAST(annotated_data_infinite2.a AS 
Int64)annotated_data_infinite2.a, annotated_data_infinite2.a, 
annotated_data_infinite2.d, SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW, SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NU [...]
+------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.a AS 
Int64)annotated_data_infinite2.a AS annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW]]
+--------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.a AS 
Int64)annotated_data_infinite2.a AS annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW]]
+----------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.a AS 
Int64)annotated_data_infinite2.a AS annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW]]
+------------Projection: CAST(annotated_data_infinite2.a AS Int64) AS 
CAST(annotated_data_infinite2.a AS Int64)annotated_data_infinite2.a, 
annotated_data_infinite2.a, annotated_data_infinite2.b, 
annotated_data_infinite2.c, annotated_data_infinite2.d
+--------------TableScan: annotated_data_infinite2 projection=[a, b, c, d]
 physical_plan
-ProjectionExec: expr=[SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW@2 as sum1, SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW@4 as sum2, SUM(annotated_data_infinite2.a) PARTIT [...]
+ProjectionExec: expr=[SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW@3 as sum1, SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW@5 as sum2, SUM(annotated_data_infinite2.a) PARTIT [...]
 --BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: 
"SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.d] 
ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), f [...]
 ----CoalesceBatchesExec: target_batch_size=4096
-------RepartitionExec: partitioning=Hash([d@1], 2), input_partitions=2, 
preserve_order=true, sort_exprs=a@0 ASC NULLS LAST
---------ProjectionExec: expr=[a@0 as a, d@3 as d, 
SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as 
SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, 
SUM(annotated_data_infinit [...]
+------RepartitionExec: partitioning=Hash([d@2], 2), input_partitions=2, 
preserve_order=true, sort_exprs=CAST(annotated_data_infinite2.a AS 
Int64)annotated_data_infinite2.a@0 ASC NULLS LAST,a@1 ASC NULLS LAST
+--------ProjectionExec: expr=[CAST(annotated_data_infinite2.a AS 
Int64)annotated_data_infinite2.a@0 as CAST(annotated_data_infinite2.a AS 
Int64)annotated_data_infinite2.a, a@1 as a, d@4 as d, 
SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as 
SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_i [...]
 ----------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) PARTITION 
BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW: Ok(Field { name: "SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW", data_type: Int64, nullab [...]
 ------------CoalesceBatchesExec: target_batch_size=4096
---------------RepartitionExec: partitioning=Hash([b@1, a@0], 2), 
input_partitions=2, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST,b@1 ASC 
NULLS LAST,c@2 ASC NULLS LAST
+--------------RepartitionExec: partitioning=Hash([b@2, a@1], 2), 
input_partitions=2, preserve_order=true, 
sort_exprs=CAST(annotated_data_infinite2.a AS 
Int64)annotated_data_infinite2.a@0 ASC NULLS LAST,a@1 ASC NULLS LAST,b@2 ASC 
NULLS LAST,c@3 ASC NULLS LAST
 ----------------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) 
PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW: Ok(Field { name: "SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW", data_type: Int64,  [...]
 ------------------CoalesceBatchesExec: target_batch_size=4096
---------------------RepartitionExec: partitioning=Hash([a@0, d@3], 2), 
input_partitions=2, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST,b@1 ASC 
NULLS LAST,c@2 ASC NULLS LAST
+--------------------RepartitionExec: partitioning=Hash([a@1, d@4], 2), 
input_partitions=2, preserve_order=true, 
sort_exprs=CAST(annotated_data_infinite2.a AS 
Int64)annotated_data_infinite2.a@0 ASC NULLS LAST,a@1 ASC NULLS LAST,b@2 ASC 
NULLS LAST,c@3 ASC NULLS LAST
 ----------------------BoundedWindowAggExec: 
wdw=[SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: 
"SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: I [...]
 ------------------------CoalesceBatchesExec: target_batch_size=4096
---------------------------RepartitionExec: partitioning=Hash([a@0, b@1], 2), 
input_partitions=2, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST,b@1 ASC 
NULLS LAST,c@2 ASC NULLS LAST
-----------------------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
-------------------------------StreamingTableExec: partition_sizes=1, 
projection=[a, b, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST]
+--------------------------RepartitionExec: partitioning=Hash([a@1, b@2], 2), 
input_partitions=2, preserve_order=true, 
sort_exprs=CAST(annotated_data_infinite2.a AS 
Int64)annotated_data_infinite2.a@0 ASC NULLS LAST,a@1 ASC NULLS LAST,b@2 ASC 
NULLS LAST,c@3 ASC NULLS LAST
+----------------------------ProjectionExec: expr=[CAST(a@0 AS Int64) as 
CAST(annotated_data_infinite2.a AS Int64)annotated_data_infinite2.a, a@0 as a, 
b@1 as b, c@2 as c, d@3 as d]
+------------------------------RepartitionExec: 
partitioning=RoundRobinBatch(2), input_partitions=1
+--------------------------------StreamingTableExec: partition_sizes=1, 
projection=[a, b, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST]
 
 # reset the partition number 1 again
 statement ok
@@ -4018,3 +4009,47 @@ select lag(a, 1, 'default') over (order by a) from 
(select '1' a union all selec
 ----
 default
 1
+
+query TT
+explain SELECT c3,
+    SUM(c9) OVER(ORDER BY c3+c4 ASC) as sum2,
+    sum1
+    FROM (
+      SELECT c3, c4, c9,
+      SUM(c9) OVER(ORDER BY c3+c4 DESC) as sum1
+      FROM aggregate_test_100
+    )
+    limit 5
+----
+logical_plan
+Projection: aggregate_test_100.c3, SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST] RANGE BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW AS sum2, sum1
+--Limit: skip=0, fetch=5
+----WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST] RANGE BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW]]
+------Projection: aggregate_test_100.c3, aggregate_test_100.c4, 
aggregate_test_100.c9, SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST] RANGE BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW AS sum1
+--------WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST] RANGE BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW]]
+----------TableScan: aggregate_test_100 projection=[c3, c4, c9]
+physical_plan
+ProjectionExec: expr=[c3@0 as c3, SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST] RANGE BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW@4 as sum2, sum1@3 as sum1]
+--GlobalLimitExec: skip=0, fetch=5
+----WindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST] RANGE BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: 
"SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + 
aggregate_test_100.c4 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
CurrentRow, e [...]
+------ProjectionExec: expr=[c3@0 as c3, c4@1 as c4, c9@2 as c9, 
SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + 
aggregate_test_100.c4 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW@3 as sum1]
+--------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST] RANGE BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: 
"SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + 
aggregate_test_100.c4 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound [...]
+----------SortExec: expr=[c3@0 + c4@1 DESC]
+------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c3, 
c4, c9], has_header=true
+
+query III
+SELECT c3,
+    SUM(c9) OVER(ORDER BY c3+c4 ASC) as sum2,
+    sum1
+    FROM (
+      SELECT c3, c4, c9,
+      SUM(c9) OVER(ORDER BY c3+c4 DESC) as sum1
+      FROM aggregate_test_100
+    )
+    limit 5
+----
+-86 222089770060 2861911482
+13 219227858578 5075947208
+125 217013822852 8701233618
+123 213388536442 11293564174
+97 210796205886 14767488750

Reply via email to