This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 0d283a49f5 Fix and improve `CommonSubexprEliminate` rule (#10396)
0d283a49f5 is described below

commit 0d283a49f52d91a58c4e88e3e5e76fe0d59c1260
Author: Peter Toth <[email protected]>
AuthorDate: Wed May 8 19:21:27 2024 +0200

    Fix and improve `CommonSubexprEliminate` rule (#10396)
    
    * Revert "fix(9870): common expression elimination optimization, should 
always re-find the correct expression during re-write. (#9871)"
    
    This reverts commit cd7a00b08309f7229073e4bba686d6271726ab1c.
    
    * expr id should always contain the full expr structure, cleaner expr ids, 
better JumpMark handling, better variable names, code cleanup, some new todos
    
    * move `Expr` from `expr_set`s to `affected_id`s
    
    * better naming, docs fixes
    
    * introduce `CommonExprs` type alias, minor todo fix
    
    * add test
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 datafusion/expr/src/logical_plan/plan.rs           |   2 +-
 .../optimizer/src/common_subexpr_eliminate.rs      | 711 ++++++++++++---------
 datafusion/sqllogictest/test_files/group_by.slt    |   8 +-
 datafusion/sqllogictest/test_files/select.slt      |  24 +-
 datafusion/sqllogictest/test_files/subquery.slt    |   8 +-
 .../sqllogictest/test_files/tpch/q1.slt.part       |   6 +-
 datafusion/sqllogictest/test_files/window.slt      | 126 ++--
 7 files changed, 500 insertions(+), 385 deletions(-)

diff --git a/datafusion/expr/src/logical_plan/plan.rs 
b/datafusion/expr/src/logical_plan/plan.rs
index c608b51e08..1b54e76a17 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -2245,7 +2245,7 @@ impl DistinctOn {
 
 /// Aggregates its input based on a set of grouping and aggregate
 /// expressions (e.g. SUM).
-#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+#[derive(Clone, PartialEq, Eq, Hash)]
 // mark non_exhaustive to encourage use of try_new/new()
 #[non_exhaustive]
 pub struct Aggregate {
diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs 
b/datafusion/optimizer/src/common_subexpr_eliminate.rs
index cb3b4accf3..0475edd659 100644
--- a/datafusion/optimizer/src/common_subexpr_eliminate.rs
+++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs
@@ -17,7 +17,6 @@
 
 //! [`CommonSubexprEliminate`] to avoid redundant computation of common 
sub-expressions
 
-use std::collections::hash_map::Entry;
 use std::collections::{BTreeSet, HashMap};
 use std::sync::Arc;
 
@@ -34,90 +33,63 @@ use datafusion_common::{
 use datafusion_expr::expr::Alias;
 use datafusion_expr::logical_plan::{Aggregate, LogicalPlan, Projection, 
Window};
 use datafusion_expr::{col, Expr, ExprSchemable};
+use indexmap::IndexMap;
 
-/// Set of expressions generated by the [`ExprIdentifierVisitor`]
-/// and consumed by the [`CommonSubexprRewriter`].
-#[derive(Default)]
-struct ExprSet {
-    /// A map from expression's identifier (stringified expr) to tuple 
including:
-    /// - the expression itself (cloned)
-    /// - counter
-    /// - DataType of this expression.
-    /// - symbol used as the identifier in the alias.
-    map: HashMap<Identifier, (Expr, usize, DataType, Identifier)>,
-}
-
-impl ExprSet {
-    fn expr_identifier(expr: &Expr) -> Identifier {
-        format!("{expr}")
-    }
-
-    fn get(&self, key: &Identifier) -> Option<&(Expr, usize, DataType, 
Identifier)> {
-        self.map.get(key)
-    }
-
-    fn entry(
-        &mut self,
-        key: Identifier,
-    ) -> Entry<'_, Identifier, (Expr, usize, DataType, Identifier)> {
-        self.map.entry(key)
-    }
-
-    fn populate_expr_set(
-        &mut self,
-        expr: &[Expr],
-        input_schema: DFSchemaRef,
-        expr_mask: ExprMask,
-    ) -> Result<()> {
-        expr.iter().try_for_each(|e| {
-            self.expr_to_identifier(e, Arc::clone(&input_schema), expr_mask)?;
-
-            Ok(())
-        })
-    }
-
-    /// Go through an expression tree and generate identifier for every node 
in this tree.
-    fn expr_to_identifier(
-        &mut self,
-        expr: &Expr,
-        input_schema: DFSchemaRef,
-        expr_mask: ExprMask,
-    ) -> Result<()> {
-        expr.visit(&mut ExprIdentifierVisitor {
-            expr_set: self,
-            input_schema,
-            visit_stack: vec![],
-            node_count: 0,
-            expr_mask,
-        })?;
-
-        Ok(())
-    }
-}
-
-impl From<Vec<(Identifier, (Expr, usize, DataType, Identifier))>> for ExprSet {
-    fn from(entries: Vec<(Identifier, (Expr, usize, DataType, Identifier))>) 
-> Self {
-        let mut expr_set = Self::default();
-        entries.into_iter().for_each(|(k, v)| {
-            expr_set.map.insert(k, v);
-        });
-        expr_set
-    }
-}
-
-/// Identifier for each subexpression.
+/// Identifier that represents a subexpression tree.
 ///
-/// Note that the current implementation uses the `Display` of an expression
-/// (a `String`) as `Identifier`.
+/// Note that the current implementation contains:
+/// - the `Display` of an expression (a `String`) and
+/// - the identifiers of the childrens of the expression
+/// concatenated.
 ///
 /// An identifier should (ideally) be able to "hash", "accumulate", "equal" 
and "have no
 /// collision (as low as possible)"
 ///
 /// Since an identifier is likely to be copied many times, it is better that 
an identifier
-/// is small or "copy". otherwise some kinds of reference count is needed. 
String description
-/// here is not such a good choose.
+/// is small or "copy". otherwise some kinds of reference count is needed. 
String
+/// description here is not such a good choose.
 type Identifier = String;
 
+/// A cache that contains the postorder index and the identifier of expression 
tree nodes
+/// by the preorder index of the nodes.
+///
+/// This cache is filled by `ExprIdentifierVisitor` during the first traversal 
and is used
+/// by `CommonSubexprRewriter` during the second traversal.
+///
+/// The purpose of this cache is to quickly find the identifier of a node 
during the
+/// second traversal.
+///
+/// Elements in this array are added during `f_down` so the indexes represent 
the preorder
+/// index of expression nodes and thus element 0 belongs to the root of the 
expression
+/// tree.
+/// The elements of the array are tuples that contain:
+/// - Postorder index that belongs to the preorder index. Assigned during 
`f_up`, start
+///   from 0.
+/// - Identifier of the expression. If empty (`""`), expr should not be 
considered for
+///   CSE.
+///
+/// # Example
+/// An expression like `(a + b)` would have the following `IdArray`:
+/// ```text
+/// [
+///   (2, "a + b"),
+///   (1, "a"),
+///   (0, "b")
+/// ]
+/// ```
+type IdArray = Vec<(usize, Identifier)>;
+
+/// A map that contains statistics of expressions by their identifiers.
+/// It contains:
+/// - The number of occurrences and
+/// - The DataType
+/// of an expression.
+type ExprStats = HashMap<Identifier, (usize, DataType)>;
+
+/// A map that contains the common expressions extracted during the second, 
rewriting
+/// traversal.
+type CommonExprs = IndexMap<Identifier, Expr>;
+
 /// Performs Common Sub-expression Elimination optimization.
 ///
 /// This optimization improves query performance by computing expressions that
@@ -150,22 +122,27 @@ impl CommonSubexprEliminate {
     /// Rewrites `exprs_list` with common sub-expressions replaced with a new
     /// column.
     ///
-    /// `affected_id` is updated with any sub expressions that were replaced.
+    /// `common_exprs` is updated with any sub expressions that were replaced.
     ///
     /// Returns the rewritten expressions
     fn rewrite_exprs_list(
         &self,
         exprs_list: &[&[Expr]],
-        expr_set: &ExprSet,
-        affected_id: &mut BTreeSet<Identifier>,
+        arrays_list: &[&[Vec<(usize, String)>]],
+        expr_stats: &ExprStats,
+        common_exprs: &mut CommonExprs,
     ) -> Result<Vec<Vec<Expr>>> {
         exprs_list
             .iter()
-            .map(|exprs| {
+            .zip(arrays_list.iter())
+            .map(|(exprs, arrays)| {
                 exprs
                     .iter()
                     .cloned()
-                    .map(|expr| replace_common_expr(expr, expr_set, 
affected_id))
+                    .zip(arrays.iter())
+                    .map(|(expr, id_array)| {
+                        replace_common_expr(expr, id_array, expr_stats, 
common_exprs)
+                    })
                     .collect::<Result<Vec<_>>>()
             })
             .collect::<Result<Vec<_>>>()
@@ -182,20 +159,26 @@ impl CommonSubexprEliminate {
     fn rewrite_expr(
         &self,
         exprs_list: &[&[Expr]],
+        arrays_list: &[&[Vec<(usize, String)>]],
         input: &LogicalPlan,
-        expr_set: &ExprSet,
+        expr_stats: &ExprStats,
         config: &dyn OptimizerConfig,
     ) -> Result<(Vec<Vec<Expr>>, LogicalPlan)> {
-        let mut affected_id = BTreeSet::<Identifier>::new();
+        let mut common_exprs = IndexMap::new();
 
-        let rewrite_exprs =
-            self.rewrite_exprs_list(exprs_list, expr_set, &mut affected_id)?;
+        let rewrite_exprs = self.rewrite_exprs_list(
+            exprs_list,
+            arrays_list,
+            expr_stats,
+            &mut common_exprs,
+        )?;
 
         let mut new_input = self
             .try_optimize(input, config)?
             .unwrap_or_else(|| input.clone());
-        if !affected_id.is_empty() {
-            new_input = build_common_expr_project_plan(new_input, affected_id, 
expr_set)?;
+        if !common_exprs.is_empty() {
+            new_input =
+                build_common_expr_project_plan(new_input, common_exprs, 
expr_stats)?;
         }
 
         Ok((rewrite_exprs, new_input))
@@ -207,7 +190,8 @@ impl CommonSubexprEliminate {
         config: &dyn OptimizerConfig,
     ) -> Result<LogicalPlan> {
         let mut window_exprs = vec![];
-        let mut expr_set = ExprSet::default();
+        let mut arrays_per_window = vec![];
+        let mut expr_stats = ExprStats::new();
 
         // Get all window expressions inside the consecutive window operators.
         // Consecutive window expressions may refer to same complex expression.
@@ -226,18 +210,34 @@ impl CommonSubexprEliminate {
             plan = input.as_ref().clone();
 
             let input_schema = Arc::clone(input.schema());
-            expr_set.populate_expr_set(&window_expr, input_schema, 
ExprMask::Normal)?;
+            let arrays = to_arrays(
+                &window_expr,
+                input_schema,
+                &mut expr_stats,
+                ExprMask::Normal,
+            )?;
 
             window_exprs.push(window_expr);
+            arrays_per_window.push(arrays);
         }
 
         let mut window_exprs = window_exprs
             .iter()
             .map(|expr| expr.as_slice())
             .collect::<Vec<_>>();
+        let arrays_per_window = arrays_per_window
+            .iter()
+            .map(|arrays| arrays.as_slice())
+            .collect::<Vec<_>>();
 
-        let (mut new_expr, new_input) =
-            self.rewrite_expr(&window_exprs, &plan, &expr_set, config)?;
+        assert_eq!(window_exprs.len(), arrays_per_window.len());
+        let (mut new_expr, new_input) = self.rewrite_expr(
+            &window_exprs,
+            &arrays_per_window,
+            &plan,
+            &expr_stats,
+            config,
+        )?;
         assert_eq!(window_exprs.len(), new_expr.len());
 
         // Construct consecutive window operator, with their corresponding new 
window expressions.
@@ -274,39 +274,49 @@ impl CommonSubexprEliminate {
             input,
             ..
         } = aggregate;
-        let mut expr_set = ExprSet::default();
+        let mut expr_stats = ExprStats::new();
 
-        // build expr_set, with groupby and aggr
+        // rewrite inputs
         let input_schema = Arc::clone(input.schema());
-        expr_set.populate_expr_set(
+        let group_arrays = to_arrays(
             group_expr,
             Arc::clone(&input_schema),
+            &mut expr_stats,
             ExprMask::Normal,
         )?;
-        expr_set.populate_expr_set(aggr_expr, input_schema, ExprMask::Normal)?;
+        let aggr_arrays =
+            to_arrays(aggr_expr, input_schema, &mut expr_stats, 
ExprMask::Normal)?;
 
-        // rewrite inputs
-        let (mut new_expr, new_input) =
-            self.rewrite_expr(&[group_expr, aggr_expr], input, &expr_set, 
config)?;
+        let (mut new_expr, new_input) = self.rewrite_expr(
+            &[group_expr, aggr_expr],
+            &[&group_arrays, &aggr_arrays],
+            input,
+            &expr_stats,
+            config,
+        )?;
         // note the reversed pop order.
         let new_aggr_expr = pop_expr(&mut new_expr)?;
         let new_group_expr = pop_expr(&mut new_expr)?;
 
         // create potential projection on top
-        let mut expr_set = ExprSet::default();
+        let mut expr_stats = ExprStats::new();
         let new_input_schema = Arc::clone(new_input.schema());
-        expr_set.populate_expr_set(
+        let aggr_arrays = to_arrays(
             &new_aggr_expr,
             new_input_schema.clone(),
+            &mut expr_stats,
             ExprMask::NormalAndAggregates,
         )?;
-
-        let mut affected_id = BTreeSet::<Identifier>::new();
-        let mut rewritten =
-            self.rewrite_exprs_list(&[&new_aggr_expr], &expr_set, &mut 
affected_id)?;
+        let mut common_exprs = IndexMap::new();
+        let mut rewritten = self.rewrite_exprs_list(
+            &[&new_aggr_expr],
+            &[&aggr_arrays],
+            &expr_stats,
+            &mut common_exprs,
+        )?;
         let rewritten = pop_expr(&mut rewritten)?;
 
-        if affected_id.is_empty() {
+        if common_exprs.is_empty() {
             // Alias aggregation expressions if they have changed
             let new_aggr_expr = new_aggr_expr
                 .iter()
@@ -319,19 +329,13 @@ impl CommonSubexprEliminate {
             Aggregate::try_new(Arc::new(new_input), new_group_expr, 
new_aggr_expr)
                 .map(LogicalPlan::Aggregate)
         } else {
-            let mut agg_exprs = vec![];
-
-            for id in affected_id {
-                match expr_set.get(&id) {
-                    Some((expr, _, _, symbol)) => {
-                        // todo: check `nullable`
-                        agg_exprs.push(expr.clone().alias(symbol.as_str()));
-                    }
-                    _ => {
-                        return internal_err!("expr_set invalid state");
-                    }
-                }
-            }
+            let mut agg_exprs = common_exprs
+                .into_iter()
+                .map(|(expr_id, expr)| {
+                    // todo: check `nullable`
+                    expr.alias(expr_id)
+                })
+                .collect::<Vec<_>>();
 
             let mut proj_exprs = vec![];
             for expr in &new_group_expr {
@@ -343,7 +347,7 @@ impl CommonSubexprEliminate {
                         agg_exprs.push(expr.alias(&name));
                         proj_exprs.push(Expr::Column(Column::from_name(name)));
                     } else {
-                        let id = ExprSet::expr_identifier(&expr_rewritten);
+                        let id = expr_identifier(&expr_rewritten, 
"".to_string());
                         let (qualifier, field) =
                             expr_rewritten.to_field(&new_input_schema)?;
                         let out_name = qualified_name(qualifier.as_ref(), 
field.name());
@@ -379,13 +383,13 @@ impl CommonSubexprEliminate {
         let inputs = plan.inputs();
         let input = inputs[0];
         let input_schema = Arc::clone(input.schema());
-        let mut expr_set = ExprSet::default();
+        let mut expr_stats = ExprStats::new();
 
-        // Visit expr list and build expr identifier to occuring count map 
(`expr_set`).
-        expr_set.populate_expr_set(&expr, input_schema, ExprMask::Normal)?;
+        // Visit expr list and build expr identifier to occuring count map 
(`expr_stats`).
+        let arrays = to_arrays(&expr, input_schema, &mut expr_stats, 
ExprMask::Normal)?;
 
         let (mut new_expr, new_input) =
-            self.rewrite_expr(&[&expr], input, &expr_set, config)?;
+            self.rewrite_expr(&[&expr], &[&arrays], input, &expr_stats, 
config)?;
 
         plan.with_new_exprs(pop_expr(&mut new_expr)?, vec![new_input])
     }
@@ -471,37 +475,56 @@ fn pop_expr(new_expr: &mut Vec<Vec<Expr>>) -> 
Result<Vec<Expr>> {
         .ok_or_else(|| DataFusionError::Internal("Failed to pop 
expression".to_string()))
 }
 
+fn to_arrays(
+    expr: &[Expr],
+    input_schema: DFSchemaRef,
+    expr_stats: &mut ExprStats,
+    expr_mask: ExprMask,
+) -> Result<Vec<Vec<(usize, String)>>> {
+    expr.iter()
+        .map(|e| {
+            let mut id_array = vec![];
+            expr_to_identifier(
+                e,
+                expr_stats,
+                &mut id_array,
+                Arc::clone(&input_schema),
+                expr_mask,
+            )?;
+
+            Ok(id_array)
+        })
+        .collect::<Result<Vec<_>>>()
+}
+
 /// Build the "intermediate" projection plan that evaluates the extracted 
common
 /// expressions.
 ///
 /// # Arguments
 /// input: the input plan
 ///
-/// affected_id: which common subexpressions were used (and thus are added to
+/// common_exprs: which common subexpressions were used (and thus are added to
 /// intermediate projection)
 ///
-/// expr_set: the set of common subexpressions
+/// expr_stats: the set of common subexpressions
 fn build_common_expr_project_plan(
     input: LogicalPlan,
-    affected_id: BTreeSet<Identifier>,
-    expr_set: &ExprSet,
+    common_exprs: CommonExprs,
+    expr_stats: &ExprStats,
 ) -> Result<LogicalPlan> {
-    let mut project_exprs = vec![];
     let mut fields_set = BTreeSet::new();
-
-    for id in affected_id {
-        match expr_set.get(&id) {
-            Some((expr, _, data_type, symbol)) => {
-                // todo: check `nullable`
-                let field = Field::new(&id, data_type.clone(), true);
-                fields_set.insert(field.name().to_owned());
-                project_exprs.push(expr.clone().alias(symbol.as_str()));
-            }
-            _ => {
-                return internal_err!("expr_set invalid state");
-            }
-        }
-    }
+    let mut project_exprs = common_exprs
+        .into_iter()
+        .map(|(expr_id, expr)| {
+            let Some((_, data_type)) = expr_stats.get(&expr_id) else {
+                return internal_err!("expr_stats invalid state");
+            };
+            // todo: check `nullable`
+            let field = Field::new(&expr_id, data_type.clone(), true);
+            fields_set.insert(field.name().to_owned());
+            Ok(expr.alias(expr_id))
+        })
+        .collect::<Result<Vec<_>>>()?;
 
     for (qualifier, field) in input.schema().iter() {
         if fields_set.insert(qualified_name(qualifier, field.name())) {
@@ -610,26 +633,29 @@ impl ExprMask {
 /// `Expr` without sub-expr (column, literal etc.) will not have identifier
 /// because they should not be recognized as common sub-expr.
 struct ExprIdentifierVisitor<'a> {
-    // param
-    expr_set: &'a mut ExprSet,
-    /// input schema for the node that we're optimizing, so we can determine 
the correct datatype
-    /// for each subexpression
+    // statistics of expressions
+    expr_stats: &'a mut ExprStats,
+    // cache to speed up second traversal
+    id_array: &'a mut IdArray,
+    // input schema for the node that we're optimizing, so we can determine 
the correct datatype
+    // for each subexpression
     input_schema: DFSchemaRef,
     // inner states
     visit_stack: Vec<VisitRecord>,
-    /// increased in fn_down, start from 0.
-    node_count: usize,
-    /// which expression should be skipped?
+    // preorder index, start from 0.
+    down_index: usize,
+    // postorder index, start from 0.
+    up_index: usize,
+    // which expression should be skipped?
     expr_mask: ExprMask,
 }
 
 /// Record item that used when traversing a expression tree.
 enum VisitRecord {
-    /// `usize` is the monotone increasing series number assigned in 
pre_visit().
-    /// Starts from 0. Is used to index the identifier array `id_array` in 
post_visit().
+    /// `usize` postorder index assigned in `f-down`(). Starts from 0.
     EnterMark(usize),
     /// the node's children were skipped => jump to f_up on same node
-    JumpMark(usize),
+    JumpMark,
     /// Accumulated identifier of sub expression.
     ExprItem(Identifier),
 }
@@ -637,17 +663,19 @@ enum VisitRecord {
 impl ExprIdentifierVisitor<'_> {
     /// Find the first `EnterMark` in the stack, and accumulates every 
`ExprItem`
     /// before it.
-    fn pop_enter_mark(&mut self) -> (usize, Identifier) {
+    fn pop_enter_mark(&mut self) -> Option<(usize, Identifier)> {
         let mut desc = String::new();
 
         while let Some(item) = self.visit_stack.pop() {
             match item {
-                VisitRecord::EnterMark(idx) | VisitRecord::JumpMark(idx) => {
-                    return (idx, desc);
+                VisitRecord::EnterMark(idx) => {
+                    return Some((idx, desc));
                 }
                 VisitRecord::ExprItem(id) => {
+                    desc.push('|');
                     desc.push_str(&id);
                 }
+                VisitRecord::JumpMark => return None,
             }
         }
         unreachable!("Enter mark should paired with node number");
@@ -658,55 +686,89 @@ impl TreeNodeVisitor for ExprIdentifierVisitor<'_> {
     type Node = Expr;
 
     fn f_down(&mut self, expr: &Expr) -> Result<TreeNodeRecursion> {
-        // related to https://github.com/apache/datafusion/issues/8814
+        // related to https://github.com/apache/arrow-datafusion/issues/8814
         // If the expr contain volatile expression or is a short-circuit 
expression, skip it.
+        // TODO: propagate is_volatile state bottom-up + consider non-volatile 
sub-expressions for CSE
+        // TODO: consider surely executed children of "short circuited"s for 
CSE
         if expr.short_circuits() || expr.is_volatile()? {
-            self.visit_stack
-                .push(VisitRecord::JumpMark(self.node_count));
-            return Ok(TreeNodeRecursion::Jump); // go to f_up
+            self.visit_stack.push(VisitRecord::JumpMark);
+
+            return Ok(TreeNodeRecursion::Jump);
         }
 
+        self.id_array.push((0, "".to_string()));
         self.visit_stack
-            .push(VisitRecord::EnterMark(self.node_count));
-        self.node_count += 1;
+            .push(VisitRecord::EnterMark(self.down_index));
+        self.down_index += 1;
 
         Ok(TreeNodeRecursion::Continue)
     }
 
     fn f_up(&mut self, expr: &Expr) -> Result<TreeNodeRecursion> {
-        let (_idx, sub_expr_identifier) = self.pop_enter_mark();
-
-        // skip exprs should not be recognize.
-        if self.expr_mask.ignores(expr) {
-            let curr_expr_identifier = ExprSet::expr_identifier(expr);
-            self.visit_stack
-                .push(VisitRecord::ExprItem(curr_expr_identifier));
+        let Some((down_index, sub_expr_id)) = self.pop_enter_mark() else {
             return Ok(TreeNodeRecursion::Continue);
-        }
-        let curr_expr_identifier = ExprSet::expr_identifier(expr);
-        let alias_symbol = 
format!("{curr_expr_identifier}{sub_expr_identifier}");
+        };
 
-        self.visit_stack
-            .push(VisitRecord::ExprItem(alias_symbol.clone()));
+        let expr_id = expr_identifier(expr, sub_expr_id);
 
-        let data_type = expr.get_type(&self.input_schema)?;
+        self.id_array[down_index].0 = self.up_index;
+        if !self.expr_mask.ignores(expr) {
+            self.id_array[down_index].1.clone_from(&expr_id);
+
+            // TODO: can we capture the data type in the second traversal only 
for
+            //  replaced expressions?
+            let data_type = expr.get_type(&self.input_schema)?;
+            let (count, _) = self
+                .expr_stats
+                .entry(expr_id.clone())
+                .or_insert((0, data_type));
+            *count += 1;
+        }
+        self.visit_stack.push(VisitRecord::ExprItem(expr_id));
+        self.up_index += 1;
 
-        self.expr_set
-            .entry(curr_expr_identifier)
-            .or_insert_with(|| (expr.clone(), 0, data_type, alias_symbol))
-            .1 += 1;
         Ok(TreeNodeRecursion::Continue)
     }
 }
 
-/// Rewrite expression by common sub-expression with a corresponding temporary
-/// column name that will compute the subexpression.
-///
-/// `affected_id` is updated with any sub expressions that were replaced
+fn expr_identifier(expr: &Expr, sub_expr_identifier: Identifier) -> Identifier 
{
+    format!("{{{expr}{sub_expr_identifier}}}")
+}
+
+/// Go through an expression tree and generate identifier for every node in 
this tree.
+fn expr_to_identifier(
+    expr: &Expr,
+    expr_stats: &mut ExprStats,
+    id_array: &mut Vec<(usize, Identifier)>,
+    input_schema: DFSchemaRef,
+    expr_mask: ExprMask,
+) -> Result<()> {
+    expr.visit(&mut ExprIdentifierVisitor {
+        expr_stats,
+        id_array,
+        input_schema,
+        visit_stack: vec![],
+        down_index: 0,
+        up_index: 0,
+        expr_mask,
+    })?;
+
+    Ok(())
+}
+
+/// Rewrite expression by replacing detected common sub-expression with
+/// the corresponding temporary column name. That column contains the
+/// evaluate result of replaced expression.
 struct CommonSubexprRewriter<'a> {
-    expr_set: &'a ExprSet,
-    /// Which identifier is replaced.
-    affected_id: &'a mut BTreeSet<Identifier>,
+    // statistics of expressions
+    expr_stats: &'a ExprStats,
+    // cache to speed up second traversal
+    id_array: &'a IdArray,
+    // common expression, that are replaced during the second traversal, are 
collected to
+    // this map
+    common_exprs: &'a mut CommonExprs,
+    // preorder index, starts from 0.
+    down_index: usize,
 }
 
 impl TreeNodeRewriter for CommonSubexprRewriter<'_> {
@@ -720,43 +782,53 @@ impl TreeNodeRewriter for CommonSubexprRewriter<'_> {
             return Ok(Transformed::new(expr, false, TreeNodeRecursion::Jump));
         }
 
-        let curr_id = &ExprSet::expr_identifier(&expr);
-
-        // lookup previously visited expression
-        match self.expr_set.get(curr_id) {
-            Some((_, counter, _, symbol)) => {
-                // if has a commonly used (a.k.a. 1+ use) expr
-                if *counter > 1 {
-                    self.affected_id.insert(curr_id.clone());
-
-                    let expr_name = expr.display_name()?;
-                    // Alias this `Column` expr to it original "expr name",
-                    // `projection_push_down` optimizer use "expr name" to 
eliminate useless
-                    // projections.
-                    Ok(Transformed::new(
-                        col(symbol).alias(expr_name),
-                        true,
-                        TreeNodeRecursion::Jump,
-                    ))
-                } else {
-                    Ok(Transformed::no(expr))
-                }
+        let (up_index, expr_id) = &self.id_array[self.down_index];
+        self.down_index += 1;
+
+        // skip `Expr`s without identifier (empty identifier).
+        if expr_id.is_empty() {
+            return Ok(Transformed::no(expr));
+        }
+
+        let (counter, _) = self.expr_stats.get(expr_id).unwrap();
+        if *counter > 1 {
+            // step index to skip all sub-node (which has smaller series 
number).
+            while self.down_index < self.id_array.len()
+                && self.id_array[self.down_index].0 < *up_index
+            {
+                self.down_index += 1;
             }
-            None => Ok(Transformed::no(expr)),
+
+            let expr_name = expr.display_name()?;
+            self.common_exprs.insert(expr_id.clone(), expr);
+            // Alias this `Column` expr to it original "expr name",
+            // `projection_push_down` optimizer use "expr name" to eliminate 
useless
+            // projections.
+            // TODO: do we really need to alias here?
+            Ok(Transformed::new(
+                col(expr_id).alias(expr_name),
+                true,
+                TreeNodeRecursion::Jump,
+            ))
+        } else {
+            Ok(Transformed::no(expr))
         }
     }
 }
 
 /// Replace common sub-expression in `expr` with the corresponding temporary
-/// column name, updating `affected_id` with any replaced expressions
+/// column name, updating `common_exprs` with any replaced expressions
 fn replace_common_expr(
     expr: Expr,
-    expr_set: &ExprSet,
-    affected_id: &mut BTreeSet<Identifier>,
+    id_array: &IdArray,
+    expr_stats: &ExprStats,
+    common_exprs: &mut CommonExprs,
 ) -> Result<Expr> {
     expr.rewrite(&mut CommonSubexprRewriter {
-        expr_set,
-        affected_id,
+        expr_stats,
+        id_array,
+        common_exprs,
+        down_index: 0,
     })
     .data()
 }
@@ -789,6 +861,74 @@ mod test {
         assert_eq!(expected, formatted_plan);
     }
 
+    #[test]
+    fn id_array_visitor() -> Result<()> {
+        let expr = ((sum(col("a") + lit(1))) - avg(col("c"))) * lit(2);
+
+        let schema = Arc::new(DFSchema::from_unqualifed_fields(
+            vec![
+                Field::new("a", DataType::Int64, false),
+                Field::new("c", DataType::Int64, false),
+            ]
+            .into(),
+            Default::default(),
+        )?);
+
+        // skip aggregates
+        let mut id_array = vec![];
+        expr_to_identifier(
+            &expr,
+            &mut HashMap::new(),
+            &mut id_array,
+            Arc::clone(&schema),
+            ExprMask::Normal,
+        )?;
+
+        let expected = vec![
+            (8, "{(SUM(a + Int32(1)) - AVG(c)) * Int32(2)|{Int32(2)}|{SUM(a + 
Int32(1)) - AVG(c)|{AVG(c)|{c}}|{SUM(a + Int32(1))|{a + 
Int32(1)|{Int32(1)}|{a}}}}}"),
+            (6, "{SUM(a + Int32(1)) - AVG(c)|{AVG(c)|{c}}|{SUM(a + 
Int32(1))|{a + Int32(1)|{Int32(1)}|{a}}}}"),
+            (3, ""),
+            (2, "{a + Int32(1)|{Int32(1)}|{a}}"),
+            (0, ""),
+            (1, ""),
+            (5, ""),
+            (4, ""),
+            (7, "")
+        ]
+        .into_iter()
+        .map(|(number, id)| (number, id.into()))
+        .collect::<Vec<_>>();
+        assert_eq!(expected, id_array);
+
+        // include aggregates
+        let mut id_array = vec![];
+        expr_to_identifier(
+            &expr,
+            &mut HashMap::new(),
+            &mut id_array,
+            Arc::clone(&schema),
+            ExprMask::NormalAndAggregates,
+        )?;
+
+        let expected = vec![
+            (8, "{(SUM(a + Int32(1)) - AVG(c)) * Int32(2)|{Int32(2)}|{SUM(a + 
Int32(1)) - AVG(c)|{AVG(c)|{c}}|{SUM(a + Int32(1))|{a + 
Int32(1)|{Int32(1)}|{a}}}}}"),
+            (6, "{SUM(a + Int32(1)) - AVG(c)|{AVG(c)|{c}}|{SUM(a + 
Int32(1))|{a + Int32(1)|{Int32(1)}|{a}}}}"),
+            (3, "{SUM(a + Int32(1))|{a + Int32(1)|{Int32(1)}|{a}}}"),
+            (2, "{a + Int32(1)|{Int32(1)}|{a}}"),
+            (0, ""),
+            (1, ""),
+            (5, "{AVG(c)|{c}}"),
+            (4, ""),
+            (7, "")
+        ]
+        .into_iter()
+        .map(|(number, id)| (number, id.into()))
+        .collect::<Vec<_>>();
+        assert_eq!(expected, id_array);
+
+        Ok(())
+    }
+
     #[test]
     fn tpch_q1_simplified() -> Result<()> {
         // SQL:
@@ -811,8 +951,8 @@ mod test {
             )?
             .build()?;
 
-        let expected = "Aggregate: groupBy=[[]], aggr=[[SUM(test.a * (Int32(1) 
- test.b)Int32(1) - test.btest.bInt32(1)test.a AS test.a * Int32(1) - test.b), 
SUM(test.a * (Int32(1) - test.b)Int32(1) - test.btest.bInt32(1)test.a AS test.a 
* Int32(1) - test.b * (Int32(1) + test.c))]]\
-        \n  Projection: test.a * (Int32(1) - test.b) AS test.a * (Int32(1) - 
test.b)Int32(1) - test.btest.bInt32(1)test.a, test.a, test.b, test.c\
+        let expected = "Aggregate: groupBy=[[]], aggr=[[SUM({test.a * 
(Int32(1) - test.b)|{Int32(1) - test.b|{test.b}|{Int32(1)}}|{test.a}} AS test.a 
* Int32(1) - test.b), SUM({test.a * (Int32(1) - test.b)|{Int32(1) - 
test.b|{test.b}|{Int32(1)}}|{test.a}} AS test.a * Int32(1) - test.b * (Int32(1) 
+ test.c))]]\
+        \n  Projection: test.a * (Int32(1) - test.b) AS {test.a * (Int32(1) - 
test.b)|{Int32(1) - test.b|{test.b}|{Int32(1)}}|{test.a}}, test.a, test.b, 
test.c\
         \n    TableScan: test";
 
         assert_optimized_plan_eq(expected, &plan);
@@ -864,8 +1004,8 @@ mod test {
             )?
             .build()?;
 
-        let expected = "Projection: AVG(test.a)test.a AS AVG(test.a) AS col1, 
AVG(test.a)test.a AS AVG(test.a) AS col2, col3, AVG(test.c) AS AVG(test.c), 
my_agg(test.a)test.a AS my_agg(test.a) AS col4, my_agg(test.a)test.a AS 
my_agg(test.a) AS col5, col6, my_agg(test.c) AS my_agg(test.c)\
-        \n  Aggregate: groupBy=[[]], aggr=[[AVG(test.a) AS AVG(test.a)test.a, 
my_agg(test.a) AS my_agg(test.a)test.a, AVG(test.b) AS col3, AVG(test.c) AS 
AVG(test.c), my_agg(test.b) AS col6, my_agg(test.c) AS my_agg(test.c)]]\
+        let expected = "Projection: {AVG(test.a)|{test.a}} AS AVG(test.a) AS 
col1, {AVG(test.a)|{test.a}} AS AVG(test.a) AS col2, col3, {AVG(test.c)} AS 
AVG(test.c), {my_agg(test.a)|{test.a}} AS my_agg(test.a) AS col4, 
{my_agg(test.a)|{test.a}} AS my_agg(test.a) AS col5, col6, {my_agg(test.c)} AS 
my_agg(test.c)\
+        \n  Aggregate: groupBy=[[]], aggr=[[AVG(test.a) AS 
{AVG(test.a)|{test.a}}, my_agg(test.a) AS {my_agg(test.a)|{test.a}}, 
AVG(test.b) AS col3, AVG(test.c) AS {AVG(test.c)}, my_agg(test.b) AS col6, 
my_agg(test.c) AS {my_agg(test.c)}]]\
         \n    TableScan: test";
 
         assert_optimized_plan_eq(expected, &plan);
@@ -883,8 +1023,8 @@ mod test {
             )?
             .build()?;
 
-        let expected = "Projection: Int32(1) + AVG(test.a)test.a AS 
AVG(test.a), Int32(1) - AVG(test.a)test.a AS AVG(test.a), Int32(1) + 
my_agg(test.a)test.a AS my_agg(test.a), Int32(1) - my_agg(test.a)test.a AS 
my_agg(test.a)\
-        \n  Aggregate: groupBy=[[]], aggr=[[AVG(test.a) AS AVG(test.a)test.a, 
my_agg(test.a) AS my_agg(test.a)test.a]]\
+        let expected = "Projection: Int32(1) + {AVG(test.a)|{test.a}} AS 
AVG(test.a), Int32(1) - {AVG(test.a)|{test.a}} AS AVG(test.a), Int32(1) + 
{my_agg(test.a)|{test.a}} AS my_agg(test.a), Int32(1) - 
{my_agg(test.a)|{test.a}} AS my_agg(test.a)\
+        \n  Aggregate: groupBy=[[]], aggr=[[AVG(test.a) AS 
{AVG(test.a)|{test.a}}, my_agg(test.a) AS {my_agg(test.a)|{test.a}}]]\
         \n    TableScan: test";
 
         assert_optimized_plan_eq(expected, &plan);
@@ -900,9 +1040,7 @@ mod test {
             )?
             .build()?;
 
-        let expected = "Aggregate: groupBy=[[]], aggr=[[AVG(UInt32(1) + 
test.atest.aUInt32(1) AS UInt32(1) + test.a) AS col1, my_agg(UInt32(1) + 
test.atest.aUInt32(1) AS UInt32(1) + test.a) AS col2]]\
-        \n  Projection: UInt32(1) + test.a AS UInt32(1) + 
test.atest.aUInt32(1), test.a, test.b, test.c\
-        \n    TableScan: test";
+        let expected = "Aggregate: groupBy=[[]], aggr=[[AVG({UInt32(1) + 
test.a|{test.a}|{UInt32(1)}} AS UInt32(1) + test.a) AS col1, my_agg({UInt32(1) 
+ test.a|{test.a}|{UInt32(1)}} AS UInt32(1) + test.a) AS col2]]\n  Projection: 
UInt32(1) + test.a AS {UInt32(1) + test.a|{test.a}|{UInt32(1)}}, test.a, 
test.b, test.c\n    TableScan: test";
 
         assert_optimized_plan_eq(expected, &plan);
 
@@ -917,8 +1055,8 @@ mod test {
             )?
             .build()?;
 
-        let expected = "Aggregate: groupBy=[[UInt32(1) + test.atest.aUInt32(1) 
AS UInt32(1) + test.a]], aggr=[[AVG(UInt32(1) + test.atest.aUInt32(1) AS 
UInt32(1) + test.a) AS col1, my_agg(UInt32(1) + test.atest.aUInt32(1) AS 
UInt32(1) + test.a) AS col2]]\
-        \n  Projection: UInt32(1) + test.a AS UInt32(1) + 
test.atest.aUInt32(1), test.a, test.b, test.c\
+        let expected = "Aggregate: groupBy=[[{UInt32(1) + 
test.a|{test.a}|{UInt32(1)}} AS UInt32(1) + test.a]], aggr=[[AVG({UInt32(1) + 
test.a|{test.a}|{UInt32(1)}} AS UInt32(1) + test.a) AS col1, my_agg({UInt32(1) 
+ test.a|{test.a}|{UInt32(1)}} AS UInt32(1) + test.a) AS col2]]\
+        \n  Projection: UInt32(1) + test.a AS {UInt32(1) + 
test.a|{test.a}|{UInt32(1)}}, test.a, test.b, test.c\
         \n    TableScan: test";
 
         assert_optimized_plan_eq(expected, &plan);
