This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 975ea5e6c3 Revert 10333 / changes to aliasing in 
CommonSubExprEliminate (#10436)
975ea5e6c3 is described below

commit 975ea5e6c3e0a377b3b33080f20178528b889b3d
Author: Mohamed Abdeen <[email protected]>
AuthorDate: Fri May 10 14:45:35 2024 +0300

    Revert 10333 / changes to aliasing in CommonSubExprEliminate (#10436)
    
    * Revert "make common expression alias human-readable (#10333)"
    
    This reverts commit 2a15614c6aeb0b7c22d0a7aac895d17f5aeaef99.
    
    * keep some ok parts
---
 .../optimizer/src/common_subexpr_eliminate.rs      |  89 ++++++++-------
 datafusion/sqllogictest/test_files/group_by.slt    |   8 +-
 datafusion/sqllogictest/test_files/select.slt      |  16 +--
 datafusion/sqllogictest/test_files/subquery.slt    |   8 +-
 .../sqllogictest/test_files/tpch/q1.slt.part       |   6 +-
 datafusion/sqllogictest/test_files/window.slt      | 126 ++++++++++-----------
 6 files changed, 130 insertions(+), 123 deletions(-)

diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs 
b/datafusion/optimizer/src/common_subexpr_eliminate.rs
index cf5c242526..0704fabea2 100644
--- a/datafusion/optimizer/src/common_subexpr_eliminate.rs
+++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs
@@ -113,8 +113,8 @@ type CommonExprs = IndexMap<Identifier, Expr>;
 /// refer to that new column.
 ///
 /// ```text
-/// ProjectionExec(exprs=[extract (day from #{new_col}), extract (year from 
#{new_col})]) <-- reuse here
-///   ProjectionExec(exprs=[to_date(c1) as #{new_col}]) <-- compute to_date 
once
+/// ProjectionExec(exprs=[extract (day from new_col), extract (year from 
new_col)]) <-- reuse here
+///   ProjectionExec(exprs=[to_date(c1) as new_col]) <-- compute to_date once
 /// ```
 pub struct CommonSubexprEliminate {}
 
@@ -347,7 +347,7 @@ impl CommonSubexprEliminate {
                         agg_exprs.push(expr.alias(&name));
                         proj_exprs.push(Expr::Column(Column::from_name(name)));
                     } else {
-                        let id = expr_identifier(&expr_rewritten);
+                        let id = expr_identifier(&expr_rewritten, 
"".to_string());
                         let (qualifier, field) =
                             expr_rewritten.to_field(&new_input_schema)?;
                         let out_name = qualified_name(qualifier.as_ref(), 
field.name());
@@ -656,16 +656,24 @@ enum VisitRecord {
     EnterMark(usize),
     /// the node's children were skipped => jump to f_up on same node
     JumpMark,
+    /// Accumulated identifier of sub expression.
+    ExprItem(Identifier),
 }
 
 impl ExprIdentifierVisitor<'_> {
     /// Find the first `EnterMark` in the stack, and accumulates every 
`ExprItem`
     /// before it.
-    fn pop_enter_mark(&mut self) -> Option<usize> {
-        if let Some(item) = self.visit_stack.pop() {
+    fn pop_enter_mark(&mut self) -> Option<(usize, Identifier)> {
+        let mut desc = String::new();
+
+        while let Some(item) = self.visit_stack.pop() {
             match item {
                 VisitRecord::EnterMark(idx) => {
-                    return Some(idx);
+                    return Some((idx, desc));
+                }
+                VisitRecord::ExprItem(id) => {
+                    desc.push('|');
+                    desc.push_str(&id);
                 }
                 VisitRecord::JumpMark => return None,
             }
@@ -697,11 +705,11 @@ impl TreeNodeVisitor for ExprIdentifierVisitor<'_> {
     }
 
     fn f_up(&mut self, expr: &Expr) -> Result<TreeNodeRecursion> {
-        let Some(down_index) = self.pop_enter_mark() else {
+        let Some((down_index, sub_expr_id)) = self.pop_enter_mark() else {
             return Ok(TreeNodeRecursion::Continue);
         };
 
-        let expr_id = expr_identifier(expr);
+        let expr_id = expr_identifier(expr, sub_expr_id);
 
         self.id_array[down_index].0 = self.up_index;
         if !self.expr_mask.ignores(expr) {
@@ -716,14 +724,15 @@ impl TreeNodeVisitor for ExprIdentifierVisitor<'_> {
                 .or_insert((0, data_type));
             *count += 1;
         }
+        self.visit_stack.push(VisitRecord::ExprItem(expr_id));
         self.up_index += 1;
 
         Ok(TreeNodeRecursion::Continue)
     }
 }
 
-fn expr_identifier(expr: &Expr) -> Identifier {
-    format!("#{{{expr}}}")
+fn expr_identifier(expr: &Expr, sub_expr_identifier: Identifier) -> Identifier 
{
+    format!("{{{expr}{sub_expr_identifier}}}")
 }
 
 /// Go through an expression tree and generate identifier for every node in 
this tree.
@@ -876,15 +885,15 @@ mod test {
         )?;
 
         let expected = vec![
-            (8, "#{(SUM(a + Int32(1)) - AVG(c)) * Int32(2)}"),
-            (6, "#{SUM(a + Int32(1)) - AVG(c)}"),
+            (8, "{(SUM(a + Int32(1)) - AVG(c)) * Int32(2)|{Int32(2)}|{SUM(a + 
Int32(1)) - AVG(c)|{AVG(c)|{c}}|{SUM(a + Int32(1))|{a + 
Int32(1)|{Int32(1)}|{a}}}}}"),
+            (6, "{SUM(a + Int32(1)) - AVG(c)|{AVG(c)|{c}}|{SUM(a + 
Int32(1))|{a + Int32(1)|{Int32(1)}|{a}}}}"),
             (3, ""),
-            (2, "#{a + Int32(1)}"),
+            (2, "{a + Int32(1)|{Int32(1)}|{a}}"),
             (0, ""),
             (1, ""),
             (5, ""),
             (4, ""),
-            (7, ""),
+            (7, "")
         ]
         .into_iter()
         .map(|(number, id)| (number, id.into()))
@@ -902,15 +911,15 @@ mod test {
         )?;
 
         let expected = vec![
-            (8, "#{(SUM(a + Int32(1)) - AVG(c)) * Int32(2)}"),
-            (6, "#{SUM(a + Int32(1)) - AVG(c)}"),
-            (3, "#{SUM(a + Int32(1))}"),
-            (2, "#{a + Int32(1)}"),
+            (8, "{(SUM(a + Int32(1)) - AVG(c)) * Int32(2)|{Int32(2)}|{SUM(a + 
Int32(1)) - AVG(c)|{AVG(c)|{c}}|{SUM(a + Int32(1))|{a + 
Int32(1)|{Int32(1)}|{a}}}}}"),
+            (6, "{SUM(a + Int32(1)) - AVG(c)|{AVG(c)|{c}}|{SUM(a + 
Int32(1))|{a + Int32(1)|{Int32(1)}|{a}}}}"),
+            (3, "{SUM(a + Int32(1))|{a + Int32(1)|{Int32(1)}|{a}}}"),
+            (2, "{a + Int32(1)|{Int32(1)}|{a}}"),
             (0, ""),
             (1, ""),
-            (5, "#{AVG(c)}"),
+            (5, "{AVG(c)|{c}}"),
             (4, ""),
-            (7, ""),
+            (7, "")
         ]
         .into_iter()
         .map(|(number, id)| (number, id.into()))
@@ -942,8 +951,8 @@ mod test {
             )?
             .build()?;
 
-        let expected = "Aggregate: groupBy=[[]], aggr=[[SUM(#{test.a * 
(Int32(1) - test.b)} AS test.a * Int32(1) - test.b), SUM(#{test.a * (Int32(1) - 
test.b)} AS test.a * Int32(1) - test.b * (Int32(1) + test.c))]]\
-        \n  Projection: test.a * (Int32(1) - test.b) AS #{test.a * (Int32(1) - 
test.b)}, test.a, test.b, test.c\
+        let expected = "Aggregate: groupBy=[[]], aggr=[[SUM({test.a * 
(Int32(1) - test.b)|{Int32(1) - test.b|{test.b}|{Int32(1)}}|{test.a}} AS test.a 
* Int32(1) - test.b), SUM({test.a * (Int32(1) - test.b)|{Int32(1) - 
test.b|{test.b}|{Int32(1)}}|{test.a}} AS test.a * Int32(1) - test.b * (Int32(1) 
+ test.c))]]\
+        \n  Projection: test.a * (Int32(1) - test.b) AS {test.a * (Int32(1) - 
test.b)|{Int32(1) - test.b|{test.b}|{Int32(1)}}|{test.a}}, test.a, test.b, 
test.c\
         \n    TableScan: test";
 
         assert_optimized_plan_eq(expected, &plan);
@@ -995,8 +1004,8 @@ mod test {
             )?
             .build()?;
 
