This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 9f0e0164c7 Move Covariance (Population) covar_pop to be a User Defined 
Aggregate Function (#10418)
9f0e0164c7 is described below

commit 9f0e0164c73c834260f842f0ee942593707730bd
Author: Junhao Liu <[email protected]>
AuthorDate: Fri May 10 06:50:01 2024 -0600

    Move Covariance (Population) covar_pop to be a User Defined Aggregate 
Function (#10418)
    
    * move covariance
    
    * add sqllogictest
---
 datafusion/expr/src/aggregate_function.rs          |  10 +-
 datafusion/expr/src/type_coercion/aggregates.rs    |  10 -
 datafusion/functions-aggregate/src/covariance.rs   |  81 +++++
 datafusion/functions-aggregate/src/lib.rs          |   1 +
 datafusion/physical-expr/src/aggregate/build_in.rs |  11 -
 .../physical-expr/src/aggregate/covariance.rs      | 372 ---------------------
 datafusion/physical-expr/src/expressions/mod.rs    |   1 -
 datafusion/proto/proto/datafusion.proto            |   2 +-
 datafusion/proto/src/generated/pbjson.rs           |   3 -
 datafusion/proto/src/generated/prost.rs            |   4 +-
 datafusion/proto/src/logical_plan/from_proto.rs    |   1 -
 datafusion/proto/src/logical_plan/to_proto.rs      |   4 -
 datafusion/proto/src/physical_plan/to_proto.rs     |  14 +-
 .../proto/tests/cases/roundtrip_logical_plan.rs    |   3 +-
 datafusion/sqllogictest/test_files/aggregate.slt   | 180 ++++++++++
 15 files changed, 273 insertions(+), 424 deletions(-)

diff --git a/datafusion/expr/src/aggregate_function.rs 
b/datafusion/expr/src/aggregate_function.rs
index af8a682eff..0a7607498c 100644
--- a/datafusion/expr/src/aggregate_function.rs
+++ b/datafusion/expr/src/aggregate_function.rs
@@ -63,8 +63,6 @@ pub enum AggregateFunction {
     Stddev,
     /// Standard Deviation (Population)
     StddevPop,
-    /// Covariance (Population)
-    CovariancePop,
     /// Correlation
     Correlation,
     /// Slope from linear regression
@@ -126,7 +124,6 @@ impl AggregateFunction {
             VariancePop => "VAR_POP",
             Stddev => "STDDEV",
             StddevPop => "STDDEV_POP",
-            CovariancePop => "COVAR_POP",
             Correlation => "CORR",
             RegrSlope => "REGR_SLOPE",
             RegrIntercept => "REGR_INTERCEPT",
@@ -181,7 +178,6 @@ impl FromStr for AggregateFunction {
             "string_agg" => AggregateFunction::StringAgg,
             // statistical
             "corr" => AggregateFunction::Correlation,
-            "covar_pop" => AggregateFunction::CovariancePop,
             "stddev" => AggregateFunction::Stddev,
             "stddev_pop" => AggregateFunction::StddevPop,
             "stddev_samp" => AggregateFunction::Stddev,
@@ -255,9 +251,6 @@ impl AggregateFunction {
             AggregateFunction::VariancePop => {
                 variance_return_type(&coerced_data_types[0])
             }
-            AggregateFunction::CovariancePop => {
-                covariance_return_type(&coerced_data_types[0])
-            }
             AggregateFunction::Correlation => {
                 correlation_return_type(&coerced_data_types[0])
             }
@@ -349,8 +342,7 @@ impl AggregateFunction {
                 Signature::uniform(1, NUMERICS.to_vec(), Volatility::Immutable)
             }
             AggregateFunction::NthValue => Signature::any(2, 
Volatility::Immutable),
-            AggregateFunction::CovariancePop
-            | AggregateFunction::Correlation
+            AggregateFunction::Correlation
             | AggregateFunction::RegrSlope
             | AggregateFunction::RegrIntercept
             | AggregateFunction::RegrCount
diff --git a/datafusion/expr/src/type_coercion/aggregates.rs 
b/datafusion/expr/src/type_coercion/aggregates.rs
index 39726d7d0e..57c0b6f4ed 100644
--- a/datafusion/expr/src/type_coercion/aggregates.rs
+++ b/datafusion/expr/src/type_coercion/aggregates.rs
@@ -183,16 +183,6 @@ pub fn coerce_types(
             }
             Ok(vec![Float64, Float64])
         }
-        AggregateFunction::CovariancePop => {
-            if !is_covariance_support_arg_type(&input_types[0]) {
-                return plan_err!(
-                    "The function {:?} does not support inputs of type {:?}.",
-                    agg_fun,
-                    input_types[0]
-                );
-            }
-            Ok(vec![Float64, Float64])
-        }
         AggregateFunction::Stddev | AggregateFunction::StddevPop => {
             if !is_stddev_support_arg_type(&input_types[0]) {
                 return plan_err!(
diff --git a/datafusion/functions-aggregate/src/covariance.rs 
b/datafusion/functions-aggregate/src/covariance.rs
index 130b193996..1210e1529d 100644
--- a/datafusion/functions-aggregate/src/covariance.rs
+++ b/datafusion/functions-aggregate/src/covariance.rs
@@ -43,6 +43,14 @@ make_udaf_expr_and_func!(
     covar_samp_udaf
 );
 
+make_udaf_expr_and_func!(
+    CovariancePopulation,
+    covar_pop,
+    y x,
+    "Computes the population covariance.",
+    covar_pop_udaf
+);
+
 pub struct CovarianceSample {
     signature: Signature,
     aliases: Vec<String>,
@@ -120,6 +128,79 @@ impl AggregateUDFImpl for CovarianceSample {
     }
 }
 
+pub struct CovariancePopulation {
+    signature: Signature,
+}
+
+impl Debug for CovariancePopulation {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        f.debug_struct("CovariancePopulation")
+            .field("name", &self.name())
+            .field("signature", &self.signature)
+            .finish()
+    }
+}
+
+impl Default for CovariancePopulation {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl CovariancePopulation {
+    pub fn new() -> Self {
+        Self {
+            signature: Signature::uniform(2, NUMERICS.to_vec(), 
Volatility::Immutable),
+        }
+    }
+}
+
+impl AggregateUDFImpl for CovariancePopulation {
+    fn as_any(&self) -> &dyn std::any::Any {
+        self
+    }
+
+    fn name(&self) -> &str {
+        "covar_pop"
+    }
+
+    fn signature(&self) -> &Signature {
+        &self.signature
+    }
+
+    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
+        if !arg_types[0].is_numeric() {
+            return plan_err!("Covariance requires numeric input types");
+        }
+
+        Ok(DataType::Float64)
+    }
+
+    fn state_fields(
+        &self,
+        name: &str,
+        _value_type: DataType,
+        _ordering_fields: Vec<Field>,
+    ) -> Result<Vec<Field>> {
+        Ok(vec![
+            Field::new(format_state_name(name, "count"), DataType::UInt64, 
true),
+            Field::new(format_state_name(name, "mean1"), DataType::Float64, 
true),
+            Field::new(format_state_name(name, "mean2"), DataType::Float64, 
true),
+            Field::new(
+                format_state_name(name, "algo_const"),
+                DataType::Float64,
+                true,
+            ),
+        ])
+    }
+
+    fn accumulator(&self, _acc_args: AccumulatorArgs) -> Result<Box<dyn 
Accumulator>> {
+        Ok(Box::new(CovarianceAccumulator::try_new(
+            StatsType::Population,
+        )?))
+    }
+}
+
 /// An accumulator to compute covariance
 /// The algorithm used is an online implementation and numerically stable. It 
is derived from the following paper
 /// for calculating variance:
diff --git a/datafusion/functions-aggregate/src/lib.rs 
b/datafusion/functions-aggregate/src/lib.rs
index d4e4d3a5f3..e76a43e398 100644
--- a/datafusion/functions-aggregate/src/lib.rs
+++ b/datafusion/functions-aggregate/src/lib.rs
@@ -75,6 +75,7 @@ pub fn register_all(registry: &mut dyn FunctionRegistry) -> 
Result<()> {
     let functions: Vec<Arc<AggregateUDF>> = vec![
         first_last::first_value_udaf(),
         covariance::covar_samp_udaf(),
+        covariance::covar_pop_udaf(),
     ];
 
     functions.into_iter().try_for_each(|udf| {
diff --git a/datafusion/physical-expr/src/aggregate/build_in.rs 
b/datafusion/physical-expr/src/aggregate/build_in.rs
index 36af875473..145e7feadf 100644
--- a/datafusion/physical-expr/src/aggregate/build_in.rs
+++ b/datafusion/physical-expr/src/aggregate/build_in.rs
@@ -181,17 +181,6 @@ pub fn create_aggregate_expr(
         (AggregateFunction::VariancePop, true) => {
             return not_impl_err!("VAR_POP(DISTINCT) aggregations are not 
available");
         }
-        (AggregateFunction::CovariancePop, false) => {
-            Arc::new(expressions::CovariancePop::new(
-                input_phy_exprs[0].clone(),
-                input_phy_exprs[1].clone(),
-                name,
-                data_type,
-            ))
-        }
-        (AggregateFunction::CovariancePop, true) => {
-            return not_impl_err!("COVAR_POP(DISTINCT) aggregations are not 
available");
-        }
         (AggregateFunction::Stddev, false) => 
Arc::new(expressions::Stddev::new(
             input_phy_exprs[0].clone(),
             name,
diff --git a/datafusion/physical-expr/src/aggregate/covariance.rs 
b/datafusion/physical-expr/src/aggregate/covariance.rs
index 272f1d8be2..639d8a098c 100644
--- a/datafusion/physical-expr/src/aggregate/covariance.rs
+++ b/datafusion/physical-expr/src/aggregate/covariance.rs
@@ -17,111 +17,17 @@
 
 //! Defines physical expressions that can evaluated at runtime during query 
execution
 
-use std::any::Any;
-use std::sync::Arc;
-
-use crate::{AggregateExpr, PhysicalExpr};
 use arrow::array::Float64Array;
 use arrow::{
     array::{ArrayRef, UInt64Array},
     compute::cast,
     datatypes::DataType,
-    datatypes::Field,
 };
 use datafusion_common::{downcast_value, unwrap_or_internal_err, ScalarValue};
 use datafusion_common::{DataFusionError, Result};
 use datafusion_expr::Accumulator;
 
 use crate::aggregate::stats::StatsType;
-use crate::aggregate::utils::down_cast_any_ref;
-use crate::expressions::format_state_name;
-
-/// COVAR_POP aggregate expression
-#[derive(Debug)]
-pub struct CovariancePop {
-    name: String,
-    expr1: Arc<dyn PhysicalExpr>,
-    expr2: Arc<dyn PhysicalExpr>,
-}
-
-impl CovariancePop {
-    /// Create a new COVAR_POP aggregate function
-    pub fn new(
-        expr1: Arc<dyn PhysicalExpr>,
-        expr2: Arc<dyn PhysicalExpr>,
-        name: impl Into<String>,
-        data_type: DataType,
-    ) -> Self {
-        // the result of covariance just support FLOAT64 data type.
-        assert!(matches!(data_type, DataType::Float64));
-        Self {
-            name: name.into(),
-            expr1,
-            expr2,
-        }
-    }
-}
-
-impl AggregateExpr for CovariancePop {
-    /// Return a reference to Any that can be used for downcasting
-    fn as_any(&self) -> &dyn Any {
-        self
-    }
-
-    fn field(&self) -> Result<Field> {
-        Ok(Field::new(&self.name, DataType::Float64, true))
-    }
-
-    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
-        Ok(Box::new(CovarianceAccumulator::try_new(
-            StatsType::Population,
-        )?))
-    }
-
-    fn state_fields(&self) -> Result<Vec<Field>> {
-        Ok(vec![
-            Field::new(
-                format_state_name(&self.name, "count"),
-                DataType::UInt64,
-                true,
-            ),
-            Field::new(
-                format_state_name(&self.name, "mean1"),
-                DataType::Float64,
-                true,
-            ),
-            Field::new(
-                format_state_name(&self.name, "mean2"),
-                DataType::Float64,
-                true,
-            ),
-            Field::new(
-                format_state_name(&self.name, "algo_const"),
-                DataType::Float64,
-                true,
-            ),
-        ])
-    }
-
-    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
-        vec![self.expr1.clone(), self.expr2.clone()]
-    }
-
-    fn name(&self) -> &str {
-        &self.name
-    }
-}
-
-impl PartialEq<dyn Any> for CovariancePop {
-    fn eq(&self, other: &dyn Any) -> bool {
-        down_cast_any_ref(other)
-            .downcast_ref::<Self>()
-            .map(|x| {
-                self.name == x.name && self.expr1.eq(&x.expr1) && 
self.expr2.eq(&x.expr2)
-            })
-            .unwrap_or(false)
-    }
-}
 
 /// An accumulator to compute covariance
 /// The algrithm used is an online implementation and numerically stable. It 
is derived from the following paper
@@ -319,281 +225,3 @@ impl Accumulator for CovarianceAccumulator {
         std::mem::size_of_val(self)
     }
 }
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-    use crate::aggregate::utils::get_accum_scalar_values_as_arrays;
-    use crate::expressions::col;
-    use crate::expressions::tests::aggregate;
-    use crate::generic_test_op2;
-    use arrow::{array::*, datatypes::*};
-
-    #[test]
-    fn covariance_f64_1() -> Result<()> {
-        let a: ArrayRef = Arc::new(Float64Array::from(vec![1_f64, 2_f64, 
3_f64]));
-        let b: ArrayRef = Arc::new(Float64Array::from(vec![4_f64, 5_f64, 
6_f64]));
-
-        generic_test_op2!(
-            a,
-            b,
-            DataType::Float64,
-            DataType::Float64,
-            CovariancePop,
-            ScalarValue::from(0.6666666666666666_f64)
-        )
-    }
-
-    #[test]
-    fn covariance_f64_5() -> Result<()> {
-        let a: ArrayRef = Arc::new(Float64Array::from(vec![1.1_f64, 2_f64, 
3_f64]));
-        let b: ArrayRef = Arc::new(Float64Array::from(vec![4.1_f64, 5_f64, 
6_f64]));
-
-        generic_test_op2!(
-            a,
-            b,
-            DataType::Float64,
-            DataType::Float64,
-            CovariancePop,
-            ScalarValue::from(0.6022222222222223_f64)
-        )
-    }
-
-    #[test]
-    fn covariance_f64_6() -> Result<()> {
-        let a = Arc::new(Float64Array::from(vec![
-            1_f64, 2_f64, 3_f64, 1.1_f64, 2.2_f64, 3.3_f64,
-        ]));
-        let b = Arc::new(Float64Array::from(vec![
-            4_f64, 5_f64, 6_f64, 4.4_f64, 5.5_f64, 6.6_f64,
-        ]));
-
-        generic_test_op2!(
-            a,
-            b,
-            DataType::Float64,
-            DataType::Float64,
-            CovariancePop,
-            ScalarValue::from(0.7616666666666666_f64)
-        )
-    }
-
-    #[test]
-    fn covariance_i32() -> Result<()> {
-        let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
-        let b: ArrayRef = Arc::new(Int32Array::from(vec![4, 5, 6]));
-
-        generic_test_op2!(
-            a,
-            b,
-            DataType::Int32,
-            DataType::Int32,
-            CovariancePop,
-            ScalarValue::from(0.6666666666666666_f64)
-        )
-    }
-
-    #[test]
-    fn covariance_u32() -> Result<()> {
-        let a: ArrayRef = Arc::new(UInt32Array::from(vec![1_u32, 2_u32, 
3_u32]));
-        let b: ArrayRef = Arc::new(UInt32Array::from(vec![4_u32, 5_u32, 
6_u32]));
-        generic_test_op2!(
-            a,
-            b,
-            DataType::UInt32,
-            DataType::UInt32,
-            CovariancePop,
-            ScalarValue::from(0.6666666666666666_f64)
-        )
-    }
-
-    #[test]
-    fn covariance_f32() -> Result<()> {
-        let a: ArrayRef = Arc::new(Float32Array::from(vec![1_f32, 2_f32, 
3_f32]));
-        let b: ArrayRef = Arc::new(Float32Array::from(vec![4_f32, 5_f32, 
6_f32]));
-        generic_test_op2!(
-            a,
-            b,
-            DataType::Float32,
-            DataType::Float32,
-            CovariancePop,
-            ScalarValue::from(0.6666666666666666_f64)
-        )
-    }
-
-    #[test]
-    fn covariance_i32_with_nulls_1() -> Result<()> {
-        let a: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, 
Some(3)]));
-        let b: ArrayRef = Arc::new(Int32Array::from(vec![Some(4), None, 
Some(6)]));
-
-        generic_test_op2!(
-            a,
-            b,
-            DataType::Int32,
-            DataType::Int32,
-            CovariancePop,
-            ScalarValue::from(1_f64)
-        )
-    }
-
-    #[test]
-    fn covariance_i32_with_nulls_2() -> Result<()> {
-        let a: ArrayRef = Arc::new(Int32Array::from(vec![
-            Some(1),
-            None,
-            Some(2),
-            None,
-            Some(3),
-            None,
-        ]));
-        let b: ArrayRef = Arc::new(Int32Array::from(vec![
-            Some(4),
-            Some(9),
-            Some(5),
-            Some(8),
-            Some(6),
-            None,
-        ]));
-
-        generic_test_op2!(
-            a,
-            b,
-            DataType::Int32,
-            DataType::Int32,
-            CovariancePop,
-            ScalarValue::from(0.6666666666666666_f64)
-        )
-    }
-
-    #[test]
-    fn covariance_pop_i32_all_nulls() -> Result<()> {
-        let a: ArrayRef = Arc::new(Int32Array::from(vec![None, None]));
-        let b: ArrayRef = Arc::new(Int32Array::from(vec![None, None]));
-
-        generic_test_op2!(
-            a,
-            b,
-            DataType::Int32,
-            DataType::Int32,
-            CovariancePop,
-            ScalarValue::Float64(None)
-        )
-    }
-
-    #[test]
-    fn covariance_pop_1_input() -> Result<()> {
-        let a: ArrayRef = Arc::new(Float64Array::from(vec![1_f64]));
-        let b: ArrayRef = Arc::new(Float64Array::from(vec![2_f64]));
-
-        generic_test_op2!(
-            a,
-            b,
-            DataType::Float64,
-            DataType::Float64,
-            CovariancePop,
-            ScalarValue::from(0_f64)
-        )
-    }
-
-    #[test]
-    fn covariance_f64_merge_1() -> Result<()> {
-        let a = Arc::new(Float64Array::from(vec![1_f64, 2_f64, 3_f64]));
-        let b = Arc::new(Float64Array::from(vec![4_f64, 5_f64, 6_f64]));
-        let c = Arc::new(Float64Array::from(vec![1.1_f64, 2.2_f64, 3.3_f64]));
-        let d = Arc::new(Float64Array::from(vec![4.4_f64, 5.5_f64, 6.6_f64]));
-
-        let schema = Schema::new(vec![
-            Field::new("a", DataType::Float64, true),
-            Field::new("b", DataType::Float64, true),
-        ]);
-
-        let batch1 = RecordBatch::try_new(Arc::new(schema.clone()), vec![a, 
b])?;
-        let batch2 = RecordBatch::try_new(Arc::new(schema.clone()), vec![c, 
d])?;
-
-        let agg1 = Arc::new(CovariancePop::new(
-            col("a", &schema)?,
-            col("b", &schema)?,
-            "bla".to_string(),
-            DataType::Float64,
-        ));
-
-        let agg2 = Arc::new(CovariancePop::new(
-            col("a", &schema)?,
-            col("b", &schema)?,
-            "bla".to_string(),
-            DataType::Float64,
-        ));
-
-        let actual = merge(&batch1, &batch2, agg1, agg2)?;
-        assert!(actual == ScalarValue::from(0.7616666666666666));
-
-        Ok(())
-    }
-
-    #[test]
-    fn covariance_f64_merge_2() -> Result<()> {
-        let a = Arc::new(Float64Array::from(vec![1_f64, 2_f64, 3_f64]));
-        let b = Arc::new(Float64Array::from(vec![4_f64, 5_f64, 6_f64]));
-        let c = Arc::new(Float64Array::from(vec![None]));
-        let d = Arc::new(Float64Array::from(vec![None]));
-
-        let schema = Schema::new(vec![
-            Field::new("a", DataType::Float64, true),
-            Field::new("b", DataType::Float64, true),
-        ]);
-
-        let batch1 = RecordBatch::try_new(Arc::new(schema.clone()), vec![a, 
b])?;
-        let batch2 = RecordBatch::try_new(Arc::new(schema.clone()), vec![c, 
d])?;
-
-        let agg1 = Arc::new(CovariancePop::new(
-            col("a", &schema)?,
-            col("b", &schema)?,
-            "bla".to_string(),
-            DataType::Float64,
-        ));
-
-        let agg2 = Arc::new(CovariancePop::new(
-            col("a", &schema)?,
-            col("b", &schema)?,
-            "bla".to_string(),
-            DataType::Float64,
-        ));
-
-        let actual = merge(&batch1, &batch2, agg1, agg2)?;
-        assert!(actual == ScalarValue::from(0.6666666666666666));
-
-        Ok(())
-    }
-
-    fn merge(
-        batch1: &RecordBatch,
-        batch2: &RecordBatch,
-        agg1: Arc<dyn AggregateExpr>,
-        agg2: Arc<dyn AggregateExpr>,
-    ) -> Result<ScalarValue> {
-        let mut accum1 = agg1.create_accumulator()?;
-        let mut accum2 = agg2.create_accumulator()?;
-        let expr1 = agg1.expressions();
-        let expr2 = agg2.expressions();
-
-        let values1 = expr1
-            .iter()
-            .map(|e| {
-                e.evaluate(batch1)
-                    .and_then(|v| v.into_array(batch1.num_rows()))
-            })
-            .collect::<Result<Vec<_>>>()?;
-        let values2 = expr2
-            .iter()
-            .map(|e| {
-                e.evaluate(batch2)
-                    .and_then(|v| v.into_array(batch2.num_rows()))
-            })
-            .collect::<Result<Vec<_>>>()?;
-        accum1.update_batch(&values1)?;
-        accum2.update_batch(&values2)?;
-        let state2 = get_accum_scalar_values_as_arrays(accum2.as_mut())?;
-        accum1.merge_batch(&state2)?;
-        accum1.evaluate()
-    }
-}
diff --git a/datafusion/physical-expr/src/expressions/mod.rs 
b/datafusion/physical-expr/src/expressions/mod.rs
index 3efa965d14..c16b609e23 100644
--- a/datafusion/physical-expr/src/expressions/mod.rs
+++ b/datafusion/physical-expr/src/expressions/mod.rs
@@ -52,7 +52,6 @@ pub use crate::aggregate::build_in::create_aggregate_expr;
 pub use crate::aggregate::correlation::Correlation;
 pub use crate::aggregate::count::Count;
 pub use crate::aggregate::count_distinct::DistinctCount;
-pub use crate::aggregate::covariance::CovariancePop;
 pub use crate::aggregate::grouping::Grouping;
 pub use crate::aggregate::median::Median;
 pub use crate::aggregate::min_max::{Max, Min};
diff --git a/datafusion/proto/proto/datafusion.proto 
b/datafusion/proto/proto/datafusion.proto
index c057ab8acd..311a0bf863 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -549,7 +549,7 @@ enum AggregateFunction {
   VARIANCE = 7;
   VARIANCE_POP = 8;
   // COVARIANCE = 9;
-  COVARIANCE_POP = 10;
+  // COVARIANCE_POP = 10;
   STDDEV = 11;
   STDDEV_POP = 12;
   CORRELATION = 13;
diff --git a/datafusion/proto/src/generated/pbjson.rs 
b/datafusion/proto/src/generated/pbjson.rs
index 994703c5fc..a1a1417358 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -430,7 +430,6 @@ impl serde::Serialize for AggregateFunction {
             Self::ArrayAgg => "ARRAY_AGG",
             Self::Variance => "VARIANCE",
             Self::VariancePop => "VARIANCE_POP",
-            Self::CovariancePop => "COVARIANCE_POP",
             Self::Stddev => "STDDEV",
             Self::StddevPop => "STDDEV_POP",
             Self::Correlation => "CORRELATION",
@@ -477,7 +476,6 @@ impl<'de> serde::Deserialize<'de> for AggregateFunction {
             "ARRAY_AGG",
             "VARIANCE",
             "VARIANCE_POP",
-            "COVARIANCE_POP",
             "STDDEV",
             "STDDEV_POP",
             "CORRELATION",
@@ -553,7 +551,6 @@ impl<'de> serde::Deserialize<'de> for AggregateFunction {
                     "ARRAY_AGG" => Ok(AggregateFunction::ArrayAgg),
                     "VARIANCE" => Ok(AggregateFunction::Variance),
                     "VARIANCE_POP" => Ok(AggregateFunction::VariancePop),
-                    "COVARIANCE_POP" => Ok(AggregateFunction::CovariancePop),
                     "STDDEV" => Ok(AggregateFunction::Stddev),
                     "STDDEV_POP" => Ok(AggregateFunction::StddevPop),
                     "CORRELATION" => Ok(AggregateFunction::Correlation),
diff --git a/datafusion/proto/src/generated/prost.rs 
b/datafusion/proto/src/generated/prost.rs
index fc23a9ea05..706794e380 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -2835,7 +2835,7 @@ pub enum AggregateFunction {
     Variance = 7,
     VariancePop = 8,
     /// COVARIANCE = 9;
-    CovariancePop = 10,
+    /// COVARIANCE_POP = 10;
     Stddev = 11,
     StddevPop = 12,
     Correlation = 13,
@@ -2881,7 +2881,6 @@ impl AggregateFunction {
             AggregateFunction::ArrayAgg => "ARRAY_AGG",
             AggregateFunction::Variance => "VARIANCE",
             AggregateFunction::VariancePop => "VARIANCE_POP",
-            AggregateFunction::CovariancePop => "COVARIANCE_POP",
             AggregateFunction::Stddev => "STDDEV",
             AggregateFunction::StddevPop => "STDDEV_POP",
             AggregateFunction::Correlation => "CORRELATION",
@@ -2924,7 +2923,6 @@ impl AggregateFunction {
             "ARRAY_AGG" => Some(Self::ArrayAgg),
             "VARIANCE" => Some(Self::Variance),
             "VARIANCE_POP" => Some(Self::VariancePop),
-            "COVARIANCE_POP" => Some(Self::CovariancePop),
             "STDDEV" => Some(Self::Stddev),
             "STDDEV_POP" => Some(Self::StddevPop),
             "CORRELATION" => Some(Self::Correlation),
diff --git a/datafusion/proto/src/logical_plan/from_proto.rs 
b/datafusion/proto/src/logical_plan/from_proto.rs
index 35d4c6409b..585bcad7f3 100644
--- a/datafusion/proto/src/logical_plan/from_proto.rs
+++ b/datafusion/proto/src/logical_plan/from_proto.rs
@@ -428,7 +428,6 @@ impl From<protobuf::AggregateFunction> for 
AggregateFunction {
             protobuf::AggregateFunction::ArrayAgg => Self::ArrayAgg,
             protobuf::AggregateFunction::Variance => Self::Variance,
             protobuf::AggregateFunction::VariancePop => Self::VariancePop,
-            protobuf::AggregateFunction::CovariancePop => Self::CovariancePop,
             protobuf::AggregateFunction::Stddev => Self::Stddev,
             protobuf::AggregateFunction::StddevPop => Self::StddevPop,
             protobuf::AggregateFunction::Correlation => Self::Correlation,
diff --git a/datafusion/proto/src/logical_plan/to_proto.rs 
b/datafusion/proto/src/logical_plan/to_proto.rs
index 80acd12e4e..4c29d7551b 100644
--- a/datafusion/proto/src/logical_plan/to_proto.rs
+++ b/datafusion/proto/src/logical_plan/to_proto.rs
@@ -369,7 +369,6 @@ impl From<&AggregateFunction> for 
protobuf::AggregateFunction {
             AggregateFunction::ArrayAgg => Self::ArrayAgg,
             AggregateFunction::Variance => Self::Variance,
             AggregateFunction::VariancePop => Self::VariancePop,
-            AggregateFunction::CovariancePop => Self::CovariancePop,
             AggregateFunction::Stddev => Self::Stddev,
             AggregateFunction::StddevPop => Self::StddevPop,
             AggregateFunction::Correlation => Self::Correlation,
@@ -673,9 +672,6 @@ pub fn serialize_expr(
                     AggregateFunction::VariancePop => {
                         protobuf::AggregateFunction::VariancePop
                     }
-                    AggregateFunction::CovariancePop => {
-                        protobuf::AggregateFunction::CovariancePop
-                    }
                     AggregateFunction::Stddev => 
protobuf::AggregateFunction::Stddev,
                     AggregateFunction::StddevPop => {
                         protobuf::AggregateFunction::StddevPop
diff --git a/datafusion/proto/src/physical_plan/to_proto.rs 
b/datafusion/proto/src/physical_plan/to_proto.rs
index 3bc71f5f4c..162a2f28e1 100644
--- a/datafusion/proto/src/physical_plan/to_proto.rs
+++ b/datafusion/proto/src/physical_plan/to_proto.rs
@@ -25,12 +25,12 @@ use datafusion::physical_expr::{PhysicalSortExpr, 
ScalarFunctionExpr};
 use datafusion::physical_plan::expressions::{
     ApproxDistinct, ApproxMedian, ApproxPercentileCont, 
ApproxPercentileContWithWeight,
     ArrayAgg, Avg, BinaryExpr, BitAnd, BitOr, BitXor, BoolAnd, BoolOr, 
CaseExpr,
-    CastExpr, Column, Correlation, Count, CovariancePop, CumeDist, 
DistinctArrayAgg,
-    DistinctBitXor, DistinctCount, DistinctSum, FirstValue, Grouping, 
InListExpr,
-    IsNotNullExpr, IsNullExpr, LastValue, Literal, Max, Median, Min, 
NegativeExpr,
-    NotExpr, NthValue, NthValueAgg, Ntile, OrderSensitiveArrayAgg, Rank, 
RankType, Regr,
-    RegrType, RowNumber, Stddev, StddevPop, StringAgg, Sum, TryCastExpr, 
Variance,
-    VariancePop, WindowShift,
+    CastExpr, Column, Correlation, Count, CumeDist, DistinctArrayAgg, 
DistinctBitXor,
+    DistinctCount, DistinctSum, FirstValue, Grouping, InListExpr, 
IsNotNullExpr,
+    IsNullExpr, LastValue, Literal, Max, Median, Min, NegativeExpr, NotExpr, 
NthValue,
+    NthValueAgg, Ntile, OrderSensitiveArrayAgg, Rank, RankType, Regr, RegrType,
+    RowNumber, Stddev, StddevPop, StringAgg, Sum, TryCastExpr, Variance, 
VariancePop,
+    WindowShift,
 };
 use datafusion::physical_plan::udaf::AggregateFunctionExpr;
 use datafusion::physical_plan::windows::{BuiltInWindowExpr, 
PlainAggregateWindowExpr};
@@ -291,8 +291,6 @@ fn aggr_expr_to_aggr_fn(expr: &dyn AggregateExpr) -> 
Result<AggrFn> {
         protobuf::AggregateFunction::Variance
     } else if aggr_expr.downcast_ref::<VariancePop>().is_some() {
         protobuf::AggregateFunction::VariancePop
-    } else if aggr_expr.downcast_ref::<CovariancePop>().is_some() {
-        protobuf::AggregateFunction::CovariancePop
     } else if aggr_expr.downcast_ref::<Stddev>().is_some() {
         protobuf::AggregateFunction::Stddev
     } else if aggr_expr.downcast_ref::<StddevPop>().is_some() {
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs 
b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index 3800b672b5..819e206156 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -30,7 +30,7 @@ use datafusion::datasource::provider::TableProviderFactory;
 use datafusion::datasource::TableProvider;
 use datafusion::execution::context::SessionState;
 use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
-use datafusion::functions_aggregate::covariance::covar_samp;
+use datafusion::functions_aggregate::covariance::{covar_pop, covar_samp};
 use datafusion::functions_aggregate::expr_fn::first_value;
 use datafusion::prelude::*;
 use datafusion::test_util::{TestTableFactory, TestTableProvider};
@@ -616,6 +616,7 @@ async fn roundtrip_expr_api() -> Result<()> {
         array_replace_all(make_array(vec![lit(1), lit(2), lit(3)]), lit(2), 
lit(4)),
         first_value(vec![lit(1)], false, None, None, None),
         covar_samp(lit(1.5), lit(2.2), false, None, None, None),
+        covar_pop(lit(1.5), lit(2.2), true, None, None, None),
     ];
 
     // ensure expressions created with the expr api can be round tripped
diff --git a/datafusion/sqllogictest/test_files/aggregate.slt 
b/datafusion/sqllogictest/test_files/aggregate.slt
index bc677b73fb..40f78e7f4d 100644
--- a/datafusion/sqllogictest/test_files/aggregate.slt
+++ b/datafusion/sqllogictest/test_files/aggregate.slt
@@ -1812,6 +1812,186 @@ select avg(c1), arrow_typeof(avg(c1)) from t;
 statement ok
 drop table t;
 
+# covariance_f64_1
+statement ok
+create table t (c1 double, c2 double) as values (1, 4), (2, 5), (3, 6);
+
+query RT
+select covar_pop(c1, c2), arrow_typeof(covar_pop(c1, c2)) from t;
+----
+0.666666666667 Float64
+
+statement ok
+drop table t;
+
+# covariance_f64_2
+statement ok
+create table t (c1 double, c2 double) as values (1, 4), (2, 5), (3, 6);
+
+query RT
+select covar_samp(c1, c2), arrow_typeof(covar_samp(c1, c2)) from t;
+----
+1 Float64
+
+statement ok
+drop table t;
+
+# covariance_f64_4
+statement ok
+create table t (c1 double, c2 double) as values (1.1, 4.1), (2.0, 5.0), (3.0, 
6.0);
+
+query RT
+select covar_samp(c1, c2), arrow_typeof(covar_samp(c1, c2)) from t;
+----
+0.903333333333 Float64
+
+statement ok
+drop table t;
+
+# covariance_f64_5
+statement ok
+create table t (c1 double, c2 double) as values (1.1, 4.1), (2.0, 5.0), (3.0, 
6.0);
+
+query RT
+select covar_pop(c1, c2), arrow_typeof(covar_pop(c1, c2)) from t;
+----
+0.602222222222 Float64
+
+statement ok
+drop table t;
+
+# covariance_f64_6
+statement ok
+create table t (c1 double, c2 double) as values (1.0, 4.0), (2.0, 5.0), (3.0, 
6.0), (1.1, 4.4), (2.2, 5.5), (3.3, 6.6);
+
+query RT
+select covar_pop(c1, c2), arrow_typeof(covar_pop(c1, c2)) from t;
+----
+0.761666666667 Float64
+
+statement ok
+drop table t;
+
+# covariance_i32
+statement ok
+create table t (c1 int, c2 int) as values (1, 4), (2, 5), (3, 6);
+
+query RT
+select covar_pop(c1, c2), arrow_typeof(covar_pop(c1, c2)) from t;
+----
+0.666666666667 Float64
+
+statement ok
+drop table t;
+
+# covariance_u32
+statement ok
+create table t (c1 int unsigned, c2 int unsigned) as values (1, 4), (2, 5), 
(3, 6);
+
+query RT
+select covar_pop(c1, c2), arrow_typeof(covar_pop(c1, c2)) from t;
+----
+0.666666666667 Float64
+
+statement ok
+drop table t;
+
+# covariance_f32
+statement ok
+create table t (c1 float, c2 float) as values (1, 4), (2, 5), (3, 6);
+
+query RT
+select covar_pop(c1, c2), arrow_typeof(covar_pop(c1, c2)) from t;
+----
+0.666666666667 Float64
+
+statement ok
+drop table t;
+
+# covariance_i32_with_nulls_1
+statement ok
+create table t (c1 int, c2 int) as values (1, 4), (null, null), (3, 6);
+
+query RT
+select covar_pop(c1, c2), arrow_typeof(covar_pop(c1, c2)) from t;
+----
+1 Float64
+
+statement ok
+drop table t;
+
+# covariance_i32_with_nulls_2
+statement ok
+create table t (c1 int, c2 int) as values (1, 4), (null, 9), (2, 5), (null, 
8), (3, 6), (null, null);
+
+query RT
+select covar_pop(c1, c2), arrow_typeof(covar_pop(c1, c2)) from t;
+----
+0.666666666667 Float64
+
+statement ok
+drop table t;
+
+# covariance_i32_with_nulls_3
+statement ok
+create table t (c1 int, c2 int) as values (1, 4), (null, 9), (2, 5), (null, 
8), (3, 6), (null, null);
+
+query RT
+select covar_samp(c1, c2), arrow_typeof(covar_samp(c1, c2)) from t;
+----
+1 Float64
+
+statement ok
+drop table t;
+
+# covariance_i32_all_nulls
+statement ok
+create table t (c1 int, c2 int) as values (null, null), (null, null);
+
+query RT
+select covar_samp(c1, c2), arrow_typeof(covar_samp(c1, c2)) from t;
+----
+NULL Float64
+
+statement ok
+drop table t;
+
+# covariance_pop_i32_all_nulls
+statement ok
+create table t (c1 int, c2 int) as values (null, null), (null, null);
+
+query RT
+select covar_pop(c1, c2), arrow_typeof(covar_pop(c1, c2)) from t;
+----
+NULL Float64
+
+statement ok
+drop table t;
+
+# covariance_1_input
+statement ok
+create table t (c1 double, c2 double) as values (1, 2);
+
+query RT
+select covar_samp(c1, c2), arrow_typeof(covar_samp(c1, c2)) from t;
+----
+NULL Float64
+
+statement ok
+drop table t;
+
+# covariance_pop_1_input
+statement ok
+create table t (c1 double, c2 double) as values (1, 2);
+
+query RT
+select covar_pop(c1, c2), arrow_typeof(covar_pop(c1, c2)) from t;
+----
+0 Float64
+
+statement ok
+drop table t;
+
 # simple_mean
 query R
 select mean(c1) from test


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to