@@ -938,9 +1076,9 @@ mod test {
             )?
             .build()?;
 
-        let expected = "Projection: UInt32(1) + test.a, UInt32(1) + 
AVG(UInt32(1) + test.atest.aUInt32(1) AS UInt32(1) + test.a)UInt32(1) + 
test.atest.aUInt32(1) AS UInt32(1) + test.a AS AVG(UInt32(1) + test.a) AS col1, 
UInt32(1) - AVG(UInt32(1) + test.atest.aUInt32(1) AS UInt32(1) + 
test.a)UInt32(1) + test.atest.aUInt32(1) AS UInt32(1) + test.a AS AVG(UInt32(1) 
+ test.a) AS col2, AVG(UInt32(1) + test.atest.aUInt32(1) AS UInt32(1) + 
test.a)UInt32(1) + test.atest.aUInt32(1) AS UInt32(1) + [...]
-        \n  Aggregate: groupBy=[[UInt32(1) + test.atest.aUInt32(1) AS 
UInt32(1) + test.a]], aggr=[[AVG(UInt32(1) + test.atest.aUInt32(1) AS UInt32(1) 
+ test.a) AS AVG(UInt32(1) + test.atest.aUInt32(1) AS UInt32(1) + 
test.a)UInt32(1) + test.atest.aUInt32(1) AS UInt32(1) + test.a, 
my_agg(UInt32(1) + test.atest.aUInt32(1) AS UInt32(1) + test.a) AS 
my_agg(UInt32(1) + test.atest.aUInt32(1) AS UInt32(1) + test.a)UInt32(1) + 
test.atest.aUInt32(1) AS UInt32(1) + test.a]]\
-        \n    Projection: UInt32(1) + test.a AS UInt32(1) + 
test.atest.aUInt32(1), test.a, test.b, test.c\
+        let expected = "Projection: UInt32(1) + test.a, UInt32(1) + 
{AVG({UInt32(1) + test.a|{test.a}|{UInt32(1)}} AS UInt32(1) + 
test.a)|{{UInt32(1) + test.a|{test.a}|{UInt32(1)}} AS UInt32(1) + 
test.a|{{UInt32(1) + test.a|{test.a}|{UInt32(1)}}}}} AS AVG(UInt32(1) + test.a) 
AS col1, UInt32(1) - {AVG({UInt32(1) + test.a|{test.a}|{UInt32(1)}} AS 
UInt32(1) + test.a)|{{UInt32(1) + test.a|{test.a}|{UInt32(1)}} AS UInt32(1) + 
test.a|{{UInt32(1) + test.a|{test.a}|{UInt32(1)}}}}} AS AVG(UInt32( [...]
+        \n  Aggregate: groupBy=[[{UInt32(1) + test.a|{test.a}|{UInt32(1)}} AS 
UInt32(1) + test.a]], aggr=[[AVG({UInt32(1) + test.a|{test.a}|{UInt32(1)}} AS 
UInt32(1) + test.a) AS {AVG({UInt32(1) + test.a|{test.a}|{UInt32(1)}} AS 
UInt32(1) + test.a)|{{UInt32(1) + test.a|{test.a}|{UInt32(1)}} AS UInt32(1) + 
test.a|{{UInt32(1) + test.a|{test.a}|{UInt32(1)}}}}}, my_agg({UInt32(1) + 
test.a|{test.a}|{UInt32(1)}} AS UInt32(1) + test.a) AS {my_agg({UInt32(1) + 
test.a|{test.a}|{UInt32(1)}} AS UIn [...]
+        \n    Projection: UInt32(1) + test.a AS {UInt32(1) + 
test.a|{test.a}|{UInt32(1)}}, test.a, test.b, test.c\
         \n      TableScan: test";
 
         assert_optimized_plan_eq(expected, &plan);
@@ -965,9 +1103,9 @@ mod test {
             )?
             .build()?;
 
-        let expected = "Projection: table.test.col.a, UInt32(1) + 
AVG(UInt32(1) + table.test.col.atable.test.col.aUInt32(1) AS UInt32(1) + 
table.test.col.a)UInt32(1) + table.test.col.atable.test.col.aUInt32(1) AS 
UInt32(1) + table.test.col.a AS AVG(UInt32(1) + table.test.col.a), 
AVG(UInt32(1) + table.test.col.atable.test.col.aUInt32(1) AS UInt32(1) + 
table.test.col.a)UInt32(1) + table.test.col.atable.test.col.aUInt32(1) AS 
UInt32(1) + table.test.col.a AS AVG(UInt32(1) + table.test.col.a)\
-        \n  Aggregate: groupBy=[[table.test.col.a]], aggr=[[AVG(UInt32(1) + 
table.test.col.atable.test.col.aUInt32(1) AS UInt32(1) + table.test.col.a) AS 
AVG(UInt32(1) + table.test.col.atable.test.col.aUInt32(1) AS UInt32(1) + 
table.test.col.a)UInt32(1) + table.test.col.atable.test.col.aUInt32(1) AS 
UInt32(1) + table.test.col.a]]\
-        \n    Projection: UInt32(1) + table.test.col.a AS UInt32(1) + 
table.test.col.atable.test.col.aUInt32(1), table.test.col.a\
+        let expected = "Projection: table.test.col.a, UInt32(1) + 
{AVG({UInt32(1) + table.test.col.a|{table.test.col.a}|{UInt32(1)}} AS UInt32(1) 
+ table.test.col.a)|{{UInt32(1) + 
table.test.col.a|{table.test.col.a}|{UInt32(1)}} AS UInt32(1) + 
table.test.col.a|{{UInt32(1) + 
table.test.col.a|{table.test.col.a}|{UInt32(1)}}}}} AS AVG(UInt32(1) + 
table.test.col.a), {AVG({UInt32(1) + 
table.test.col.a|{table.test.col.a}|{UInt32(1)}} AS UInt32(1) + 
table.test.col.a)|{{UInt32(1) + table.test.co [...]
+        \n  Aggregate: groupBy=[[table.test.col.a]], aggr=[[AVG({UInt32(1) + 
table.test.col.a|{table.test.col.a}|{UInt32(1)}} AS UInt32(1) + 
table.test.col.a) AS {AVG({UInt32(1) + 
table.test.col.a|{table.test.col.a}|{UInt32(1)}} AS UInt32(1) + 
table.test.col.a)|{{UInt32(1) + 
table.test.col.a|{table.test.col.a}|{UInt32(1)}} AS UInt32(1) + 
table.test.col.a|{{UInt32(1) + 
table.test.col.a|{table.test.col.a}|{UInt32(1)}}}}}]]\
+        \n    Projection: UInt32(1) + table.test.col.a AS {UInt32(1) + 
table.test.col.a|{table.test.col.a}|{UInt32(1)}}, table.test.col.a\
         \n      TableScan: table.test";
 
         assert_optimized_plan_eq(expected, &plan);
@@ -986,8 +1124,8 @@ mod test {
             ])?
             .build()?;
 
-        let expected = "Projection: Int32(1) + test.atest.aInt32(1) AS 
Int32(1) + test.a AS first, Int32(1) + test.atest.aInt32(1) AS Int32(1) + 
test.a AS second\
-        \n  Projection: Int32(1) + test.a AS Int32(1) + test.atest.aInt32(1), 
test.a, test.b, test.c\
+        let expected = "Projection: {Int32(1) + test.a|{test.a}|{Int32(1)}} AS 
Int32(1) + test.a AS first, {Int32(1) + test.a|{test.a}|{Int32(1)}} AS Int32(1) 
+ test.a AS second\
+        \n  Projection: Int32(1) + test.a AS {Int32(1) + 
test.a|{test.a}|{Int32(1)}}, test.a, test.b, test.c\
         \n    TableScan: test";
 
         assert_optimized_plan_eq(expected, &plan);
@@ -1031,35 +1169,28 @@ mod test {
     #[test]
     fn redundant_project_fields() {
         let table_scan = test_table_scan().unwrap();
-        let affected_id: BTreeSet<Identifier> =
-            ["c+a".to_string(), "b+a".to_string()].into_iter().collect();
-        let expr_set_1 = vec![
-            (
-                "c+a".to_string(),
-                (col("c") + col("a"), 1, DataType::UInt32, "c+a".to_string()),
-            ),
-            (
-                "b+a".to_string(),
-                (col("b") + col("a"), 1, DataType::UInt32, "b+a".to_string()),
-            ),
-        ]
-        .into();
-        let expr_set_2 = vec![
-            (
-                "c+a".to_string(),
-                (col("c+a"), 1, DataType::UInt32, "c+a".to_string()),
-            ),
-            (
-                "b+a".to_string(),
-                (col("b+a"), 1, DataType::UInt32, "b+a".to_string()),
-            ),
-        ]
-        .into();
+        let expr_stats_1 = ExprStats::from([
+            ("c+a".to_string(), (1, DataType::UInt32)),
+            ("b+a".to_string(), (1, DataType::UInt32)),
+        ]);
+        let common_exprs_1 = IndexMap::from([
+            ("c+a".to_string(), col("c") + col("a")),
+            ("b+a".to_string(), col("b") + col("a")),
+        ]);
+        let exprs_stats_2 = ExprStats::from([
+            ("c+a".to_string(), (1, DataType::UInt32)),
+            ("b+a".to_string(), (1, DataType::UInt32)),
+        ]);
+        let common_exprs_2 = IndexMap::from([
+            ("c+a".to_string(), col("c+a")),
+            ("b+a".to_string(), col("b+a")),
+        ]);
         let project =
-            build_common_expr_project_plan(table_scan, affected_id.clone(), 
&expr_set_1)
+            build_common_expr_project_plan(table_scan, common_exprs_1, 
&expr_stats_1)
                 .unwrap();
         let project_2 =
-            build_common_expr_project_plan(project, affected_id, 
&expr_set_2).unwrap();
+            build_common_expr_project_plan(project, common_exprs_2, 
&exprs_stats_2)
+                .unwrap();
 
         let mut field_set = BTreeSet::new();
         for name in project_2.schema().field_names() {
@@ -1076,57 +1207,33 @@ mod test {
             .unwrap()
             .build()
             .unwrap();
-        let affected_id: BTreeSet<Identifier> =
-            ["test1.c+test1.a".to_string(), "test1.b+test1.a".to_string()]
-                .into_iter()
-                .collect();
-        let expr_set_1 = vec![
-            (
-                "test1.c+test1.a".to_string(),
-                (
-                    col("test1.c") + col("test1.a"),
-                    1,
-                    DataType::UInt32,
-                    "test1.c+test1.a".to_string(),
-                ),
-            ),
-            (
-                "test1.b+test1.a".to_string(),
-                (
-                    col("test1.b") + col("test1.a"),
-                    1,
-                    DataType::UInt32,
-                    "test1.b+test1.a".to_string(),
-                ),
-            ),
-        ]
-        .into();
-        let expr_set_2 = vec![
+        let expr_stats_1 = ExprStats::from([
+            ("test1.c+test1.a".to_string(), (1, DataType::UInt32)),
+            ("test1.b+test1.a".to_string(), (1, DataType::UInt32)),
+        ]);
+        let common_exprs_1 = IndexMap::from([
             (
                 "test1.c+test1.a".to_string(),
-                (
-                    col("test1.c+test1.a"),
-                    1,
-                    DataType::UInt32,
-                    "test1.c+test1.a".to_string(),
-                ),
+                col("test1.c") + col("test1.a"),
             ),
             (
                 "test1.b+test1.a".to_string(),
-                (
-                    col("test1.b+test1.a"),
-                    1,
-                    DataType::UInt32,
-                    "test1.b+test1.a".to_string(),
-                ),
+                col("test1.b") + col("test1.a"),
             ),
-        ]
-        .into();
+        ]);
+        let expr_stats_2 = ExprStats::from([
+            ("test1.c+test1.a".to_string(), (1, DataType::UInt32)),
+            ("test1.b+test1.a".to_string(), (1, DataType::UInt32)),
+        ]);
+        let common_exprs_2 = IndexMap::from([
+            ("test1.c+test1.a".to_string(), col("test1.c+test1.a")),
+            ("test1.b+test1.a".to_string(), col("test1.b+test1.a")),
+        ]);
         let project =
-            build_common_expr_project_plan(join, affected_id.clone(), 
&expr_set_1)
-                .unwrap();
+            build_common_expr_project_plan(join, common_exprs_1, 
&expr_stats_1).unwrap();
         let project_2 =
-            build_common_expr_project_plan(project, affected_id, 
&expr_set_2).unwrap();
+            build_common_expr_project_plan(project, common_exprs_2, 
&expr_stats_2)
+                .unwrap();
 
         let mut field_set = BTreeSet::new();
         for name in project_2.schema().field_names() {
@@ -1193,8 +1300,8 @@ mod test {
             .build()?;
 
         let expected = "Projection: test.a, test.b, test.c\
-        \n  Filter: Int32(1) + test.atest.aInt32(1) - Int32(10) > Int32(1) + 
test.atest.aInt32(1)\
-        \n    Projection: Int32(1) + test.a AS Int32(1) + 
test.atest.aInt32(1), test.a, test.b, test.c\
+        \n  Filter: {Int32(1) + test.a|{test.a}|{Int32(1)}} - Int32(10) > 
{Int32(1) + test.a|{test.a}|{Int32(1)}}\
+        \n    Projection: Int32(1) + test.a AS {Int32(1) + 
test.a|{test.a}|{Int32(1)}}, test.a, test.b, test.c\
         \n      TableScan: test";
 
         assert_optimized_plan_eq(expected, &plan);
diff --git a/datafusion/sqllogictest/test_files/group_by.slt 
b/datafusion/sqllogictest/test_files/group_by.slt
index 986a36944f..5a605ea58b 100644
--- a/datafusion/sqllogictest/test_files/group_by.slt
+++ b/datafusion/sqllogictest/test_files/group_by.slt
@@ -4187,8 +4187,8 @@ EXPLAIN SELECT SUM(DISTINCT CAST(x AS DOUBLE)), 
MAX(DISTINCT CAST(x AS DOUBLE))
 logical_plan
 01)Projection: SUM(alias1) AS SUM(DISTINCT t1.x), MAX(alias1) AS MAX(DISTINCT 
t1.x)
 02)--Aggregate: groupBy=[[t1.y]], aggr=[[SUM(alias1), MAX(alias1)]]
-03)----Aggregate: groupBy=[[t1.y, CAST(t1.x AS Float64)t1.x AS t1.x AS 
alias1]], aggr=[[]]
-04)------Projection: CAST(t1.x AS Float64) AS CAST(t1.x AS Float64)t1.x, t1.y
+03)----Aggregate: groupBy=[[t1.y, {CAST(t1.x AS Float64)|{t1.x}} AS t1.x AS 
alias1]], aggr=[[]]
+04)------Projection: CAST(t1.x AS Float64) AS {CAST(t1.x AS Float64)|{t1.x}}, 
t1.y
 05)--------TableScan: t1 projection=[x, y]
 physical_plan
 01)ProjectionExec: expr=[SUM(alias1)@1 as SUM(DISTINCT t1.x), MAX(alias1)@2 as 
MAX(DISTINCT t1.x)]
@@ -4200,8 +4200,8 @@ physical_plan
 07)------------CoalesceBatchesExec: target_batch_size=2
 08)--------------RepartitionExec: partitioning=Hash([y@0, alias1@1], 8), 
input_partitions=8
 09)----------------RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1
-10)------------------AggregateExec: mode=Partial, gby=[y@1 as y, CAST(t1.x AS 
Float64)t1.x@0 as alias1], aggr=[]
-11)--------------------ProjectionExec: expr=[CAST(x@0 AS Float64) as CAST(t1.x 
AS Float64)t1.x, y@1 as y]
+10)------------------AggregateExec: mode=Partial, gby=[y@1 as y, {CAST(t1.x AS 
Float64)|{t1.x}}@0 as alias1], aggr=[]
+11)--------------------ProjectionExec: expr=[CAST(x@0 AS Float64) as 
{CAST(t1.x AS Float64)|{t1.x}}, y@1 as y]
 12)----------------------MemoryExec: partitions=1, partition_sizes=[1]
 
 # create an unbounded table that contains ordered timestamp.