-        let expected = "Projection: #{AVG(test.a)} AS AVG(test.a) AS col1, 
#{AVG(test.a)} AS AVG(test.a) AS col2, col3, #{AVG(test.c)} AS AVG(test.c), 
#{my_agg(test.a)} AS my_agg(test.a) AS col4, #{my_agg(test.a)} AS 
my_agg(test.a) AS col5, col6, #{my_agg(test.c)} AS my_agg(test.c)\
-        \n  Aggregate: groupBy=[[]], aggr=[[AVG(test.a) AS #{AVG(test.a)}, 
my_agg(test.a) AS #{my_agg(test.a)}, AVG(test.b) AS col3, AVG(test.c) AS 
#{AVG(test.c)}, my_agg(test.b) AS col6, my_agg(test.c) AS #{my_agg(test.c)}]]\
+        let expected = "Projection: {AVG(test.a)|{test.a}} AS AVG(test.a) AS 
col1, {AVG(test.a)|{test.a}} AS AVG(test.a) AS col2, col3, {AVG(test.c)} AS 
AVG(test.c), {my_agg(test.a)|{test.a}} AS my_agg(test.a) AS col4, 
{my_agg(test.a)|{test.a}} AS my_agg(test.a) AS col5, col6, {my_agg(test.c)} AS 
my_agg(test.c)\
+        \n  Aggregate: groupBy=[[]], aggr=[[AVG(test.a) AS 
{AVG(test.a)|{test.a}}, my_agg(test.a) AS {my_agg(test.a)|{test.a}}, 
AVG(test.b) AS col3, AVG(test.c) AS {AVG(test.c)}, my_agg(test.b) AS col6, 
my_agg(test.c) AS {my_agg(test.c)}]]\
         \n    TableScan: test";
 
         assert_optimized_plan_eq(expected, &plan);
@@ -1014,8 +1023,8 @@ mod test {
             )?
             .build()?;
 
-        let expected = "Projection: Int32(1) + #{AVG(test.a)} AS AVG(test.a), 
Int32(1) - #{AVG(test.a)} AS AVG(test.a), Int32(1) + #{my_agg(test.a)} AS 
my_agg(test.a), Int32(1) - #{my_agg(test.a)} AS my_agg(test.a)\
-        \n  Aggregate: groupBy=[[]], aggr=[[AVG(test.a) AS #{AVG(test.a)}, 
my_agg(test.a) AS #{my_agg(test.a)}]]\
+        let expected = "Projection: Int32(1) + {AVG(test.a)|{test.a}} AS 
AVG(test.a), Int32(1) - {AVG(test.a)|{test.a}} AS AVG(test.a), Int32(1) + 
{my_agg(test.a)|{test.a}} AS my_agg(test.a), Int32(1) - 
{my_agg(test.a)|{test.a}} AS my_agg(test.a)\
+        \n  Aggregate: groupBy=[[]], aggr=[[AVG(test.a) AS 
{AVG(test.a)|{test.a}}, my_agg(test.a) AS {my_agg(test.a)|{test.a}}]]\
         \n    TableScan: test";
 
         assert_optimized_plan_eq(expected, &plan);
@@ -1031,9 +1040,7 @@ mod test {
             )?
             .build()?;
 
-        let expected = "Aggregate: groupBy=[[]], aggr=[[AVG(#{UInt32(1) + 
test.a} AS UInt32(1) + test.a) AS col1, my_agg(#{UInt32(1) + test.a} AS 
UInt32(1) + test.a) AS col2]]\
-        \n  Projection: UInt32(1) + test.a AS #{UInt32(1) + test.a}, test.a, 
test.b, test.c\
-        \n    TableScan: test";
+        let expected = "Aggregate: groupBy=[[]], aggr=[[AVG({UInt32(1) + 
test.a|{test.a}|{UInt32(1)}} AS UInt32(1) + test.a) AS col1, my_agg({UInt32(1) 
+ test.a|{test.a}|{UInt32(1)}} AS UInt32(1) + test.a) AS col2]]\n  Projection: 
UInt32(1) + test.a AS {UInt32(1) + test.a|{test.a}|{UInt32(1)}}, test.a, 
test.b, test.c\n    TableScan: test";
 
         assert_optimized_plan_eq(expected, &plan);
 
@@ -1048,8 +1055,8 @@ mod test {
             )?
             .build()?;
 
-        let expected = "Aggregate: groupBy=[[#{UInt32(1) + test.a} AS 
UInt32(1) + test.a]], aggr=[[AVG(#{UInt32(1) + test.a} AS UInt32(1) + test.a) 
AS col1, my_agg(#{UInt32(1) + test.a} AS UInt32(1) + test.a) AS col2]]\
-        \n  Projection: UInt32(1) + test.a AS #{UInt32(1) + test.a}, test.a, 
test.b, test.c\
+        let expected = "Aggregate: groupBy=[[{UInt32(1) + 
test.a|{test.a}|{UInt32(1)}} AS UInt32(1) + test.a]], aggr=[[AVG({UInt32(1) + 
test.a|{test.a}|{UInt32(1)}} AS UInt32(1) + test.a) AS col1, my_agg({UInt32(1) 
+ test.a|{test.a}|{UInt32(1)}} AS UInt32(1) + test.a) AS col2]]\
+        \n  Projection: UInt32(1) + test.a AS {UInt32(1) + 
test.a|{test.a}|{UInt32(1)}}, test.a, test.b, test.c\
         \n    TableScan: test";
 
         assert_optimized_plan_eq(expected, &plan);
@@ -1069,9 +1076,9 @@ mod test {
             )?
             .build()?;
 
-        let expected = "Projection: UInt32(1) + test.a, UInt32(1) + 
#{AVG(#{UInt32(1) + test.a} AS UInt32(1) + test.a)} AS AVG(UInt32(1) + test.a) 
AS col1, UInt32(1) - #{AVG(#{UInt32(1) + test.a} AS UInt32(1) + test.a)} AS 
AVG(UInt32(1) + test.a) AS col2, #{AVG(#{UInt32(1) + test.a} AS UInt32(1) + 
test.a)} AS AVG(UInt32(1) + test.a), UInt32(1) + #{my_agg(#{UInt32(1) + test.a} 
AS UInt32(1) + test.a)} AS my_agg(UInt32(1) + test.a) AS col3, UInt32(1) - 
#{my_agg(#{UInt32(1) + test.a} AS UInt [...]
-        \n  Aggregate: groupBy=[[#{UInt32(1) + test.a} AS UInt32(1) + 
test.a]], aggr=[[AVG(#{UInt32(1) + test.a} AS UInt32(1) + test.a) AS 
#{AVG(#{UInt32(1) + test.a} AS UInt32(1) + test.a)}, my_agg(#{UInt32(1) + 
test.a} AS UInt32(1) + test.a) AS #{my_agg(#{UInt32(1) + test.a} AS UInt32(1) + 
test.a)}]]\
-        \n    Projection: UInt32(1) + test.a AS #{UInt32(1) + test.a}, test.a, 
test.b, test.c\
+        let expected = "Projection: UInt32(1) + test.a, UInt32(1) + 
{AVG({UInt32(1) + test.a|{test.a}|{UInt32(1)}} AS UInt32(1) + 
test.a)|{{UInt32(1) + test.a|{test.a}|{UInt32(1)}} AS UInt32(1) + 
test.a|{{UInt32(1) + test.a|{test.a}|{UInt32(1)}}}}} AS AVG(UInt32(1) + test.a) 
AS col1, UInt32(1) - {AVG({UInt32(1) + test.a|{test.a}|{UInt32(1)}} AS 
UInt32(1) + test.a)|{{UInt32(1) + test.a|{test.a}|{UInt32(1)}} AS UInt32(1) + 
test.a|{{UInt32(1) + test.a|{test.a}|{UInt32(1)}}}}} AS AVG(UInt32( [...]
+        \n  Aggregate: groupBy=[[{UInt32(1) + test.a|{test.a}|{UInt32(1)}} AS 
UInt32(1) + test.a]], aggr=[[AVG({UInt32(1) + test.a|{test.a}|{UInt32(1)}} AS 
UInt32(1) + test.a) AS {AVG({UInt32(1) + test.a|{test.a}|{UInt32(1)}} AS 
UInt32(1) + test.a)|{{UInt32(1) + test.a|{test.a}|{UInt32(1)}} AS UInt32(1) + 
test.a|{{UInt32(1) + test.a|{test.a}|{UInt32(1)}}}}}, my_agg({UInt32(1) + 
test.a|{test.a}|{UInt32(1)}} AS UInt32(1) + test.a) AS {my_agg({UInt32(1) + 
test.a|{test.a}|{UInt32(1)}} AS UIn [...]
+        \n    Projection: UInt32(1) + test.a AS {UInt32(1) + 
test.a|{test.a}|{UInt32(1)}}, test.a, test.b, test.c\
         \n      TableScan: test";
 
         assert_optimized_plan_eq(expected, &plan);
@@ -1096,9 +1103,9 @@ mod test {
             )?
             .build()?;
 
-        let expected = "Projection: table.test.col.a, UInt32(1) + 
#{AVG(#{UInt32(1) + table.test.col.a} AS UInt32(1) + table.test.col.a)} AS 
AVG(UInt32(1) + table.test.col.a), #{AVG(#{UInt32(1) + table.test.col.a} AS 
UInt32(1) + table.test.col.a)} AS AVG(UInt32(1) + table.test.col.a)\
-        \n  Aggregate: groupBy=[[table.test.col.a]], aggr=[[AVG(#{UInt32(1) + 
table.test.col.a} AS UInt32(1) + table.test.col.a) AS #{AVG(#{UInt32(1) + 
table.test.col.a} AS UInt32(1) + table.test.col.a)}]]\
-        \n    Projection: UInt32(1) + table.test.col.a AS #{UInt32(1) + 
table.test.col.a}, table.test.col.a\
+        let expected = "Projection: table.test.col.a, UInt32(1) + 
{AVG({UInt32(1) + table.test.col.a|{table.test.col.a}|{UInt32(1)}} AS UInt32(1) 
+ table.test.col.a)|{{UInt32(1) + 
table.test.col.a|{table.test.col.a}|{UInt32(1)}} AS UInt32(1) + 
table.test.col.a|{{UInt32(1) + 
table.test.col.a|{table.test.col.a}|{UInt32(1)}}}}} AS AVG(UInt32(1) + 
table.test.col.a), {AVG({UInt32(1) + 
table.test.col.a|{table.test.col.a}|{UInt32(1)}} AS UInt32(1) + 
table.test.col.a)|{{UInt32(1) + table.test.co [...]
+        \n  Aggregate: groupBy=[[table.test.col.a]], aggr=[[AVG({UInt32(1) + 
table.test.col.a|{table.test.col.a}|{UInt32(1)}} AS UInt32(1) + 
table.test.col.a) AS {AVG({UInt32(1) + 
table.test.col.a|{table.test.col.a}|{UInt32(1)}} AS UInt32(1) + 
table.test.col.a)|{{UInt32(1) + 
table.test.col.a|{table.test.col.a}|{UInt32(1)}} AS UInt32(1) + 
table.test.col.a|{{UInt32(1) + 
table.test.col.a|{table.test.col.a}|{UInt32(1)}}}}}]]\
+        \n    Projection: UInt32(1) + table.test.col.a AS {UInt32(1) + 
table.test.col.a|{table.test.col.a}|{UInt32(1)}}, table.test.col.a\
         \n      TableScan: table.test";
 
         assert_optimized_plan_eq(expected, &plan);
@@ -1117,8 +1124,8 @@ mod test {
             ])?
             .build()?;
 
-        let expected = "Projection: #{Int32(1) + test.a} AS Int32(1) + test.a 
AS first, #{Int32(1) + test.a} AS Int32(1) + test.a AS second\
-        \n  Projection: Int32(1) + test.a AS #{Int32(1) + test.a}, test.a, 
test.b, test.c\
+        let expected = "Projection: {Int32(1) + test.a|{test.a}|{Int32(1)}} AS 
Int32(1) + test.a AS first, {Int32(1) + test.a|{test.a}|{Int32(1)}} AS Int32(1) 
+ test.a AS second\
+        \n  Projection: Int32(1) + test.a AS {Int32(1) + 
test.a|{test.a}|{Int32(1)}}, test.a, test.b, test.c\
         \n    TableScan: test";
 
         assert_optimized_plan_eq(expected, &plan);
@@ -1293,8 +1300,8 @@ mod test {
             .build()?;
 
         let expected = "Projection: test.a, test.b, test.c\
-        \n  Filter: #{Int32(1) + test.a} - Int32(10) > #{Int32(1) + test.a}\
-        \n    Projection: Int32(1) + test.a AS #{Int32(1) + test.a}, test.a, 
test.b, test.c\
+        \n  Filter: {Int32(1) + test.a|{test.a}|{Int32(1)}} - Int32(10) > 
{Int32(1) + test.a|{test.a}|{Int32(1)}}\
+        \n    Projection: Int32(1) + test.a AS {Int32(1) + 
test.a|{test.a}|{Int32(1)}}, test.a, test.b, test.c\
         \n      TableScan: test";
 
         assert_optimized_plan_eq(expected, &plan);
diff --git a/datafusion/sqllogictest/test_files/group_by.slt 
b/datafusion/sqllogictest/test_files/group_by.slt
index 609b63a5c6..5a605ea58b 100644
--- a/datafusion/sqllogictest/test_files/group_by.slt
+++ b/datafusion/sqllogictest/test_files/group_by.slt
@@ -4187,8 +4187,8 @@ EXPLAIN SELECT SUM(DISTINCT CAST(x AS DOUBLE)), 
MAX(DISTINCT CAST(x AS DOUBLE))
 logical_plan
 01)Projection: SUM(alias1) AS SUM(DISTINCT t1.x), MAX(alias1) AS MAX(DISTINCT 
t1.x)
 02)--Aggregate: groupBy=[[t1.y]], aggr=[[SUM(alias1), MAX(alias1)]]
-03)----Aggregate: groupBy=[[t1.y, #{CAST(t1.x AS Float64)} AS t1.x AS 
alias1]], aggr=[[]]
-04)------Projection: CAST(t1.x AS Float64) AS #{CAST(t1.x AS Float64)}, t1.y
+03)----Aggregate: groupBy=[[t1.y, {CAST(t1.x AS Float64)|{t1.x}} AS t1.x AS 
alias1]], aggr=[[]]
+04)------Projection: CAST(t1.x AS Float64) AS {CAST(t1.x AS Float64)|{t1.x}}, 
t1.y
 05)--------TableScan: t1 projection=[x, y]
 physical_plan
 01)ProjectionExec: expr=[SUM(alias1)@1 as SUM(DISTINCT t1.x), MAX(alias1)@2 as 
MAX(DISTINCT t1.x)]
@@ -4200,8 +4200,8 @@ physical_plan
 07)------------CoalesceBatchesExec: target_batch_size=2
 08)--------------RepartitionExec: partitioning=Hash([y@0, alias1@1], 8), 
