This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 870857a0b3 Make `SUM` and `AVG` Aggregate Type Coercion Explicit 
(#7369)
870857a0b3 is described below

commit 870857a0b32e66cc6e2fd14a4cb4c127d392aa0f
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Tue Aug 22 15:16:51 2023 +0100

    Make `SUM` and `AVG` Aggregate Type Coercion Explicit (#7369)
    
    * Make Aggregate Type Coercion Explicit
    
    * Clippy
---
 .../core/src/physical_plan/aggregates/mod.rs       |  37 +----
 datafusion/core/tests/fuzz_cases/window_fuzz.rs    |  11 +-
 datafusion/expr/src/aggregate_function.rs          |  24 ++--
 datafusion/expr/src/type_coercion/aggregates.rs    | 111 +++++++++------
 datafusion/optimizer/src/analyzer/type_coercion.rs |  21 ++-
 .../optimizer/tests/optimizer_integration.rs       |   2 +-
 datafusion/physical-expr/src/aggregate/average.rs  | 152 +++++++++------------
 datafusion/physical-expr/src/aggregate/build_in.rs | 112 +++++++--------
 datafusion/physical-expr/src/aggregate/count.rs    |   8 +-
 datafusion/physical-expr/src/aggregate/grouping.rs |   8 +-
 datafusion/physical-expr/src/aggregate/sum.rs      | 137 +++++++------------
 datafusion/physical-expr/src/expressions/mod.rs    |  35 +++++
 datafusion/proto/src/physical_plan/mod.rs          |  15 +-
 datafusion/sqllogictest/test_files/decimal.slt     |   2 +-
 datafusion/sqllogictest/test_files/groupby.slt     |  24 ++--
 datafusion/sqllogictest/test_files/insert.slt      |   6 +-
 .../sqllogictest/test_files/insert_to_external.slt |   4 +-
 datafusion/sqllogictest/test_files/subquery.slt    |  10 +-
 datafusion/sqllogictest/test_files/window.slt      |  82 ++++++-----
 19 files changed, 385 insertions(+), 416 deletions(-)

diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs 
b/datafusion/core/src/physical_plan/aggregates/mod.rs
index d6c40508d4..8338da8ed6 100644
--- a/datafusion/core/src/physical_plan/aggregates/mod.rs
+++ b/datafusion/core/src/physical_plan/aggregates/mod.rs
@@ -35,7 +35,7 @@ use datafusion_execution::TaskContext;
 use datafusion_expr::Accumulator;
 use datafusion_physical_expr::{
     equivalence::project_equivalence_properties,
-    expressions::{Avg, CastExpr, Column, Sum},
+    expressions::Column,
     normalize_out_expr_with_columns_map, reverse_order_bys,
     utils::{convert_to_expr, get_indices_of_matching_exprs},
     AggregateExpr, LexOrdering, LexOrderingReq, OrderingEquivalenceProperties,
@@ -1010,40 +1010,7 @@ fn aggregate_expressions(
         | AggregateMode::SinglePartitioned => Ok(aggr_expr
             .iter()
             .map(|agg| {
-                let pre_cast_type = if let Some(Sum {
-                    data_type,
-                    pre_cast_to_sum_type,
-                    ..
-                }) = agg.as_any().downcast_ref::<Sum>()
-                {
-                    if *pre_cast_to_sum_type {
-                        Some(data_type.clone())
-                    } else {
-                        None
-                    }
-                } else if let Some(Avg {
-                    sum_data_type,
-                    pre_cast_to_sum_type,
-                    ..
-                }) = agg.as_any().downcast_ref::<Avg>()
-                {
-                    if *pre_cast_to_sum_type {
-                        Some(sum_data_type.clone())
-                    } else {
-                        None
-                    }
-                } else {
-                    None
-                };
-                let mut result = agg
-                    .expressions()
-                    .into_iter()
-                    .map(|expr| {
-                        pre_cast_type.clone().map_or(expr.clone(), |cast_type| 
{
-                            Arc::new(CastExpr::new(expr, cast_type, None))
-                        })
-                    })
-                    .collect::<Vec<_>>();
+                let mut result = agg.expressions().clone();
                 // In partial mode, append ordering requirements to 
expressions' results.
                 // Ordering requirements are used by subsequent executors to 
satisfy the required
                 // ordering for 
`AggregateMode::FinalPartitioned`/`AggregateMode::Final` modes.
diff --git a/datafusion/core/tests/fuzz_cases/window_fuzz.rs 
b/datafusion/core/tests/fuzz_cases/window_fuzz.rs
index c28b3440d7..1d9c4a9d02 100644
--- a/datafusion/core/tests/fuzz_cases/window_fuzz.rs
+++ b/datafusion/core/tests/fuzz_cases/window_fuzz.rs
@@ -39,7 +39,8 @@ use datafusion_expr::{
 
 use datafusion::prelude::{SessionConfig, SessionContext};
 use datafusion_common::{Result, ScalarValue};
-use datafusion_physical_expr::expressions::{col, lit};
+use datafusion_expr::type_coercion::aggregates::coerce_types;
+use datafusion_physical_expr::expressions::{cast, col, lit};
 use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
 use test_utils::add_empty_batches;
 
@@ -261,6 +262,14 @@ fn get_random_function(
     let rand_fn_idx = rng.gen_range(0..window_fn_map.len());
     let fn_name = window_fn_map.keys().collect::<Vec<_>>()[rand_fn_idx];
     let (window_fn, new_args) = 
window_fn_map.values().collect::<Vec<_>>()[rand_fn_idx];
+    if let WindowFunction::AggregateFunction(f) = window_fn {
+        let a = args[0].clone();
+        let dt = a.data_type(schema.as_ref()).unwrap();
+        let sig = f.signature();
+        let coerced = coerce_types(f, &[dt], &sig).unwrap();
+        args[0] = cast(a, schema, coerced[0].clone()).unwrap();
+    }
+
     for new_arg in new_args {
         args.push(new_arg.clone());
     }
diff --git a/datafusion/expr/src/aggregate_function.rs 
b/datafusion/expr/src/aggregate_function.rs
index 3debe21800..9412c9247b 100644
--- a/datafusion/expr/src/aggregate_function.rs
+++ b/datafusion/expr/src/aggregate_function.rs
@@ -228,20 +228,16 @@ impl AggregateFunction {
         // Note that this function *must* return the same type that the 
respective physical expression returns
         // or the execution panics.
 
-        let coerced_data_types = 
crate::type_coercion::aggregates::coerce_types(
-            self,
-            input_expr_types,
-            &self.signature(),
-        )
-        // original errors are all related to wrong function signature
-        // aggregate them for better error message
-        .map_err(|_| {
-            DataFusionError::Plan(utils::generate_signature_error_msg(
-                &format!("{self}"),
-                self.signature(),
-                input_expr_types,
-            ))
-        })?;
+        let coerced_data_types = coerce_types(self, input_expr_types, 
&self.signature())
+            // original errors are all related to wrong function signature
+            // aggregate them for better error message
+            .map_err(|_| {
+                DataFusionError::Plan(utils::generate_signature_error_msg(
+                    &format!("{self}"),
+                    self.signature(),
+                    input_expr_types,
+                ))
+            })?;
 
         match self {
             AggregateFunction::Count | AggregateFunction::ApproxDistinct => {
diff --git a/datafusion/expr/src/type_coercion/aggregates.rs 
b/datafusion/expr/src/type_coercion/aggregates.rs
index 094b6e9da4..c47c04195d 100644
--- a/datafusion/expr/src/type_coercion/aggregates.rs
+++ b/datafusion/expr/src/type_coercion/aggregates.rs
@@ -19,6 +19,7 @@ use arrow::datatypes::{
     DataType, TimeUnit, DECIMAL128_MAX_PRECISION, DECIMAL128_MAX_SCALE,
     DECIMAL256_MAX_PRECISION, DECIMAL256_MAX_SCALE,
 };
+
 use datafusion_common::{internal_err, plan_err, DataFusionError, Result};
 use std::ops::Deref;
 
@@ -89,6 +90,7 @@ pub fn coerce_types(
     input_types: &[DataType],
     signature: &Signature,
 ) -> Result<Vec<DataType>> {
+    use DataType::*;
     // Validate input_types matches (at least one of) the func signature.
     check_arg_count(agg_fun, input_types, &signature.type_signature)?;
 
@@ -105,26 +107,44 @@ pub fn coerce_types(
         AggregateFunction::Sum => {
             // Refer to 
https://www.postgresql.org/docs/8.2/functions-aggregate.html doc
             // smallint, int, bigint, real, double precision, decimal, or 
interval.
-            if !is_sum_support_arg_type(&input_types[0]) {
-                return plan_err!(
-                    "The function {:?} does not support inputs of type {:?}.",
-                    agg_fun,
-                    input_types[0]
-                );
-            }
-            Ok(input_types.to_vec())
+            let v = match &input_types[0] {
+                Decimal128(p, s) => Decimal128(*p, *s),
+                Decimal256(p, s) => Decimal256(*p, *s),
+                d if d.is_signed_integer() => Int64,
+                d if d.is_unsigned_integer() => UInt64,
+                d if d.is_floating() => Float64,
+                Dictionary(_, v) => {
+                    return coerce_types(agg_fun, &[v.as_ref().clone()], 
signature)
+                }
+                _ => {
+                    return plan_err!(
+                        "The function {:?} does not support inputs of type 
{:?}.",
+                        agg_fun,
+                        input_types[0]
+                    )
+                }
+            };
+            Ok(vec![v])
         }
         AggregateFunction::Avg => {
             // Refer to 
https://www.postgresql.org/docs/8.2/functions-aggregate.html doc
             // smallint, int, bigint, real, double precision, decimal, or 
interval
-            if !is_avg_support_arg_type(&input_types[0]) {
-                return plan_err!(
-                    "The function {:?} does not support inputs of type {:?}.",
-                    agg_fun,
-                    input_types[0]
-                );
-            }
-            Ok(input_types.to_vec())
+            let v = match &input_types[0] {
+                Decimal128(p, s) => Decimal128(*p, *s),
+                Decimal256(p, s) => Decimal256(*p, *s),
+                d if d.is_numeric() => Float64,
+                Dictionary(_, v) => {
+                    return coerce_types(agg_fun, &[v.as_ref().clone()], 
signature)
+                }
+                _ => {
+                    return plan_err!(
+                        "The function {:?} does not support inputs of type 
{:?}.",
+                        agg_fun,
+                        input_types[0]
+                    )
+                }
+            };
+            Ok(vec![v])
         }
         AggregateFunction::BitAnd
         | AggregateFunction::BitOr
@@ -160,7 +180,7 @@ pub fn coerce_types(
                     input_types[0]
                 );
             }
-            Ok(input_types.to_vec())
+            Ok(vec![Float64, Float64])
         }
         AggregateFunction::Covariance | AggregateFunction::CovariancePop => {
             if !is_covariance_support_arg_type(&input_types[0]) {
@@ -170,7 +190,7 @@ pub fn coerce_types(
                     input_types[0]
                 );
             }
-            Ok(input_types.to_vec())
+            Ok(vec![Float64, Float64])
         }
         AggregateFunction::Stddev | AggregateFunction::StddevPop => {
             if !is_stddev_support_arg_type(&input_types[0]) {
@@ -180,7 +200,7 @@ pub fn coerce_types(
                     input_types[0]
                 );
             }
-            Ok(input_types.to_vec())
+            Ok(vec![Float64])
         }
         AggregateFunction::Correlation => {
             if !is_correlation_support_arg_type(&input_types[0]) {
@@ -190,7 +210,7 @@ pub fn coerce_types(
                     input_types[0]
                 );
             }
-            Ok(input_types.to_vec())
+            Ok(vec![Float64, Float64])
         }
         AggregateFunction::RegrSlope
         | AggregateFunction::RegrIntercept
@@ -211,7 +231,7 @@ pub fn coerce_types(
                     input_types[0]
                 );
             }
-            Ok(input_types.to_vec())
+            Ok(vec![Float64, Float64])
         }
         AggregateFunction::ApproxPercentileCont => {
             if !is_approx_percentile_cont_supported_arg_type(&input_types[0]) {
@@ -357,11 +377,9 @@ fn get_min_max_result_type(input_types: &[DataType]) -> 
Result<Vec<DataType>> {
 /// function return type of a sum
 pub fn sum_return_type(arg_type: &DataType) -> Result<DataType> {
     match arg_type {
-        arg_type if SIGNED_INTEGERS.contains(arg_type) => Ok(DataType::Int64),
-        arg_type if UNSIGNED_INTEGERS.contains(arg_type) => 
Ok(DataType::UInt64),
-        // In the 
https://www.postgresql.org/docs/current/functions-aggregate.html doc,
-        // the result type of floating-point is FLOAT64 with the double 
precision.
-        DataType::Float64 | DataType::Float32 => Ok(DataType::Float64),
+        DataType::Int64 => Ok(DataType::Int64),
+        DataType::UInt64 => Ok(DataType::UInt64),
+        DataType::Float64 => Ok(DataType::Float64),
         DataType::Decimal128(precision, scale) => {
             // in the spark, the result type is DECIMAL(min(38,precision+10), 
s)
             // ref: 
https://github.com/apache/spark/blob/fcf636d9eb8d645c24be3db2d599aba2d7e2955a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Sum.scala#L66
@@ -374,9 +392,6 @@ pub fn sum_return_type(arg_type: &DataType) -> 
Result<DataType> {
             let new_precision = DECIMAL256_MAX_PRECISION.min(*precision + 10);
             Ok(DataType::Decimal256(new_precision, *scale))
         }
-        DataType::Dictionary(_, dict_value_type) => {
-            sum_return_type(dict_value_type.as_ref())
-        }
         other => plan_err!("SUM does not support type \"{other:?}\""),
     }
 }
@@ -601,21 +616,29 @@ mod tests {
                 assert_eq!(*input_type, result.unwrap());
             }
         }
-        // test sum, avg
-        let funs = vec![AggregateFunction::Sum, AggregateFunction::Avg];
-        let input_types = vec![
-            vec![DataType::Int32],
-            vec![DataType::Float32],
-            vec![DataType::Decimal128(20, 3)],
-            vec![DataType::Decimal256(20, 3)],
-        ];
-        for fun in funs {
-            for input_type in &input_types {
-                let signature = fun.signature();
-                let result = coerce_types(&fun, input_type, &signature);
-                assert_eq!(*input_type, result.unwrap());
-            }
-        }
+        // test sum
+        let fun = AggregateFunction::Sum;
+        let signature = fun.signature();
+        let r = coerce_types(&fun, &[DataType::Int32], &signature).unwrap();
+        assert_eq!(r[0], DataType::Int64);
+        let r = coerce_types(&fun, &[DataType::Float32], &signature).unwrap();
+        assert_eq!(r[0], DataType::Float64);
+        let r = coerce_types(&fun, &[DataType::Decimal128(20, 3)], 
&signature).unwrap();
+        assert_eq!(r[0], DataType::Decimal128(20, 3));
+        let r = coerce_types(&fun, &[DataType::Decimal256(20, 3)], 
&signature).unwrap();
+        assert_eq!(r[0], DataType::Decimal256(20, 3));
+
+        // test avg
+        let fun = AggregateFunction::Avg;
+        let signature = fun.signature();
+        let r = coerce_types(&fun, &[DataType::Int32], &signature).unwrap();
+        assert_eq!(r[0], DataType::Float64);
+        let r = coerce_types(&fun, &[DataType::Float32], &signature).unwrap();
+        assert_eq!(r[0], DataType::Float64);
+        let r = coerce_types(&fun, &[DataType::Decimal128(20, 3)], 
&signature).unwrap();
+        assert_eq!(r[0], DataType::Decimal128(20, 3));
+        let r = coerce_types(&fun, &[DataType::Decimal256(20, 3)], 
&signature).unwrap();
+        assert_eq!(r[0], DataType::Decimal256(20, 3));
 
         // ApproxPercentileCont input types
         let input_types = vec![
diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs 
b/datafusion/optimizer/src/analyzer/type_coercion.rs
index e9d155d5d2..2a6e08cd52 100644
--- a/datafusion/optimizer/src/analyzer/type_coercion.rs
+++ b/datafusion/optimizer/src/analyzer/type_coercion.rs
@@ -44,8 +44,8 @@ use datafusion_expr::type_coercion::{is_datetime, is_numeric, 
is_utf8_or_large_u
 use datafusion_expr::utils::from_plan;
 use datafusion_expr::{
     is_false, is_not_false, is_not_true, is_not_unknown, is_true, is_unknown,
-    type_coercion, AggregateFunction, BuiltinScalarFunction, Expr, 
LogicalPlan, Operator,
-    Projection, WindowFrame, WindowFrameBound, WindowFrameUnits,
+    type_coercion, window_function, AggregateFunction, BuiltinScalarFunction, 
Expr,
+    LogicalPlan, Operator, Projection, WindowFrame, WindowFrameBound, 
WindowFrameUnits,
 };
 use datafusion_expr::{ExprSchemable, Signature};
 
@@ -381,6 +381,19 @@ impl TreeNodeRewriter for TypeCoercionRewriter {
             }) => {
                 let window_frame =
                     coerce_window_frame(window_frame, &self.schema, 
&order_by)?;
+
+                let args = match &fun {
+                    window_function::WindowFunction::AggregateFunction(fun) => 
{
+                        coerce_agg_exprs_for_signature(
+                            fun,
+                            &args,
+                            &self.schema,
+                            &fun.signature(),
+                        )?
+                    }
+                    _ => args,
+                };
+
                 let expr = Expr::WindowFunction(WindowFunction::new(
                     fun,
                     args,
@@ -961,7 +974,7 @@ mod test {
             None,
         ));
         let plan = LogicalPlan::Projection(Projection::try_new(vec![agg_expr], 
empty)?);
-        let expected = "Projection: AVG(Int64(12))\n  EmptyRelation";
+        let expected = "Projection: AVG(CAST(Int64(12) AS Float64))\n  
EmptyRelation";
         assert_analyzed_plan_eq(Arc::new(TypeCoercion::new()), &plan, 
expected)?;
 
         let empty = empty_with_type(DataType::Int32);
@@ -974,7 +987,7 @@ mod test {
             None,
         ));
         let plan = LogicalPlan::Projection(Projection::try_new(vec![agg_expr], 
empty)?);
-        let expected = "Projection: AVG(a)\n  EmptyRelation";
+        let expected = "Projection: AVG(CAST(a AS Float64))\n  EmptyRelation";
         assert_analyzed_plan_eq(Arc::new(TypeCoercion::new()), &plan, 
expected)?;
         Ok(())
     }
diff --git a/datafusion/optimizer/tests/optimizer_integration.rs 
b/datafusion/optimizer/tests/optimizer_integration.rs
index 142ae870d4..5c71ea08ef 100644
--- a/datafusion/optimizer/tests/optimizer_integration.rs
+++ b/datafusion/optimizer/tests/optimizer_integration.rs
@@ -70,7 +70,7 @@ fn subquery_filter_with_cast() -> Result<()> {
     \n  Inner Join:  Filter: CAST(test.col_int32 AS Float64) > 
__scalar_sq_1.AVG(test.col_int32)\
     \n    TableScan: test projection=[col_int32]\
     \n    SubqueryAlias: __scalar_sq_1\
-    \n      Aggregate: groupBy=[[]], aggr=[[AVG(test.col_int32)]]\
+    \n      Aggregate: groupBy=[[]], aggr=[[AVG(CAST(test.col_int32 AS 
Float64))]]\
     \n        Projection: test.col_int32\
     \n          Filter: test.col_utf8 >= Utf8(\"2002-05-08\") AND 
test.col_utf8 <= Utf8(\"2002-05-13\")\
     \n            TableScan: test projection=[col_int32, col_utf8]";
diff --git a/datafusion/physical-expr/src/aggregate/average.rs 
b/datafusion/physical-expr/src/aggregate/average.rs
index 932081c4eb..59f72b6e3e 100644
--- a/datafusion/physical-expr/src/aggregate/average.rs
+++ b/datafusion/physical-expr/src/aggregate/average.rs
@@ -43,6 +43,7 @@ use arrow_array::{
 use datafusion_common::{
     downcast_value, internal_err, not_impl_err, DataFusionError, Result, 
ScalarValue,
 };
+use datafusion_expr::type_coercion::aggregates::avg_return_type;
 use datafusion_expr::Accumulator;
 
 use super::groups_accumulator::EmitTo;
@@ -53,9 +54,8 @@ use super::utils::{adjust_output_array, Decimal128Averager};
 pub struct Avg {
     name: String,
     expr: Arc<dyn PhysicalExpr>,
-    pub sum_data_type: DataType,
-    rt_data_type: DataType,
-    pub pre_cast_to_sum_type: bool,
+    input_data_type: DataType,
+    result_data_type: DataType,
 }
 
 impl Avg {
@@ -63,34 +63,21 @@ impl Avg {
     pub fn new(
         expr: Arc<dyn PhysicalExpr>,
         name: impl Into<String>,
-        sum_data_type: DataType,
-    ) -> Self {
-        Self::new_with_pre_cast(expr, name, sum_data_type.clone(), 
sum_data_type, false)
-    }
-
-    pub fn new_with_pre_cast(
-        expr: Arc<dyn PhysicalExpr>,
-        name: impl Into<String>,
-        sum_data_type: DataType,
-        rt_data_type: DataType,
-        cast_to_sum_type: bool,
+        data_type: DataType,
     ) -> Self {
         // the internal sum data type of avg just support FLOAT64 and Decimal 
data type.
         assert!(matches!(
-            sum_data_type,
-            DataType::Float64 | DataType::Decimal128(_, _) | 
DataType::Decimal256(_, _)
-        ));
-        // the result of avg just support FLOAT64 and Decimal data type.
-        assert!(matches!(
-            rt_data_type,
+            data_type,
             DataType::Float64 | DataType::Decimal128(_, _) | 
DataType::Decimal256(_, _)
         ));
+
+        let result_data_type = avg_return_type(&data_type).unwrap();
+
         Self {
             name: name.into(),
             expr,
-            sum_data_type,
-            rt_data_type,
-            pre_cast_to_sum_type: cast_to_sum_type,
+            input_data_type: data_type,
+            result_data_type,
         }
     }
 }
@@ -102,14 +89,14 @@ impl AggregateExpr for Avg {
     }
 
     fn field(&self) -> Result<Field> {
-        Ok(Field::new(&self.name, self.rt_data_type.clone(), true))
+        Ok(Field::new(&self.name, self.result_data_type.clone(), true))
     }
 
     fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
         Ok(Box::new(AvgAccumulator::try_new(
             // avg is f64 or decimal
-            &self.sum_data_type,
-            &self.rt_data_type,
+            &self.input_data_type,
+            &self.result_data_type,
         )?))
     }
 
@@ -122,7 +109,7 @@ impl AggregateExpr for Avg {
             ),
             Field::new(
                 format_state_name(&self.name, "sum"),
-                self.sum_data_type.clone(),
+                self.input_data_type.clone(),
                 true,
             ),
         ])
@@ -142,25 +129,25 @@ impl AggregateExpr for Avg {
 
     fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {
         Ok(Box::new(AvgAccumulator::try_new(
-            &self.sum_data_type,
-            &self.rt_data_type,
+            &self.input_data_type,
+            &self.result_data_type,
         )?))
     }
 
     fn groups_accumulator_supported(&self) -> bool {
         use DataType::*;
 
-        matches!(&self.rt_data_type, Float64 | Decimal128(_, _))
+        matches!(&self.result_data_type, Float64 | Decimal128(_, _))
     }
 
     fn create_groups_accumulator(&self) -> Result<Box<dyn GroupsAccumulator>> {
         use DataType::*;
         // instantiate specialized accumulator based for the type
-        match (&self.sum_data_type, &self.rt_data_type) {
+        match (&self.input_data_type, &self.result_data_type) {
             (Float64, Float64) => {
                 Ok(Box::new(AvgGroupsAccumulator::<Float64Type, _>::new(
-                    &self.sum_data_type,
-                    &self.rt_data_type,
+                    &self.input_data_type,
+                    &self.result_data_type,
                     |sum: f64, count: u64| Ok(sum / count as f64),
                 )))
             }
@@ -178,16 +165,16 @@ impl AggregateExpr for Avg {
                     move |sum: i128, count: u64| decimal_averager.avg(sum, 
count as i128);
 
                 Ok(Box::new(AvgGroupsAccumulator::<Decimal128Type, _>::new(
-                    &self.sum_data_type,
-                    &self.rt_data_type,
+                    &self.input_data_type,
+                    &self.result_data_type,
                     avg_fn,
                 )))
             }
 
             _ => not_impl_err!(
                 "AvgGroupsAccumulator for ({} --> {})",
-                self.sum_data_type,
-                self.rt_data_type
+                self.input_data_type,
+                self.result_data_type
             ),
         }
     }
@@ -199,8 +186,8 @@ impl PartialEq<dyn Any> for Avg {
             .downcast_ref::<Self>()
             .map(|x| {
                 self.name == x.name
-                    && self.sum_data_type == x.sum_data_type
-                    && self.rt_data_type == x.rt_data_type
+                    && self.input_data_type == x.input_data_type
+                    && self.result_data_type == x.result_data_type
                     && self.expr.eq(&x.expr)
             })
             .unwrap_or(false)
@@ -212,7 +199,6 @@ impl PartialEq<dyn Any> for Avg {
 pub struct AvgAccumulator {
     // sum is used for null
     sum: ScalarValue,
-    sum_data_type: DataType,
     return_data_type: DataType,
     count: u64,
 }
@@ -222,7 +208,6 @@ impl AvgAccumulator {
     pub fn try_new(datatype: &DataType, return_data_type: &DataType) -> 
Result<Self> {
         Ok(Self {
             sum: ScalarValue::try_from(datatype)?,
-            sum_data_type: datatype.clone(),
             return_data_type: return_data_type.clone(),
             count: 0,
         })
@@ -238,16 +223,14 @@ impl Accumulator for AvgAccumulator {
         let values = &values[0];
 
         self.count += (values.len() - values.null_count()) as u64;
-        self.sum = self
-            .sum
-            .add(&sum::sum_batch(values, &self.sum_data_type)?)?;
+        self.sum = self.sum.add(&sum::sum_batch(values)?)?;
         Ok(())
     }
 
     fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
         let values = &values[0];
         self.count -= (values.len() - values.null_count()) as u64;
-        let delta = sum_batch(values, &self.sum.get_datatype())?;
+        let delta = sum_batch(values)?;
         self.sum = self.sum.sub(&delta)?;
         Ok(())
     }
@@ -258,9 +241,7 @@ impl Accumulator for AvgAccumulator {
         self.count += compute::sum(counts).unwrap_or(0);
 
         // sums are summed
-        self.sum = self
-            .sum
-            .add(&sum::sum_batch(&states[1], &self.sum_data_type)?)?;
+        self.sum = self.sum.add(&sum::sum_batch(&states[1])?)?;
         Ok(())
     }
 
@@ -491,72 +472,69 @@ where
 #[cfg(test)]
 mod tests {
     use super::*;
-    use crate::expressions::col;
-    use crate::expressions::tests::aggregate;
-    use crate::generic_test_op;
-    use arrow::record_batch::RecordBatch;
-    use arrow::{array::*, datatypes::*};
-    use datafusion_common::Result;
+    use crate::expressions::tests::assert_aggregate;
+    use arrow::array::*;
+    use datafusion_expr::AggregateFunction;
 
     #[test]
-    fn avg_decimal() -> Result<()> {
+    fn avg_decimal() {
         // test agg
         let array: ArrayRef = Arc::new(
             (1..7)
                 .map(Some)
                 .collect::<Decimal128Array>()
-                .with_precision_and_scale(10, 0)?,
+                .with_precision_and_scale(10, 0)
+                .unwrap(),
         );
 
-        generic_test_op!(
+        assert_aggregate(
             array,
-            DataType::Decimal128(10, 0),
-            Avg,
-            ScalarValue::Decimal128(Some(35000), 14, 4)
-        )
+            AggregateFunction::Avg,
+            ScalarValue::Decimal128(Some(35000), 14, 4),
+        );
     }
 
     #[test]
-    fn avg_decimal_with_nulls() -> Result<()> {
+    fn avg_decimal_with_nulls() {
         let array: ArrayRef = Arc::new(
             (1..6)
                 .map(|i| if i == 2 { None } else { Some(i) })
                 .collect::<Decimal128Array>()
-                .with_precision_and_scale(10, 0)?,
+                .with_precision_and_scale(10, 0)
+                .unwrap(),
         );
-        generic_test_op!(
+        assert_aggregate(
             array,
-            DataType::Decimal128(10, 0),
-            Avg,
-            ScalarValue::Decimal128(Some(32500), 14, 4)
-        )
+            AggregateFunction::Avg,
+            ScalarValue::Decimal128(Some(32500), 14, 4),
+        );
     }
 
     #[test]
-    fn avg_decimal_all_nulls() -> Result<()> {
+    fn avg_decimal_all_nulls() {
         // test agg
         let array: ArrayRef = Arc::new(
             std::iter::repeat::<Option<i128>>(None)
                 .take(6)
                 .collect::<Decimal128Array>()
-                .with_precision_and_scale(10, 0)?,
+                .with_precision_and_scale(10, 0)
+                .unwrap(),
         );
-        generic_test_op!(
+        assert_aggregate(
             array,
-            DataType::Decimal128(10, 0),
-            Avg,
-            ScalarValue::Decimal128(None, 14, 4)
-        )
+            AggregateFunction::Avg,
+            ScalarValue::Decimal128(None, 14, 4),
+        );
     }
 
     #[test]
-    fn avg_i32() -> Result<()> {
+    fn avg_i32() {
         let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
-        generic_test_op!(a, DataType::Int32, Avg, ScalarValue::from(3_f64))
+        assert_aggregate(a, AggregateFunction::Avg, ScalarValue::from(3_f64));
     }
 
     #[test]
-    fn avg_i32_with_nulls() -> Result<()> {
+    fn avg_i32_with_nulls() {
         let a: ArrayRef = Arc::new(Int32Array::from(vec![
             Some(1),
             None,
@@ -564,33 +542,33 @@ mod tests {
             Some(4),
             Some(5),
         ]));
-        generic_test_op!(a, DataType::Int32, Avg, ScalarValue::from(3.25f64))
+        assert_aggregate(a, AggregateFunction::Avg, 
ScalarValue::from(3.25f64));
     }
 
     #[test]
-    fn avg_i32_all_nulls() -> Result<()> {
+    fn avg_i32_all_nulls() {
         let a: ArrayRef = Arc::new(Int32Array::from(vec![None, None]));
-        generic_test_op!(a, DataType::Int32, Avg, ScalarValue::Float64(None))
+        assert_aggregate(a, AggregateFunction::Avg, 
ScalarValue::Float64(None));
     }
 
     #[test]
-    fn avg_u32() -> Result<()> {
+    fn avg_u32() {
         let a: ArrayRef =
             Arc::new(UInt32Array::from(vec![1_u32, 2_u32, 3_u32, 4_u32, 
5_u32]));
-        generic_test_op!(a, DataType::UInt32, Avg, ScalarValue::from(3.0f64))
+        assert_aggregate(a, AggregateFunction::Avg, ScalarValue::from(3.0f64));
     }
 
     #[test]
-    fn avg_f32() -> Result<()> {
+    fn avg_f32() {
         let a: ArrayRef =
             Arc::new(Float32Array::from(vec![1_f32, 2_f32, 3_f32, 4_f32, 
5_f32]));
-        generic_test_op!(a, DataType::Float32, Avg, ScalarValue::from(3_f64))
+        assert_aggregate(a, AggregateFunction::Avg, ScalarValue::from(3_f64));
     }
 
     #[test]
-    fn avg_f64() -> Result<()> {
+    fn avg_f64() {
         let a: ArrayRef =
             Arc::new(Float64Array::from(vec![1_f64, 2_f64, 3_f64, 4_f64, 
5_f64]));
-        generic_test_op!(a, DataType::Float64, Avg, ScalarValue::from(3_f64))
+        assert_aggregate(a, AggregateFunction::Avg, ScalarValue::from(3_f64));
     }
 }
diff --git a/datafusion/physical-expr/src/aggregate/build_in.rs 
b/datafusion/physical-expr/src/aggregate/build_in.rs
index 2ef9b31fe3..a38dfdcc13 100644
--- a/datafusion/physical-expr/src/aggregate/build_in.rs
+++ b/datafusion/physical-expr/src/aggregate/build_in.rs
@@ -30,7 +30,6 @@ use crate::aggregate::regr::RegrType;
 use crate::{expressions, AggregateExpr, PhysicalExpr, PhysicalSortExpr};
 use arrow::datatypes::Schema;
 use datafusion_common::{not_impl_err, DataFusionError, Result};
-use datafusion_expr::aggregate_function::sum_type_of_avg;
 pub use datafusion_expr::AggregateFunction;
 use std::sync::Arc;
 
@@ -50,7 +49,7 @@ pub fn create_aggregate_expr(
         .iter()
         .map(|e| e.data_type(input_schema))
         .collect::<Result<Vec<_>>>()?;
-    let rt_type = fun.return_type(&input_phy_types)?;
+    let data_type = input_phy_types[0].clone();
     let ordering_types = ordering_req
         .iter()
         .map(|e| e.expr.data_type(input_schema))
@@ -58,72 +57,63 @@ pub fn create_aggregate_expr(
     let input_phy_exprs = input_phy_exprs.to_vec();
     Ok(match (fun, distinct) {
         (AggregateFunction::Count, false) => Arc::new(
-            expressions::Count::new_with_multiple_exprs(input_phy_exprs, name, 
rt_type),
+            expressions::Count::new_with_multiple_exprs(input_phy_exprs, name, 
data_type),
         ),
         (AggregateFunction::Count, true) => 
Arc::new(expressions::DistinctCount::new(
-            input_phy_types[0].clone(),
+            data_type,
             input_phy_exprs[0].clone(),
             name,
         )),
         (AggregateFunction::Grouping, _) => 
Arc::new(expressions::Grouping::new(
             input_phy_exprs[0].clone(),
             name,
-            rt_type,
+            data_type,
         )),
         (AggregateFunction::BitAnd, _) => Arc::new(expressions::BitAnd::new(
             input_phy_exprs[0].clone(),
             name,
-            rt_type,
+            data_type,
         )),
         (AggregateFunction::BitOr, _) => Arc::new(expressions::BitOr::new(
             input_phy_exprs[0].clone(),
             name,
-            rt_type,
+            data_type,
         )),
         (AggregateFunction::BitXor, false) => 
Arc::new(expressions::BitXor::new(
             input_phy_exprs[0].clone(),
             name,
-            rt_type,
+            data_type,
         )),
         (AggregateFunction::BitXor, true) => 
Arc::new(expressions::DistinctBitXor::new(
             input_phy_exprs[0].clone(),
             name,
-            rt_type,
+            data_type,
         )),
         (AggregateFunction::BoolAnd, _) => Arc::new(expressions::BoolAnd::new(
             input_phy_exprs[0].clone(),
             name,
-            rt_type,
+            data_type,
         )),
         (AggregateFunction::BoolOr, _) => Arc::new(expressions::BoolOr::new(
             input_phy_exprs[0].clone(),
             name,
-            rt_type,
+            data_type,
+        )),
+        (AggregateFunction::Sum, false) => Arc::new(expressions::Sum::new(
+            input_phy_exprs[0].clone(),
+            name,
+            input_phy_types[0].clone(),
         )),
-        (AggregateFunction::Sum, false) => {
-            let cast_to_sum_type = rt_type != input_phy_types[0];
-            Arc::new(expressions::Sum::new_with_pre_cast(
-                input_phy_exprs[0].clone(),
-                name,
-                rt_type,
-                cast_to_sum_type,
-            ))
-        }
         (AggregateFunction::Sum, true) => 
Arc::new(expressions::DistinctSum::new(
             vec![input_phy_exprs[0].clone()],
             name,
-            rt_type,
+            data_type,
         )),
-        (AggregateFunction::ApproxDistinct, _) => {
-            Arc::new(expressions::ApproxDistinct::new(
-                input_phy_exprs[0].clone(),
-                name,
-                input_phy_types[0].clone(),
-            ))
-        }
+        (AggregateFunction::ApproxDistinct, _) => Arc::new(
+            expressions::ApproxDistinct::new(input_phy_exprs[0].clone(), name, 
data_type),
+        ),
         (AggregateFunction::ArrayAgg, false) => {
             let expr = input_phy_exprs[0].clone();
-            let data_type = input_phy_types[0].clone();
             if ordering_req.is_empty() {
                 Arc::new(expressions::ArrayAgg::new(expr, name, data_type))
             } else {
@@ -145,43 +135,37 @@ pub fn create_aggregate_expr(
             Arc::new(expressions::DistinctArrayAgg::new(
                 input_phy_exprs[0].clone(),
                 name,
-                input_phy_types[0].clone(),
+                data_type,
             ))
         }
         (AggregateFunction::Min, _) => Arc::new(expressions::Min::new(
             input_phy_exprs[0].clone(),
             name,
-            rt_type,
+            data_type,
         )),
         (AggregateFunction::Max, _) => Arc::new(expressions::Max::new(
             input_phy_exprs[0].clone(),
             name,
-            rt_type,
+            data_type,
+        )),
+        (AggregateFunction::Avg, false) => Arc::new(expressions::Avg::new(
+            input_phy_exprs[0].clone(),
+            name,
+            data_type,
         )),
-        (AggregateFunction::Avg, false) => {
-            let sum_type = sum_type_of_avg(&input_phy_types)?;
-            let cast_to_sum_type = sum_type != input_phy_types[0];
-            Arc::new(expressions::Avg::new_with_pre_cast(
-                input_phy_exprs[0].clone(),
-                name,
-                sum_type,
-                rt_type,
-                cast_to_sum_type,
-            ))
-        }
         (AggregateFunction::Avg, true) => {
             return not_impl_err!("AVG(DISTINCT) aggregations are not 
available");
         }
         (AggregateFunction::Variance, false) => 
Arc::new(expressions::Variance::new(
             input_phy_exprs[0].clone(),
             name,
-            rt_type,
+            data_type,
         )),
         (AggregateFunction::Variance, true) => {
             return not_impl_err!("VAR(DISTINCT) aggregations are not 
available");
         }
         (AggregateFunction::VariancePop, false) => Arc::new(
-            expressions::VariancePop::new(input_phy_exprs[0].clone(), name, 
rt_type),
+            expressions::VariancePop::new(input_phy_exprs[0].clone(), name, 
data_type),
         ),
         (AggregateFunction::VariancePop, true) => {
             return not_impl_err!("VAR_POP(DISTINCT) aggregations are not 
available");
@@ -190,7 +174,7 @@ pub fn create_aggregate_expr(
             input_phy_exprs[0].clone(),
             input_phy_exprs[1].clone(),
             name,
-            rt_type,
+            data_type,
         )),
         (AggregateFunction::Covariance, true) => {
             return not_impl_err!("COVAR(DISTINCT) aggregations are not 
available");
@@ -200,7 +184,7 @@ pub fn create_aggregate_expr(
                 input_phy_exprs[0].clone(),
                 input_phy_exprs[1].clone(),
                 name,
-                rt_type,
+                data_type,
             ))
         }
         (AggregateFunction::CovariancePop, true) => {
@@ -209,7 +193,7 @@ pub fn create_aggregate_expr(
         (AggregateFunction::Stddev, false) => 
Arc::new(expressions::Stddev::new(
             input_phy_exprs[0].clone(),
             name,
-            rt_type,
+            data_type,
         )),
         (AggregateFunction::Stddev, true) => {
             return not_impl_err!("STDDEV(DISTINCT) aggregations are not 
available");
@@ -217,7 +201,7 @@ pub fn create_aggregate_expr(
         (AggregateFunction::StddevPop, false) => 
Arc::new(expressions::StddevPop::new(
             input_phy_exprs[0].clone(),
             name,
-            rt_type,
+            data_type,
         )),
         (AggregateFunction::StddevPop, true) => {
             return not_impl_err!("STDDEV_POP(DISTINCT) aggregations are not 
available");
@@ -227,7 +211,7 @@ pub fn create_aggregate_expr(
                 input_phy_exprs[0].clone(),
                 input_phy_exprs[1].clone(),
                 name,
-                rt_type,
+                data_type,
             ))
         }
         (AggregateFunction::Correlation, true) => {
@@ -238,63 +222,63 @@ pub fn create_aggregate_expr(
             input_phy_exprs[1].clone(),
             name,
             RegrType::Slope,
-            rt_type,
+            data_type,
         )),
         (AggregateFunction::RegrIntercept, false) => 
Arc::new(expressions::Regr::new(
             input_phy_exprs[0].clone(),
             input_phy_exprs[1].clone(),
             name,
             RegrType::Intercept,
-            rt_type,
+            data_type,
         )),
         (AggregateFunction::RegrCount, false) => 
Arc::new(expressions::Regr::new(
             input_phy_exprs[0].clone(),
             input_phy_exprs[1].clone(),
             name,
             RegrType::Count,
-            rt_type,
+            data_type,
         )),
         (AggregateFunction::RegrR2, false) => Arc::new(expressions::Regr::new(
             input_phy_exprs[0].clone(),
             input_phy_exprs[1].clone(),
             name,
             RegrType::R2,
-            rt_type,
+            data_type,
         )),
         (AggregateFunction::RegrAvgx, false) => 
Arc::new(expressions::Regr::new(
             input_phy_exprs[0].clone(),
             input_phy_exprs[1].clone(),
             name,
             RegrType::AvgX,
-            rt_type,
+            data_type,
         )),
         (AggregateFunction::RegrAvgy, false) => 
Arc::new(expressions::Regr::new(
             input_phy_exprs[0].clone(),
             input_phy_exprs[1].clone(),
             name,
             RegrType::AvgY,
-            rt_type,
+            data_type,
         )),
         (AggregateFunction::RegrSXX, false) => Arc::new(expressions::Regr::new(
             input_phy_exprs[0].clone(),
             input_phy_exprs[1].clone(),
             name,
             RegrType::SXX,
-            rt_type,
+            data_type,
         )),
         (AggregateFunction::RegrSYY, false) => Arc::new(expressions::Regr::new(
             input_phy_exprs[0].clone(),
             input_phy_exprs[1].clone(),
             name,
             RegrType::SYY,
-            rt_type,
+            data_type,
         )),
         (AggregateFunction::RegrSXY, false) => Arc::new(expressions::Regr::new(
             input_phy_exprs[0].clone(),
             input_phy_exprs[1].clone(),
             name,
             RegrType::SXY,
-            rt_type,
+            data_type,
         )),
         (
             AggregateFunction::RegrSlope
@@ -316,14 +300,14 @@ pub fn create_aggregate_expr(
                     // Pass in the desired percentile expr
                     input_phy_exprs,
                     name,
-                    rt_type,
+                    data_type,
                 )?)
             } else {
                 Arc::new(expressions::ApproxPercentileCont::new_with_max_size(
                     // Pass in the desired percentile expr
                     input_phy_exprs,
                     name,
-                    rt_type,
+                    data_type,
                 )?)
             }
         }
@@ -337,7 +321,7 @@ pub fn create_aggregate_expr(
                 // Pass in the desired percentile expr
                 input_phy_exprs,
                 name,
-                rt_type,
+                data_type,
             )?)
         }
         (AggregateFunction::ApproxPercentileContWithWeight, true) => {
@@ -349,7 +333,7 @@ pub fn create_aggregate_expr(
             Arc::new(expressions::ApproxMedian::try_new(
                 input_phy_exprs[0].clone(),
                 name,
-                rt_type,
+                data_type,
             )?)
         }
         (AggregateFunction::ApproxMedian, true) => {
@@ -360,7 +344,7 @@ pub fn create_aggregate_expr(
         (AggregateFunction::Median, false) => 
Arc::new(expressions::Median::new(
             input_phy_exprs[0].clone(),
             name,
-            rt_type,
+            data_type,
         )),
         (AggregateFunction::Median, true) => {
             return not_impl_err!("MEDIAN(DISTINCT) aggregations are not 
available");
diff --git a/datafusion/physical-expr/src/aggregate/count.rs 
b/datafusion/physical-expr/src/aggregate/count.rs
index 178f08b481..738ca4e915 100644
--- a/datafusion/physical-expr/src/aggregate/count.rs
+++ b/datafusion/physical-expr/src/aggregate/count.rs
@@ -218,17 +218,13 @@ impl AggregateExpr for Count {
     }
 
     fn field(&self) -> Result<Field> {
-        Ok(Field::new(
-            &self.name,
-            self.data_type.clone(),
-            self.nullable,
-        ))
+        Ok(Field::new(&self.name, DataType::Int64, self.nullable))
     }
 
     fn state_fields(&self) -> Result<Vec<Field>> {
         Ok(vec![Field::new(
             format_state_name(&self.name, "count"),
-            self.data_type.clone(),
+            DataType::Int64,
             true,
         )])
     }
diff --git a/datafusion/physical-expr/src/aggregate/grouping.rs 
b/datafusion/physical-expr/src/aggregate/grouping.rs
index 6a7e6f94d1..70afda265a 100644
--- a/datafusion/physical-expr/src/aggregate/grouping.rs
+++ b/datafusion/physical-expr/src/aggregate/grouping.rs
@@ -62,17 +62,13 @@ impl AggregateExpr for Grouping {
     }
 
     fn field(&self) -> Result<Field> {
-        Ok(Field::new(
-            &self.name,
-            self.data_type.clone(),
-            self.nullable,
-        ))
+        Ok(Field::new(&self.name, DataType::Int32, self.nullable))
     }
 
     fn state_fields(&self) -> Result<Vec<Field>> {
         Ok(vec![Field::new(
             format_state_name(&self.name, "grouping"),
-            self.data_type.clone(),
+            DataType::Int32,
             true,
         )])
     }
diff --git a/datafusion/physical-expr/src/aggregate/sum.rs 
b/datafusion/physical-expr/src/aggregate/sum.rs
index cca1721a82..baaebada37 100644
--- a/datafusion/physical-expr/src/aggregate/sum.rs
+++ b/datafusion/physical-expr/src/aggregate/sum.rs
@@ -30,13 +30,9 @@ use arrow::array::Array;
 use arrow::array::Decimal128Array;
 use arrow::array::Decimal256Array;
 use arrow::compute;
-use arrow::compute::kernels::cast;
 use arrow::datatypes::DataType;
 use arrow::{
-    array::{
-        ArrayRef, Float32Array, Float64Array, Int16Array, Int32Array, 
Int64Array,
-        Int8Array, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
-    },
+    array::{ArrayRef, Float64Array, Int64Array, UInt64Array},
     datatypes::Field,
 };
 use arrow_array::types::{
@@ -46,16 +42,16 @@ use arrow_array::types::{
 use datafusion_common::{
     downcast_value, internal_err, not_impl_err, DataFusionError, Result, 
ScalarValue,
 };
+use datafusion_expr::type_coercion::aggregates::sum_return_type;
 use datafusion_expr::Accumulator;
 
 /// SUM aggregate expression
 #[derive(Debug, Clone)]
 pub struct Sum {
     name: String,
-    pub data_type: DataType,
+    data_type: DataType,
     expr: Arc<dyn PhysicalExpr>,
     nullable: bool,
-    pub pre_cast_to_sum_type: bool,
 }
 
 impl Sum {
@@ -65,27 +61,12 @@ impl Sum {
         name: impl Into<String>,
         data_type: DataType,
     ) -> Self {
+        let data_type = sum_return_type(&data_type).unwrap();
         Self {
             name: name.into(),
             expr,
             data_type,
             nullable: true,
-            pre_cast_to_sum_type: false,
-        }
-    }
-
-    pub fn new_with_pre_cast(
-        expr: Arc<dyn PhysicalExpr>,
-        name: impl Into<String>,
-        data_type: DataType,
-        pre_cast_to_sum_type: bool,
-    ) -> Self {
-        Self {
-            name: name.into(),
-            expr,
-            data_type,
-            nullable: true,
-            pre_cast_to_sum_type,
         }
     }
 }
@@ -269,14 +250,7 @@ fn sum_decimal256_batch(
 }
 
 // sums the array and returns a ScalarValue of its corresponding type.
-pub(crate) fn sum_batch(values: &ArrayRef, sum_type: &DataType) -> 
Result<ScalarValue> {
-    // TODO refine the cast kernel in arrow-rs
-    let cast_values = if values.data_type() != sum_type {
-        Some(cast(values, sum_type)?)
-    } else {
-        None
-    };
-    let values = cast_values.as_ref().unwrap_or(values);
+pub(crate) fn sum_batch(values: &ArrayRef) -> Result<ScalarValue> {
     Ok(match values.data_type() {
         DataType::Decimal128(precision, scale) => {
             sum_decimal_batch(values, *precision, *scale)?
@@ -285,15 +259,8 @@ pub(crate) fn sum_batch(values: &ArrayRef, sum_type: 
&DataType) -> Result<Scalar
             sum_decimal256_batch(values, *precision, *scale)?
         }
         DataType::Float64 => typed_sum_delta_batch!(values, Float64Array, 
Float64),
-        DataType::Float32 => typed_sum_delta_batch!(values, Float32Array, 
Float32),
         DataType::Int64 => typed_sum_delta_batch!(values, Int64Array, Int64),
-        DataType::Int32 => typed_sum_delta_batch!(values, Int32Array, Int32),
-        DataType::Int16 => typed_sum_delta_batch!(values, Int16Array, Int16),
-        DataType::Int8 => typed_sum_delta_batch!(values, Int8Array, Int8),
         DataType::UInt64 => typed_sum_delta_batch!(values, UInt64Array, 
UInt64),
-        DataType::UInt32 => typed_sum_delta_batch!(values, UInt32Array, 
UInt32),
-        DataType::UInt16 => typed_sum_delta_batch!(values, UInt16Array, 
UInt16),
-        DataType::UInt8 => typed_sum_delta_batch!(values, UInt8Array, UInt8),
         e => {
             return internal_err!("Sum is not expected to receive the type 
{e:?}");
         }
@@ -307,7 +274,7 @@ impl Accumulator for SumAccumulator {
 
     fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
         let values = &values[0];
-        let delta = sum_batch(values, &self.sum.get_datatype())?;
+        let delta = sum_batch(values)?;
         self.sum = self.sum.add(&delta)?;
         Ok(())
     }
@@ -336,7 +303,7 @@ impl Accumulator for SlidingSumAccumulator {
     fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
         let values = &values[0];
         self.count += (values.len() - values.null_count()) as u64;
-        let delta = sum_batch(values, &self.sum.get_datatype())?;
+        let delta = sum_batch(values)?;
         self.sum = self.sum.add(&delta)?;
         Ok(())
     }
@@ -344,7 +311,7 @@ impl Accumulator for SlidingSumAccumulator {
     fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
         let values = &values[0];
         self.count -= (values.len() - values.null_count()) as u64;
-        let delta = sum_batch(values, &self.sum.get_datatype())?;
+        let delta = sum_batch(values)?;
         self.sum = self.sum.sub(&delta)?;
         Ok(())
     }
@@ -376,23 +343,21 @@ impl Accumulator for SlidingSumAccumulator {
 #[cfg(test)]
 mod tests {
     use super::*;
-    use crate::expressions::col;
-    use crate::expressions::tests::aggregate;
-    use crate::generic_test_op;
-    use arrow::datatypes::*;
-    use arrow::record_batch::RecordBatch;
-    use datafusion_common::Result;
+    use crate::expressions::tests::assert_aggregate;
+    use arrow_array::{Float32Array, Int32Array, UInt32Array};
+    use datafusion_expr::AggregateFunction;
 
     #[test]
-    fn sum_decimal() -> Result<()> {
+    fn sum_decimal() {
         // test sum batch
         let array: ArrayRef = Arc::new(
             (1..6)
                 .map(Some)
                 .collect::<Decimal128Array>()
-                .with_precision_and_scale(10, 0)?,
+                .with_precision_and_scale(10, 0)
+                .unwrap(),
         );
-        let result = sum_batch(&array, &DataType::Decimal128(10, 0))?;
+        let result = sum_batch(&array).unwrap();
         assert_eq!(ScalarValue::Decimal128(Some(15), 10, 0), result);
 
         // test agg
@@ -400,27 +365,28 @@ mod tests {
             (1..6)
                 .map(Some)
                 .collect::<Decimal128Array>()
-                .with_precision_and_scale(10, 0)?,
+                .with_precision_and_scale(10, 0)
+                .unwrap(),
         );
 
-        generic_test_op!(
+        assert_aggregate(
             array,
-            DataType::Decimal128(10, 0),
-            Sum,
-            ScalarValue::Decimal128(Some(15), 20, 0)
-        )
+            AggregateFunction::Sum,
+            ScalarValue::Decimal128(Some(15), 20, 0),
+        );
     }
 
     #[test]
-    fn sum_decimal_with_nulls() -> Result<()> {
+    fn sum_decimal_with_nulls() {
         // test with batch
         let array: ArrayRef = Arc::new(
             (1..6)
                 .map(|i| if i == 2 { None } else { Some(i) })
                 .collect::<Decimal128Array>()
-                .with_precision_and_scale(10, 0)?,
+                .with_precision_and_scale(10, 0)
+                .unwrap(),
         );
-        let result = sum_batch(&array, &DataType::Decimal128(10, 0))?;
+        let result = sum_batch(&array).unwrap();
         assert_eq!(ScalarValue::Decimal128(Some(13), 10, 0), result);
 
         // test agg
@@ -428,45 +394,46 @@ mod tests {
             (1..6)
                 .map(|i| if i == 2 { None } else { Some(i) })
                 .collect::<Decimal128Array>()
-                .with_precision_and_scale(35, 0)?,
+                .with_precision_and_scale(35, 0)
+                .unwrap(),
         );
-        generic_test_op!(
+
+        assert_aggregate(
             array,
-            DataType::Decimal128(35, 0),
-            Sum,
-            ScalarValue::Decimal128(Some(13), 38, 0)
-        )
+            AggregateFunction::Sum,
+            ScalarValue::Decimal128(Some(13), 38, 0),
+        );
     }
 
     #[test]
-    fn sum_decimal_all_nulls() -> Result<()> {
+    fn sum_decimal_all_nulls() {
         // test with batch
         let array: ArrayRef = Arc::new(
             std::iter::repeat::<Option<i128>>(None)
                 .take(6)
                 .collect::<Decimal128Array>()
-                .with_precision_and_scale(10, 0)?,
+                .with_precision_and_scale(10, 0)
+                .unwrap(),
         );
-        let result = sum_batch(&array, &DataType::Decimal128(10, 0))?;
+        let result = sum_batch(&array).unwrap();
         assert_eq!(ScalarValue::Decimal128(None, 10, 0), result);
 
         // test agg
-        generic_test_op!(
+        assert_aggregate(
             array,
-            DataType::Decimal128(10, 0),
-            Sum,
-            ScalarValue::Decimal128(None, 20, 0)
-        )
+            AggregateFunction::Sum,
+            ScalarValue::Decimal128(None, 20, 0),
+        );
     }
 
     #[test]
-    fn sum_i32() -> Result<()> {
+    fn sum_i32() {
         let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
-        generic_test_op!(a, DataType::Int32, Sum, ScalarValue::from(15i32))
+        assert_aggregate(a, AggregateFunction::Sum, ScalarValue::from(15i64));
     }
 
     #[test]
-    fn sum_i32_with_nulls() -> Result<()> {
+    fn sum_i32_with_nulls() {
         let a: ArrayRef = Arc::new(Int32Array::from(vec![
             Some(1),
             None,
@@ -474,33 +441,33 @@ mod tests {
             Some(4),
             Some(5),
         ]));
-        generic_test_op!(a, DataType::Int32, Sum, ScalarValue::from(13i32))
+        assert_aggregate(a, AggregateFunction::Sum, ScalarValue::from(13i64));
     }
 
     #[test]
-    fn sum_i32_all_nulls() -> Result<()> {
+    fn sum_i32_all_nulls() {
         let a: ArrayRef = Arc::new(Int32Array::from(vec![None, None]));
-        generic_test_op!(a, DataType::Int32, Sum, ScalarValue::Int32(None))
+        assert_aggregate(a, AggregateFunction::Sum, ScalarValue::Int64(None));
     }
 
     #[test]
-    fn sum_u32() -> Result<()> {
+    fn sum_u32() {
         let a: ArrayRef =
             Arc::new(UInt32Array::from(vec![1_u32, 2_u32, 3_u32, 4_u32, 
5_u32]));
-        generic_test_op!(a, DataType::UInt32, Sum, ScalarValue::from(15u32))
+        assert_aggregate(a, AggregateFunction::Sum, ScalarValue::from(15u64));
     }
 
     #[test]
-    fn sum_f32() -> Result<()> {
+    fn sum_f32() {
         let a: ArrayRef =
             Arc::new(Float32Array::from(vec![1_f32, 2_f32, 3_f32, 4_f32, 
5_f32]));
-        generic_test_op!(a, DataType::Float32, Sum, ScalarValue::from(15_f32))
+        assert_aggregate(a, AggregateFunction::Sum, ScalarValue::from(15_f64));
     }
 
     #[test]
-    fn sum_f64() -> Result<()> {
+    fn sum_f64() {
         let a: ArrayRef =
             Arc::new(Float64Array::from(vec![1_f64, 2_f64, 3_f64, 4_f64, 
5_f64]));
-        generic_test_op!(a, DataType::Float64, Sum, ScalarValue::from(15_f64))
+        assert_aggregate(a, AggregateFunction::Sum, ScalarValue::from(15_f64));
     }
 }
diff --git a/datafusion/physical-expr/src/expressions/mod.rs 
b/datafusion/physical-expr/src/expressions/mod.rs
index 022e0ae02e..4a6d52834d 100644
--- a/datafusion/physical-expr/src/expressions/mod.rs
+++ b/datafusion/physical-expr/src/expressions/mod.rs
@@ -100,10 +100,15 @@ pub use crate::PhysicalSortExpr;
 
 #[cfg(test)]
 pub(crate) mod tests {
+    use crate::expressions::{col, create_aggregate_expr, try_cast};
     use crate::AggregateExpr;
     use arrow::record_batch::RecordBatch;
+    use arrow_array::ArrayRef;
+    use arrow_schema::{Field, Schema};
     use datafusion_common::Result;
     use datafusion_common::ScalarValue;
+    use datafusion_expr::type_coercion::aggregates::coerce_types;
+    use datafusion_expr::AggregateFunction;
     use std::sync::Arc;
 
     /// macro to perform an aggregation and verify the result.
@@ -131,6 +136,36 @@ pub(crate) mod tests {
         }};
     }
 
+    /// Assert `function(array) == expected` performing any necessary type 
coercion
+    pub fn assert_aggregate(
+        array: ArrayRef,
+        function: AggregateFunction,
+        expected: ScalarValue,
+    ) {
+        let data_type = array.data_type();
+        let sig = function.signature();
+        let coerced = coerce_types(&function, &[data_type.clone()], 
&sig).unwrap();
+
+        let input_schema = Schema::new(vec![Field::new("a", data_type.clone(), 
true)]);
+        let batch =
+            RecordBatch::try_new(Arc::new(input_schema.clone()), 
vec![array]).unwrap();
+
+        let input = try_cast(
+            col("a", &input_schema).unwrap(),
+            &input_schema,
+            coerced[0].clone(),
+        )
+        .unwrap();
+
+        let schema = Schema::new(vec![Field::new("a", coerced[0].clone(), 
true)]);
+        let agg =
+            create_aggregate_expr(&function, false, &[input], &[], &schema, 
"aggregate")
+                .unwrap();
+
+        let result = aggregate(&batch, agg).unwrap();
+        assert_eq!(expected, result);
+    }
+
     /// macro to perform an aggregation with two inputs and verify the result.
     #[macro_export]
     macro_rules! generic_test_op2 {
diff --git a/datafusion/proto/src/physical_plan/mod.rs 
b/datafusion/proto/src/physical_plan/mod.rs
index 8f02e3f3a7..ff7d74de1c 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -1388,8 +1388,8 @@ mod roundtrip_tests {
     use datafusion::execution::context::ExecutionProps;
     use datafusion::logical_expr::create_udf;
     use datafusion::logical_expr::{BuiltinScalarFunction, Volatility};
-    use datafusion::physical_expr::expressions::in_list;
     use datafusion::physical_expr::expressions::GetFieldAccessExpr;
+    use datafusion::physical_expr::expressions::{cast, in_list};
     use datafusion::physical_expr::ScalarFunctionExpr;
     use datafusion::physical_plan::aggregates::PhysicalGroupBy;
     use datafusion::physical_plan::expressions::{like, BinaryExpr, 
GetIndexedFieldExpr};
@@ -1585,14 +1585,11 @@ mod roundtrip_tests {
         let groups: Vec<(Arc<dyn PhysicalExpr>, String)> =
             vec![(col("a", &schema)?, "unused".to_string())];
 
-        let aggregates: Vec<Arc<dyn AggregateExpr>> =
-            vec![Arc::new(Avg::new_with_pre_cast(
-                col("b", &schema)?,
-                "AVG(b)".to_string(),
-                DataType::Float64,
-                DataType::Float64,
-                true,
-            ))];
+        let aggregates: Vec<Arc<dyn AggregateExpr>> = vec![Arc::new(Avg::new(
+            cast(col("b", &schema)?, &schema, DataType::Float64)?,
+            "AVG(b)".to_string(),
+            DataType::Float64,
+        ))];
 
         roundtrip_test(Arc::new(AggregateExec::try_new(
             AggregateMode::Final,
diff --git a/datafusion/sqllogictest/test_files/decimal.slt 
b/datafusion/sqllogictest/test_files/decimal.slt
index da448f0857..a2a1df55e5 100644
--- a/datafusion/sqllogictest/test_files/decimal.slt
+++ b/datafusion/sqllogictest/test_files/decimal.slt
@@ -618,7 +618,7 @@ select a / b from foo;
 statement ok
 create table t as values (arrow_cast(123, 'Decimal256(5,2)'));
 
-query error DataFusion error: Internal error: Operator \+ is not implemented 
for types Decimal256\(None,15,2\) and Decimal256\(Some\(12300\),15,2\)\. This 
was likely caused by a bug in DataFusion's code and we would welcome that you 
file an bug report in our issue tracker
+query error DataFusion error: Internal error: Operator \+ is not implemented 
for types Decimal256\(None,5,2\) and Decimal256\(Some\(12300\),5,2\)\. This was 
likely caused by a bug in DataFusion's code and we would welcome that you file 
an bug report in our issue tracker
 select AVG(column1) from t;
 
 statement ok
diff --git a/datafusion/sqllogictest/test_files/groupby.slt 
b/datafusion/sqllogictest/test_files/groupby.slt
index 14638a833f..9b594b046a 100644
--- a/datafusion/sqllogictest/test_files/groupby.slt
+++ b/datafusion/sqllogictest/test_files/groupby.slt
@@ -2059,7 +2059,7 @@ EXPLAIN SELECT a, b,
 ----
 logical_plan
 Projection: annotated_data_infinite2.a, annotated_data_infinite2.b, 
SUM(annotated_data_infinite2.c) AS summation1
---Aggregate: groupBy=[[annotated_data_infinite2.b, 
annotated_data_infinite2.a]], aggr=[[SUM(annotated_data_infinite2.c)]]
+--Aggregate: groupBy=[[annotated_data_infinite2.b, 
annotated_data_infinite2.a]], aggr=[[SUM(CAST(annotated_data_infinite2.c AS 
Int64))]]
 ----TableScan: annotated_data_infinite2 projection=[a, b, c]
 physical_plan
 ProjectionExec: expr=[a@1 as a, b@0 as b, SUM(annotated_data_infinite2.c)@2 as 
summation1]
@@ -2090,7 +2090,7 @@ EXPLAIN SELECT a, d,
 ----
 logical_plan
 Projection: annotated_data_infinite2.a, annotated_data_infinite2.d, 
SUM(annotated_data_infinite2.c) ORDER BY [annotated_data_infinite2.a DESC NULLS 
FIRST] AS summation1
---Aggregate: groupBy=[[annotated_data_infinite2.d, 
annotated_data_infinite2.a]], aggr=[[SUM(annotated_data_infinite2.c) ORDER BY 
[annotated_data_infinite2.a DESC NULLS FIRST]]]
+--Aggregate: groupBy=[[annotated_data_infinite2.d, 
annotated_data_infinite2.a]], aggr=[[SUM(CAST(annotated_data_infinite2.c AS 
Int64)) ORDER BY [annotated_data_infinite2.a DESC NULLS FIRST]]]
 ----TableScan: annotated_data_infinite2 projection=[a, c, d]
 physical_plan
 ProjectionExec: expr=[a@1 as a, d@0 as d, SUM(annotated_data_infinite2.c) 
ORDER BY [annotated_data_infinite2.a DESC NULLS FIRST]@2 as summation1]
@@ -2269,7 +2269,7 @@ EXPLAIN SELECT s.country, ARRAY_AGG(s.amount ORDER BY 
s.amount DESC) AS amounts,
 ----
 logical_plan
 Projection: s.country, ARRAY_AGG(s.amount) ORDER BY [s.amount DESC NULLS 
FIRST] AS amounts, SUM(s.amount) AS sum1
---Aggregate: groupBy=[[s.country]], aggr=[[ARRAY_AGG(s.amount) ORDER BY 
[s.amount DESC NULLS FIRST], SUM(s.amount)]]
+--Aggregate: groupBy=[[s.country]], aggr=[[ARRAY_AGG(s.amount) ORDER BY 
[s.amount DESC NULLS FIRST], SUM(CAST(s.amount AS Float64))]]
 ----SubqueryAlias: s
 ------TableScan: sales_global projection=[country, amount]
 physical_plan
@@ -2312,7 +2312,7 @@ EXPLAIN SELECT s.country, ARRAY_AGG(s.amount ORDER BY 
s.amount DESC) AS amounts,
 ----
 logical_plan
 Projection: s.country, ARRAY_AGG(s.amount) ORDER BY [s.amount DESC NULLS 
FIRST] AS amounts, SUM(s.amount) AS sum1
---Aggregate: groupBy=[[s.country]], aggr=[[ARRAY_AGG(s.amount) ORDER BY 
[s.amount DESC NULLS FIRST], SUM(s.amount)]]
+--Aggregate: groupBy=[[s.country]], aggr=[[ARRAY_AGG(s.amount) ORDER BY 
[s.amount DESC NULLS FIRST], SUM(CAST(s.amount AS Float64))]]
 ----SubqueryAlias: s
 ------Sort: sales_global.country ASC NULLS LAST
 --------TableScan: sales_global projection=[country, amount]
@@ -2348,7 +2348,7 @@ EXPLAIN SELECT s.country, s.zip_code, ARRAY_AGG(s.amount 
ORDER BY s.amount DESC)
 ----
 logical_plan
 Projection: s.country, s.zip_code, ARRAY_AGG(s.amount) ORDER BY [s.amount DESC 
NULLS FIRST] AS amounts, SUM(s.amount) AS sum1
---Aggregate: groupBy=[[s.country, s.zip_code]], aggr=[[ARRAY_AGG(s.amount) 
ORDER BY [s.amount DESC NULLS FIRST], SUM(s.amount)]]
+--Aggregate: groupBy=[[s.country, s.zip_code]], aggr=[[ARRAY_AGG(s.amount) 
ORDER BY [s.amount DESC NULLS FIRST], SUM(CAST(s.amount AS Float64))]]
 ----SubqueryAlias: s
 ------Sort: sales_global.country ASC NULLS LAST
 --------TableScan: sales_global projection=[zip_code, country, amount]
@@ -2384,7 +2384,7 @@ EXPLAIN SELECT s.country, ARRAY_AGG(s.amount ORDER BY 
s.country DESC) AS amounts
 ----
 logical_plan
 Projection: s.country, ARRAY_AGG(s.amount) ORDER BY [s.country DESC NULLS 
FIRST] AS amounts, SUM(s.amount) AS sum1
---Aggregate: groupBy=[[s.country]], aggr=[[ARRAY_AGG(s.amount) ORDER BY 
[s.country DESC NULLS FIRST], SUM(s.amount)]]
+--Aggregate: groupBy=[[s.country]], aggr=[[ARRAY_AGG(s.amount) ORDER BY 
[s.country DESC NULLS FIRST], SUM(CAST(s.amount AS Float64))]]
 ----SubqueryAlias: s
 ------Sort: sales_global.country ASC NULLS LAST
 --------TableScan: sales_global projection=[country, amount]
@@ -2419,7 +2419,7 @@ EXPLAIN SELECT s.country, ARRAY_AGG(s.amount ORDER BY 
s.country DESC, s.amount D
 ----
 logical_plan
 Projection: s.country, ARRAY_AGG(s.amount) ORDER BY [s.country DESC NULLS 
FIRST, s.amount DESC NULLS FIRST] AS amounts, SUM(s.amount) AS sum1
---Aggregate: groupBy=[[s.country]], aggr=[[ARRAY_AGG(s.amount) ORDER BY 
[s.country DESC NULLS FIRST, s.amount DESC NULLS FIRST], SUM(s.amount)]]
+--Aggregate: groupBy=[[s.country]], aggr=[[ARRAY_AGG(s.amount) ORDER BY 
[s.country DESC NULLS FIRST, s.amount DESC NULLS FIRST], SUM(CAST(s.amount AS 
Float64))]]
 ----SubqueryAlias: s
 ------Sort: sales_global.country ASC NULLS LAST
 --------TableScan: sales_global projection=[country, amount]
@@ -2546,7 +2546,7 @@ EXPLAIN SELECT country, SUM(amount ORDER BY ts DESC) AS 
sum1,
 ----
 logical_plan
 Projection: sales_global.country, SUM(sales_global.amount) ORDER BY 
[sales_global.ts DESC NULLS FIRST] AS sum1, ARRAY_AGG(sales_global.amount) 
ORDER BY [sales_global.amount ASC NULLS LAST] AS amounts
---Aggregate: groupBy=[[sales_global.country]], aggr=[[SUM(sales_global.amount) 
ORDER BY [sales_global.ts DESC NULLS FIRST], ARRAY_AGG(sales_global.amount) 
ORDER BY [sales_global.amount ASC NULLS LAST]]]
+--Aggregate: groupBy=[[sales_global.country]], 
aggr=[[SUM(CAST(sales_global.amount AS Float64)) ORDER BY [sales_global.ts DESC 
NULLS FIRST], ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC 
NULLS LAST]]]
 ----TableScan: sales_global projection=[country, ts, amount]
 physical_plan
 ProjectionExec: expr=[country@0 as country, SUM(sales_global.amount) ORDER BY 
[sales_global.ts DESC NULLS FIRST]@1 as sum1, ARRAY_AGG(sales_global.amount) 
ORDER BY [sales_global.amount ASC NULLS LAST]@2 as amounts]
@@ -2580,7 +2580,7 @@ EXPLAIN SELECT country, FIRST_VALUE(amount ORDER BY ts 
DESC) as fv1,
 ----
 logical_plan
 Projection: sales_global.country, FIRST_VALUE(sales_global.amount) ORDER BY 
[sales_global.ts DESC NULLS FIRST] AS fv1, LAST_VALUE(sales_global.amount) 
ORDER BY [sales_global.ts DESC NULLS FIRST] AS lv1, SUM(sales_global.amount) 
ORDER BY [sales_global.ts DESC NULLS FIRST] AS sum1
---Aggregate: groupBy=[[sales_global.country]], 
aggr=[[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS 
FIRST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS 
FIRST], SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]]]
+--Aggregate: groupBy=[[sales_global.country]], 
aggr=[[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS 
FIRST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS 
FIRST], SUM(CAST(sales_global.amount AS Float64)) ORDER BY [sales_global.ts 
DESC NULLS FIRST]]]
 ----Sort: sales_global.ts ASC NULLS LAST
 ------TableScan: sales_global projection=[country, ts, amount]
 physical_plan
@@ -2615,7 +2615,7 @@ EXPLAIN SELECT country, FIRST_VALUE(amount ORDER BY ts 
DESC) as fv1,
 ----
 logical_plan
 Projection: sales_global.country, FIRST_VALUE(sales_global.amount) ORDER BY 
[sales_global.ts DESC NULLS FIRST] AS fv1, LAST_VALUE(sales_global.amount) 
ORDER BY [sales_global.ts DESC NULLS FIRST] AS lv1, SUM(sales_global.amount) 
ORDER BY [sales_global.ts DESC NULLS FIRST] AS sum1
---Aggregate: groupBy=[[sales_global.country]], 
aggr=[[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS 
FIRST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS 
FIRST], SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]]]
+--Aggregate: groupBy=[[sales_global.country]], 
aggr=[[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS 
FIRST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS 
FIRST], SUM(CAST(sales_global.amount AS Float64)) ORDER BY [sales_global.ts 
DESC NULLS FIRST]]]
 ----TableScan: sales_global projection=[country, ts, amount]
 physical_plan
 ProjectionExec: expr=[country@0 as country, FIRST_VALUE(sales_global.amount) 
ORDER BY [sales_global.ts DESC NULLS FIRST]@1 as fv1, 
LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@2 
as lv1, SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@3 
as sum1]
@@ -3103,7 +3103,7 @@ EXPLAIN SELECT r.sn, SUM(l.amount), r.amount
 logical_plan
 Sort: r.sn ASC NULLS LAST
 --Projection: r.sn, SUM(l.amount), r.amount
-----Aggregate: groupBy=[[r.sn, r.amount]], aggr=[[SUM(l.amount)]]
+----Aggregate: groupBy=[[r.sn, r.amount]], aggr=[[SUM(CAST(l.amount AS 
Float64))]]
 ------Projection: l.amount, r.sn, r.amount
 --------Inner Join:  Filter: l.sn >= r.sn
 ----------SubqueryAlias: l
@@ -3248,7 +3248,7 @@ Sort: l.sn ASC NULLS LAST
 ----Aggregate: groupBy=[[l.sn, l.zip_code, l.country, l.ts, l.currency, 
l.amount, l.sum_amount]], aggr=[[]]
 ------SubqueryAlias: l
 --------Projection: l.zip_code, l.country, l.sn, l.ts, l.currency, l.amount, 
SUM(l.amount) ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS sum_amount
-----------WindowAggr: windowExpr=[[SUM(l.amount) ROWS BETWEEN 1 PRECEDING AND 
1 FOLLOWING]]
+----------WindowAggr: windowExpr=[[SUM(CAST(l.amount AS Float64)) ROWS BETWEEN 
1 PRECEDING AND 1 FOLLOWING]]
 ------------SubqueryAlias: l
 --------------TableScan: sales_global_with_pk projection=[zip_code, country, 
sn, ts, currency, amount]
 physical_plan
diff --git a/datafusion/sqllogictest/test_files/insert.slt 
b/datafusion/sqllogictest/test_files/insert.slt
index e42d2ef059..74968bb089 100644
--- a/datafusion/sqllogictest/test_files/insert.slt
+++ b/datafusion/sqllogictest/test_files/insert.slt
@@ -61,7 +61,7 @@ Dml: op=[Insert Into] table=[table_without_values]
 --Projection: SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] 
ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 
FOLLOWING AS field1, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING 
AS field2
 ----Sort: aggregate_test_100.c1 ASC NULLS LAST
 ------Projection: SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(*) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING, aggregate_test_100.c1
---------WindowAggr: windowExpr=[[SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING AS COUNT(*) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
+--------WindowAggr: windowExpr=[[SUM(CAST(aggregate_test_100.c4 AS Int64)) 
PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS 
LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING AS COUNT(*) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
 ----------TableScan: aggregate_test_100 projection=[c1, c4, c9]
 physical_plan
 InsertExec: sink=MemoryTable (partitions=1)
@@ -122,7 +122,7 @@ FROM aggregate_test_100
 logical_plan
 Dml: op=[Insert Into] table=[table_without_values]
 --Projection: SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] 
ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 
FOLLOWING AS field1, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING 
AS field2
-----WindowAggr: windowExpr=[[SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING AS COUNT(*) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
+----WindowAggr: windowExpr=[[SUM(CAST(aggregate_test_100.c4 AS Int64)) 
PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS 
LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING AS COUNT(*) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
 ------TableScan: aggregate_test_100 projection=[c1, c4, c9]
 physical_plan
 InsertExec: sink=MemoryTable (partitions=1)
@@ -171,7 +171,7 @@ Dml: op=[Insert Into] table=[table_without_values]
 --Projection: a1 AS a1, a2 AS a2
 ----Sort: aggregate_test_100.c1 ASC NULLS LAST
 ------Projection: SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING AS a1, COUNT(*) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING AS a2, aggregate_test_100.c1
---------WindowAggr: windowExpr=[[SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING AS COUNT(*) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
+--------WindowAggr: windowExpr=[[SUM(CAST(aggregate_test_100.c4 AS Int64)) 
PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS 
LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING AS COUNT(*) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
 ----------TableScan: aggregate_test_100 projection=[c1, c4, c9]
 physical_plan
 InsertExec: sink=MemoryTable (partitions=8)
diff --git a/datafusion/sqllogictest/test_files/insert_to_external.slt 
b/datafusion/sqllogictest/test_files/insert_to_external.slt
index 6c9c6fdd47..b2ee5468e7 100644
--- a/datafusion/sqllogictest/test_files/insert_to_external.slt
+++ b/datafusion/sqllogictest/test_files/insert_to_external.slt
@@ -106,7 +106,7 @@ Dml: op=[Insert Into] table=[table_without_values]
 --Projection: SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] 
ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 
FOLLOWING AS field1, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING 
AS field2
 ----Sort: aggregate_test_100.c1 ASC NULLS LAST
 ------Projection: SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(*) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING, aggregate_test_100.c1
---------WindowAggr: windowExpr=[[SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING AS COUNT(*) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
+--------WindowAggr: windowExpr=[[SUM(CAST(aggregate_test_100.c4 AS Int64)) 
PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS 
LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING AS COUNT(*) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
 ----------TableScan: aggregate_test_100 projection=[c1, c4, c9]
 physical_plan
 InsertExec: sink=ParquetSink(writer_mode=PutMultipart, file_groups=[])
@@ -169,7 +169,7 @@ FROM aggregate_test_100
 logical_plan
 Dml: op=[Insert Into] table=[table_without_values]
 --Projection: SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] 
ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 
FOLLOWING AS field1, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING 
AS field2
-----WindowAggr: windowExpr=[[SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING AS COUNT(*) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
+----WindowAggr: windowExpr=[[SUM(CAST(aggregate_test_100.c4 AS Int64)) 
PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS 
LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING AS COUNT(*) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
 ------TableScan: aggregate_test_100 projection=[c1, c4, c9]
 physical_plan
 InsertExec: sink=ParquetSink(writer_mode=PutMultipart, file_groups=[])
diff --git a/datafusion/sqllogictest/test_files/subquery.slt 
b/datafusion/sqllogictest/test_files/subquery.slt
index bdc8991b90..d88291b56f 100644
--- a/datafusion/sqllogictest/test_files/subquery.slt
+++ b/datafusion/sqllogictest/test_files/subquery.slt
@@ -173,7 +173,7 @@ Projection: t1.t1_id, __scalar_sq_1.SUM(t2.t2_int) AS t2_sum
 ----TableScan: t1 projection=[t1_id]
 ----SubqueryAlias: __scalar_sq_1
 ------Projection: SUM(t2.t2_int), t2.t2_id
---------Aggregate: groupBy=[[t2.t2_id]], aggr=[[SUM(t2.t2_int)]]
+--------Aggregate: groupBy=[[t2.t2_id]], aggr=[[SUM(CAST(t2.t2_int AS Int64))]]
 ----------TableScan: t2 projection=[t2_id, t2_int]
 physical_plan
 ProjectionExec: expr=[t1_id@0 as t1_id, SUM(t2.t2_int)@1 as t2_sum]
@@ -241,7 +241,7 @@ Projection: t1.t1_id, __scalar_sq_1.SUM(t2.t2_int) AS t2_sum
 ----TableScan: t1 projection=[t1_id]
 ----SubqueryAlias: __scalar_sq_1
 ------Projection: SUM(t2.t2_int), t2.t2_id
---------Aggregate: groupBy=[[t2.t2_id, Utf8("a")]], aggr=[[SUM(t2.t2_int)]]
+--------Aggregate: groupBy=[[t2.t2_id, Utf8("a")]], aggr=[[SUM(CAST(t2.t2_int 
AS Int64))]]
 ----------TableScan: t2 projection=[t2_id, t2_int]
 physical_plan
 ProjectionExec: expr=[t1_id@0 as t1_id, SUM(t2.t2_int)@1 as t2_sum]
@@ -278,7 +278,7 @@ Projection: t1.t1_id, __scalar_sq_1.SUM(t2.t2_int) AS t2_sum
 ----SubqueryAlias: __scalar_sq_1
 ------Projection: SUM(t2.t2_int), t2.t2_id
 --------Filter: SUM(t2.t2_int) < Int64(3)
-----------Aggregate: groupBy=[[t2.t2_id]], aggr=[[SUM(t2.t2_int)]]
+----------Aggregate: groupBy=[[t2.t2_id]], aggr=[[SUM(CAST(t2.t2_int AS 
Int64))]]
 ------------TableScan: t2 projection=[t2_id, t2_int]
 physical_plan
 ProjectionExec: expr=[t1_id@0 as t1_id, SUM(t2.t2_int)@1 as t2_sum]
@@ -483,7 +483,7 @@ logical_plan
 Filter: EXISTS (<subquery>)
 --Subquery:
 ----Projection: SUM(outer_ref(t1.t1_int) + t2.t2_id)
-------Aggregate: groupBy=[[]], aggr=[[SUM(outer_ref(t1.t1_int) + t2.t2_id)]]
+------Aggregate: groupBy=[[]], aggr=[[SUM(CAST(outer_ref(t1.t1_int) + t2.t2_id 
AS Int64))]]
 --------Filter: outer_ref(t1.t1_name) = t2.t2_name
 ----------TableScan: t2
 --TableScan: t1 projection=[t1_id, t1_name]
@@ -497,7 +497,7 @@ Filter: EXISTS (<subquery>)
 --Subquery:
 ----Projection: COUNT(*)
 ------Filter: SUM(outer_ref(t1.t1_int) + t2.t2_id) > Int64(0)
---------Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)) AS COUNT(*), 
SUM(outer_ref(t1.t1_int) + t2.t2_id)]]
+--------Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)) AS COUNT(*), 
SUM(CAST(outer_ref(t1.t1_int) + t2.t2_id AS Int64))]]
 ----------Filter: outer_ref(t1.t1_name) = t2.t2_name
 ------------TableScan: t2
 --TableScan: t1 projection=[t1_id, t1_name]
diff --git a/datafusion/sqllogictest/test_files/window.slt 
b/datafusion/sqllogictest/test_files/window.slt
index 36bb0d4628..207bf21101 100644
--- a/datafusion/sqllogictest/test_files/window.slt
+++ b/datafusion/sqllogictest/test_files/window.slt
@@ -1274,7 +1274,7 @@ logical_plan
 Projection: SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1, 
aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(*) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING
 --WindowAggr: windowExpr=[[COUNT(UInt8(1)) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING AS COUNT(*) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
 ----Projection: aggregate_test_100.c1, aggregate_test_100.c2, 
SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1, 
aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING
-------WindowAggr: windowExpr=[[SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
+------WindowAggr: windowExpr=[[SUM(CAST(aggregate_test_100.c4 AS Int64)) 
PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY 
[aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 
FOLLOWING]]
 --------TableScan: aggregate_test_100 projection=[c1, c2, c4]
 physical_plan
 ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@2 as 
SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1, 
aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(*) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS 
BETWEEN 1 PRE [...]
@@ -2540,21 +2540,25 @@ Projection: sum1, sum2, sum3, min1, min2, min3, max1, 
max2, max3, cnt1, cnt2, su
 --Limit: skip=0, fetch=5
 ----Sort: annotated_data_finite.inc_col DESC NULLS FIRST, fetch=5
 ------Projection: SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 
FOLLOWING AS sum1, SUM(annotated_data_finite.desc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 
FOLLOWING AS sum2, SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 
FOLLOWING AS sum3, MIN(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_f [...]
---------WindowAggr: windowExpr=[[SUM(annotated_data_finite.desc_col) ROWS 
BETWEEN 8 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) ROWS BETWEEN 8 PRECEDING 
AND 1 FOLLOWING AS COUNT(*) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING]]
+--------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_finite.desc_col AS 
Int64)) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) ROWS BETWEEN 
8 PRECEDING AND 1 FOLLOWING AS COUNT(*) ROWS BETWEEN 8 PRECEDING AND 1 
FOLLOWING]]
 ----------Projection: annotated_data_finite.inc_col, 
annotated_data_finite.desc_col, SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 
FOLLOWING, SUM(annotated_data_finite.desc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 
FOLLOWING, SUM(annotated_data_finite.desc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING, MIN(annotated [...]
-------------WindowAggr: windowExpr=[[SUM(annotated_data_finite.inc_col) ORDER 
BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 
FOLLOWING, SUM(annotated_data_finite.desc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 
FOLLOWING, SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 
FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite 
[...]
---------------WindowAggr: windowExpr=[[SUM(annotated_data_finite.inc_col) 
ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING 
AND 4 FOLLOWING, SUM(annotated_data_finite.desc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 
FOLLOWING, SUM(annotated_data_finite.desc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data [...]
-----------------TableScan: annotated_data_finite projection=[ts, inc_col, 
desc_col]
+------------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_finite.inc_col AS 
Int64)annotated_data_finite.inc_col AS annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 
FOLLOWING, SUM(CAST(annotated_data_finite.desc_col AS Int64)) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 
FOLLOWING, SUM(CAST(annotated_data_finite.inc_col AS 
Int64)annotated_data_finite.inc_col AS annotated_data_finite. [...]
+--------------Projection: CAST(annotated_data_finite.inc_col AS Int64) AS 
CAST(annotated_data_finite.inc_col AS Int64)annotated_data_finite.inc_col, 
annotated_data_finite.ts, annotated_data_finite.inc_col, 
annotated_data_finite.desc_col, SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 
FOLLOWING, SUM(annotated_data_finite.desc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 FO 
[...]
+----------------WindowAggr: 
windowExpr=[[SUM(CAST(annotated_data_finite.inc_col AS Int64)) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 
FOLLOWING, SUM(CAST(annotated_data_finite.desc_col AS 
Int64)annotated_data_finite.desc_col AS annotated_data_finite.desc_col) ORDER 
BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 
FOLLOWING, SUM(CAST(annotated_data_finite.desc_col AS 
Int64)annotated_data_finite.desc_col AS annotated_d [...]
+------------------Projection: CAST(annotated_data_finite.desc_col AS Int64) AS 
CAST(annotated_data_finite.desc_col AS Int64)annotated_data_finite.desc_col, 
annotated_data_finite.ts, annotated_data_finite.inc_col, 
annotated_data_finite.desc_col
+--------------------TableScan: annotated_data_finite projection=[ts, inc_col, 
desc_col]
 physical_plan
 ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, sum3@2 as sum3, min1@3 
as min1, min2@4 as min2, min3@5 as min3, max1@6 as max1, max2@7 as max2, max3@8 
as max3, cnt1@9 as cnt1, cnt2@10 as cnt2, sumr1@11 as sumr1, sumr2@12 as sumr2, 
sumr3@13 as sumr3, minr1@14 as minr1, minr2@15 as minr2, minr3@16 as minr3, 
maxr1@17 as maxr1, maxr2@18 as maxr2, maxr3@19 as maxr3, cntr1@20 as cntr1, 
cntr2@21 as cntr2, sum4@22 as sum4, cnt3@23 as cnt3]
 --GlobalLimitExec: skip=0, fetch=5
 ----SortExec: fetch=5, expr=[inc_col@24 DESC]
 ------ProjectionExec: expr=[SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 
FOLLOWING@13 as sum1, SUM(annotated_data_finite.desc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 
FOLLOWING@14 as sum2, SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 
FOLLOWING@15 as sum3, MIN(annotated_data_finite.inc_col) ORDER B [...]
 --------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite.desc_col) ROWS 
BETWEEN 8 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"SUM(annotated_data_finite.desc_col) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING", 
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(8)), 
end_bound: Following(UInt64(1)) }, COUNT(*) ROWS BETWEEN 8 PRECEDING AND 1 
FOLLOWING: Ok(Field { name: "COUNT(*) ROWS BETWEEN  [...]
-----------ProjectionExec: expr=[inc_col@1 as inc_col, desc_col@2 as desc_col, 
SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC 
NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING@3 as 
SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC 
NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING, 
SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC 
NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING@4 as SUM(annotate [...]
+----------ProjectionExec: expr=[inc_col@2 as inc_col, desc_col@3 as desc_col, 
SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC 
NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING@4 as 
SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC 
NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING, 
SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC 
NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING@5 as SUM(annotate [...]
 ------------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite.inc_col) 
ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING 
AND 1 FOLLOWING: Ok(Field { name: "SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(Int32(10)), end_bound: Follow [...]
---------------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite.inc_col) 
ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING 
AND 4 FOLLOWING: Ok(Field { name: "SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(Int32(4)), end_bound: Fol [...]
-----------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts, 
inc_col, desc_col], output_ordering=[ts@0 ASC NULLS LAST], has_header=true
+--------------ProjectionExec: expr=[CAST(inc_col@2 AS Int64) as 
CAST(annotated_data_finite.inc_col AS Int64)annotated_data_finite.inc_col, ts@1 
as ts, inc_col@2 as inc_col, desc_col@3 as desc_col, 
SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC 
NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING@4 as 
SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC 
NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING, 
SUM(annotated_data_finite.des [...]
+----------------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite.inc_col) 
ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING 
AND 4 FOLLOWING: Ok(Field { name: "SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(Int32(4)), end_bound: F [...]
+------------------ProjectionExec: expr=[CAST(desc_col@2 AS Int64) as 
CAST(annotated_data_finite.desc_col AS Int64)annotated_data_finite.desc_col, 
ts@0 as ts, inc_col@1 as inc_col, desc_col@2 as desc_col]
+--------------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts, 
inc_col, desc_col], output_ordering=[ts@0 ASC NULLS LAST], has_header=true
 
 query IIIIIIIIIIIIIIIIIIIIIIII
 SELECT
@@ -2702,8 +2706,8 @@ Projection: sum1, sum2, min1, min2, max1, max2, count1, 
count2, avg1, avg2
 --Limit: skip=0, fetch=5
 ----Sort: annotated_data_finite.inc_col ASC NULLS LAST, fetch=5
 ------Projection: SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
5 FOLLOWING AS sum1, SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING AS sum2, MIN(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
5 FOLLOWING AS min1, MIN(annotated_data_finite.inc_col) OR [...]
---------WindowAggr: windowExpr=[[SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
5 FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
5 FOLLOWING, MAX(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
5 FOLLOWING, COUNT(annotated_data_finite.inc_col) ORDER BY [a [...]
-----------WindowAggr: windowExpr=[[SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING, MAX(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING, COUNT(annotated_data_finite.inc_col) ORD [...]
+--------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_finite.inc_col AS 
Int64)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 
UNBOUNDED PRECEDING AND 5 FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER 
BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND 5 FOLLOWING, MAX(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
5 FOLLOWING, COUNT(annotated_data_finite.inc_c [...]
+----------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_finite.inc_col AS 
Int64)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 
PRECEDING AND UNBOUNDED FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING, MAX(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING, COUNT(annotated_data_fini [...]
 ------------TableScan: annotated_data_finite projection=[ts, inc_col]
 physical_plan
 ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, min1@2 as min1, min2@3 
as min2, max1@4 as max1, max2@5 as max2, count1@6 as count1, count2@7 as 
count2, avg1@8 as avg1, avg2@9 as avg2]
@@ -2801,8 +2805,8 @@ Projection: sum1, sum2, count1, count2
 --Limit: skip=0, fetch=5
 ----Sort: annotated_data_infinite.ts ASC NULLS LAST, fetch=5
 ------Projection: SUM(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING AS sum1, SUM(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING AS sum2, COUNT(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING AS count1, COUNT(annotated_data_inf [...]
---------WindowAggr: windowExpr=[[SUM(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING, COUNT(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING]]
-----------WindowAggr: windowExpr=[[SUM(annotated_data_infinite.inc_col) ORDER 
BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING, COUNT(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING]]
+--------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite.inc_col AS 
Int64)) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN 
UNBOUNDED PRECEDING AND 1 FOLLOWING, COUNT(annotated_data_infinite.inc_col) 
ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED 
PRECEDING AND 1 FOLLOWING]]
+----------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite.inc_col AS 
Int64)) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 
PRECEDING AND UNBOUNDED FOLLOWING, COUNT(annotated_data_infinite.inc_col) ORDER 
BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING]]
 ------------TableScan: annotated_data_infinite projection=[ts, inc_col]
 physical_plan
 ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, count1@2 as count1, 
count2@3 as count2]
@@ -2847,8 +2851,8 @@ Projection: sum1, sum2, count1, count2
 --Limit: skip=0, fetch=5
 ----Sort: annotated_data_infinite.ts ASC NULLS LAST, fetch=5
 ------Projection: SUM(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING AS sum1, SUM(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING AS sum2, COUNT(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING AS count1, COUNT(annotated_data_inf [...]
---------WindowAggr: windowExpr=[[SUM(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING, COUNT(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING]]
-----------WindowAggr: windowExpr=[[SUM(annotated_data_infinite.inc_col) ORDER 
BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING, COUNT(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING]]
+--------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite.inc_col AS 
Int64)) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN 
UNBOUNDED PRECEDING AND 1 FOLLOWING, COUNT(annotated_data_infinite.inc_col) 
ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED 
PRECEDING AND 1 FOLLOWING]]
+----------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite.inc_col AS 
Int64)) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 
PRECEDING AND UNBOUNDED FOLLOWING, COUNT(annotated_data_infinite.inc_col) ORDER 
BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING]]
 ------------TableScan: annotated_data_infinite projection=[ts, inc_col]
 physical_plan
 ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, count1@2 as count1, 
count2@3 as count2]
@@ -2942,15 +2946,16 @@ EXPLAIN SELECT a, b, c,
 logical_plan
 Projection: annotated_data_infinite2.a, annotated_data_infinite2.b, 
annotated_data_infinite2.c, SUM(annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC 
NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING AS sum1, 
SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS [...]
 --Limit: skip=0, fetch=5
-----WindowAggr: windowExpr=[[SUM(annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS 
LAST, annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC 
NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, 
SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.d] ORDER 
BY [annotated_data_infinite2.a ASC NULLS LAST, annotated_data_infinite2.b ASC 
NULLS LAST, annotated_data_infinite2.c ASC NULLS L [...]
-------WindowAggr: windowExpr=[[SUM(annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a, 
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_infinite2.c) 
PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a, 
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] ROWS BETWEEN CURRENT ROW AND 1 FOLLOWING]]
---------WindowAggr: windowExpr=[[SUM(annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING, SUM(annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 
FOLLOWING]]
-----------WindowAggr: windowExpr=[[SUM(annotated_data_infinite2.c) PARTITION 
BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC 
NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, 
SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS 
LAST, annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 1 FOLL [...]
-------------WindowAggr: windowExpr=[[SUM(annotated_data_infinite2.c) PARTITION 
BY [annotated_data_infinite2.a, annotated_data_infinite2.b, 
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_infinite2.c) 
PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b, 
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] ROWS BETWEEN 5 PRECEDING AND CURRENT ROW]]
---------------WindowAggr: windowExpr=[[SUM(annotated_data_infinite2.c) 
PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING, SUM(annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 
FOLLOWING]]
-----------------TableScan: annotated_data_infinite2 projection=[a, b, c, d]
+----WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.c AS 
Int64)annotated_data_infinite2.c AS annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS 
LAST, annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC 
NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, 
SUM(CAST(annotated_data_infinite2.c AS Int64)annotated_data_infinite2.c AS 
annotated_data_infinite2.c) PARTITION BY [annotated_data_infin [...]
+------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.c AS 
Int64)annotated_data_infinite2.c AS annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a, 
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, 
SUM(CAST(annotated_data_infinite2.c AS Int64)annotated_data_infinite2.c AS 
annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.b, 
annotated_data_infin [...]
+--------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.c AS 
Int64)annotated_data_infinite2.c AS annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING, SUM(CAST(annotated_data_infinite2.c AS 
Int64)annotated_data_infinite2.c AS annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotate 
[...]
+----------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.c AS 
Int64)annotated_data_infinite2.c AS annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC 
NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, 
SUM(CAST(annotated_data_infinite2.c AS Int64)annotated_data_infinite2.c AS 
annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, a [...]
+------------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.c AS 
Int64)annotated_data_infinite2.c AS annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b, 
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, 
SUM(CAST(annotated_data_infinite2.c AS Int64)annotated_data_infinite2.c AS 
annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, 
annotated_data [...]
+--------------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.c AS 
Int64)annotated_data_infinite2.c AS annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING, SUM(CAST(annotated_data_infinite2.c AS 
Int64)annotated_data_infinite2.c AS annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [an [...]
+----------------Projection: CAST(annotated_data_infinite2.c AS Int64) AS 
CAST(annotated_data_infinite2.c AS Int64)annotated_data_infinite2.c, 
annotated_data_infinite2.a, annotated_data_infinite2.b, 
annotated_data_infinite2.c, annotated_data_infinite2.d
+------------------TableScan: annotated_data_infinite2 projection=[a, b, c, d]
 physical_plan
-ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c, 
SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS 
LAST, annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING@8 as sum1, SUM(annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC NULL 
[...]
+ProjectionExec: expr=[a@1 as a, b@2 as b, c@3 as c, 
SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS 
LAST, annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING@9 as sum1, SUM(annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC NULL 
[...]
 --GlobalLimitExec: skip=0, fetch=5
 ----BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS 
LAST, annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC 
NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.d] 
ORDER BY [annotated_data_infinite2.a ASC NULLS LAST, annotated_data_infinite2.b 
ASC NULLS LAST, annotated_data_inf [...]
 ------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a, 
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.b, 
annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FO 
[...]
@@ -2958,7 +2963,8 @@ ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c, 
SUM(annotated_data_infinite2
 ----------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c) PARTITION 
BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC 
NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS 
LAST, annotated_data_infinite2.c ASC NULLS LAST] [...]
 ------------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c) 
PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b, 
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.b, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AN [...]
 --------------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c) 
PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING: Ok(Field { name: "SUM(annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING", data_type: Int64, nullable: true, dict [...]
-----------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, 
c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS 
LAST, c@2 ASC NULLS LAST], has_header=true
+----------------ProjectionExec: expr=[CAST(c@2 AS Int64) as 
CAST(annotated_data_infinite2.c AS Int64)annotated_data_infinite2.c, a@0 as a, 
b@1 as b, c@2 as c, d@3 as d]
+------------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, 
c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS 
LAST, c@2 ASC NULLS LAST], has_header=true
 
 
 query IIIIIIIIIIIIIII
@@ -3010,29 +3016,31 @@ logical_plan
 Limit: skip=0, fetch=5
 --Sort: annotated_data_finite2.c ASC NULLS LAST, fetch=5
 ----Projection: annotated_data_finite2.a, annotated_data_finite2.b, 
annotated_data_finite2.c, SUM(annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY 
[annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS 
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING AS sum1, 
SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, 
annotated_data_finite2.d] ORDER BY [annotated_data_finite2.b ASC NULLS LAST, 
annotated_dat [...]
-------WindowAggr: windowExpr=[[SUM(annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.d] ORDER BY [annotated_data_finite2.a ASC NULLS LAST, 
annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS 
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_finite2.c) 
PARTITION BY [annotated_data_finite2.d] ORDER BY [annotated_data_finite2.a ASC 
NULLS LAST, annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c 
ASC NULLS LAST] ROWS BETWEEN  [...]
---------WindowAggr: windowExpr=[[SUM(annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.b, annotated_data_finite2.a, annotated_data_finite2.d] 
ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 
1 FOLLOWING, SUM(annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.b, annotated_data_finite2.a, annotated_data_finite2.d] 
ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN CURRENT ROW AND 
1 FOLLOWING]]
-----------WindowAggr: windowExpr=[[SUM(annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.b, annotated_data_finite2.a] ORDER BY 
[annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING, SUM(annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.b, annotated_data_finite2.a] ORDER BY 
[annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 
FOLLOWING]]
-------------WindowAggr: windowExpr=[[SUM(annotated_data_finite2.c) PARTITION 
BY [annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY 
[annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS 
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_finite2.c) 
PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY 
[annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS 
LAST] ROWS BETWEEN 1 FOLLOWING AND 5 FOLLOWING]]
---------------WindowAggr: windowExpr=[[SUM(annotated_data_finite2.c) PARTITION 
BY [annotated_data_finite2.a, annotated_data_finite2.b, 
annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] 
ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_finite2.c) 
PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, 
annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] 
ROWS BETWEEN 5 PRECEDING AND CURRENT ROW]]
-----------------WindowAggr: windowExpr=[[SUM(annotated_data_finite2.c) 
PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY 
[annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING, SUM(annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY 
[annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 
FOLLOWING]]
-------------------TableScan: annotated_data_finite2 projection=[a, b, c, d]
+------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_finite2.c AS 
Int64)annotated_data_finite2.c AS annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.d] ORDER BY [annotated_data_finite2.a ASC NULLS LAST, 
annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS 
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, 
SUM(CAST(annotated_data_finite2.c AS Int64)annotated_data_finite2.c AS 
annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.d] ORDER BY [an 
[...]
+--------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_finite2.c AS 
Int64)annotated_data_finite2.c AS annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.b, annotated_data_finite2.a, annotated_data_finite2.d] 
ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 
1 FOLLOWING, SUM(CAST(annotated_data_finite2.c AS 
Int64)annotated_data_finite2.c AS annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.b, annotated_data_finite2.a, annotated_data [...]
+----------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_finite2.c AS 
Int64)annotated_data_finite2.c AS annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.b, annotated_data_finite2.a] ORDER BY 
[annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING, SUM(CAST(annotated_data_finite2.c AS Int64)annotated_data_finite2.c 
AS annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.b, 
annotated_data_finite2.a] ORDER BY [annotated_data_finite2.c ASC [...]
+------------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_finite2.c AS 
Int64)annotated_data_finite2.c AS annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY 
[annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS 
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, 
SUM(CAST(annotated_data_finite2.c AS Int64)annotated_data_finite2.c AS 
annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, 
annotated_data_finite [...]
+--------------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_finite2.c AS 
Int64)annotated_data_finite2.c AS annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] 
ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 
1 FOLLOWING, SUM(CAST(annotated_data_finite2.c AS 
Int64)annotated_data_finite2.c AS annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.a, annotated_data_finite2.b, annotate [...]
+----------------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_finite2.c AS 
Int64)annotated_data_finite2.c AS annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY 
[annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING, SUM(CAST(annotated_data_finite2.c AS Int64)annotated_data_finite2.c 
AS annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, 
annotated_data_finite2.b] ORDER BY [annotated_data_finite2 [...]
+------------------Projection: CAST(annotated_data_finite2.c AS Int64) AS 
CAST(annotated_data_finite2.c AS Int64)annotated_data_finite2.c, 
annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.c, 
annotated_data_finite2.d
+--------------------TableScan: annotated_data_finite2 projection=[a, b, c, d]
 physical_plan
 GlobalLimitExec: skip=0, fetch=5
 --SortExec: fetch=5, expr=[c@2 ASC NULLS LAST]
-----ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c, 
SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, 
annotated_data_finite2.d] ORDER BY [annotated_data_finite2.b ASC NULLS LAST, 
annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING@8 as sum1, SUM(annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY 
[annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS 
LAST] ROWS BET [...]
+----ProjectionExec: expr=[a@1 as a, b@2 as b, c@3 as c, 
SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, 
annotated_data_finite2.d] ORDER BY [annotated_data_finite2.b ASC NULLS LAST, 
annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING@9 as sum1, SUM(annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY 
[annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS 
LAST] ROWS BET [...]
 ------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.d] ORDER BY [annotated_data_finite2.a ASC NULLS LAST, 
annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS 
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.d] ORDER BY 
[annotated_data_finite2.a ASC NULLS LAST, annotated_data_finite2.b ASC NULLS 
LAST, annotated_data_finite2.c ASC NULLS [...]
---------SortExec: expr=[d@3 ASC NULLS LAST,a@0 ASC NULLS LAST,b@1 ASC NULLS 
LAST,c@2 ASC NULLS LAST]
+--------SortExec: expr=[d@4 ASC NULLS LAST,a@1 ASC NULLS LAST,b@2 ASC NULLS 
LAST,c@3 ASC NULLS LAST]
 ----------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite2.c) PARTITION 
BY [annotated_data_finite2.b, annotated_data_finite2.a, 
annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] 
ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.b, 
annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY 
[annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING", data_t [...]
-------------SortExec: expr=[b@1 ASC NULLS LAST,a@0 ASC NULLS LAST,d@3 ASC 
NULLS LAST,c@2 ASC NULLS LAST]
+------------SortExec: expr=[b@2 ASC NULLS LAST,a@1 ASC NULLS LAST,d@4 ASC 
NULLS LAST,c@3 ASC NULLS LAST]
 --------------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite2.c) 
PARTITION BY [annotated_data_finite2.b, annotated_data_finite2.a] ORDER BY 
[annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING: Ok(Field { name: "SUM(annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.b, annotated_data_finite2.a] ORDER BY 
[annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ [...]
-----------------SortExec: expr=[b@1 ASC NULLS LAST,a@0 ASC NULLS LAST,c@2 ASC 
NULLS LAST]
+----------------SortExec: expr=[b@2 ASC NULLS LAST,a@1 ASC NULLS LAST,c@3 ASC 
NULLS LAST]
 ------------------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite2.c) 
PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY 
[annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS 
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, 
annotated_data_finite2.d] ORDER BY [annotated_data_finite2.b ASC NULLS LAST, 
annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEE [...]
---------------------SortExec: expr=[a@0 ASC NULLS LAST,d@3 ASC NULLS LAST,b@1 
ASC NULLS LAST,c@2 ASC NULLS LAST]
+--------------------SortExec: expr=[a@1 ASC NULLS LAST,d@4 ASC NULLS LAST,b@2 
ASC NULLS LAST,c@3 ASC NULLS LAST]
 ----------------------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite2.c) 
PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, 
annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] 
ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, 
annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY 
[annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOW 
[...]
-------------------------SortExec: expr=[a@0 ASC NULLS LAST,b@1 ASC NULLS 
LAST,d@3 ASC NULLS LAST,c@2 ASC NULLS LAST]
+------------------------SortExec: expr=[a@1 ASC NULLS LAST,b@2 ASC NULLS 
LAST,d@4 ASC NULLS LAST,c@3 ASC NULLS LAST]
 --------------------------BoundedWindowAggExec: 
wdw=[SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, 
annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] 
ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, 
annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] 
ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, 
dict_id: [...]
-----------------------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, 
c, d], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS 
LAST], has_header=true
+----------------------------ProjectionExec: expr=[CAST(c@2 AS Int64) as 
CAST(annotated_data_finite2.c AS Int64)annotated_data_finite2.c, a@0 as a, b@1 
as b, c@2 as c, d@3 as d]
+------------------------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, 
c, d], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS 
LAST], has_header=true
 
 
 query IIIIIIIIIIIIIII

Reply via email to