diff --git a/datafusion/sqllogictest/test_files/select.slt 
b/datafusion/sqllogictest/test_files/select.slt
index f91cfd6be6..24163e37de 100644
--- a/datafusion/sqllogictest/test_files/select.slt
+++ b/datafusion/sqllogictest/test_files/select.slt
@@ -1426,12 +1426,12 @@ query TT
 EXPLAIN SELECT x/2, x/2+1 FROM t;
 ----
 logical_plan
-01)Projection: t.x / Int64(2)Int64(2)t.x AS t.x / Int64(2), t.x / 
Int64(2)Int64(2)t.x AS t.x / Int64(2) + Int64(1)
-02)--Projection: t.x / Int64(2) AS t.x / Int64(2)Int64(2)t.x
+01)Projection: {t.x / Int64(2)|{Int64(2)}|{t.x}} AS t.x / Int64(2), {t.x / 
Int64(2)|{Int64(2)}|{t.x}} AS t.x / Int64(2) + Int64(1)
+02)--Projection: t.x / Int64(2) AS {t.x / Int64(2)|{Int64(2)}|{t.x}}
 03)----TableScan: t projection=[x]
 physical_plan
-01)ProjectionExec: expr=[t.x / Int64(2)Int64(2)t.x@0 as t.x / Int64(2), t.x / 
Int64(2)Int64(2)t.x@0 + 1 as t.x / Int64(2) + Int64(1)]
-02)--ProjectionExec: expr=[x@0 / 2 as t.x / Int64(2)Int64(2)t.x]
+01)ProjectionExec: expr=[{t.x / Int64(2)|{Int64(2)}|{t.x}}@0 as t.x / 
Int64(2), {t.x / Int64(2)|{Int64(2)}|{t.x}}@0 + 1 as t.x / Int64(2) + Int64(1)]
+02)--ProjectionExec: expr=[x@0 / 2 as {t.x / Int64(2)|{Int64(2)}|{t.x}}]
 03)----MemoryExec: partitions=1, partition_sizes=[1]
 
 query II