input_partitions=8
 09)----------------RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1
-10)------------------AggregateExec: mode=Partial, gby=[y@1 as y, #{CAST(t1.x 
AS Float64)}@0 as alias1], aggr=[]
-11)--------------------ProjectionExec: expr=[CAST(x@0 AS Float64) as 
#{CAST(t1.x AS Float64)}, y@1 as y]
+10)------------------AggregateExec: mode=Partial, gby=[y@1 as y, {CAST(t1.x AS 
Float64)|{t1.x}}@0 as alias1], aggr=[]
+11)--------------------ProjectionExec: expr=[CAST(x@0 AS Float64) as 
{CAST(t1.x AS Float64)|{t1.x}}, y@1 as y]
 12)----------------------MemoryExec: partitions=1, partition_sizes=[1]
 
 # create an unbounded table that contains ordered timestamp.
diff --git a/datafusion/sqllogictest/test_files/select.slt 
b/datafusion/sqllogictest/test_files/select.slt
index 7244bb19ba..24163e37de 100644
--- a/datafusion/sqllogictest/test_files/select.slt
+++ b/datafusion/sqllogictest/test_files/select.slt
@@ -1426,12 +1426,12 @@ query TT
 EXPLAIN SELECT x/2, x/2+1 FROM t;
 ----
 logical_plan
-01)Projection: #{t.x / Int64(2)} AS t.x / Int64(2), #{t.x / Int64(2)} AS t.x / 
Int64(2) + Int64(1)
-02)--Projection: t.x / Int64(2) AS #{t.x / Int64(2)}
+01)Projection: {t.x / Int64(2)|{Int64(2)}|{t.x}} AS t.x / Int64(2), {t.x / 
Int64(2)|{Int64(2)}|{t.x}} AS t.x / Int64(2) + Int64(1)
+02)--Projection: t.x / Int64(2) AS {t.x / Int64(2)|{Int64(2)}|{t.x}}
 03)----TableScan: t projection=[x]
 physical_plan
-01)ProjectionExec: expr=[#{t.x / Int64(2)}@0 as t.x / Int64(2), #{t.x / 
Int64(2)}@0 + 1 as t.x / Int64(2) + Int64(1)]
-02)--ProjectionExec: expr=[x@0 / 2 as #{t.x / Int64(2)}]
+01)ProjectionExec: expr=[{t.x / Int64(2)|{Int64(2)}|{t.x}}@0 as t.x / 
Int64(2), {t.x / Int64(2)|{Int64(2)}|{t.x}}@0 + 1 as t.x / Int64(2) + Int64(1)]
+02)--ProjectionExec: expr=[x@0 / 2 as {t.x / Int64(2)|{Int64(2)}|{t.x}}]
 03)----MemoryExec: partitions=1, partition_sizes=[1]
 
 query II
@@ -1444,12 +1444,12 @@ query TT
 EXPLAIN SELECT abs(x), abs(x) + abs(y) FROM t;
 ----
 logical_plan
-01)Projection: #{abs(t.x)} AS abs(t.x), #{abs(t.x)} AS abs(t.x) + abs(t.y)
-02)--Projection: abs(t.x) AS #{abs(t.x)}, t.y
+01)Projection: {abs(t.x)|{t.x}} AS abs(t.x), {abs(t.x)|{t.x}} AS abs(t.x) + 
abs(t.y)
+02)--Projection: abs(t.x) AS {abs(t.x)|{t.x}}, t.y
 03)----TableScan: t projection=[x, y]
 physical_plan
-01)ProjectionExec: expr=[#{abs(t.x)}@0 as abs(t.x), #{abs(t.x)}@0 + abs(y@1) 
as abs(t.x) + abs(t.y)]
-02)--ProjectionExec: expr=[abs(x@0) as #{abs(t.x)}, y@1 as y]
+01)ProjectionExec: expr=[{abs(t.x)|{t.x}}@0 as abs(t.x), {abs(t.x)|{t.x}}@0 + 
abs(y@1) as abs(t.x) + abs(t.y)]
+02)--ProjectionExec: expr=[abs(x@0) as {abs(t.x)|{t.x}}, y@1 as y]
 03)----MemoryExec: partitions=1, partition_sizes=[1]
 
 query II
diff --git a/datafusion/sqllogictest/test_files/subquery.slt 
b/datafusion/sqllogictest/test_files/subquery.slt
index 095d8714b7..085f192a5d 100644
--- a/datafusion/sqllogictest/test_files/subquery.slt
+++ b/datafusion/sqllogictest/test_files/subquery.slt
@@ -1072,8 +1072,8 @@ query TT
 explain select a/2, a/2 + 1 from t
 ----
 logical_plan
-01)Projection: #{t.a / Int64(2)} AS t.a / Int64(2), #{t.a / Int64(2)} AS t.a / 
Int64(2) + Int64(1)
-02)--Projection: t.a / Int64(2) AS #{t.a / Int64(2)}
+01)Projection: {t.a / Int64(2)|{Int64(2)}|{t.a}} AS t.a / Int64(2), {t.a / 
Int64(2)|{Int64(2)}|{t.a}} AS t.a / Int64(2) + Int64(1)
+02)--Projection: t.a / Int64(2) AS {t.a / Int64(2)|{Int64(2)}|{t.a}}
 03)----TableScan: t projection=[a]
 
 statement ok
@@ -1083,8 +1083,8 @@ query TT
 explain select a/2, a/2 + 1 from t
 ----
 logical_plan
-01)Projection: #{t.a / Int64(2)} AS t.a / Int64(2), #{t.a / Int64(2)} AS t.a / 
Int64(2) + Int64(1)
-02)--Projection: t.a / Int64(2) AS #{t.a / Int64(2)}
+01)Projection: {t.a / Int64(2)|{Int64(2)}|{t.a}} AS t.a / Int64(2), {t.a / 
Int64(2)|{Int64(2)}|{t.a}} AS t.a / Int64(2) + Int64(1)
+02)--Projection: t.a / Int64(2) AS {t.a / Int64(2)|{Int64(2)}|{t.a}}
 03)----TableScan: t projection=[a]
 
 ###
diff --git a/datafusion/sqllogictest/test_files/tpch/q1.slt.part 
b/datafusion/sqllogictest/test_files/tpch/q1.slt.part
index 4a0d4a2e6b..f2c14f2628 100644
--- a/datafusion/sqllogictest/test_files/tpch/q1.slt.part
+++ b/datafusion/sqllogictest/test_files/tpch/q1.slt.part
@@ -42,8 +42,8 @@ explain select
 logical_plan
 01)Sort: lineitem.l_returnflag ASC NULLS LAST, lineitem.l_linestatus ASC NULLS 
LAST
 02)--Projection: lineitem.l_returnflag, lineitem.l_linestatus, 
