alamb commented on a change in pull request #1387:
URL: https://github.com/apache/arrow-datafusion/pull/1387#discussion_r760561390



##########
File path: datafusion/src/execution/context.rs
##########
@@ -2058,7 +2058,7 @@ mod tests {
         .await
         .unwrap_err();
 
-        assert_eq!(results.to_string(), "Error during planning: Coercion from 
[Timestamp(Nanosecond, None)] to the signature Uniform(1, [Int8, Int16, Int32, 
Int64, UInt8, UInt16, UInt32, UInt64, Float32, Float64]) failed.");
+        assert_eq!(results.to_string(), "Error during planning: The function 
Sum do not support the Timestamp(Nanosecond, None).");

Review comment:
       Would it be possible to add the valid signatures into this error 
message? The new wording is more readable, but we did lose some information 
about what type signatures are valid

##########
File path: datafusion/src/physical_plan/coercion_rule/aggregate_rule.rs
##########
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Support the coercion rule for aggregate function.
+
+use crate::arrow::datatypes::Schema;
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::aggregates::AggregateFunction;
+use crate::physical_plan::expressions::{
+    is_avg_support_arg_type, is_sum_support_arg_type, try_cast,
+};
+use crate::physical_plan::functions::{Signature, TypeSignature};
+use crate::physical_plan::PhysicalExpr;
+use arrow::datatypes::DataType;
+use std::ops::Deref;
+use std::sync::Arc;
+
+pub fn coerce_types(
+    agg_fun: &AggregateFunction,
+    input_types: &[DataType],
+    signature: &Signature,
+) -> Result<Vec<DataType>> {
+    match signature.type_signature {
+        TypeSignature::Uniform(agg_count, _) | TypeSignature::Any(agg_count) 
=> {
+            if input_types.len() != agg_count {
+                return Err(DataFusionError::Plan(format!("The function {:?} 
expect argument number is {:?}, but the input argument number is {:?}",

Review comment:
       ```suggestion
                   return Err(DataFusionError::Plan(format!("The function {:?} 
expects {:?} arguments, but {:?} were provided",
   ```

##########
File path: datafusion/src/physical_plan/coercion_rule/aggregate_rule.rs
##########
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Support the coercion rule for aggregate function.
+
+use crate::arrow::datatypes::Schema;
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::aggregates::AggregateFunction;
+use crate::physical_plan::expressions::{
+    is_avg_support_arg_type, is_sum_support_arg_type, try_cast,
+};
+use crate::physical_plan::functions::{Signature, TypeSignature};
+use crate::physical_plan::PhysicalExpr;
+use arrow::datatypes::DataType;
+use std::ops::Deref;
+use std::sync::Arc;
+
+pub fn coerce_types(
+    agg_fun: &AggregateFunction,
+    input_types: &[DataType],
+    signature: &Signature,
+) -> Result<Vec<DataType>> {
+    match signature.type_signature {
+        TypeSignature::Uniform(agg_count, _) | TypeSignature::Any(agg_count) 
=> {
+            if input_types.len() != agg_count {
+                return Err(DataFusionError::Plan(format!("The function {:?} 
expect argument number is {:?}, but the input argument number is {:?}",
+                                                         agg_fun, agg_count, 
input_types.len())));
+            }
+        }
+        _ => {
+            return Err(DataFusionError::Plan(format!(
+                "The aggregate coercion rule don't support this {:?}",
+                signature
+            )));
+        }
+    };
+    match agg_fun {
+        AggregateFunction::Count | AggregateFunction::ApproxDistinct => {
+            Ok(input_types.to_vec())
+        }
+        AggregateFunction::ArrayAgg => Ok(input_types.to_vec()),
+        AggregateFunction::Min | AggregateFunction::Max => {
+            // min and max support the dictionary data type
+            // unpack the dictionary to get the value
+            get_min_max_result_type(input_types)
+        }
+        AggregateFunction::Sum => {
+            // Refer to 
https://www.postgresql.org/docs/8.2/functions-aggregate.html doc
+            // smallint, int, bigint, real, double precision, decimal, or 
interval.
+            if !is_sum_support_arg_type(&input_types[0]) {
+                return Err(DataFusionError::Plan(format!(
+                    "The function {:?} do not support the {:?}.",
+                    agg_fun, input_types[0]
+                )));
+            }
+            Ok(input_types.to_vec())
+        }
+        AggregateFunction::Avg => {
+            // Refer to 
https://www.postgresql.org/docs/8.2/functions-aggregate.html doc
+            // smallint, int, bigint, real, double precision, decimal, or 
interval
+            if !is_avg_support_arg_type(&input_types[0]) {
+                return Err(DataFusionError::Plan(format!(
+                    "The function {:?} do not support the {:?}.",
+                    agg_fun, input_types[0]
+                )));
+            }
+            Ok(input_types.to_vec())
+        }
+    }
+}
+
+fn get_min_max_result_type(input_types: &[DataType]) -> Result<Vec<DataType>> {
+    // min and max support the dictionary data type
+    // unpack the dictionary to get the value
+    match &input_types[0] {

Review comment:
       I think min and max should only have a single input argument 
(`input_types.len()` is zero) so this is probably ok. I suggest an explicit 
check like `assert_eq!(input_types.len(), 1)` in addition as a defensive check

##########
File path: datafusion/src/physical_plan/coercion_rule/aggregate_rule.rs
##########
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Support the coercion rule for aggregate function.
+
+use crate::arrow::datatypes::Schema;
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::aggregates::AggregateFunction;
+use crate::physical_plan::expressions::{
+    is_avg_support_arg_type, is_sum_support_arg_type, try_cast,
+};
+use crate::physical_plan::functions::{Signature, TypeSignature};
+use crate::physical_plan::PhysicalExpr;
+use arrow::datatypes::DataType;
+use std::ops::Deref;
+use std::sync::Arc;
+
+pub fn coerce_types(

Review comment:
       I think some docstrings would help here (perhaps a pointer to the module 
level documentation as is done in `type_coercion.rs`)

##########
File path: datafusion/src/physical_plan/aggregates.rs
##########
@@ -262,6 +266,131 @@ pub fn signature(fun: &AggregateFunction) -> Signature {
 mod tests {
     use super::*;
     use crate::error::Result;
+    use crate::physical_plan::expressions::{ApproxDistinct, ArrayAgg, Count, 
Max, Min};
+
+    #[test]
+    fn test_count_arragg_approx_expr() -> Result<()> {
+        let funcs = vec![
+            AggregateFunction::Count,
+            AggregateFunction::ArrayAgg,
+            AggregateFunction::ApproxDistinct,
+        ];
+        let data_types = vec![
+            DataType::UInt32,
+            DataType::Int32,
+            DataType::Float32,
+            DataType::Float64,
+            DataType::Decimal(10, 2),
+            DataType::Utf8,
+        ];
+        for fun in funcs {
+            for data_type in &data_types {
+                let input_schema =
+                    Schema::new(vec![Field::new("c1", data_type.clone(), 
true)]);
+                let input_phy_exprs: Vec<Arc<dyn PhysicalExpr>> = 
vec![Arc::new(
+                    expressions::Column::new_with_schema("c1", 
&input_schema).unwrap(),
+                )];
+                let result_agg_phy_exprs = create_aggregate_expr(
+                    &fun,
+                    false,
+                    &input_phy_exprs[0..1],
+                    &input_schema,
+                    "c1",
+                )?;
+                match fun {
+                    AggregateFunction::Count => {
+                        assert!(result_agg_phy_exprs.as_any().is::<Count>());
+                        assert_eq!("c1", result_agg_phy_exprs.name());
+                        assert_eq!(
+                            Field::new("c1", DataType::UInt64, true),
+                            result_agg_phy_exprs.field().unwrap()
+                        );
+                    }
+                    AggregateFunction::ApproxDistinct => {
+                        
assert!(result_agg_phy_exprs.as_any().is::<ApproxDistinct>());
+                        assert_eq!("c1", result_agg_phy_exprs.name());
+                        assert_eq!(
+                            Field::new("c1", DataType::UInt64, false),
+                            result_agg_phy_exprs.field().unwrap()
+                        );
+                    }
+                    AggregateFunction::ArrayAgg => {
+                        
assert!(result_agg_phy_exprs.as_any().is::<ArrayAgg>());
+                        assert_eq!("c1", result_agg_phy_exprs.name());
+                        assert_eq!(
+                            Field::new(
+                                "c1",
+                                DataType::List(Box::new(Field::new(
+                                    "item",
+                                    data_type.clone(),
+                                    true
+                                ))),
+                                false
+                            ),
+                            result_agg_phy_exprs.field().unwrap()
+                        );
+                    }
+                    _ => {}
+                };
+            }
+        }
+        Ok(())
+    }
+
+    #[test]
+    fn test_min_max_expr() -> Result<()> {
+        let funcs = vec![AggregateFunction::Min, AggregateFunction::Max];
+        let data_types = vec![
+            DataType::UInt32,
+            DataType::Int32,
+            DataType::Float32,
+            DataType::Float64,
+            DataType::Decimal(10, 2),
+            DataType::Utf8,
+        ];
+        for fun in funcs {
+            for data_type in &data_types {
+                let input_schema =
+                    Schema::new(vec![Field::new("c1", data_type.clone(), 
true)]);
+                let input_phy_exprs: Vec<Arc<dyn PhysicalExpr>> = 
vec![Arc::new(
+                    expressions::Column::new_with_schema("c1", 
&input_schema).unwrap(),
+                )];
+                let result_agg_phy_exprs = create_aggregate_expr(
+                    &fun,
+                    false,
+                    &input_phy_exprs[0..1],
+                    &input_schema,
+                    "c1",
+                )?;
+                match fun {
+                    AggregateFunction::Min => {
+                        assert!(result_agg_phy_exprs.as_any().is::<Min>());
+                        assert_eq!("c1", result_agg_phy_exprs.name());
+                        assert_eq!(
+                            Field::new("c1", data_type.clone(), true),
+                            result_agg_phy_exprs.field().unwrap()
+                        );
+                    }
+                    AggregateFunction::Max => {
+                        assert!(result_agg_phy_exprs.as_any().is::<Max>());
+                        assert_eq!("c1", result_agg_phy_exprs.name());
+                        assert_eq!(
+                            Field::new("c1", data_type.clone(), true),
+                            result_agg_phy_exprs.field().unwrap()
+                        );
+                    }
+                    _ => {}
+                };
+            }
+        }
+        Ok(())
+    }
+
+    #[test]
+    fn test_sum_avg_expr() -> Result<()> {
+        // TODO

Review comment:
       Do you intend to complete this `TODO` in this PR?

##########
File path: datafusion/src/physical_plan/coercion_rule/aggregate_rule.rs
##########
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Support the coercion rule for aggregate function.
+
+use crate::arrow::datatypes::Schema;
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::aggregates::AggregateFunction;
+use crate::physical_plan::expressions::{
+    is_avg_support_arg_type, is_sum_support_arg_type, try_cast,
+};
+use crate::physical_plan::functions::{Signature, TypeSignature};
+use crate::physical_plan::PhysicalExpr;
+use arrow::datatypes::DataType;
+use std::ops::Deref;
+use std::sync::Arc;
+
+pub fn coerce_types(
+    agg_fun: &AggregateFunction,
+    input_types: &[DataType],
+    signature: &Signature,
+) -> Result<Vec<DataType>> {
+    match signature.type_signature {
+        TypeSignature::Uniform(agg_count, _) | TypeSignature::Any(agg_count) 
=> {
+            if input_types.len() != agg_count {
+                return Err(DataFusionError::Plan(format!("The function {:?} 
expect argument number is {:?}, but the input argument number is {:?}",
+                                                         agg_fun, agg_count, 
input_types.len())));
+            }
+        }
+        _ => {
+            return Err(DataFusionError::Plan(format!(
+                "The aggregate coercion rule don't support this {:?}",
+                signature
+            )));
+        }
+    };
+    match agg_fun {
+        AggregateFunction::Count | AggregateFunction::ApproxDistinct => {
+            Ok(input_types.to_vec())
+        }
+        AggregateFunction::ArrayAgg => Ok(input_types.to_vec()),
+        AggregateFunction::Min | AggregateFunction::Max => {
+            // min and max support the dictionary data type
+            // unpack the dictionary to get the value
+            get_min_max_result_type(input_types)
+        }
+        AggregateFunction::Sum => {
+            // Refer to 
https://www.postgresql.org/docs/8.2/functions-aggregate.html doc
+            // smallint, int, bigint, real, double precision, decimal, or 
interval.
+            if !is_sum_support_arg_type(&input_types[0]) {
+                return Err(DataFusionError::Plan(format!(
+                    "The function {:?} do not support the {:?}.",
+                    agg_fun, input_types[0]
+                )));
+            }
+            Ok(input_types.to_vec())
+        }
+        AggregateFunction::Avg => {
+            // Refer to 
https://www.postgresql.org/docs/8.2/functions-aggregate.html doc
+            // smallint, int, bigint, real, double precision, decimal, or 
interval
+            if !is_avg_support_arg_type(&input_types[0]) {
+                return Err(DataFusionError::Plan(format!(
+                    "The function {:?} do not support the {:?}.",
+                    agg_fun, input_types[0]
+                )));
+            }
+            Ok(input_types.to_vec())
+        }
+    }
+}
+
+fn get_min_max_result_type(input_types: &[DataType]) -> Result<Vec<DataType>> {
+    // min and max support the dictionary data type
+    // unpack the dictionary to get the value
+    match &input_types[0] {
+        DataType::Dictionary(_, dict_value_type) => {
+            // TODO add checker, if the value type is complex data type
+            Ok(vec![dict_value_type.deref().clone()])
+        }
+        // TODO add checker for datatype which min and max supported
+        // For example, the `Struct` and `Map` type are not supported in the 
MIN and MAX function
+        _ => Ok(input_types.to_vec()),
+    }
+}
+
+pub fn coerce_exprs(
+    agg_fun: &AggregateFunction,
+    input_exprs: &[Arc<dyn PhysicalExpr>],
+    schema: &Schema,
+    signature: &Signature,
+) -> Result<Vec<Arc<dyn PhysicalExpr>>> {
+    if input_exprs.is_empty() {
+        return Ok(vec![]);
+    }
+    let input_types = input_exprs
+        .iter()
+        .map(|e| e.data_type(schema))
+        .collect::<Result<Vec<_>>>()?;
+
+    // get the coerced data types
+    let coerced_types = coerce_types(agg_fun, &input_types, signature)?;
+
+    // try cast if need
+    input_exprs
+        .iter()
+        .enumerate()
+        .map(|(i, expr)| try_cast(expr.clone(), schema, 
coerced_types[i].clone()))
+        .collect::<Result<Vec<_>>>()
+}
+
+#[cfg(test)]
+mod tests {
+    use crate::physical_plan::aggregates;
+    use crate::physical_plan::aggregates::{signature, AggregateFunction};
+    use crate::physical_plan::coercion_rule::aggregate_rule::coerce_types;
+    use arrow::datatypes::DataType;
+
+    #[test]
+    fn test_aggregate_coerce_types() {
+        // test input args with error number input types
+        let fun = AggregateFunction::Min;
+        let input_types = vec![DataType::Int64, DataType::Int32];
+        let signature = signature(&fun);
+        let result = coerce_types(&fun, &input_types, &signature);
+        assert_eq!("Error during planning: The function Min expect argument 
number is 1, but the input argument number is 2", 
result.unwrap_err().to_string());
+
+        // test input args is invalid data type for sum or avg
+        let fun = AggregateFunction::Sum;
+        let input_types = vec![DataType::Utf8];
+        let signature = aggregates::signature(&fun);
+        let result = coerce_types(&fun, &input_types, &signature);
+        assert_eq!(
+            "Error during planning: The function Sum do not support the Utf8.",
+            result.unwrap_err().to_string()
+        );
+        let fun = AggregateFunction::Avg;
+        let signature = aggregates::signature(&fun);
+        let result = coerce_types(&fun, &input_types, &signature);
+        assert_eq!(
+            "Error during planning: The function Avg do not support the Utf8.",
+            result.unwrap_err().to_string()
+        );
+
+        // test count, array_agg, approx_distinct, min, max.
+        // the coerced types is same with input types
+        let funs = vec![
+            AggregateFunction::Count,
+            AggregateFunction::ArrayAgg,
+            AggregateFunction::ApproxDistinct,
+            AggregateFunction::Min,
+            AggregateFunction::Max,
+        ];
+        let input_types = vec![
+            vec![DataType::Int32],
+            // vec![DataType::Decimal(10, 2)],

Review comment:
       Did you mean to leave this commented out?

##########
File path: datafusion/tests/sql.rs
##########
@@ -5612,10 +5612,9 @@ async fn test_aggregation_with_bad_arguments() -> 
Result<()> {
     let mut ctx = ExecutionContext::new();
     register_aggregate_csv(&mut ctx).await?;
     let sql = "SELECT COUNT(DISTINCT) FROM aggregate_test_100";
-    let logical_plan = ctx.create_logical_plan(sql)?;
-    let physical_plan = ctx.create_physical_plan(&logical_plan).await;
-    let err = physical_plan.unwrap_err();
-    assert_eq!(err.to_string(), "Error during planning: Invalid or wrong 
number of arguments passed to aggregate: 'COUNT(DISTINCT )'");
+    let logical_plan = ctx.create_logical_plan(sql);
+    let err = logical_plan.unwrap_err();
+    assert_eq!(err.to_string(), DataFusionError::Plan("The function Count 
expect argument number is 1, but the input argument number is 
0".to_string()).to_string());

Review comment:
       That is a nicer error message for sure 👍 

##########
File path: datafusion/src/physical_plan/coercion_rule/aggregate_rule.rs
##########
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Support the coercion rule for aggregate function.
+
+use crate::arrow::datatypes::Schema;
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::aggregates::AggregateFunction;
+use crate::physical_plan::expressions::{
+    is_avg_support_arg_type, is_sum_support_arg_type, try_cast,
+};
+use crate::physical_plan::functions::{Signature, TypeSignature};
+use crate::physical_plan::PhysicalExpr;
+use arrow::datatypes::DataType;
+use std::ops::Deref;
+use std::sync::Arc;
+
+pub fn coerce_types(
+    agg_fun: &AggregateFunction,
+    input_types: &[DataType],
+    signature: &Signature,
+) -> Result<Vec<DataType>> {
+    match signature.type_signature {
+        TypeSignature::Uniform(agg_count, _) | TypeSignature::Any(agg_count) 
=> {
+            if input_types.len() != agg_count {
+                return Err(DataFusionError::Plan(format!("The function {:?} 
expect argument number is {:?}, but the input argument number is {:?}",
+                                                         agg_fun, agg_count, 
input_types.len())));
+            }
+        }
+        _ => {
+            return Err(DataFusionError::Plan(format!(
+                "The aggregate coercion rule don't support this {:?}",
+                signature
+            )));
+        }
+    };
+    match agg_fun {
+        AggregateFunction::Count | AggregateFunction::ApproxDistinct => {
+            Ok(input_types.to_vec())
+        }
+        AggregateFunction::ArrayAgg => Ok(input_types.to_vec()),
+        AggregateFunction::Min | AggregateFunction::Max => {
+            // min and max support the dictionary data type
+            // unpack the dictionary to get the value
+            get_min_max_result_type(input_types)
+        }
+        AggregateFunction::Sum => {
+            // Refer to 
https://www.postgresql.org/docs/8.2/functions-aggregate.html doc
+            // smallint, int, bigint, real, double precision, decimal, or 
interval.
+            if !is_sum_support_arg_type(&input_types[0]) {
+                return Err(DataFusionError::Plan(format!(
+                    "The function {:?} do not support the {:?}.",
+                    agg_fun, input_types[0]
+                )));
+            }
+            Ok(input_types.to_vec())
+        }
+        AggregateFunction::Avg => {
+            // Refer to 
https://www.postgresql.org/docs/8.2/functions-aggregate.html doc
+            // smallint, int, bigint, real, double precision, decimal, or 
interval
+            if !is_avg_support_arg_type(&input_types[0]) {
+                return Err(DataFusionError::Plan(format!(
+                    "The function {:?} do not support the {:?}.",
+                    agg_fun, input_types[0]
+                )));
+            }
+            Ok(input_types.to_vec())
+        }
+    }
+}
+
+fn get_min_max_result_type(input_types: &[DataType]) -> Result<Vec<DataType>> {
+    // min and max support the dictionary data type
+    // unpack the dictionary to get the value
+    match &input_types[0] {
+        DataType::Dictionary(_, dict_value_type) => {
+            // TODO add checker, if the value type is complex data type
+            Ok(vec![dict_value_type.deref().clone()])
+        }
+        // TODO add checker for datatype which min and max supported
+        // For example, the `Struct` and `Map` type are not supported in the 
MIN and MAX function
+        _ => Ok(input_types.to_vec()),
+    }
+}
+
+pub fn coerce_exprs(
+    agg_fun: &AggregateFunction,
+    input_exprs: &[Arc<dyn PhysicalExpr>],
+    schema: &Schema,
+    signature: &Signature,
+) -> Result<Vec<Arc<dyn PhysicalExpr>>> {
+    if input_exprs.is_empty() {
+        return Ok(vec![]);
+    }
+    let input_types = input_exprs
+        .iter()
+        .map(|e| e.data_type(schema))
+        .collect::<Result<Vec<_>>>()?;
+
+    // get the coerced data types
+    let coerced_types = coerce_types(agg_fun, &input_types, signature)?;
+
+    // try cast if need
+    input_exprs
+        .iter()
+        .enumerate()
+        .map(|(i, expr)| try_cast(expr.clone(), schema, 
coerced_types[i].clone()))
+        .collect::<Result<Vec<_>>>()

Review comment:
       You might be able to avoid some clones here by using `into_iter()` and 
`zip` https://doc.rust-lang.org/std/iter/struct.Zip.html
   
   Something like (untested):
   ```suggestion
       input_exprs
           .iter().zip(coerced_types.into_iter())
           .map(|(expr, coerced_type)| try_cast(expr.clone(), schema, 
coerced_type))
           .collect::<Result<Vec<_>>>()
   ```

##########
File path: datafusion/src/physical_plan/coercion_rule/aggregate_rule.rs
##########
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Support the coercion rule for aggregate function.
+
+use crate::arrow::datatypes::Schema;
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::aggregates::AggregateFunction;
+use crate::physical_plan::expressions::{
+    is_avg_support_arg_type, is_sum_support_arg_type, try_cast,
+};
+use crate::physical_plan::functions::{Signature, TypeSignature};
+use crate::physical_plan::PhysicalExpr;
+use arrow::datatypes::DataType;
+use std::ops::Deref;
+use std::sync::Arc;
+
+pub fn coerce_types(
+    agg_fun: &AggregateFunction,
+    input_types: &[DataType],
+    signature: &Signature,
+) -> Result<Vec<DataType>> {
+    match signature.type_signature {
+        TypeSignature::Uniform(agg_count, _) | TypeSignature::Any(agg_count) 
=> {
+            if input_types.len() != agg_count {
+                return Err(DataFusionError::Plan(format!("The function {:?} 
expect argument number is {:?}, but the input argument number is {:?}",
+                                                         agg_fun, agg_count, 
input_types.len())));
+            }
+        }
+        _ => {
+            return Err(DataFusionError::Plan(format!(
+                "The aggregate coercion rule don't support this {:?}",
+                signature
+            )));
+        }
+    };
+    match agg_fun {
+        AggregateFunction::Count | AggregateFunction::ApproxDistinct => {
+            Ok(input_types.to_vec())
+        }
+        AggregateFunction::ArrayAgg => Ok(input_types.to_vec()),
+        AggregateFunction::Min | AggregateFunction::Max => {
+            // min and max support the dictionary data type
+            // unpack the dictionary to get the value
+            get_min_max_result_type(input_types)
+        }
+        AggregateFunction::Sum => {
+            // Refer to 
https://www.postgresql.org/docs/8.2/functions-aggregate.html doc
+            // smallint, int, bigint, real, double precision, decimal, or 
interval.
+            if !is_sum_support_arg_type(&input_types[0]) {
+                return Err(DataFusionError::Plan(format!(
+                    "The function {:?} do not support the {:?}.",
+                    agg_fun, input_types[0]
+                )));
+            }
+            Ok(input_types.to_vec())
+        }
+        AggregateFunction::Avg => {
+            // Refer to 
https://www.postgresql.org/docs/8.2/functions-aggregate.html doc
+            // smallint, int, bigint, real, double precision, decimal, or 
interval
+            if !is_avg_support_arg_type(&input_types[0]) {
+                return Err(DataFusionError::Plan(format!(
+                    "The function {:?} do not support the {:?}.",

Review comment:
       ```suggestion
                       "The function {:?} does not support inputs of type 
{:?}.",
   ```

##########
File path: datafusion/src/physical_plan/coercion_rule/aggregate_rule.rs
##########
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Support the coercion rule for aggregate function.
+
+use crate::arrow::datatypes::Schema;
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::aggregates::AggregateFunction;
+use crate::physical_plan::expressions::{
+    is_avg_support_arg_type, is_sum_support_arg_type, try_cast,
+};
+use crate::physical_plan::functions::{Signature, TypeSignature};
+use crate::physical_plan::PhysicalExpr;
+use arrow::datatypes::DataType;
+use std::ops::Deref;
+use std::sync::Arc;
+
+pub fn coerce_types(
+    agg_fun: &AggregateFunction,
+    input_types: &[DataType],
+    signature: &Signature,
+) -> Result<Vec<DataType>> {
+    match signature.type_signature {
+        TypeSignature::Uniform(agg_count, _) | TypeSignature::Any(agg_count) 
=> {
+            if input_types.len() != agg_count {
+                return Err(DataFusionError::Plan(format!("The function {:?} 
expect argument number is {:?}, but the input argument number is {:?}",
+                                                         agg_fun, agg_count, 
input_types.len())));
+            }
+        }
+        _ => {
+            return Err(DataFusionError::Plan(format!(
+                "The aggregate coercion rule don't support this {:?}",
+                signature
+            )));

Review comment:
       ```suggestion
           _ => {
               return Err(DataFusionError::Internal(format!(
                   "Aggregate functions only support uniform signatures. Got 
{:?}",
                   signature
               )));
   ```

##########
File path: datafusion/src/physical_plan/coercion_rule/aggregate_rule.rs
##########
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Support the coercion rule for aggregate function.
+
+use crate::arrow::datatypes::Schema;
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::aggregates::AggregateFunction;
+use crate::physical_plan::expressions::{
+    is_avg_support_arg_type, is_sum_support_arg_type, try_cast,
+};
+use crate::physical_plan::functions::{Signature, TypeSignature};
+use crate::physical_plan::PhysicalExpr;
+use arrow::datatypes::DataType;
+use std::ops::Deref;
+use std::sync::Arc;
+
+pub fn coerce_types(
+    agg_fun: &AggregateFunction,
+    input_types: &[DataType],
+    signature: &Signature,
+) -> Result<Vec<DataType>> {
+    match signature.type_signature {
+        TypeSignature::Uniform(agg_count, _) | TypeSignature::Any(agg_count) 
=> {
+            if input_types.len() != agg_count {
+                return Err(DataFusionError::Plan(format!("The function {:?} 
expect argument number is {:?}, but the input argument number is {:?}",
+                                                         agg_fun, agg_count, 
input_types.len())));
+            }
+        }
+        _ => {
+            return Err(DataFusionError::Plan(format!(
+                "The aggregate coercion rule don't support this {:?}",
+                signature
+            )));
+        }
+    };
+    match agg_fun {
+        AggregateFunction::Count | AggregateFunction::ApproxDistinct => {
+            Ok(input_types.to_vec())
+        }
+        AggregateFunction::ArrayAgg => Ok(input_types.to_vec()),
+        AggregateFunction::Min | AggregateFunction::Max => {
+            // min and max support the dictionary data type
+            // unpack the dictionary to get the value
+            get_min_max_result_type(input_types)
+        }
+        AggregateFunction::Sum => {
+            // Refer to 
https://www.postgresql.org/docs/8.2/functions-aggregate.html doc
+            // smallint, int, bigint, real, double precision, decimal, or 
interval.
+            if !is_sum_support_arg_type(&input_types[0]) {
+                return Err(DataFusionError::Plan(format!(
+                    "The function {:?} do not support the {:?}.",
+                    agg_fun, input_types[0]
+                )));
+            }
+            Ok(input_types.to_vec())
+        }
+        AggregateFunction::Avg => {
+            // Refer to 
https://www.postgresql.org/docs/8.2/functions-aggregate.html doc
+            // smallint, int, bigint, real, double precision, decimal, or 
interval
+            if !is_avg_support_arg_type(&input_types[0]) {
+                return Err(DataFusionError::Plan(format!(
+                    "The function {:?} do not support the {:?}.",
+                    agg_fun, input_types[0]
+                )));
+            }
+            Ok(input_types.to_vec())
+        }
+    }
+}
+
+fn get_min_max_result_type(input_types: &[DataType]) -> Result<Vec<DataType>> {
+    // min and max support the dictionary data type
+    // unpack the dictionary to get the value
+    match &input_types[0] {
+        DataType::Dictionary(_, dict_value_type) => {
+            // TODO add checker, if the value type is complex data type
+            Ok(vec![dict_value_type.deref().clone()])
+        }
+        // TODO add checker for datatype which min and max supported

Review comment:
       that is a good TODO. 

##########
File path: datafusion/src/physical_plan/coercion_rule/aggregate_rule.rs
##########
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Support the coercion rule for aggregate function.
+
+use crate::arrow::datatypes::Schema;
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::aggregates::AggregateFunction;
+use crate::physical_plan::expressions::{
+    is_avg_support_arg_type, is_sum_support_arg_type, try_cast,
+};
+use crate::physical_plan::functions::{Signature, TypeSignature};
+use crate::physical_plan::PhysicalExpr;
+use arrow::datatypes::DataType;
+use std::ops::Deref;
+use std::sync::Arc;
+
+pub fn coerce_types(
+    agg_fun: &AggregateFunction,
+    input_types: &[DataType],
+    signature: &Signature,
+) -> Result<Vec<DataType>> {
+    match signature.type_signature {
+        TypeSignature::Uniform(agg_count, _) | TypeSignature::Any(agg_count) 
=> {
+            if input_types.len() != agg_count {
+                return Err(DataFusionError::Plan(format!("The function {:?} 
expect argument number is {:?}, but the input argument number is {:?}",
+                                                         agg_fun, agg_count, 
input_types.len())));
+            }
+        }
+        _ => {
+            return Err(DataFusionError::Plan(format!(
+                "The aggregate coercion rule don't support this {:?}",
+                signature
+            )));
+        }
+    };
+    match agg_fun {
+        AggregateFunction::Count | AggregateFunction::ApproxDistinct => {
+            Ok(input_types.to_vec())
+        }
+        AggregateFunction::ArrayAgg => Ok(input_types.to_vec()),
+        AggregateFunction::Min | AggregateFunction::Max => {
+            // min and max support the dictionary data type
+            // unpack the dictionary to get the value
+            get_min_max_result_type(input_types)
+        }
+        AggregateFunction::Sum => {
+            // Refer to 
https://www.postgresql.org/docs/8.2/functions-aggregate.html doc
+            // smallint, int, bigint, real, double precision, decimal, or 
interval.
+            if !is_sum_support_arg_type(&input_types[0]) {
+                return Err(DataFusionError::Plan(format!(
+                    "The function {:?} do not support the {:?}.",

Review comment:
       ```suggestion
                       "The function {:?} does not support inputs of type 
{:?}.",
   ```

##########
File path: datafusion/src/physical_plan/coercion_rule/aggregate_rule.rs
##########
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Support the coercion rule for aggregate function.
+
+use crate::arrow::datatypes::Schema;
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::aggregates::AggregateFunction;
+use crate::physical_plan::expressions::{
+    is_avg_support_arg_type, is_sum_support_arg_type, try_cast,
+};
+use crate::physical_plan::functions::{Signature, TypeSignature};
+use crate::physical_plan::PhysicalExpr;
+use arrow::datatypes::DataType;
+use std::ops::Deref;
+use std::sync::Arc;
+
+pub fn coerce_types(
+    agg_fun: &AggregateFunction,
+    input_types: &[DataType],
+    signature: &Signature,
+) -> Result<Vec<DataType>> {
+    match signature.type_signature {
+        TypeSignature::Uniform(agg_count, _) | TypeSignature::Any(agg_count) 
=> {
+            if input_types.len() != agg_count {
+                return Err(DataFusionError::Plan(format!("The function {:?} 
expect argument number is {:?}, but the input argument number is {:?}",
+                                                         agg_fun, agg_count, 
input_types.len())));
+            }
+        }
+        _ => {
+            return Err(DataFusionError::Plan(format!(
+                "The aggregate coercion rule don't support this {:?}",
+                signature
+            )));
+        }
+    };
+    match agg_fun {
+        AggregateFunction::Count | AggregateFunction::ApproxDistinct => {
+            Ok(input_types.to_vec())
+        }
+        AggregateFunction::ArrayAgg => Ok(input_types.to_vec()),
+        AggregateFunction::Min | AggregateFunction::Max => {
+            // min and max support the dictionary data type
+            // unpack the dictionary to get the value
+            get_min_max_result_type(input_types)
+        }
+        AggregateFunction::Sum => {
+            // Refer to 
https://www.postgresql.org/docs/8.2/functions-aggregate.html doc
+            // smallint, int, bigint, real, double precision, decimal, or 
interval.
+            if !is_sum_support_arg_type(&input_types[0]) {
+                return Err(DataFusionError::Plan(format!(
+                    "The function {:?} do not support the {:?}.",
+                    agg_fun, input_types[0]
+                )));
+            }
+            Ok(input_types.to_vec())
+        }
+        AggregateFunction::Avg => {
+            // Refer to 
https://www.postgresql.org/docs/8.2/functions-aggregate.html doc
+            // smallint, int, bigint, real, double precision, decimal, or 
interval
+            if !is_avg_support_arg_type(&input_types[0]) {
+                return Err(DataFusionError::Plan(format!(
+                    "The function {:?} do not support the {:?}.",
+                    agg_fun, input_types[0]
+                )));
+            }
+            Ok(input_types.to_vec())
+        }
+    }
+}
+
+fn get_min_max_result_type(input_types: &[DataType]) -> Result<Vec<DataType>> {
+    // min and max support the dictionary data type
+    // unpack the dictionary to get the value
+    match &input_types[0] {
+        DataType::Dictionary(_, dict_value_type) => {
+            // TODO add checker, if the value type is complex data type
+            Ok(vec![dict_value_type.deref().clone()])
+        }
+        // TODO add checker for datatype which min and max supported
+        // For example, the `Struct` and `Map` type are not supported in the 
MIN and MAX function
+        _ => Ok(input_types.to_vec()),
+    }
+}
+
+pub fn coerce_exprs(
+    agg_fun: &AggregateFunction,
+    input_exprs: &[Arc<dyn PhysicalExpr>],
+    schema: &Schema,
+    signature: &Signature,
+) -> Result<Vec<Arc<dyn PhysicalExpr>>> {
+    if input_exprs.is_empty() {
+        return Ok(vec![]);
+    }
+    let input_types = input_exprs
+        .iter()
+        .map(|e| e.data_type(schema))
+        .collect::<Result<Vec<_>>>()?;
+
+    // get the coerced data types
+    let coerced_types = coerce_types(agg_fun, &input_types, signature)?;
+
+    // try cast if need
+    input_exprs
+        .iter()
+        .enumerate()
+        .map(|(i, expr)| try_cast(expr.clone(), schema, 
coerced_types[i].clone()))
+        .collect::<Result<Vec<_>>>()
+}
+
+#[cfg(test)]
+mod tests {
+    use crate::physical_plan::aggregates;
+    use crate::physical_plan::aggregates::{signature, AggregateFunction};
+    use crate::physical_plan::coercion_rule::aggregate_rule::coerce_types;
+    use arrow::datatypes::DataType;
+
+    #[test]
+    fn test_aggregate_coerce_types() {
+        // test input args with error number input types
+        let fun = AggregateFunction::Min;
+        let input_types = vec![DataType::Int64, DataType::Int32];
+        let signature = signature(&fun);
+        let result = coerce_types(&fun, &input_types, &signature);
+        assert_eq!("Error during planning: The function Min expect argument 
number is 1, but the input argument number is 2", 
result.unwrap_err().to_string());
+
+        // test input args is invalid data type for sum or avg
+        let fun = AggregateFunction::Sum;
+        let input_types = vec![DataType::Utf8];
+        let signature = aggregates::signature(&fun);
+        let result = coerce_types(&fun, &input_types, &signature);
+        assert_eq!(
+            "Error during planning: The function Sum do not support the Utf8.",
+            result.unwrap_err().to_string()
+        );
+        let fun = AggregateFunction::Avg;
+        let signature = aggregates::signature(&fun);
+        let result = coerce_types(&fun, &input_types, &signature);
+        assert_eq!(
+            "Error during planning: The function Avg do not support the Utf8.",
+            result.unwrap_err().to_string()
+        );
+
+        // test count, array_agg, approx_distinct, min, max.
+        // the coerced types is same with input types
+        let funs = vec![
+            AggregateFunction::Count,
+            AggregateFunction::ArrayAgg,
+            AggregateFunction::ApproxDistinct,
+            AggregateFunction::Min,
+            AggregateFunction::Max,
+        ];
+        let input_types = vec![
+            vec![DataType::Int32],
+            // vec![DataType::Decimal(10, 2)],
+            vec![DataType::Utf8],
+        ];
+        for fun in funs {
+            for input_type in &input_types {
+                let signature = aggregates::signature(&fun);
+                let result = coerce_types(&fun, input_type, &signature);
+                assert_eq!(*input_type, result.unwrap());
+            }
+        }
+        // test sum, avg
+        let funs = vec![AggregateFunction::Sum, AggregateFunction::Avg];
+        let input_types = vec![
+            vec![DataType::Int32],
+            vec![DataType::Float32],
+            // vec![DataType::Decimal(20, 3)],

Review comment:
       likewise, here I think should work now

##########
File path: datafusion/src/physical_plan/expressions/average.rs
##########
@@ -62,6 +62,24 @@ pub fn avg_return_type(arg_type: &DataType) -> 
Result<DataType> {
     }
 }
 
+pub(crate) fn is_avg_support_arg_type(arg_type: &DataType) -> bool {
+    // TODO support the interval
+    // TODO: do we need to support the unsigned data type?

Review comment:
       What do you mean by `the unsigned data type`? If you mean "do we support 
AVG(UInt8)" I think the answer is yes.

##########
File path: datafusion/src/physical_plan/expressions/mod.rs
##########
@@ -60,6 +60,7 @@ pub mod helpers {
 
 pub use approx_distinct::ApproxDistinct;
 pub use array_agg::ArrayAgg;
+pub(crate) use average::is_avg_support_arg_type;

Review comment:
       I don't think we need to export this function (aka no need to declare it 
`pub crate` in this function). Likewise below

##########
File path: datafusion/src/physical_plan/coercion_rule/mod.rs
##########
@@ -0,0 +1,19 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! define the coercion rule for different Expr type

Review comment:
       ```suggestion
   //! define coercion rules for aggregate functions
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to