@@ -1444,12 +1444,12 @@ query TT
 EXPLAIN SELECT abs(x), abs(x) + abs(y) FROM t;
 ----
 logical_plan
-01)Projection: abs(t.x)t.x AS abs(t.x), abs(t.x)t.x AS abs(t.x) + abs(t.y)
-02)--Projection: abs(t.x) AS abs(t.x)t.x, t.y
+01)Projection: {abs(t.x)|{t.x}} AS abs(t.x), {abs(t.x)|{t.x}} AS abs(t.x) + 
abs(t.y)
+02)--Projection: abs(t.x) AS {abs(t.x)|{t.x}}, t.y
 03)----TableScan: t projection=[x, y]
 physical_plan
-01)ProjectionExec: expr=[abs(t.x)t.x@0 as abs(t.x), abs(t.x)t.x@0 + abs(y@1) 
as abs(t.x) + abs(t.y)]
-02)--ProjectionExec: expr=[abs(x@0) as abs(t.x)t.x, y@1 as y]
+01)ProjectionExec: expr=[{abs(t.x)|{t.x}}@0 as abs(t.x), {abs(t.x)|{t.x}}@0 + 
abs(y@1) as abs(t.x) + abs(t.y)]
+02)--ProjectionExec: expr=[abs(x@0) as {abs(t.x)|{t.x}}, y@1 as y]
 03)----MemoryExec: partitions=1, partition_sizes=[1]
 
 query II