SUM(lineitem.l_quantity) AS sum_qty, SUM(lineitem.l_extendedprice) AS 
sum_base_price, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) 
AS sum_disc_price, SUM(lineitem.l_extendedprice * Int64(1) - 
lineitem.l_discount * Int64(1) + lineitem.l_tax) AS sum_charge, 
AVG(lineitem.l_quantity) AS avg_qty, AVG(lineitem.l_extendedprice) AS 
avg_price, AVG(lineitem.l_discount) AS avg_disc, COUNT(*) AS count_order
-03)----Aggregate: groupBy=[[lineitem.l_returnflag, lineitem.l_linestatus]], 
aggr=[[SUM(lineitem.l_quantity), SUM(lineitem.l_extendedprice), 
SUM(#{lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - 
lineitem.l_discount)} AS lineitem.l_extendedprice * Decimal128(Some(1),20,0) - 
lineitem.l_discount) AS SUM(lineitem.l_extendedprice * Int64(1) - 
lineitem.l_discount), SUM(#{lineitem.l_extendedprice * 
(Decimal128(Some(1),20,0) - lineitem.l_discount)} AS lineitem.l_extendedprice * 
Decimal128( [...]
-04)------Projection: lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - 
lineitem.l_discount) AS #{lineitem.l_extendedprice * (Decimal128(Some(1),20,0) 
- lineitem.l_discount)}, lineitem.l_quantity, lineitem.l_extendedprice, 
lineitem.l_discount, lineitem.l_tax, lineitem.l_returnflag, 
lineitem.l_linestatus
+03)----Aggregate: groupBy=[[lineitem.l_returnflag, lineitem.l_linestatus]], 
aggr=[[SUM(lineitem.l_quantity), SUM(lineitem.l_extendedprice), 
SUM({lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - 
lineitem.l_discount)|{Decimal128(Some(1),20,0) - 
lineitem.l_discount|{lineitem.l_discount}|{Decimal128(Some(1),20,0)}}|{lineitem.l_extendedprice}}
 AS lineitem.l_extendedprice * Decimal128(Some(1),20,0) - lineitem.l_discount) 
AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount),  [...]
+04)------Projection: lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - 
lineitem.l_discount) AS {lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - 
lineitem.l_discount)|{Decimal128(Some(1),20,0) - 
lineitem.l_discount|{lineitem.l_discount}|{Decimal128(Some(1),20,0)}}|{lineitem.l_extendedprice}},
 lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, 
lineitem.l_tax, lineitem.l_returnflag, lineitem.l_linestatus
 05)--------Filter: lineitem.l_shipdate <= Date32("10471")
 06)----------TableScan: lineitem projection=[l_quantity, l_extendedprice, 
l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate], 
partial_filters=[lineitem.l_shipdate <= Date32("10471")]
 physical_plan
@@ -54,7 +54,7 @@ physical_plan
 05)--------CoalesceBatchesExec: target_batch_size=8192
 06)----------RepartitionExec: partitioning=Hash([l_returnflag@0, 
l_linestatus@1], 4), input_partitions=4
 07)------------AggregateExec: mode=Partial, gby=[l_returnflag@5 as 
l_returnflag, l_linestatus@6 as l_linestatus], aggr=[SUM(lineitem.l_quantity), 
SUM(lineitem.l_extendedprice), SUM(lineitem.l_extendedprice * Int64(1) - 
lineitem.l_discount), SUM(lineitem.l_extendedprice * Int64(1) - 
lineitem.l_discount * Int64(1) + lineitem.l_tax), AVG(lineitem.l_quantity), 
AVG(lineitem.l_extendedprice), AVG(lineitem.l_discount), COUNT(*)]
-08)--------------ProjectionExec: expr=[l_extendedprice@1 * (Some(1),20,0 - 
l_discount@2) as #{lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - 
lineitem.l_discount)}, l_quantity@0 as l_quantity, l_extendedprice@1 as 
l_extendedprice, l_discount@2 as l_discount, l_tax@3 as l_tax, l_returnflag@4 
as l_returnflag, l_linestatus@5 as l_linestatus]
+08)--------------ProjectionExec: expr=[l_extendedprice@1 * (Some(1),20,0 - 
l_discount@2) as {lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - 
lineitem.l_discount)|{Decimal128(Some(1),20,0) - 
lineitem.l_discount|{lineitem.l_discount}|{Decimal128(Some(1),20,0)}}|{lineitem.l_extendedprice}},
 l_quantity@0 as l_quantity, l_extendedprice@1 as l_extendedprice, l_discount@2 
as l_discount, l_tax@3 as l_tax, l_returnflag@4 as l_returnflag, l_linestatus@5 
as l_linestatus]
 09)----------------CoalesceBatchesExec: target_batch_size=8192
 10)------------------FilterExec: l_shipdate@6 <= 10471
 11)--------------------CsvExec: file_groups={4 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:0..18561749],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:18561749..37123498],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:37123498..55685247],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:55685247..74246996]]},
 projection=[l_quantity, l_extendedprice, l_discount, l_tax, l [...]
diff --git a/datafusion/sqllogictest/test_files/window.slt 
b/datafusion/sqllogictest/test_files/window.slt
index daf6f17afe..af09e644c9 100644
--- a/datafusion/sqllogictest/test_files/window.slt
+++ b/datafusion/sqllogictest/test_files/window.slt
@@ -1687,20 +1687,20 @@ EXPLAIN SELECT c3,
 logical_plan
 01)Projection: aggregate_test_100.c3, SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, 
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum1, 
SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + 
aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum2
 02)--Limit: skip=0, fetch=5
-03)----WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY 
[#{aggregate_test_100.c3 + aggregate_test_100.c4} AS aggregate_test_100.c3 + 
aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS SUM(aggregate_test_100.c9) 
ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, 
aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW]]
-04)------Projection: #{aggregate_test_100.c3 + aggregate_test_100.c4}, 
aggregate_test_100.c3, aggregate_test_100.c9, SUM(aggregate_test_100.c9) ORDER 
BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, 
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
-05)--------WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY 
[#{aggregate_test_100.c3 + aggregate_test_100.c4} AS aggregate_test_100.c3 + 
aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, 
aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW AS SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + 
aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, 
aggregate_test_100.c2 ASC NULLS [...]
-06)----------Projection: aggregate_test_100.c3 + aggregate_test_100.c4 AS 
#{aggregate_test_100.c3 + aggregate_test_100.c4}, aggregate_test_100.c2, 
aggregate_test_100.c3, aggregate_test_100.c9
+03)----WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY 
[{aggregate_test_100.c3 + 
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}} AS 
aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, 
aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW AS SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + 
aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING  [...]
+04)------Projection: {aggregate_test_100.c3 + 
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}}, 
aggregate_test_100.c3, aggregate_test_100.c9, SUM(aggregate_test_100.c9) ORDER 
BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, 
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+05)--------WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY 
[{aggregate_test_100.c3 + 
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}} AS 
aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, 
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS SUM(aggregate_test_100.c9) 
ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, 
aggregate_test_100.c9 DE [...]
+06)----------Projection: aggregate_test_100.c3 + aggregate_test_100.c4 AS 
{aggregate_test_100.c3 + 
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}}, 
aggregate_test_100.c2, aggregate_test_100.c3, aggregate_test_100.c9
 07)------------TableScan: aggregate_test_100 projection=[c2, c3, c4, c9]
 physical_plan
 01)ProjectionExec: expr=[c3@1 as c3, SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, 
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as sum1, 
SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + 
aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as sum2]
 02)--GlobalLimitExec: skip=0, fetch=5
 03)----WindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, 
aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW: Ok(Field { name: "SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, 
aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: 
false, [...]
-04)------ProjectionExec: expr=[#{aggregate_test_100.c3 + 
aggregate_test_100.c4}@0 as #{aggregate_test_100.c3 + aggregate_test_100.c4}, 
c3@2 as c3, c9@3 as c9, SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, 
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as 
SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + 
aggregate_test_100.c4 DESC NULLS FIRST [...]
+04)------ProjectionExec: expr=[{aggregate_test_100.c3 + 
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}}@0 as 
{aggregate_test_100.c3 + 
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}}, c3@2 as 
c3, c9@3 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + 
aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, 
aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW@4 as SUM(a [...]
 05)--------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, 
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: 
"SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + 
aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, 
aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRE [...]
-06)----------SortPreservingMergeExec: [#{aggregate_test_100.c3 + 
aggregate_test_100.c4}@0 DESC,c9@3 DESC,c2@1 ASC NULLS LAST]
-07)------------SortExec: expr=[#{aggregate_test_100.c3 + 
aggregate_test_100.c4}@0 DESC,c9@3 DESC,c2@1 ASC NULLS LAST], 
preserve_partitioning=[true]
-08)--------------ProjectionExec: expr=[c3@1 + c4@2 as #{aggregate_test_100.c3 
+ aggregate_test_100.c4}, c2@0 as c2, c3@1 as c3, c9@3 as c9]
+06)----------SortPreservingMergeExec: [{aggregate_test_100.c3 + 
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}}@0 
DESC,c9@3 DESC,c2@1 ASC NULLS LAST]
+07)------------SortExec: expr=[{aggregate_test_100.c3 + 
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}}@0 
DESC,c9@3 DESC,c2@1 ASC NULLS LAST], preserve_partitioning=[true]
+08)--------------ProjectionExec: expr=[c3@1 + c4@2 as {aggregate_test_100.c3 + 
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}}, c2@0 as 
c2, c3@1 as c3, c9@3 as c9]
 09)----------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
 10)------------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, 
c3, c4, c9], has_header=true
 
@@ -2544,11 +2544,11 @@ logical_plan
 02)--Limit: skip=0, fetch=5
 03)----Sort: annotated_data_finite.inc_col DESC NULLS FIRST, fetch=5
 04)------Projection: SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 