@@ -1613,6 +1613,14 @@ select count(1) from v;
 ----
 1
 
+# Ensure CSE resolves columns correctly
+# should be 3, not 1
+# https://github.com/apache/datafusion/issues/10413
+query I
+select a + b from (select 1 as a, 2 as b, 1 as "a + b");
+----
+1
+
 # run below query without logical optimizations
 statement ok
 set datafusion.optimizer.max_passes=0;
diff --git a/datafusion/sqllogictest/test_files/subquery.slt 
b/datafusion/sqllogictest/test_files/subquery.slt
index 7196418af1..085f192a5d 100644
--- a/datafusion/sqllogictest/test_files/subquery.slt
+++ b/datafusion/sqllogictest/test_files/subquery.slt
@@ -1072,8 +1072,8 @@ query TT
 explain select a/2, a/2 + 1 from t
 ----
 logical_plan
-01)Projection: t.a / Int64(2)Int64(2)t.a AS t.a / Int64(2), t.a / 
Int64(2)Int64(2)t.a AS t.a / Int64(2) + Int64(1)
-02)--Projection: t.a / Int64(2) AS t.a / Int64(2)Int64(2)t.a
+01)Projection: {t.a / Int64(2)|{Int64(2)}|{t.a}} AS t.a / Int64(2), {t.a / 
Int64(2)|{Int64(2)}|{t.a}} AS t.a / Int64(2) + Int64(1)
+02)--Projection: t.a / Int64(2) AS {t.a / Int64(2)|{Int64(2)}|{t.a}}
 03)----TableScan: t projection=[a]
 
 statement ok
@@ -1083,8 +1083,8 @@ query TT
 explain select a/2, a/2 + 1 from t
 ----
 logical_plan
-01)Projection: t.a / Int64(2)Int64(2)t.a AS t.a / Int64(2), t.a / 
Int64(2)Int64(2)t.a AS t.a / Int64(2) + Int64(1)
-02)--Projection: t.a / Int64(2) AS t.a / Int64(2)Int64(2)t.a
+01)Projection: {t.a / Int64(2)|{Int64(2)}|{t.a}} AS t.a / Int64(2), {t.a / 
Int64(2)|{Int64(2)}|{t.a}} AS t.a / Int64(2) + Int64(1)
+02)--Projection: t.a / Int64(2) AS {t.a / Int64(2)|{Int64(2)}|{t.a}}
 03)----TableScan: t projection=[a]
 
 ###
diff --git a/datafusion/sqllogictest/test_files/tpch/q1.slt.part 
b/datafusion/sqllogictest/test_files/tpch/q1.slt.part
index cc71167537..f2c14f2628 100644
--- a/datafusion/sqllogictest/test_files/tpch/q1.slt.part
+++ b/datafusion/sqllogictest/test_files/tpch/q1.slt.part
@@ -42,8 +42,8 @@ explain select
 logical_plan
 01)Sort: lineitem.l_returnflag ASC NULLS LAST, lineitem.l_linestatus ASC NULLS 
LAST
 02)--Projection: lineitem.l_returnflag, lineitem.l_linestatus, 
SUM(lineitem.l_quantity) AS sum_qty, SUM(lineitem.l_extendedprice) AS 
sum_base_price, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) 
AS sum_disc_price, SUM(lineitem.l_extendedprice * Int64(1) - 
lineitem.l_discount * Int64(1) + lineitem.l_tax) AS sum_charge, 
AVG(lineitem.l_quantity) AS avg_qty, AVG(lineitem.l_extendedprice) AS 
avg_price, AVG(lineitem.l_discount) AS avg_disc, COUNT(*) AS count_order
-03)----Aggregate: groupBy=[[lineitem.l_returnflag, lineitem.l_linestatus]], 
aggr=[[SUM(lineitem.l_quantity), SUM(lineitem.l_extendedprice), 
SUM(lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - 
lineitem.l_discount)Decimal128(Some(1),20,0) - 
lineitem.l_discountlineitem.l_discountDecimal128(Some(1),20,0)lineitem.l_extendedprice
 AS lineitem.l_extendedprice * Decimal128(Some(1),20,0) - lineitem.l_discount) 
AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), 
SUM(lineitem.l [...]
-04)------Projection: lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - 
lineitem.l_discount) AS lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - 
lineitem.l_discount)Decimal128(Some(1),20,0) - 
lineitem.l_discountlineitem.l_discountDecimal128(Some(1),20,0)lineitem.l_extendedprice,
 lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, 
lineitem.l_tax, lineitem.l_returnflag, lineitem.l_linestatus
+03)----Aggregate: groupBy=[[lineitem.l_returnflag, lineitem.l_linestatus]], 
aggr=[[SUM(lineitem.l_quantity), SUM(lineitem.l_extendedprice), 
SUM({lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - 
lineitem.l_discount)|{Decimal128(Some(1),20,0) - 
lineitem.l_discount|{lineitem.l_discount}|{Decimal128(Some(1),20,0)}}|{lineitem.l_extendedprice}}
 AS lineitem.l_extendedprice * Decimal128(Some(1),20,0) - lineitem.l_discount) 
AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount),  [...]
+04)------Projection: lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - 
lineitem.l_discount) AS {lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - 
lineitem.l_discount)|{Decimal128(Some(1),20,0) - 
lineitem.l_discount|{lineitem.l_discount}|{Decimal128(Some(1),20,0)}}|{lineitem.l_extendedprice}},
 lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, 
lineitem.l_tax, lineitem.l_returnflag, lineitem.l_linestatus
 05)--------Filter: lineitem.l_shipdate <= Date32("10471")
 06)----------TableScan: lineitem projection=[l_quantity, l_extendedprice, 
l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate], 
partial_filters=[lineitem.l_shipdate <= Date32("10471")]
 physical_plan
@@ -54,7 +54,7 @@ physical_plan
 05)--------CoalesceBatchesExec: target_batch_size=8192
 06)----------RepartitionExec: partitioning=Hash([l_returnflag@0, 
l_linestatus@1], 4), input_partitions=4
 07)------------AggregateExec: mode=Partial, gby=[l_returnflag@5 as 
l_returnflag, l_linestatus@6 as l_linestatus], aggr=[SUM(lineitem.l_quantity), 
SUM(lineitem.l_extendedprice), SUM(lineitem.l_extendedprice * Int64(1) - 
lineitem.l_discount), SUM(lineitem.l_extendedprice * Int64(1) - 
lineitem.l_discount * Int64(1) + lineitem.l_tax), AVG(lineitem.l_quantity), 
AVG(lineitem.l_extendedprice), AVG(lineitem.l_discount), COUNT(*)]
-08)--------------ProjectionExec: expr=[l_extendedprice@1 * (Some(1),20,0 - 
l_discount@2) as lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - 
lineitem.l_discount)Decimal128(Some(1),20,0) - 
lineitem.l_discountlineitem.l_discountDecimal128(Some(1),20,0)lineitem.l_extendedprice,
 l_quantity@0 as l_quantity, l_extendedprice@1 as l_extendedprice, l_discount@2 
as l_discount, l_tax@3 as l_tax, l_returnflag@4 as l_returnflag, l_linestatus@5 
as l_linestatus]
+08)--------------ProjectionExec: expr=[l_extendedprice@1 * (Some(1),20,0 - 
l_discount@2) as {lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - 
lineitem.l_discount)|{Decimal128(Some(1),20,0) - 
lineitem.l_discount|{lineitem.l_discount}|{Decimal128(Some(1),20,0)}}|{lineitem.l_extendedprice}},
 l_quantity@0 as l_quantity, l_extendedprice@1 as l_extendedprice, l_discount@2 
as l_discount, l_tax@3 as l_tax, l_returnflag@4 as l_returnflag, l_linestatus@5 
as l_linestatus]
 09)----------------CoalesceBatchesExec: target_batch_size=8192
 10)------------------FilterExec: l_shipdate@6 <= 10471
 11)--------------------CsvExec: file_groups={4 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:0..18561749],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:18561749..37123498],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:37123498..55685247],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:55685247..74246996]]},
 projection=[l_quantity, l_extendedprice, l_discount, l_tax, l [...]
diff --git a/datafusion/sqllogictest/test_files/window.slt 
b/datafusion/sqllogictest/test_files/window.slt
index 924bc16348..af09e644c9 100644
--- a/datafusion/sqllogictest/test_files/window.slt
+++ b/datafusion/sqllogictest/test_files/window.slt
@@ -1687,20 +1687,20 @@ EXPLAIN SELECT c3,
 logical_plan
 01)Projection: aggregate_test_100.c3, SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, 
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum1, 
SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + 
aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum2
 02)--Limit: skip=0, fetch=5
-03)----WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c3 + 
aggregate_test_100.c4aggregate_test_100.c4aggregate_test_100.c3 AS 
aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, 
aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW AS SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + 
aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURR [...]
-04)------Projection: aggregate_test_100.c3 + 
aggregate_test_100.c4aggregate_test_100.c4aggregate_test_100.c3, 
aggregate_test_100.c3, aggregate_test_100.c9, SUM(aggregate_test_100.c9) ORDER 
BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, 
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
-05)--------WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c3 + 
aggregate_test_100.c4aggregate_test_100.c4aggregate_test_100.c3 AS 
aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, 
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS SUM(aggregate_test_100.c9) 
ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, 
aggregate_test_100.c9 DESC NULLS [...]
-06)----------Projection: aggregate_test_100.c3 + aggregate_test_100.c4 AS 
aggregate_test_100.c3 + 
aggregate_test_100.c4aggregate_test_100.c4aggregate_test_100.c3, 
aggregate_test_100.c2, aggregate_test_100.c3, aggregate_test_100.c9
+03)----WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY 
[{aggregate_test_100.c3 + 
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}} AS 
aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, 
aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW AS SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + 
aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING  [...]
+04)------Projection: {aggregate_test_100.c3 + 
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}}, 
aggregate_test_100.c3, aggregate_test_100.c9, SUM(aggregate_test_100.c9) ORDER 
BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, 
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+05)--------WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY 
[{aggregate_test_100.c3 + 
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}} AS 
aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, 
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS SUM(aggregate_test_100.c9) 
ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, 
aggregate_test_100.c9 DE [...]
+06)----------Projection: aggregate_test_100.c3 + aggregate_test_100.c4 AS 
{aggregate_test_100.c3 + 
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}}, 
aggregate_test_100.c2, aggregate_test_100.c3, aggregate_test_100.c9
 07)------------TableScan: aggregate_test_100 projection=[c2, c3, c4, c9]
 physical_plan
 01)ProjectionExec: expr=[c3@1 as c3, SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, 
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as sum1, 
SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + 
aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as sum2]
 02)--GlobalLimitExec: skip=0, fetch=5
 03)----WindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, 
aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW: Ok(Field { name: "SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, 
aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: 
false, [...]
-04)------ProjectionExec: expr=[aggregate_test_100.c3 + 
aggregate_test_100.c4aggregate_test_100.c4aggregate_test_100.c3@0 as 
aggregate_test_100.c3 + 
aggregate_test_100.c4aggregate_test_100.c4aggregate_test_100.c3, c3@2 as c3, 
c9@3 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + 
aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, 
aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW@4 as SUM(aggregate_test_10 [...]
+04)------ProjectionExec: expr=[{aggregate_test_100.c3 + 
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}}@0 as 
{aggregate_test_100.c3 + 
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}}, c3@2 as 
c3, c9@3 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + 
aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, 
aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW@4 as SUM(a [...]
 05)--------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, 
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: 
"SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + 
aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, 
aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRE [...]
-06)----------SortPreservingMergeExec: [aggregate_test_100.c3 + 
aggregate_test_100.c4aggregate_test_100.c4aggregate_test_100.c3@0 DESC,c9@3 
DESC,c2@1 ASC NULLS LAST]
-07)------------SortExec: expr=[aggregate_test_100.c3 + 
aggregate_test_100.c4aggregate_test_100.c4aggregate_test_100.c3@0 DESC,c9@3 
DESC,c2@1 ASC NULLS LAST], preserve_partitioning=[true]
-08)--------------ProjectionExec: expr=[c3@1 + c4@2 as aggregate_test_100.c3 + 
aggregate_test_100.c4aggregate_test_100.c4aggregate_test_100.c3, c2@0 as c2, 
c3@1 as c3, c9@3 as c9]
+06)----------SortPreservingMergeExec: [{aggregate_test_100.c3 + 
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}}@0 
DESC,c9@3 DESC,c2@1 ASC NULLS LAST]
+07)------------SortExec: expr=[{aggregate_test_100.c3 + 
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}}@0 
DESC,c9@3 DESC,c2@1 ASC NULLS LAST], preserve_partitioning=[true]
+08)--------------ProjectionExec: expr=[c3@1 + c4@2 as {aggregate_test_100.c3 + 
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}}, c2@0 as 
c2, c3@1 as c3, c9@3 as c9]
 09)----------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
 10)------------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, 
c3, c4, c9], has_header=true
 
@@ -2544,11 +2544,11 @@ logical_plan
 02)--Limit: skip=0, fetch=5
 03)----Sort: annotated_data_finite.inc_col DESC NULLS FIRST, fetch=5
 04)------Projection: SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 
FOLLOWING AS sum1, SUM(annotated_data_finite.desc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 
FOLLOWING AS sum2, SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 
FOLLOWING AS sum3, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_dat 
[...]
-05)--------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_finite.desc_col AS 
Int64)annotated_data_finite.desc_col AS annotated_data_finite.desc_col) ROWS 
BETWEEN 8 PRECEDING AND 1 FOLLOWING, COUNT(Int64(1)) ROWS BETWEEN 8 PRECEDING 
AND 1 FOLLOWING AS COUNT(*) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING]]
-06)----------Projection: CAST(annotated_data_finite.desc_col AS 
Int64)annotated_data_finite.desc_col, annotated_data_finite.inc_col, 
SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC 
NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING, 
SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC 
NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING, 
SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC 
NULLS FIRST] ROWS BE [...]
-07)------------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_finite.inc_col 
AS Int64)annotated_data_finite.inc_col AS annotated_data_finite.inc_col) ORDER 
BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 
FOLLOWING, SUM(CAST(annotated_data_finite.desc_col AS 
Int64)annotated_data_finite.desc_col AS annotated_data_finite.desc_col) ORDER 
BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 
FOLLOWING, SUM(CAST(annotated_data_finite.inc_c [...]
-08)--------------WindowAggr: 
windowExpr=[[SUM(CAST(annotated_data_finite.inc_col AS 
Int64)annotated_data_finite.inc_col AS annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 
FOLLOWING, SUM(CAST(annotated_data_finite.desc_col AS 
Int64)annotated_data_finite.desc_col AS annotated_data_finite.desc_col) ORDER 
BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 
FOLLOWING, SUM(CAST(annotated_data_finite. [...]
-09)----------------Projection: CAST(annotated_data_finite.desc_col AS Int64) 
AS CAST(annotated_data_finite.desc_col AS Int64)annotated_data_finite.desc_col, 
CAST(annotated_data_finite.inc_col AS Int64) AS 
CAST(annotated_data_finite.inc_col AS Int64)annotated_data_finite.inc_col, 
annotated_data_finite.ts, annotated_data_finite.inc_col, 
annotated_data_finite.desc_col
+05)--------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_finite.desc_col 
AS Int64)|{annotated_data_finite.desc_col}} AS annotated_data_finite.desc_col) 
ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING, COUNT(Int64(1)) ROWS BETWEEN 8 
PRECEDING AND 1 FOLLOWING AS COUNT(*) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING]]
+06)----------Projection: {CAST(annotated_data_finite.desc_col AS 
Int64)|{annotated_data_finite.desc_col}}, annotated_data_finite.inc_col, 
SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC 
NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING, 
SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC 
NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING, 
SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC 
NULLS FIRST] RO [...]
+07)------------WindowAggr: 
windowExpr=[[SUM({CAST(annotated_data_finite.inc_col AS 
Int64)|{annotated_data_finite.inc_col}} AS annotated_data_finite.inc_col) ORDER 
BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 
FOLLOWING, SUM({CAST(annotated_data_finite.desc_col AS 
Int64)|{annotated_data_finite.desc_col}} AS annotated_data_finite.desc_col) 
ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING 
AND 1 FOLLOWING, SUM({CAST(annotated_data_f [...]
+08)--------------WindowAggr: 
windowExpr=[[SUM({CAST(annotated_data_finite.inc_col AS 
Int64)|{annotated_data_finite.inc_col}} AS annotated_data_finite.inc_col) ORDER 
BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 
FOLLOWING, SUM({CAST(annotated_data_finite.desc_col AS 
Int64)|{annotated_data_finite.desc_col}} AS annotated_data_finite.desc_col) 
ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING 
AND 8 FOLLOWING, SUM({CAST(annotated_d [...]
+09)----------------Projection: CAST(annotated_data_finite.desc_col AS Int64) 
AS {CAST(annotated_data_finite.desc_col AS 
Int64)|{annotated_data_finite.desc_col}}, CAST(annotated_data_finite.inc_col AS 
Int64) AS {CAST(annotated_data_finite.inc_col AS 
Int64)|{annotated_data_finite.inc_col}}, annotated_data_finite.ts, 
annotated_data_finite.inc_col, annotated_data_finite.desc_col
 10)------------------TableScan: annotated_data_finite projection=[ts, inc_col, 
desc_col]
 physical_plan
 01)ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, sum3@2 as sum3, 
min1@3 as min1, min2@4 as min2, min3@5 as min3, max1@6 as max1, max2@7 as max2, 
max3@8 as max3, cnt1@9 as cnt1, cnt2@10 as cnt2, sumr1@11 as sumr1, sumr2@12 as 
sumr2, sumr3@13 as sumr3, minr1@14 as minr1, minr2@15 as minr2, minr3@16 as 
minr3, maxr1@17 as maxr1, maxr2@18 as maxr2, maxr3@19 as maxr3, cntr1@20 as 
cntr1, cntr2@21 as cntr2, sum4@22 as sum4, cnt3@23 as cnt3]
@@ -2556,10 +2556,10 @@ physical_plan
 03)----SortExec: TopK(fetch=5), expr=[inc_col@24 DESC], 
preserve_partitioning=[false]
 04)------ProjectionExec: expr=[SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 
FOLLOWING@13 as sum1, SUM(annotated_data_finite.desc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 
FOLLOWING@14 as sum2, SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 
FOLLOWING@15 as sum3, MIN(annotated_data_finite.inc_col) ORDE [...]
 05)--------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite.desc_col) ROWS 
BETWEEN 8 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"SUM(annotated_data_finite.desc_col) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING", 
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(8)), 
end_bound: Following(UInt64(1)), is_causal: false }, COUNT(*) ROWS BETWEEN 8 
PRECEDING AND 1 FOLLOWING: Ok(Field { name: "C [...]
-06)----------ProjectionExec: expr=[CAST(annotated_data_finite.desc_col AS 
Int64)annotated_data_finite.desc_col@0 as CAST(annotated_data_finite.desc_col 
AS Int64)annotated_data_finite.desc_col, inc_col@3 as inc_col, 
SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC 
NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING@5 as 
SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC 
NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING, SUM(annotat [...]
+06)----------ProjectionExec: expr=[{CAST(annotated_data_finite.desc_col AS 
Int64)|{annotated_data_finite.desc_col}}@0 as 
{CAST(annotated_data_finite.desc_col AS 
Int64)|{annotated_data_finite.desc_col}}, inc_col@3 as inc_col, 
SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC 
NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING@5 as 
SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC 
NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING, S [...]
 07)------------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite.inc_col) 
ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING 
AND 1 FOLLOWING: Ok(Field { name: "SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(Int32(10)), end_bound: Fol [...]
 08)--------------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite.inc_col) 
ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING 
AND 4 FOLLOWING: Ok(Field { name: "SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(Int32(4)), end_bound:  [...]
-09)----------------ProjectionExec: expr=[CAST(desc_col@2 AS Int64) as 
CAST(annotated_data_finite.desc_col AS Int64)annotated_data_finite.desc_col, 
CAST(inc_col@1 AS Int64) as CAST(annotated_data_finite.inc_col AS 
Int64)annotated_data_finite.inc_col, ts@0 as ts, inc_col@1 as inc_col, 
desc_col@2 as desc_col]
+09)----------------ProjectionExec: expr=[CAST(desc_col@2 AS Int64) as 
{CAST(annotated_data_finite.desc_col AS 
Int64)|{annotated_data_finite.desc_col}}, CAST(inc_col@1 AS Int64) as 
{CAST(annotated_data_finite.inc_col AS Int64)|{annotated_data_finite.inc_col}}, 
ts@0 as ts, inc_col@1 as inc_col, desc_col@2 as desc_col]
 10)------------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts, 
inc_col, desc_col], output_ordering=[ts@0 ASC NULLS LAST], has_header=true
 
 query IIIIIIIIIIIIIIIIIIIIIIII
@@ -2708,9 +2708,9 @@ logical_plan
 02)--Limit: skip=0, fetch=5
 03)----Sort: annotated_data_finite.inc_col ASC NULLS LAST, fetch=5
 04)------Projection: SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
5 FOLLOWING AS sum1, SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING AS sum2, MIN(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
5 FOLLOWING AS min1, MIN(annotated_data_finite.inc_col) [...]
-05)--------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_finite.inc_col AS 
Int64)annotated_data_finite.inc_col AS annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
5 FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
5 FOLLOWING, MAX(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOU [...]
-06)----------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_finite.inc_col 
AS Int64)annotated_data_finite.inc_col AS annotated_data_finite.inc_col) ORDER 
BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING, MAX(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWE [...]
-07)------------Projection: CAST(annotated_data_finite.inc_col AS Float64) AS 
CAST(annotated_data_finite.inc_col AS Float64)annotated_data_finite.inc_col, 
CAST(annotated_data_finite.inc_col AS Int64) AS 
CAST(annotated_data_finite.inc_col AS Int64)annotated_data_finite.inc_col, 
annotated_data_finite.ts, annotated_data_finite.inc_col
+05)--------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_finite.inc_col AS 
Int64)|{annotated_data_finite.inc_col}} AS annotated_data_finite.inc_col) ORDER 
BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND 5 FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
5 FOLLOWING, MAX(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN  [...]
+06)----------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_finite.inc_col 
AS Int64)|{annotated_data_finite.inc_col}} AS annotated_data_finite.inc_col) 
ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING 
AND UNBOUNDED FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING, MAX(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE  [...]
+07)------------Projection: CAST(annotated_data_finite.inc_col AS Int64) AS 
{CAST(annotated_data_finite.inc_col AS Int64)|{annotated_data_finite.inc_col}}, 
CAST(annotated_data_finite.inc_col AS Float64) AS 
{CAST(annotated_data_finite.inc_col AS 
Float64)|{annotated_data_finite.inc_col}}, annotated_data_finite.ts, 
annotated_data_finite.inc_col
 08)--------------TableScan: annotated_data_finite projection=[ts, inc_col]
 physical_plan
 01)ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, min1@2 as min1, 
min2@3 as min2, max1@4 as max1, max2@5 as max2, count1@6 as count1, count2@7 as 
count2, avg1@8 as avg1, avg2@9 as avg2]
@@ -2719,7 +2719,7 @@ physical_plan
 04)------ProjectionExec: expr=[SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
5 FOLLOWING@9 as sum1, SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING@4 as sum2, MIN(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
5 FOLLOWING@10 as min1, MIN(annotated_dat [...]
 05)--------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite.inc_col) ORDER 
BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND 5 FOLLOWING: Ok(Field { name: "SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
5 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(Int32(NULL)), en [...]
 06)----------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite.inc_col) 
ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING 
AND UNBOUNDED FOLLOWING: Ok(Field { name: "SUM(annotated_data_finite.inc_col) 
ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING 
AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int32(NULL [...]
-07)------------ProjectionExec: expr=[CAST(inc_col@1 AS Float64) as 
CAST(annotated_data_finite.inc_col AS Float64)annotated_data_finite.inc_col, 
CAST(inc_col@1 AS Int64) as CAST(annotated_data_finite.inc_col AS 
Int64)annotated_data_finite.inc_col, ts@0 as ts, inc_col@1 as inc_col]
+07)------------ProjectionExec: expr=[CAST(inc_col@1 AS Int64) as 
{CAST(annotated_data_finite.inc_col AS Int64)|{annotated_data_finite.inc_col}}, 
CAST(inc_col@1 AS Float64) as {CAST(annotated_data_finite.inc_col AS 
Float64)|{annotated_data_finite.inc_col}}, ts@0 as ts, inc_col@1 as inc_col]
 08)--------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts, 
inc_col], output_ordering=[ts@0 ASC NULLS LAST], has_header=true
 
 query IIIIIIIIRR
@@ -2809,9 +2809,9 @@ logical_plan
 02)--Limit: skip=0, fetch=5
 03)----Sort: annotated_data_infinite.ts ASC NULLS LAST, fetch=5
 04)------Projection: SUM(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING AS sum1, SUM(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING AS sum2, COUNT(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING AS count1, COUNT(annotated_data_ [...]
-05)--------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite.inc_col 
AS Int64)annotated_data_infinite.inc_col AS annotated_data_infinite.inc_col) 
ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED 
PRECEDING AND 1 FOLLOWING, COUNT(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING]]
-06)----------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite.inc_col 
AS Int64)annotated_data_infinite.inc_col AS annotated_data_infinite.inc_col) 
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING 
AND UNBOUNDED FOLLOWING, COUNT(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING]]
-07)------------Projection: CAST(annotated_data_infinite.inc_col AS Int64) AS 
CAST(annotated_data_infinite.inc_col AS Int64)annotated_data_infinite.inc_col, 
annotated_data_infinite.ts, annotated_data_infinite.inc_col
+05)--------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_infinite.inc_col 
AS Int64)|{annotated_data_infinite.inc_col}} AS 
annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS 
LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING, 
COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC 
NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING]]
+06)----------WindowAggr: 
windowExpr=[[SUM({CAST(annotated_data_infinite.inc_col AS 
Int64)|{annotated_data_infinite.inc_col}} AS annotated_data_infinite.inc_col) 
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING 
AND UNBOUNDED FOLLOWING, COUNT(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING]]
+07)------------Projection: CAST(annotated_data_infinite.inc_col AS Int64) AS 
{CAST(annotated_data_infinite.inc_col AS 
Int64)|{annotated_data_infinite.inc_col}}, annotated_data_infinite.ts, 
annotated_data_infinite.inc_col
 08)--------------TableScan: annotated_data_infinite projection=[ts, inc_col]
 physical_plan
 01)ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, count1@2 as count1, 