FOLLOWING AS sum1, SUM(annotated_data_finite.desc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 
FOLLOWING AS sum2, SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 
FOLLOWING AS sum3, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_dat 
[...]
-05)--------WindowAggr: windowExpr=[[SUM(#{CAST(annotated_data_finite.desc_col 
AS Int64)} AS annotated_data_finite.desc_col) ROWS BETWEEN 8 PRECEDING AND 1 
FOLLOWING, COUNT(Int64(1)) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING AS COUNT(*) 
ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING]]
-06)----------Projection: #{CAST(annotated_data_finite.desc_col AS Int64)}, 
annotated_data_finite.inc_col, SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 
FOLLOWING, SUM(annotated_data_finite.desc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 
FOLLOWING, SUM(annotated_data_finite.desc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOL 
[...]
-07)------------WindowAggr: 
windowExpr=[[SUM(#{CAST(annotated_data_finite.inc_col AS Int64)} AS 
annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS 
LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, 
SUM(#{CAST(annotated_data_finite.desc_col AS Int64)} AS 
annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts ASC NULLS 
LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING, 
SUM(#{CAST(annotated_data_finite.inc_col AS Int64)} AS 
annotated_data_finite.inc_col) ORD [...]
-08)--------------WindowAggr: 
windowExpr=[[SUM(#{CAST(annotated_data_finite.inc_col AS Int64)} AS 
annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS 
FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING, 
SUM(#{CAST(annotated_data_finite.desc_col AS Int64)} AS 
annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS 
FIRST] RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING, 
SUM(#{CAST(annotated_data_finite.desc_col AS Int64)} AS 
annotated_data_finite.desc_c [...]
-09)----------------Projection: CAST(annotated_data_finite.desc_col AS Int64) 
AS #{CAST(annotated_data_finite.desc_col AS Int64)}, 
CAST(annotated_data_finite.inc_col AS Int64) AS 
#{CAST(annotated_data_finite.inc_col AS Int64)}, annotated_data_finite.ts, 
annotated_data_finite.inc_col, annotated_data_finite.desc_col
+05)--------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_finite.desc_col 
AS Int64)|{annotated_data_finite.desc_col}} AS annotated_data_finite.desc_col) 
ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING, COUNT(Int64(1)) ROWS BETWEEN 8 
PRECEDING AND 1 FOLLOWING AS COUNT(*) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING]]
+06)----------Projection: {CAST(annotated_data_finite.desc_col AS 
Int64)|{annotated_data_finite.desc_col}}, annotated_data_finite.inc_col, 
SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC 
NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING, 
SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC 
NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING, 
SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC 
NULLS FIRST] RO [...]
+07)------------WindowAggr: 
windowExpr=[[SUM({CAST(annotated_data_finite.inc_col AS 
Int64)|{annotated_data_finite.inc_col}} AS annotated_data_finite.inc_col) ORDER 
BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 
FOLLOWING, SUM({CAST(annotated_data_finite.desc_col AS 
Int64)|{annotated_data_finite.desc_col}} AS annotated_data_finite.desc_col) 
ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING 
AND 1 FOLLOWING, SUM({CAST(annotated_data_f [...]
+08)--------------WindowAggr: 
windowExpr=[[SUM({CAST(annotated_data_finite.inc_col AS 
Int64)|{annotated_data_finite.inc_col}} AS annotated_data_finite.inc_col) ORDER 
BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 
FOLLOWING, SUM({CAST(annotated_data_finite.desc_col AS 
Int64)|{annotated_data_finite.desc_col}} AS annotated_data_finite.desc_col) 
ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING 
AND 8 FOLLOWING, SUM({CAST(annotated_d [...]
+09)----------------Projection: CAST(annotated_data_finite.desc_col AS Int64) 
AS {CAST(annotated_data_finite.desc_col AS 
Int64)|{annotated_data_finite.desc_col}}, CAST(annotated_data_finite.inc_col AS 
Int64) AS {CAST(annotated_data_finite.inc_col AS 
Int64)|{annotated_data_finite.inc_col}}, annotated_data_finite.ts, 
annotated_data_finite.inc_col, annotated_data_finite.desc_col
 10)------------------TableScan: annotated_data_finite projection=[ts, inc_col, 
desc_col]
 physical_plan
 01)ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, sum3@2 as sum3, 
min1@3 as min1, min2@4 as min2, min3@5 as min3, max1@6 as max1, max2@7 as max2, 
max3@8 as max3, cnt1@9 as cnt1, cnt2@10 as cnt2, sumr1@11 as sumr1, sumr2@12 as 
sumr2, sumr3@13 as sumr3, minr1@14 as minr1, minr2@15 as minr2, minr3@16 as 
minr3, maxr1@17 as maxr1, maxr2@18 as maxr2, maxr3@19 as maxr3, cntr1@20 as 
cntr1, cntr2@21 as cntr2, sum4@22 as sum4, cnt3@23 as cnt3]
@@ -2556,10 +2556,10 @@ physical_plan
 03)----SortExec: TopK(fetch=5), expr=[inc_col@24 DESC], 
preserve_partitioning=[false]
 04)------ProjectionExec: expr=[SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 
FOLLOWING@13 as sum1, SUM(annotated_data_finite.desc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 
FOLLOWING@14 as sum2, SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 
FOLLOWING@15 as sum3, MIN(annotated_data_finite.inc_col) ORDE [...]
 05)--------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite.desc_col) ROWS 
BETWEEN 8 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"SUM(annotated_data_finite.desc_col) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING", 
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(8)), 
end_bound: Following(UInt64(1)), is_causal: false }, COUNT(*) ROWS BETWEEN 8 
PRECEDING AND 1 FOLLOWING: Ok(Field { name: "C [...]
-06)----------ProjectionExec: expr=[#{CAST(annotated_data_finite.desc_col AS 
Int64)}@0 as #{CAST(annotated_data_finite.desc_col AS Int64)}, inc_col@3 as 
inc_col, SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts 
DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING@5 as 
SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC 
NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING, 
SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_fini [...]
+06)----------ProjectionExec: expr=[{CAST(annotated_data_finite.desc_col AS 
Int64)|{annotated_data_finite.desc_col}}@0 as 
{CAST(annotated_data_finite.desc_col AS 
Int64)|{annotated_data_finite.desc_col}}, inc_col@3 as inc_col, 
SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC 
NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING@5 as 
SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC 
NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING, S [...]
 07)------------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite.inc_col) 
ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING 
AND 1 FOLLOWING: Ok(Field { name: "SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(Int32(10)), end_bound: Fol [...]
 08)--------------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite.inc_col) 
ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING 
AND 4 FOLLOWING: Ok(Field { name: "SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(Int32(4)), end_bound:  [...]
-09)----------------ProjectionExec: expr=[CAST(desc_col@2 AS Int64) as 
#{CAST(annotated_data_finite.desc_col AS Int64)}, CAST(inc_col@1 AS Int64) as 
#{CAST(annotated_data_finite.inc_col AS Int64)}, ts@0 as ts, inc_col@1 as 
inc_col, desc_col@2 as desc_col]
+09)----------------ProjectionExec: expr=[CAST(desc_col@2 AS Int64) as 
{CAST(annotated_data_finite.desc_col AS 
Int64)|{annotated_data_finite.desc_col}}, CAST(inc_col@1 AS Int64) as 
{CAST(annotated_data_finite.inc_col AS Int64)|{annotated_data_finite.inc_col}}, 
ts@0 as ts, inc_col@1 as inc_col, desc_col@2 as desc_col]
 10)------------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts, 
inc_col, desc_col], output_ordering=[ts@0 ASC NULLS LAST], has_header=true
 
 query IIIIIIIIIIIIIIIIIIIIIIII
@@ -2708,9 +2708,9 @@ logical_plan
 02)--Limit: skip=0, fetch=5
 03)----Sort: annotated_data_finite.inc_col ASC NULLS LAST, fetch=5
 04)------Projection: SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
5 FOLLOWING AS sum1, SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING AS sum2, MIN(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
5 FOLLOWING AS min1, MIN(annotated_data_finite.inc_col) [...]
-05)--------WindowAggr: windowExpr=[[SUM(#{CAST(annotated_data_finite.inc_col 
AS Int64)} AS annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts 
ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING, 
MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING, 
MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLO [...]
-06)----------WindowAggr: windowExpr=[[SUM(#{CAST(annotated_data_finite.inc_col 
AS Int64)} AS annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts 
DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING, 
MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC 
NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING, 
MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC 
NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUND [...]
-07)------------Projection: CAST(annotated_data_finite.inc_col AS Int64) AS 
#{CAST(annotated_data_finite.inc_col AS Int64)}, 
CAST(annotated_data_finite.inc_col AS Float64) AS 
#{CAST(annotated_data_finite.inc_col AS Float64)}, annotated_data_finite.ts, 
annotated_data_finite.inc_col
+05)--------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_finite.inc_col AS 
Int64)|{annotated_data_finite.inc_col}} AS annotated_data_finite.inc_col) ORDER 
BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND 5 FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
5 FOLLOWING, MAX(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN  [...]
+06)----------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_finite.inc_col 
AS Int64)|{annotated_data_finite.inc_col}} AS annotated_data_finite.inc_col) 
ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING 
AND UNBOUNDED FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING, MAX(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE  [...]
+07)------------Projection: CAST(annotated_data_finite.inc_col AS Int64) AS 
{CAST(annotated_data_finite.inc_col AS Int64)|{annotated_data_finite.inc_col}}, 
CAST(annotated_data_finite.inc_col AS Float64) AS 
{CAST(annotated_data_finite.inc_col AS 
Float64)|{annotated_data_finite.inc_col}}, annotated_data_finite.ts, 
annotated_data_finite.inc_col
 08)--------------TableScan: annotated_data_finite projection=[ts, inc_col]
 physical_plan
 01)ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, min1@2 as min1, 
min2@3 as min2, max1@4 as max1, max2@5 as max2, count1@6 as count1, count2@7 as 
count2, avg1@8 as avg1, avg2@9 as avg2]
@@ -2719,7 +2719,7 @@ physical_plan
 04)------ProjectionExec: expr=[SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
5 FOLLOWING@9 as sum1, SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING@4 as sum2, MIN(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
5 FOLLOWING@10 as min1, MIN(annotated_dat [...]
 05)--------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite.inc_col) ORDER 
BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND 5 FOLLOWING: Ok(Field { name: "SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
5 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(Int32(NULL)), en [...]
 06)----------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite.inc_col) 
ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING 
AND UNBOUNDED FOLLOWING: Ok(Field { name: "SUM(annotated_data_finite.inc_col) 
ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING 
AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int32(NULL [...]
-07)------------ProjectionExec: expr=[CAST(inc_col@1 AS Int64) as 
#{CAST(annotated_data_finite.inc_col AS Int64)}, CAST(inc_col@1 AS Float64) as 
#{CAST(annotated_data_finite.inc_col AS Float64)}, ts@0 as ts, inc_col@1 as 
inc_col]
+07)------------ProjectionExec: expr=[CAST(inc_col@1 AS Int64) as 
{CAST(annotated_data_finite.inc_col AS Int64)|{annotated_data_finite.inc_col}}, 
CAST(inc_col@1 AS Float64) as {CAST(annotated_data_finite.inc_col AS 
Float64)|{annotated_data_finite.inc_col}}, ts@0 as ts, inc_col@1 as inc_col]
 08)--------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts, 
inc_col], output_ordering=[ts@0 ASC NULLS LAST], has_header=true
 
 query IIIIIIIIRR