count2@3 as count2]
@@ -2819,7 +2819,7 @@ physical_plan
 03)----ProjectionExec: expr=[SUM(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING@5 as sum1, SUM(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING@3 as sum2, COUNT(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING@6 as count1, COUNT(a [...]
 04)------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite.inc_col) ORDER 
BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING: Ok(Field { name: "SUM(annotated_data_infinite.inc_col) ORDER 
BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(NULL)) [...]
 05)--------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite.inc_col) 
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING 
AND UNBOUNDED FOLLOWING: Ok(Field { name: "SUM(annotated_data_infinite.inc_col) 
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING 
AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64( [...]
-06)----------ProjectionExec: expr=[CAST(inc_col@1 AS Int64) as 
CAST(annotated_data_infinite.inc_col AS Int64)annotated_data_infinite.inc_col, 
ts@0 as ts, inc_col@1 as inc_col]
+06)----------ProjectionExec: expr=[CAST(inc_col@1 AS Int64) as 
{CAST(annotated_data_infinite.inc_col AS 
Int64)|{annotated_data_infinite.inc_col}}, ts@0 as ts, inc_col@1 as inc_col]
 07)------------StreamingTableExec: partition_sizes=1, projection=[ts, 
inc_col], infinite_source=true, output_ordering=[ts@0 ASC NULLS LAST]
 
 query IIII
@@ -2856,9 +2856,9 @@ logical_plan
 02)--Limit: skip=0, fetch=5
 03)----Sort: annotated_data_infinite.ts ASC NULLS LAST, fetch=5
 04)------Projection: SUM(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING AS sum1, SUM(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING AS sum2, COUNT(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING AS count1, COUNT(annotated_data_ [...]
-05)--------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite.inc_col 
AS Int64)annotated_data_infinite.inc_col AS annotated_data_infinite.inc_col) 
ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED 
PRECEDING AND 1 FOLLOWING, COUNT(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING]]
-06)----------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite.inc_col 
AS Int64)annotated_data_infinite.inc_col AS annotated_data_infinite.inc_col) 
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING 
AND UNBOUNDED FOLLOWING, COUNT(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING]]
-07)------------Projection: CAST(annotated_data_infinite.inc_col AS Int64) AS 
CAST(annotated_data_infinite.inc_col AS Int64)annotated_data_infinite.inc_col, 
annotated_data_infinite.ts, annotated_data_infinite.inc_col
+05)--------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_infinite.inc_col 
AS Int64)|{annotated_data_infinite.inc_col}} AS 
annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS 
LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING, 
COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC 
NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING]]
+06)----------WindowAggr: 
windowExpr=[[SUM({CAST(annotated_data_infinite.inc_col AS 
Int64)|{annotated_data_infinite.inc_col}} AS annotated_data_infinite.inc_col) 
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING 
AND UNBOUNDED FOLLOWING, COUNT(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING]]
+07)------------Projection: CAST(annotated_data_infinite.inc_col AS Int64) AS 
{CAST(annotated_data_infinite.inc_col AS 
Int64)|{annotated_data_infinite.inc_col}}, annotated_data_infinite.ts, 
annotated_data_infinite.inc_col
 08)--------------TableScan: annotated_data_infinite projection=[ts, inc_col]
 physical_plan
 01)ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, count1@2 as count1, 
count2@3 as count2]
@@ -2866,7 +2866,7 @@ physical_plan
 03)----ProjectionExec: expr=[SUM(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING@5 as sum1, SUM(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING@3 as sum2, COUNT(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING@6 as count1, COUNT(a [...]
 04)------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite.inc_col) ORDER 
BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING: Ok(Field { name: "SUM(annotated_data_infinite.inc_col) ORDER 
BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(NULL)) [...]
 05)--------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite.inc_col) 
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING 
AND UNBOUNDED FOLLOWING: Ok(Field { name: "SUM(annotated_data_infinite.inc_col) 
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING 
AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64( [...]
-06)----------ProjectionExec: expr=[CAST(inc_col@1 AS Int64) as 
CAST(annotated_data_infinite.inc_col AS Int64)annotated_data_infinite.inc_col, 
ts@0 as ts, inc_col@1 as inc_col]
+06)----------ProjectionExec: expr=[CAST(inc_col@1 AS Int64) as 
{CAST(annotated_data_infinite.inc_col AS 
Int64)|{annotated_data_infinite.inc_col}}, ts@0 as ts, inc_col@1 as inc_col]
 07)------------StreamingTableExec: partition_sizes=1, projection=[ts, 
inc_col], infinite_source=true, output_ordering=[ts@0 ASC NULLS LAST]
 
 
@@ -2953,13 +2953,13 @@ EXPLAIN SELECT a, b, c,
 logical_plan
 01)Projection: annotated_data_infinite2.a, annotated_data_infinite2.b, 
annotated_data_infinite2.c, SUM(annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC 
NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING AS sum1, 
SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NU [...]
 02)--Limit: skip=0, fetch=5
-03)----WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.c AS 
Int64)annotated_data_infinite2.c AS annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS 
LAST, annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC 
NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, 
SUM(CAST(annotated_data_infinite2.c AS Int64)annotated_data_infinite2.c AS 
annotated_data_infinite2.c) PARTITION BY [annotated_data_in [...]
-04)------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.c AS 
Int64)annotated_data_infinite2.c AS annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a, 
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, 
SUM(CAST(annotated_data_infinite2.c AS Int64)annotated_data_infinite2.c AS 
annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.b, 
annotated_data_in [...]
-05)--------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.c AS 
Int64)annotated_data_infinite2.c AS annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING, SUM(CAST(annotated_data_infinite2.c AS 
Int64)annotated_data_infinite2.c AS annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annot [...]
-06)----------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.c AS 
Int64)annotated_data_infinite2.c AS annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC 
NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, 
SUM(CAST(annotated_data_infinite2.c AS Int64)annotated_data_infinite2.c AS 
annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a [...]
-07)------------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.c AS 
Int64)annotated_data_infinite2.c AS annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b, 
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, 
SUM(CAST(annotated_data_infinite2.c AS Int64)annotated_data_infinite2.c AS 
annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, 
annotated_d [...]
-08)--------------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.c 
AS Int64)annotated_data_infinite2.c AS annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING, SUM(CAST(annotated_data_infinite2.c AS 
Int64)annotated_data_infinite2.c AS annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY  [...]
-09)----------------Projection: CAST(annotated_data_infinite2.c AS Int64) AS 
CAST(annotated_data_infinite2.c AS Int64)annotated_data_infinite2.c, 
annotated_data_infinite2.a, annotated_data_infinite2.b, 
annotated_data_infinite2.c, annotated_data_infinite2.d
+03)----WindowAggr: windowExpr=[[SUM({CAST(annotated_data_infinite2.c AS 
Int64)|{annotated_data_infinite2.c}} AS annotated_data_infinite2.c) PARTITION 
BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS 
LAST, annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC 
NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, 
SUM({CAST(annotated_data_infinite2.c AS Int64)|{annotated_data_infinite2.c}} AS 
annotated_data_infinite2.c) PARTITION BY [annotat [...]
+04)------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_infinite2.c AS 
Int64)|{annotated_data_infinite2.c}} AS annotated_data_infinite2.c) PARTITION 
BY [annotated_data_infinite2.b, annotated_data_infinite2.a, 
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, 
SUM({CAST(annotated_data_infinite2.c AS Int64)|{annotated_data_infinite2.c}} AS 
annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.b, annotat 
[...]
+05)--------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_infinite2.c AS 
Int64)|{annotated_data_infinite2.c}} AS annotated_data_infinite2.c) PARTITION 
BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING, SUM({CAST(annotated_data_infinite2.c AS 
Int64)|{annotated_data_infinite2.c}} AS annotated_data_infinite2.c) PARTITION 
BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER [...]
+06)----------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_infinite2.c AS 
Int64)|{annotated_data_infinite2.c}} AS annotated_data_infinite2.c) PARTITION 
BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC 
NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, 
SUM({CAST(annotated_data_infinite2.c AS Int64)|{annotated_data_infinite2.c}} AS 
annotated_data_infinite2.c) PARTITION BY [annotated_data_i [...]
+07)------------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_infinite2.c 
AS Int64)|{annotated_data_infinite2.c}} AS annotated_data_infinite2.c) 
PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b, 
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, 
SUM({CAST(annotated_data_infinite2.c AS Int64)|{annotated_data_infinite2.c}} AS 
annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, a [...]
+08)--------------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_infinite2.c 
AS Int64)|{annotated_data_infinite2.c}} AS annotated_data_infinite2.c) 
PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING, SUM({CAST(annotated_data_infinite2.c AS 
Int64)|{annotated_data_infinite2.c}} AS annotated_data_infinite2.c) PARTITION 
BY [annotated_data_infinite2.a, annotated_data_infinite2.b] [...]
+09)----------------Projection: CAST(annotated_data_infinite2.c AS Int64) AS 
{CAST(annotated_data_infinite2.c AS Int64)|{annotated_data_infinite2.c}}, 
annotated_data_infinite2.a, annotated_data_infinite2.b, 
annotated_data_infinite2.c, annotated_data_infinite2.d
 10)------------------TableScan: annotated_data_infinite2 projection=[a, b, c, 
d]
 physical_plan
 01)ProjectionExec: expr=[a@1 as a, b@2 as b, c@3 as c, 
SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS 
LAST, annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING@9 as sum1, SUM(annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC N 
[...]
@@ -2970,7 +2970,7 @@ physical_plan
 06)----------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c) 
PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC 
NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS 
LAST, annotated_data_infinite2.c ASC NULLS LA [...]
 07)------------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c) 
PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b, 
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.b, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING [...]
 08)--------------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c) 
PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING: Ok(Field { name: "SUM(annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING", data_type: Int64, nullable: true, d [...]
-09)----------------ProjectionExec: expr=[CAST(c@2 AS Int64) as 
CAST(annotated_data_infinite2.c AS Int64)annotated_data_infinite2.c, a@0 as a, 
b@1 as b, c@2 as c, d@3 as d]
+09)----------------ProjectionExec: expr=[CAST(c@2 AS Int64) as 
{CAST(annotated_data_infinite2.c AS Int64)|{annotated_data_infinite2.c}}, a@0 
as a, b@1 as b, c@2 as c, d@3 as d]
 10)------------------StreamingTableExec: partition_sizes=1, projection=[a, b, 
c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS 
LAST, c@2 ASC NULLS LAST]
 
 query IIIIIIIIIIIIIII
@@ -3022,13 +3022,13 @@ logical_plan
 01)Limit: skip=0, fetch=5
 02)--Sort: annotated_data_finite2.c ASC NULLS LAST, fetch=5
 03)----Projection: annotated_data_finite2.a, annotated_data_finite2.b, 
annotated_data_finite2.c, SUM(annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY 
[annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS 
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING AS sum1, 
SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, 
annotated_data_finite2.d] ORDER BY [annotated_data_finite2.b ASC NULLS LAST, 
annotated_ [...]
-04)------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_finite2.c AS 
Int64)annotated_data_finite2.c AS annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.d] ORDER BY [annotated_data_finite2.a ASC NULLS LAST, 
annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS 
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, 
SUM(CAST(annotated_data_finite2.c AS Int64)annotated_data_finite2.c AS 
annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.d] ORDER BY  
[...]
-05)--------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_finite2.c AS 
Int64)annotated_data_finite2.c AS annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.b, annotated_data_finite2.a, annotated_data_finite2.d] 
ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 
1 FOLLOWING, SUM(CAST(annotated_data_finite2.c AS 
Int64)annotated_data_finite2.c AS annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.b, annotated_data_finite2.a, annotated_d [...]
-06)----------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_finite2.c AS 
Int64)annotated_data_finite2.c AS annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.b, annotated_data_finite2.a] ORDER BY 
[annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING, SUM(CAST(annotated_data_finite2.c AS Int64)annotated_data_finite2.c 
AS annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.b, 
annotated_data_finite2.a] ORDER BY [annotated_data_finite2.c  [...]
-07)------------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_finite2.c AS 
Int64)annotated_data_finite2.c AS annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY 
[annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS 
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, 
SUM(CAST(annotated_data_finite2.c AS Int64)annotated_data_finite2.c AS 
annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, 
annotated_data_fin [...]
-08)--------------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_finite2.c AS 
Int64)annotated_data_finite2.c AS annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] 
ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 
1 FOLLOWING, SUM(CAST(annotated_data_finite2.c AS 
Int64)annotated_data_finite2.c AS annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.a, annotated_data_finite2.b, annot [...]
-09)----------------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_finite2.c 
AS Int64)annotated_data_finite2.c AS annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY 
[annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING, SUM(CAST(annotated_data_finite2.c AS Int64)annotated_data_finite2.c 
AS annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, 
annotated_data_finite2.b] ORDER BY [annotated_data_fini [...]
-10)------------------Projection: CAST(annotated_data_finite2.c AS Int64) AS 
CAST(annotated_data_finite2.c AS Int64)annotated_data_finite2.c, 
annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.c, 
annotated_data_finite2.d
+04)------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_finite2.c AS 
Int64)|{annotated_data_finite2.c}} AS annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.d] ORDER BY [annotated_data_finite2.a ASC NULLS LAST, 
annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS 
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, 
SUM({CAST(annotated_data_finite2.c AS Int64)|{annotated_data_finite2.c}} AS 
annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.d] [...]
+05)--------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_finite2.c AS 
Int64)|{annotated_data_finite2.c}} AS annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.b, annotated_data_finite2.a, annotated_data_finite2.d] 
ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 
1 FOLLOWING, SUM({CAST(annotated_data_finite2.c AS 
Int64)|{annotated_data_finite2.c}} AS annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.b, annotated_data_finite2.a, a [...]
+06)----------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_finite2.c AS 
Int64)|{annotated_data_finite2.c}} AS annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.b, annotated_data_finite2.a] ORDER BY 
[annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING, SUM({CAST(annotated_data_finite2.c AS 
Int64)|{annotated_data_finite2.c}} AS annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.b, annotated_data_finite2.a] ORDER BY [annotated_data_ 
[...]
+07)------------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_finite2.c AS 
Int64)|{annotated_data_finite2.c}} AS annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY 
[annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS 
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, 
SUM({CAST(annotated_data_finite2.c AS Int64)|{annotated_data_finite2.c}} AS 
annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotate [...]
+08)--------------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_finite2.c 
AS Int64)|{annotated_data_finite2.c}} AS annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] 
ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 
1 FOLLOWING, SUM({CAST(annotated_data_finite2.c AS 
Int64)|{annotated_data_finite2.c}} AS annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.a, annotated_data_finite [...]
+09)----------------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_finite2.c 
AS Int64)|{annotated_data_finite2.c}} AS annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY 
[annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING, SUM({CAST(annotated_data_finite2.c AS 
Int64)|{annotated_data_finite2.c}} AS annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated [...]
+10)------------------Projection: CAST(annotated_data_finite2.c AS Int64) AS 
{CAST(annotated_data_finite2.c AS Int64)|{annotated_data_finite2.c}}, 
annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.c, 
annotated_data_finite2.d
 11)--------------------TableScan: annotated_data_finite2 projection=[a, b, c, 
d]
 physical_plan
 01)GlobalLimitExec: skip=0, fetch=5
@@ -3045,7 +3045,7 @@ physical_plan
 12)----------------------BoundedWindowAggExec: 
wdw=[SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, 
annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY 
[annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING: Ok(Field { name: "SUM(annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] 
ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 
1 FOL [...]
 13)------------------------SortExec: expr=[a@1 ASC NULLS LAST,b@2 ASC NULLS 
LAST,d@4 ASC NULLS LAST,c@3 ASC NULLS LAST], preserve_partitioning=[false]
 14)--------------------------BoundedWindowAggExec: 
wdw=[SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, 
annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] 
ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, 
annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] 
ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, 
dict_ [...]
-15)----------------------------ProjectionExec: expr=[CAST(c@2 AS Int64) as 
CAST(annotated_data_finite2.c AS Int64)annotated_data_finite2.c, a@0 as a, b@1 
as b, c@2 as c, d@3 as d]
+15)----------------------------ProjectionExec: expr=[CAST(c@2 AS Int64) as 
{CAST(annotated_data_finite2.c AS Int64)|{annotated_data_finite2.c}}, a@0 as a, 
b@1 as b, c@2 as c, d@3 as d]
 16)------------------------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, 
c, d], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS 
LAST], has_header=true
 
 query IIIIIIIIIIIIIII
@@ -3211,21 +3211,21 @@ FROM annotated_data_infinite2;
 ----
 logical_plan
 01)Projection: SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW AS sum1, SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW AS sum2, SUM(annotated_data_infinite2.a) PARTITION BY [ann [...]
-02)--WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.a AS 
Int64)annotated_data_infinite2.a AS annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
-03)----Projection: CAST(annotated_data_infinite2.a AS 
Int64)annotated_data_infinite2.a, annotated_data_infinite2.a, 
annotated_data_infinite2.d, SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW, SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC [...]
-04)------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.a AS 
Int64)annotated_data_infinite2.a AS annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW]]
-05)--------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.a AS 
Int64)annotated_data_infinite2.a AS annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW]]
-06)----------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.a AS 
Int64)annotated_data_infinite2.a AS annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW]]
-07)------------Projection: CAST(annotated_data_infinite2.a AS Int64) AS 
CAST(annotated_data_infinite2.a AS Int64)annotated_data_infinite2.a, 
annotated_data_infinite2.a, annotated_data_infinite2.b, 
annotated_data_infinite2.c, annotated_data_infinite2.d
+02)--WindowAggr: windowExpr=[[SUM({CAST(annotated_data_infinite2.a AS 
Int64)|{annotated_data_infinite2.a}} AS annotated_data_infinite2.a) PARTITION 
BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+03)----Projection: {CAST(annotated_data_infinite2.a AS 
Int64)|{annotated_data_infinite2.a}}, annotated_data_infinite2.a, 
annotated_data_infinite2.d, SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW, SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2. [...]
+04)------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_infinite2.a AS 
Int64)|{annotated_data_infinite2.a}} AS annotated_data_infinite2.a) PARTITION 
BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW]]
+05)--------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_infinite2.a AS 
Int64)|{annotated_data_infinite2.a}} AS annotated_data_infinite2.a) PARTITION 
BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW]]
+06)----------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_infinite2.a AS 
Int64)|{annotated_data_infinite2.a}} AS annotated_data_infinite2.a) PARTITION 
BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW]]
+07)------------Projection: CAST(annotated_data_infinite2.a AS Int64) AS 
{CAST(annotated_data_infinite2.a AS Int64)|{annotated_data_infinite2.a}}, 
annotated_data_infinite2.a, annotated_data_infinite2.b, 
annotated_data_infinite2.c, annotated_data_infinite2.d
 08)--------------TableScan: annotated_data_infinite2 projection=[a, b, c, d]
 physical_plan
 01)ProjectionExec: expr=[SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW@3 as sum1, SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW@5 as sum2, SUM(annotated_data_infinite2.a) PAR [...]
 02)--BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: 
"SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.d] 
ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }) [...]
-03)----ProjectionExec: expr=[CAST(annotated_data_infinite2.a AS 
Int64)annotated_data_infinite2.a@0 as CAST(annotated_data_infinite2.a AS 
Int64)annotated_data_infinite2.a, a@1 as a, d@4 as d, 
SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as 
SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_in [...]
+03)----ProjectionExec: expr=[{CAST(annotated_data_infinite2.a AS 
Int64)|{annotated_data_infinite2.a}}@0 as {CAST(annotated_data_infinite2.a AS 
Int64)|{annotated_data_infinite2.a}}, a@1 as a, d@4 as d, 
SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as 
SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, 
annotat [...]
 04)------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) PARTITION 
BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW: Ok(Field { name: "SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW", data_type: Int64, nullabl [...]
 05)--------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) 
PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW: Ok(Field { name: "SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW", data_type: Int64, nulla [...]
 06)----------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) 
PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW: Ok(Field { name: "SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW", data_type: Int64, nul [...]
-07)------------ProjectionExec: expr=[CAST(a@0 AS Int64) as 
CAST(annotated_data_infinite2.a AS Int64)annotated_data_infinite2.a, a@0 as a, 
b@1 as b, c@2 as c, d@3 as d]
+07)------------ProjectionExec: expr=[CAST(a@0 AS Int64) as 
{CAST(annotated_data_infinite2.a AS Int64)|{annotated_data_infinite2.a}}, a@0 
as a, b@1 as b, c@2 as c, d@3 as d]
 08)--------------StreamingTableExec: partition_sizes=1, projection=[a, b, c, 
d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS 
LAST, c@2 ASC NULLS LAST]
 
 statement ok
@@ -3242,29 +3242,29 @@ FROM annotated_data_infinite2;
 ----
 logical_plan
 01)Projection: SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW AS sum1, SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW AS sum2, SUM(annotated_data_infinite2.a) PARTITION BY [ann [...]
-02)--WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.a AS 
Int64)annotated_data_infinite2.a AS annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
-03)----Projection: CAST(annotated_data_infinite2.a AS 
Int64)annotated_data_infinite2.a, annotated_data_infinite2.a, 
annotated_data_infinite2.d, SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW, SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC [...]
-04)------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.a AS 
Int64)annotated_data_infinite2.a AS annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW]]
-05)--------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.a AS 
Int64)annotated_data_infinite2.a AS annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW]]
-06)----------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.a AS 
Int64)annotated_data_infinite2.a AS annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW]]
-07)------------Projection: CAST(annotated_data_infinite2.a AS Int64) AS 
CAST(annotated_data_infinite2.a AS Int64)annotated_data_infinite2.a, 
annotated_data_infinite2.a, annotated_data_infinite2.b, 
annotated_data_infinite2.c, annotated_data_infinite2.d
+02)--WindowAggr: windowExpr=[[SUM({CAST(annotated_data_infinite2.a AS 
Int64)|{annotated_data_infinite2.a}} AS annotated_data_infinite2.a) PARTITION 
BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+03)----Projection: {CAST(annotated_data_infinite2.a AS 
Int64)|{annotated_data_infinite2.a}}, annotated_data_infinite2.a, 
annotated_data_infinite2.d, SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW, SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2. [...]
+04)------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_infinite2.a AS 
Int64)|{annotated_data_infinite2.a}} AS annotated_data_infinite2.a) PARTITION 
BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW]]
+05)--------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_infinite2.a AS 
Int64)|{annotated_data_infinite2.a}} AS annotated_data_infinite2.a) PARTITION 
BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW]]
+06)----------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_infinite2.a AS 
Int64)|{annotated_data_infinite2.a}} AS annotated_data_infinite2.a) PARTITION 
BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW]]
+07)------------Projection: CAST(annotated_data_infinite2.a AS Int64) AS 
{CAST(annotated_data_infinite2.a AS Int64)|{annotated_data_infinite2.a}}, 
annotated_data_infinite2.a, annotated_data_infinite2.b, 
annotated_data_infinite2.c, annotated_data_infinite2.d
 08)--------------TableScan: annotated_data_infinite2 projection=[a, b, c, d]
 physical_plan
 01)ProjectionExec: expr=[SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW@3 as sum1, SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW@5 as sum2, SUM(annotated_data_infinite2.a) PAR [...]
 02)--BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: 
"SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.d] 
ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }) [...]
 03)----CoalesceBatchesExec: target_batch_size=4096
-04)------RepartitionExec: partitioning=Hash([d@2], 2), input_partitions=2, 
preserve_order=true, sort_exprs=CAST(annotated_data_infinite2.a AS 
Int64)annotated_data_infinite2.a@0 ASC NULLS LAST,a@1 ASC NULLS LAST
-05)--------ProjectionExec: expr=[CAST(annotated_data_infinite2.a AS 
Int64)annotated_data_infinite2.a@0 as CAST(annotated_data_infinite2.a AS 
Int64)annotated_data_infinite2.a, a@1 as a, d@4 as d, 
SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as 
SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, 
annotated_dat [...]
+04)------RepartitionExec: partitioning=Hash([d@2], 2), input_partitions=2, 
preserve_order=true, sort_exprs={CAST(annotated_data_infinite2.a AS 
Int64)|{annotated_data_infinite2.a}}@0 ASC NULLS LAST,a@1 ASC NULLS LAST
+05)--------ProjectionExec: expr=[{CAST(annotated_data_infinite2.a AS 
Int64)|{annotated_data_infinite2.a}}@0 as {CAST(annotated_data_infinite2.a AS 
Int64)|{annotated_data_infinite2.a}}, a@1 as a, d@4 as d, 
SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as 
SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, ann 
[...]
 06)----------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) 
PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW: Ok(Field { name: "SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW", data_type: Int64, nul [...]
 07)------------CoalesceBatchesExec: target_batch_size=4096
-08)--------------RepartitionExec: partitioning=Hash([b@2, a@1], 2), 
input_partitions=2, preserve_order=true, sort_exprs=a@1 ASC NULLS LAST,b@2 ASC 
NULLS LAST,c@3 ASC NULLS LAST,CAST(annotated_data_infinite2.a AS 
Int64)annotated_data_infinite2.a@0 ASC NULLS LAST
+08)--------------RepartitionExec: partitioning=Hash([b@2, a@1], 2), 
input_partitions=2, preserve_order=true, sort_exprs=a@1 ASC NULLS LAST,b@2 ASC 
NULLS LAST,c@3 ASC NULLS LAST,{CAST(annotated_data_infinite2.a AS 
Int64)|{annotated_data_infinite2.a}}@0 ASC NULLS LAST
 09)----------------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) 
PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW: Ok(Field { name: "SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW", data_type: Int6 [...]
 10)------------------CoalesceBatchesExec: target_batch_size=4096
-11)--------------------RepartitionExec: partitioning=Hash([a@1, d@4], 2), 
input_partitions=2, preserve_order=true, sort_exprs=a@1 ASC NULLS LAST,b@2 ASC 
NULLS LAST,c@3 ASC NULLS LAST,CAST(annotated_data_infinite2.a AS 
Int64)annotated_data_infinite2.a@0 ASC NULLS LAST
+11)--------------------RepartitionExec: partitioning=Hash([a@1, d@4], 2), 
input_partitions=2, preserve_order=true, sort_exprs=a@1 ASC NULLS LAST,b@2 ASC 
NULLS LAST,c@3 ASC NULLS LAST,{CAST(annotated_data_infinite2.a AS 
Int64)|{annotated_data_infinite2.a}}@0 ASC NULLS LAST
 12)----------------------BoundedWindowAggExec: 
wdw=[SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: 
"SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type [...]
 13)------------------------CoalesceBatchesExec: target_batch_size=4096
-14)--------------------------RepartitionExec: partitioning=Hash([a@1, b@2], 
2), input_partitions=2, preserve_order=true, sort_exprs=a@1 ASC NULLS LAST,b@2 
ASC NULLS LAST,c@3 ASC NULLS LAST,CAST(annotated_data_infinite2.a AS 
Int64)annotated_data_infinite2.a@0 ASC NULLS LAST
-15)----------------------------ProjectionExec: expr=[CAST(a@0 AS Int64) as 
CAST(annotated_data_infinite2.a AS Int64)annotated_data_infinite2.a, a@0 as a, 
b@1 as b, c@2 as c, d@3 as d]
+14)--------------------------RepartitionExec: partitioning=Hash([a@1, b@2], 
2), input_partitions=2, preserve_order=true, sort_exprs=a@1 ASC NULLS LAST,b@2 
ASC NULLS LAST,c@3 ASC NULLS LAST,{CAST(annotated_data_infinite2.a AS 
Int64)|{annotated_data_infinite2.a}}@0 ASC NULLS LAST
+15)----------------------------ProjectionExec: expr=[CAST(a@0 AS Int64) as 
{CAST(annotated_data_infinite2.a AS Int64)|{annotated_data_infinite2.a}}, a@0 
as a, b@1 as b, c@2 as c, d@3 as d]
 16)------------------------------RepartitionExec: 
partitioning=RoundRobinBatch(2), input_partitions=1
 17)--------------------------------StreamingTableExec: partition_sizes=1, 
projection=[a, b, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST]
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to