@@ -2809,9 +2809,9 @@ logical_plan
 02)--Limit: skip=0, fetch=5
 03)----Sort: annotated_data_infinite.ts ASC NULLS LAST, fetch=5
 04)------Projection: SUM(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING AS sum1, SUM(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING AS sum2, COUNT(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING AS count1, COUNT(annotated_data_ [...]
-05)--------WindowAggr: windowExpr=[[SUM(#{CAST(annotated_data_infinite.inc_col 
AS Int64)} AS annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING, COUNT(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING]]
-06)----------WindowAggr: 
windowExpr=[[SUM(#{CAST(annotated_data_infinite.inc_col AS Int64)} AS 
annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC 
NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING, 
COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts 
DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING]]
-07)------------Projection: CAST(annotated_data_infinite.inc_col AS Int64) AS 
#{CAST(annotated_data_infinite.inc_col AS Int64)}, annotated_data_infinite.ts, 
annotated_data_infinite.inc_col
+05)--------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_infinite.inc_col 
AS Int64)|{annotated_data_infinite.inc_col}} AS 
annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS 
LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING, 
COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC 
NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING]]
+06)----------WindowAggr: 
windowExpr=[[SUM({CAST(annotated_data_infinite.inc_col AS 
Int64)|{annotated_data_infinite.inc_col}} AS annotated_data_infinite.inc_col) 
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING 
AND UNBOUNDED FOLLOWING, COUNT(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING]]
+07)------------Projection: CAST(annotated_data_infinite.inc_col AS Int64) AS 
{CAST(annotated_data_infinite.inc_col AS 
Int64)|{annotated_data_infinite.inc_col}}, annotated_data_infinite.ts, 
annotated_data_infinite.inc_col
 08)--------------TableScan: annotated_data_infinite projection=[ts, inc_col]
 physical_plan
 01)ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, count1@2 as count1, 
count2@3 as count2]
@@ -2819,7 +2819,7 @@ physical_plan
 03)----ProjectionExec: expr=[SUM(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING@5 as sum1, SUM(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING@3 as sum2, COUNT(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING@6 as count1, COUNT(a [...]
 04)------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite.inc_col) ORDER 
BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING: Ok(Field { name: "SUM(annotated_data_infinite.inc_col) ORDER 
BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(NULL)) [...]
 05)--------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite.inc_col) 
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING 
AND UNBOUNDED FOLLOWING: Ok(Field { name: "SUM(annotated_data_infinite.inc_col) 
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING 
AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64( [...]
-06)----------ProjectionExec: expr=[CAST(inc_col@1 AS Int64) as 
#{CAST(annotated_data_infinite.inc_col AS Int64)}, ts@0 as ts, inc_col@1 as 
inc_col]
+06)----------ProjectionExec: expr=[CAST(inc_col@1 AS Int64) as 
{CAST(annotated_data_infinite.inc_col AS 
Int64)|{annotated_data_infinite.inc_col}}, ts@0 as ts, inc_col@1 as inc_col]
 07)------------StreamingTableExec: partition_sizes=1, projection=[ts, 
inc_col], infinite_source=true, output_ordering=[ts@0 ASC NULLS LAST]
 
 query IIII
@@ -2856,9 +2856,9 @@ logical_plan
 02)--Limit: skip=0, fetch=5
 03)----Sort: annotated_data_infinite.ts ASC NULLS LAST, fetch=5
 04)------Projection: SUM(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING AS sum1, SUM(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING AS sum2, COUNT(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING AS count1, COUNT(annotated_data_ [...]
-05)--------WindowAggr: windowExpr=[[SUM(#{CAST(annotated_data_infinite.inc_col 
AS Int64)} AS annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING, COUNT(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING]]
-06)----------WindowAggr: 
windowExpr=[[SUM(#{CAST(annotated_data_infinite.inc_col AS Int64)} AS 
annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC 
NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING, 
COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts 
DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING]]
-07)------------Projection: CAST(annotated_data_infinite.inc_col AS Int64) AS 
#{CAST(annotated_data_infinite.inc_col AS Int64)}, annotated_data_infinite.ts, 
annotated_data_infinite.inc_col
+05)--------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_infinite.inc_col 
AS Int64)|{annotated_data_infinite.inc_col}} AS 
annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS 
LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING, 
COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC 
NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING]]
+06)----------WindowAggr: 
windowExpr=[[SUM({CAST(annotated_data_infinite.inc_col AS 
Int64)|{annotated_data_infinite.inc_col}} AS annotated_data_infinite.inc_col) 
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING 
AND UNBOUNDED FOLLOWING, COUNT(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING]]
+07)------------Projection: CAST(annotated_data_infinite.inc_col AS Int64) AS 
{CAST(annotated_data_infinite.inc_col AS 
Int64)|{annotated_data_infinite.inc_col}}, annotated_data_infinite.ts, 
annotated_data_infinite.inc_col
 08)--------------TableScan: annotated_data_infinite projection=[ts, inc_col]
 physical_plan
 01)ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, count1@2 as count1, 
count2@3 as count2]
@@ -2866,7 +2866,7 @@ physical_plan
 03)----ProjectionExec: expr=[SUM(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING@5 as sum1, SUM(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING@3 as sum2, COUNT(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING@6 as count1, COUNT(a [...]
 04)------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite.inc_col) ORDER 
BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING: Ok(Field { name: "SUM(annotated_data_infinite.inc_col) ORDER 
BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(NULL)) [...]
 05)--------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite.inc_col) 
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING 
AND UNBOUNDED FOLLOWING: Ok(Field { name: "SUM(annotated_data_infinite.inc_col) 
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING 
AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64( [...]
-06)----------ProjectionExec: expr=[CAST(inc_col@1 AS Int64) as 
#{CAST(annotated_data_infinite.inc_col AS Int64)}, ts@0 as ts, inc_col@1 as 
inc_col]
+06)----------ProjectionExec: expr=[CAST(inc_col@1 AS Int64) as 
{CAST(annotated_data_infinite.inc_col AS 
Int64)|{annotated_data_infinite.inc_col}}, ts@0 as ts, inc_col@1 as inc_col]
 07)------------StreamingTableExec: partition_sizes=1, projection=[ts, 
inc_col], infinite_source=true, output_ordering=[ts@0 ASC NULLS LAST]
 
 
@@ -2953,13 +2953,13 @@ EXPLAIN SELECT a, b, c,
 logical_plan
 01)Projection: annotated_data_infinite2.a, annotated_data_infinite2.b, 
annotated_data_infinite2.c, SUM(annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC 
NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING AS sum1, 
SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NU [...]
 02)--Limit: skip=0, fetch=5
-03)----WindowAggr: windowExpr=[[SUM(#{CAST(annotated_data_infinite2.c AS 
Int64)} AS annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS 
LAST, annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC 
NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, 
SUM(#{CAST(annotated_data_infinite2.c AS Int64)} AS annotated_data_infinite2.c) 
PARTITION BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2. 
[...]
-04)------WindowAggr: windowExpr=[[SUM(#{CAST(annotated_data_infinite2.c AS 
Int64)} AS annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a, 
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, 
SUM(#{CAST(annotated_data_infinite2.c AS Int64)} AS annotated_data_infinite2.c) 
PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a, 
annotated_data_infinite2.d] ORDER B [...]
-05)--------WindowAggr: windowExpr=[[SUM(#{CAST(annotated_data_infinite2.c AS 
Int64)} AS annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING, SUM(#{CAST(annotated_data_infinite2.c AS Int64)} AS 
annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.b, 
annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] ROWS BET [...]
-06)----------WindowAggr: windowExpr=[[SUM(#{CAST(annotated_data_infinite2.c AS 
Int64)} AS annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC 
NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, 
SUM(#{CAST(annotated_data_infinite2.c AS Int64)} AS annotated_data_infinite2.c) 
PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annota [...]
-07)------------WindowAggr: windowExpr=[[SUM(#{CAST(annotated_data_infinite2.c 
AS Int64)} AS annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b, 
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, 
SUM(#{CAST(annotated_data_infinite2.c AS Int64)} AS annotated_data_infinite2.c) 
PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b, 
annotated_data_infinite2.d] O [...]
-08)--------------WindowAggr: 
windowExpr=[[SUM(#{CAST(annotated_data_infinite2.c AS Int64)} AS 
annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, 
SUM(#{CAST(annotated_data_infinite2.c AS Int64)} AS annotated_data_infinite2.c) 
PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RO [...]
-09)----------------Projection: CAST(annotated_data_infinite2.c AS Int64) AS 
#{CAST(annotated_data_infinite2.c AS Int64)}, annotated_data_infinite2.a, 
annotated_data_infinite2.b, annotated_data_infinite2.c, 
annotated_data_infinite2.d
+03)----WindowAggr: windowExpr=[[SUM({CAST(annotated_data_infinite2.c AS 
Int64)|{annotated_data_infinite2.c}} AS annotated_data_infinite2.c) PARTITION 
BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS 
LAST, annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC 
NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, 
SUM({CAST(annotated_data_infinite2.c AS Int64)|{annotated_data_infinite2.c}} AS 
annotated_data_infinite2.c) PARTITION BY [annotat [...]
+04)------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_infinite2.c AS 
Int64)|{annotated_data_infinite2.c}} AS annotated_data_infinite2.c) PARTITION 
BY [annotated_data_infinite2.b, annotated_data_infinite2.a, 
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, 
SUM({CAST(annotated_data_infinite2.c AS Int64)|{annotated_data_infinite2.c}} AS 
annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.b, annotat 
[...]
+05)--------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_infinite2.c AS 
Int64)|{annotated_data_infinite2.c}} AS annotated_data_infinite2.c) PARTITION 
BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING, SUM({CAST(annotated_data_infinite2.c AS 
Int64)|{annotated_data_infinite2.c}} AS annotated_data_infinite2.c) PARTITION 
BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER [...]
+06)----------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_infinite2.c AS 
Int64)|{annotated_data_infinite2.c}} AS annotated_data_infinite2.c) PARTITION 
BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC 
NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, 
SUM({CAST(annotated_data_infinite2.c AS Int64)|{annotated_data_infinite2.c}} AS 
annotated_data_infinite2.c) PARTITION BY [annotated_data_i [...]
+07)------------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_infinite2.c 
AS Int64)|{annotated_data_infinite2.c}} AS annotated_data_infinite2.c) 
PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b, 
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, 
SUM({CAST(annotated_data_infinite2.c AS Int64)|{annotated_data_infinite2.c}} AS 
annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, a [...]
+08)--------------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_infinite2.c 
AS Int64)|{annotated_data_infinite2.c}} AS annotated_data_infinite2.c) 
PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING, SUM({CAST(annotated_data_infinite2.c AS 
Int64)|{annotated_data_infinite2.c}} AS annotated_data_infinite2.c) PARTITION 
BY [annotated_data_infinite2.a, annotated_data_infinite2.b] [...]
+09)----------------Projection: CAST(annotated_data_infinite2.c AS Int64) AS 
{CAST(annotated_data_infinite2.c AS Int64)|{annotated_data_infinite2.c}}, 
annotated_data_infinite2.a, annotated_data_infinite2.b, 
annotated_data_infinite2.c, annotated_data_infinite2.d
 10)------------------TableScan: annotated_data_infinite2 projection=[a, b, c, 
d]
 physical_plan
 01)ProjectionExec: expr=[a@1 as a, b@2 as b, c@3 as c, 
SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS 
LAST, annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING@9 as sum1, SUM(annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC N 
[...]
@@ -2970,7 +2970,7 @@ physical_plan
 06)----------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c) 
PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC 
NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS 
LAST, annotated_data_infinite2.c ASC NULLS LA [...]
 07)------------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c) 
PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b, 
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.b, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING [...]
 08)--------------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c) 
PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING: Ok(Field { name: "SUM(annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING", data_type: Int64, nullable: true, d [...]
-09)----------------ProjectionExec: expr=[CAST(c@2 AS Int64) as 
#{CAST(annotated_data_infinite2.c AS Int64)}, a@0 as a, b@1 as b, c@2 as c, d@3 
as d]
+09)----------------ProjectionExec: expr=[CAST(c@2 AS Int64) as 
{CAST(annotated_data_infinite2.c AS Int64)|{annotated_data_infinite2.c}}, a@0 
as a, b@1 as b, c@2 as c, d@3 as d]
 10)------------------StreamingTableExec: partition_sizes=1, projection=[a, b, 
c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS 
LAST, c@2 ASC NULLS LAST]
 
 query IIIIIIIIIIIIIII
@@ -3022,13 +3022,13 @@ logical_plan
 01)Limit: skip=0, fetch=5
 02)--Sort: annotated_data_finite2.c ASC NULLS LAST, fetch=5
 03)----Projection: annotated_data_finite2.a, annotated_data_finite2.b, 
annotated_data_finite2.c, SUM(annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY 
[annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS 
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING AS sum1, 
SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, 
annotated_data_finite2.d] ORDER BY [annotated_data_finite2.b ASC NULLS LAST, 
annotated_ [...]
-04)------WindowAggr: windowExpr=[[SUM(#{CAST(annotated_data_finite2.c AS 
Int64)} AS annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.d] 
ORDER BY [annotated_data_finite2.a ASC NULLS LAST, annotated_data_finite2.b ASC 
NULLS LAST, annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING 
AND 1 FOLLOWING, SUM(#{CAST(annotated_data_finite2.c AS Int64)} AS 
annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.d] ORDER BY 
[annotated_data_finite2.a ASC NULLS LAST,  [...]
-05)--------WindowAggr: windowExpr=[[SUM(#{CAST(annotated_data_finite2.c AS 
Int64)} AS annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.b, 
annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY 
[annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING, SUM(#{CAST(annotated_data_finite2.c AS Int64)} AS 
annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.b, 
annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY [annotated_data_fi 
[...]
-06)----------WindowAggr: windowExpr=[[SUM(#{CAST(annotated_data_finite2.c AS 
Int64)} AS annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.b, 
annotated_data_finite2.a] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] 
ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(#{CAST(annotated_data_finite2.c 
AS Int64)} AS annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.b, 
annotated_data_finite2.a] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] 
ROWS BETWEEN 5 PRECEDING A [...]
-07)------------WindowAggr: windowExpr=[[SUM(#{CAST(annotated_data_finite2.c AS 
Int64)} AS annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, 
annotated_data_finite2.d] ORDER BY [annotated_data_finite2.b ASC NULLS LAST, 
annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING, SUM(#{CAST(annotated_data_finite2.c AS Int64)} AS 
annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, 
annotated_data_finite2.d] ORDER BY [annotated_data_finite2.b [...]
-08)--------------WindowAggr: windowExpr=[[SUM(#{CAST(annotated_data_finite2.c 
AS Int64)} AS annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, 
annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY 
[annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING, SUM(#{CAST(annotated_data_finite2.c AS Int64)} AS 
annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, 
annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_d [...]
-09)----------------WindowAggr: 
windowExpr=[[SUM(#{CAST(annotated_data_finite2.c AS Int64)} AS 
annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, 
annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] 
ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(#{CAST(annotated_data_finite2.c 
AS Int64)} AS annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, 
annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] 
ROWS BETWEEN 5 PRECE [...]
-10)------------------Projection: CAST(annotated_data_finite2.c AS Int64) AS 
#{CAST(annotated_data_finite2.c AS Int64)}, annotated_data_finite2.a, 
annotated_data_finite2.b, annotated_data_finite2.c, annotated_data_finite2.d
+04)------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_finite2.c AS 
Int64)|{annotated_data_finite2.c}} AS annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.d] ORDER BY [annotated_data_finite2.a ASC NULLS LAST, 
annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS 
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, 
SUM({CAST(annotated_data_finite2.c AS Int64)|{annotated_data_finite2.c}} AS 
annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.d] [...]
+05)--------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_finite2.c AS 
Int64)|{annotated_data_finite2.c}} AS annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.b, annotated_data_finite2.a, annotated_data_finite2.d] 
ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 
1 FOLLOWING, SUM({CAST(annotated_data_finite2.c AS 
Int64)|{annotated_data_finite2.c}} AS annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.b, annotated_data_finite2.a, a [...]
+06)----------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_finite2.c AS 
Int64)|{annotated_data_finite2.c}} AS annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.b, annotated_data_finite2.a] ORDER BY 
[annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING, SUM({CAST(annotated_data_finite2.c AS 
Int64)|{annotated_data_finite2.c}} AS annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.b, annotated_data_finite2.a] ORDER BY [annotated_data_ 
[...]
+07)------------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_finite2.c AS 
Int64)|{annotated_data_finite2.c}} AS annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY 
[annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS 
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, 
SUM({CAST(annotated_data_finite2.c AS Int64)|{annotated_data_finite2.c}} AS 
annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotate [...]
+08)--------------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_finite2.c 
AS Int64)|{annotated_data_finite2.c}} AS annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] 
ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 
1 FOLLOWING, SUM({CAST(annotated_data_finite2.c AS 
Int64)|{annotated_data_finite2.c}} AS annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.a, annotated_data_finite [...]
+09)----------------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_finite2.c 
AS Int64)|{annotated_data_finite2.c}} AS annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY 
[annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING, SUM({CAST(annotated_data_finite2.c AS 
Int64)|{annotated_data_finite2.c}} AS annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated [...]
+10)------------------Projection: CAST(annotated_data_finite2.c AS Int64) AS 
{CAST(annotated_data_finite2.c AS Int64)|{annotated_data_finite2.c}}, 
annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.c, 
annotated_data_finite2.d
 11)--------------------TableScan: annotated_data_finite2 projection=[a, b, c, 
d]
 physical_plan
 01)GlobalLimitExec: skip=0, fetch=5
@@ -3045,7 +3045,7 @@ physical_plan
 12)----------------------BoundedWindowAggExec: 
wdw=[SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, 
annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY 
[annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING: Ok(Field { name: "SUM(annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] 
ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 
1 FOL [...]
 13)------------------------SortExec: expr=[a@1 ASC NULLS LAST,b@2 ASC NULLS 
LAST,d@4 ASC NULLS LAST,c@3 ASC NULLS LAST], preserve_partitioning=[false]
 14)--------------------------BoundedWindowAggExec: 
wdw=[SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, 
annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] 
ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, 
annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] 
ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, 
dict_ [...]
-15)----------------------------ProjectionExec: expr=[CAST(c@2 AS Int64) as 
#{CAST(annotated_data_finite2.c AS Int64)}, a@0 as a, b@1 as b, c@2 as c, d@3 
as d]
+15)----------------------------ProjectionExec: expr=[CAST(c@2 AS Int64) as 
{CAST(annotated_data_finite2.c AS Int64)|{annotated_data_finite2.c}}, a@0 as a, 
b@1 as b, c@2 as c, d@3 as d]
 16)------------------------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, 
c, d], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS 
LAST], has_header=true
 
 query IIIIIIIIIIIIIII
@@ -3211,21 +3211,21 @@ FROM annotated_data_infinite2;
 ----
 logical_plan
 01)Projection: SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW AS sum1, SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW AS sum2, SUM(annotated_data_infinite2.a) PARTITION BY [ann [...]
-02)--WindowAggr: windowExpr=[[SUM(#{CAST(annotated_data_infinite2.a AS Int64)} 
AS annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.d] ORDER 
BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW]]
-03)----Projection: #{CAST(annotated_data_infinite2.a AS Int64)}, 
annotated_data_infinite2.a, annotated_data_infinite2.d, 
SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, 
SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS 
LAST] RANGE BETW [...]
-04)------WindowAggr: windowExpr=[[SUM(#{CAST(annotated_data_infinite2.a AS 
Int64)} AS annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW]]
-05)--------WindowAggr: windowExpr=[[SUM(#{CAST(annotated_data_infinite2.a AS 
Int64)} AS annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW]]
-06)----------WindowAggr: windowExpr=[[SUM(#{CAST(annotated_data_infinite2.a AS 
Int64)} AS annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW]]
-07)------------Projection: CAST(annotated_data_infinite2.a AS Int64) AS 
#{CAST(annotated_data_infinite2.a AS Int64)}, annotated_data_infinite2.a, 
annotated_data_infinite2.b, annotated_data_infinite2.c, 
annotated_data_infinite2.d
+02)--WindowAggr: windowExpr=[[SUM({CAST(annotated_data_infinite2.a AS 
Int64)|{annotated_data_infinite2.a}} AS annotated_data_infinite2.a) PARTITION 
BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+03)----Projection: {CAST(annotated_data_infinite2.a AS 
Int64)|{annotated_data_infinite2.a}}, annotated_data_infinite2.a, 
annotated_data_infinite2.d, SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW, SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2. [...]
+04)------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_infinite2.a AS 
Int64)|{annotated_data_infinite2.a}} AS annotated_data_infinite2.a) PARTITION 
BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW]]
+05)--------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_infinite2.a AS 
Int64)|{annotated_data_infinite2.a}} AS annotated_data_infinite2.a) PARTITION 
BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW]]
+06)----------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_infinite2.a AS 
Int64)|{annotated_data_infinite2.a}} AS annotated_data_infinite2.a) PARTITION 
BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW]]
+07)------------Projection: CAST(annotated_data_infinite2.a AS Int64) AS 
{CAST(annotated_data_infinite2.a AS Int64)|{annotated_data_infinite2.a}}, 
annotated_data_infinite2.a, annotated_data_infinite2.b, 
annotated_data_infinite2.c, annotated_data_infinite2.d
 08)--------------TableScan: annotated_data_infinite2 projection=[a, b, c, d]
 physical_plan
 01)ProjectionExec: expr=[SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW@3 as sum1, SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW@5 as sum2, SUM(annotated_data_infinite2.a) PAR [...]
 02)--BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: 
"SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.d] 
ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }) [...]
-03)----ProjectionExec: expr=[#{CAST(annotated_data_infinite2.a AS Int64)}@0 as 
#{CAST(annotated_data_infinite2.a AS Int64)}, a@1 as a, d@4 as d, 
SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as 
SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2. [...]
+03)----ProjectionExec: expr=[{CAST(annotated_data_infinite2.a AS 
Int64)|{annotated_data_infinite2.a}}@0 as {CAST(annotated_data_infinite2.a AS 
Int64)|{annotated_data_infinite2.a}}, a@1 as a, d@4 as d, 
SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as 
SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, 
annotat [...]
 04)------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) PARTITION 
BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW: Ok(Field { name: "SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW", data_type: Int64, nullabl [...]
 05)--------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) 
PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW: Ok(Field { name: "SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW", data_type: Int64, nulla [...]
 06)----------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) 
PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW: Ok(Field { name: "SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW", data_type: Int64, nul [...]
-07)------------ProjectionExec: expr=[CAST(a@0 AS Int64) as 
#{CAST(annotated_data_infinite2.a AS Int64)}, a@0 as a, b@1 as b, c@2 as c, d@3 
as d]
+07)------------ProjectionExec: expr=[CAST(a@0 AS Int64) as 
{CAST(annotated_data_infinite2.a AS Int64)|{annotated_data_infinite2.a}}, a@0 
as a, b@1 as b, c@2 as c, d@3 as d]
 08)--------------StreamingTableExec: partition_sizes=1, projection=[a, b, c, 
d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS 
LAST, c@2 ASC NULLS LAST]
 
 statement ok
@@ -3242,29 +3242,29 @@ FROM annotated_data_infinite2;
 ----
 logical_plan
 01)Projection: SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW AS sum1, SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW AS sum2, SUM(annotated_data_infinite2.a) PARTITION BY [ann [...]
-02)--WindowAggr: windowExpr=[[SUM(#{CAST(annotated_data_infinite2.a AS Int64)} 
AS annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.d] ORDER 
BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW]]
-03)----Projection: #{CAST(annotated_data_infinite2.a AS Int64)}, 
annotated_data_infinite2.a, annotated_data_infinite2.d, 
SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, 
SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS 
LAST] RANGE BETW [...]
-04)------WindowAggr: windowExpr=[[SUM(#{CAST(annotated_data_infinite2.a AS 
Int64)} AS annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW]]
-05)--------WindowAggr: windowExpr=[[SUM(#{CAST(annotated_data_infinite2.a AS 
Int64)} AS annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW]]
-06)----------WindowAggr: windowExpr=[[SUM(#{CAST(annotated_data_infinite2.a AS 
Int64)} AS annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW]]
-07)------------Projection: CAST(annotated_data_infinite2.a AS Int64) AS 
#{CAST(annotated_data_infinite2.a AS Int64)}, annotated_data_infinite2.a, 
annotated_data_infinite2.b, annotated_data_infinite2.c, 
annotated_data_infinite2.d
+02)--WindowAggr: windowExpr=[[SUM({CAST(annotated_data_infinite2.a AS 
Int64)|{annotated_data_infinite2.a}} AS annotated_data_infinite2.a) PARTITION 
BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+03)----Projection: {CAST(annotated_data_infinite2.a AS 
Int64)|{annotated_data_infinite2.a}}, annotated_data_infinite2.a, 
annotated_data_infinite2.d, SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW, SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2. [...]
+04)------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_infinite2.a AS 
Int64)|{annotated_data_infinite2.a}} AS annotated_data_infinite2.a) PARTITION 
BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW]]
+05)--------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_infinite2.a AS 
Int64)|{annotated_data_infinite2.a}} AS annotated_data_infinite2.a) PARTITION 
BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW]]
+06)----------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_infinite2.a AS 
Int64)|{annotated_data_infinite2.a}} AS annotated_data_infinite2.a) PARTITION 
BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW]]
+07)------------Projection: CAST(annotated_data_infinite2.a AS Int64) AS 
{CAST(annotated_data_infinite2.a AS Int64)|{annotated_data_infinite2.a}}, 
annotated_data_infinite2.a, annotated_data_infinite2.b, 
annotated_data_infinite2.c, annotated_data_infinite2.d
 08)--------------TableScan: annotated_data_infinite2 projection=[a, b, c, d]
 physical_plan
 01)ProjectionExec: expr=[SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW@3 as sum1, SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW@5 as sum2, SUM(annotated_data_infinite2.a) PAR [...]
 02)--BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: 
"SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.d] 
ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }) [...]
 03)----CoalesceBatchesExec: target_batch_size=4096
-04)------RepartitionExec: partitioning=Hash([d@2], 2), input_partitions=2, 
preserve_order=true, sort_exprs=#{CAST(annotated_data_infinite2.a AS Int64)}@0 
ASC NULLS LAST,a@1 ASC NULLS LAST
-05)--------ProjectionExec: expr=[#{CAST(annotated_data_infinite2.a AS 
Int64)}@0 as #{CAST(annotated_data_infinite2.a AS Int64)}, a@1 as a, d@4 as d, 
SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as 
SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.b] ORDER BY [annotated_data_infini [...]
+04)------RepartitionExec: partitioning=Hash([d@2], 2), input_partitions=2, 
preserve_order=true, sort_exprs={CAST(annotated_data_infinite2.a AS 
Int64)|{annotated_data_infinite2.a}}@0 ASC NULLS LAST,a@1 ASC NULLS LAST
+05)--------ProjectionExec: expr=[{CAST(annotated_data_infinite2.a AS 
Int64)|{annotated_data_infinite2.a}}@0 as {CAST(annotated_data_infinite2.a AS 
Int64)|{annotated_data_infinite2.a}}, a@1 as a, d@4 as d, 
SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as 
SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, ann 
[...]
 06)----------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) 
PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW: Ok(Field { name: "SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW", data_type: Int64, nul [...]
 07)------------CoalesceBatchesExec: target_batch_size=4096
-08)--------------RepartitionExec: partitioning=Hash([b@2, a@1], 2), 
input_partitions=2, preserve_order=true, sort_exprs=a@1 ASC NULLS LAST,b@2 ASC 
NULLS LAST,c@3 ASC NULLS LAST,#{CAST(annotated_data_infinite2.a AS Int64)}@0 
ASC NULLS LAST
+08)--------------RepartitionExec: partitioning=Hash([b@2, a@1], 2), 
input_partitions=2, preserve_order=true, sort_exprs=a@1 ASC NULLS LAST,b@2 ASC 
NULLS LAST,c@3 ASC NULLS LAST,{CAST(annotated_data_infinite2.a AS 
Int64)|{annotated_data_infinite2.a}}@0 ASC NULLS LAST
 09)----------------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) 
PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW: Ok(Field { name: "SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW", data_type: Int6 [...]
 10)------------------CoalesceBatchesExec: target_batch_size=4096
-11)--------------------RepartitionExec: partitioning=Hash([a@1, d@4], 2), 
input_partitions=2, preserve_order=true, sort_exprs=a@1 ASC NULLS LAST,b@2 ASC 
NULLS LAST,c@3 ASC NULLS LAST,#{CAST(annotated_data_infinite2.a AS Int64)}@0 
ASC NULLS LAST
+11)--------------------RepartitionExec: partitioning=Hash([a@1, d@4], 2), 
input_partitions=2, preserve_order=true, sort_exprs=a@1 ASC NULLS LAST,b@2 ASC 
NULLS LAST,c@3 ASC NULLS LAST,{CAST(annotated_data_infinite2.a AS 
Int64)|{annotated_data_infinite2.a}}@0 ASC NULLS LAST
 12)----------------------BoundedWindowAggExec: 
wdw=[SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: 
"SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type [...]
 13)------------------------CoalesceBatchesExec: target_batch_size=4096
-14)--------------------------RepartitionExec: partitioning=Hash([a@1, b@2], 
2), input_partitions=2, preserve_order=true, sort_exprs=a@1 ASC NULLS LAST,b@2 
ASC NULLS LAST,c@3 ASC NULLS LAST,#{CAST(annotated_data_infinite2.a AS 
Int64)}@0 ASC NULLS LAST
-15)----------------------------ProjectionExec: expr=[CAST(a@0 AS Int64) as 
#{CAST(annotated_data_infinite2.a AS Int64)}, a@0 as a, b@1 as b, c@2 as c, d@3 
as d]
+14)--------------------------RepartitionExec: partitioning=Hash([a@1, b@2], 
2), input_partitions=2, preserve_order=true, sort_exprs=a@1 ASC NULLS LAST,b@2 
ASC NULLS LAST,c@3 ASC NULLS LAST,{CAST(annotated_data_infinite2.a AS 
Int64)|{annotated_data_infinite2.a}}@0 ASC NULLS LAST
+15)----------------------------ProjectionExec: expr=[CAST(a@0 AS Int64) as 
{CAST(annotated_data_infinite2.a AS Int64)|{annotated_data_infinite2.a}}, a@0 
as a, b@1 as b, c@2 as c, d@3 as d]
 16)------------------------------RepartitionExec: 
partitioning=RoundRobinBatch(2), input_partitions=1
 17)--------------------------------StreamingTableExec: partition_sizes=1, 
projection=[a, b, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST]
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to