This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new b4c77e5  Add covar operators (#1551)
b4c77e5 is described below

commit b4c77e5313c53b67eeb3c42e390e22b5ef059055
Author: Lin Ma <[email protected]>
AuthorDate: Thu Jan 13 11:31:15 2022 -0800

    Add covar operators (#1551)
    
    * initial change
    
    * fix some logic
    
    * adding basic tests
    
    * mark update and merge unimplemented
    
    * all tests pass
    
    * add doc and tests
    
    * lint
    
    * clipy
    
    * add more merge tests
    
    * add merge tests for stddev
    
    * lint
    
    * clippy
---
 README.md                                          |   2 +-
 ballista/rust/core/proto/ballista.proto            |   6 +-
 .../rust/core/src/serde/logical_plan/to_proto.rs   |   8 +
 ballista/rust/core/src/serde/mod.rs                |   4 +
 datafusion/src/physical_plan/aggregates.rs         |  41 +-
 .../physical_plan/coercion_rule/aggregate_rule.rs  |  22 +-
 .../src/physical_plan/expressions/covariance.rs    | 723 +++++++++++++++++++++
 datafusion/src/physical_plan/expressions/mod.rs    |  30 +
 datafusion/src/physical_plan/expressions/stddev.rs |  89 +++
 .../src/physical_plan/expressions/variance.rs      | 103 ++-
 datafusion/tests/sql/aggregates.rs                 |  24 +
 11 files changed, 1039 insertions(+), 13 deletions(-)

diff --git a/README.md b/README.md
index 82089f1..19ccdef 100644
--- a/README.md
+++ b/README.md
@@ -268,7 +268,7 @@ This library currently supports many SQL constructs, 
including
 - `CAST` to change types, including e.g. `Timestamp(Nanosecond, None)`
 - Many mathematical unary and binary expressions such as `+`, `/`, `sqrt`, 
`tan`, `>=`.
 - `WHERE` to filter
-- `GROUP BY` together with one of the following aggregations: `MIN`, `MAX`, 
`COUNT`, `SUM`, `AVG`, `VAR`, `STDDEV` (sample and population)
+- `GROUP BY` together with one of the following aggregations: `MIN`, `MAX`, 
`COUNT`, `SUM`, `AVG`, `VAR`, `COVAR`, `STDDEV` (sample and population)
 - `ORDER BY` together with an expression and optional `ASC` or `DESC` and also 
optional `NULLS FIRST` or `NULLS LAST`
 
 ## Supported Functions
diff --git a/ballista/rust/core/proto/ballista.proto 
b/ballista/rust/core/proto/ballista.proto
index 3fa14f1..cc6e00a 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -171,8 +171,10 @@ enum AggregateFunction {
   ARRAY_AGG = 6;
   VARIANCE=7;
   VARIANCE_POP=8;
-  STDDEV=9;
-  STDDEV_POP=10;
+  COVARIANCE=9;
+  COVARIANCE_POP=10;
+  STDDEV=11;
+  STDDEV_POP=12;
 }
 
 message AggregateExprNode {
diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs 
b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
index 46543ea..36e9ba6 100644
--- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
@@ -1039,6 +1039,12 @@ impl TryInto<protobuf::LogicalExprNode> for &Expr {
                     AggregateFunction::VariancePop => {
                         protobuf::AggregateFunction::VariancePop
                     }
+                    AggregateFunction::Covariance => {
+                        protobuf::AggregateFunction::Covariance
+                    }
+                    AggregateFunction::CovariancePop => {
+                        protobuf::AggregateFunction::CovariancePop
+                    }
                     AggregateFunction::Stddev => 
protobuf::AggregateFunction::Stddev,
                     AggregateFunction::StddevPop => {
                         protobuf::AggregateFunction::StddevPop
@@ -1275,6 +1281,8 @@ impl From<&AggregateFunction> for 
protobuf::AggregateFunction {
             AggregateFunction::ArrayAgg => Self::ArrayAgg,
             AggregateFunction::Variance => Self::Variance,
             AggregateFunction::VariancePop => Self::VariancePop,
+            AggregateFunction::Covariance => Self::Covariance,
+            AggregateFunction::CovariancePop => Self::CovariancePop,
             AggregateFunction::Stddev => Self::Stddev,
             AggregateFunction::StddevPop => Self::StddevPop,
         }
diff --git a/ballista/rust/core/src/serde/mod.rs 
b/ballista/rust/core/src/serde/mod.rs
index e68983a..62246a0 100644
--- a/ballista/rust/core/src/serde/mod.rs
+++ b/ballista/rust/core/src/serde/mod.rs
@@ -122,6 +122,10 @@ impl From<protobuf::AggregateFunction> for 
AggregateFunction {
             protobuf::AggregateFunction::ArrayAgg => 
AggregateFunction::ArrayAgg,
             protobuf::AggregateFunction::Variance => 
AggregateFunction::Variance,
             protobuf::AggregateFunction::VariancePop => 
AggregateFunction::VariancePop,
+            protobuf::AggregateFunction::Covariance => 
AggregateFunction::Covariance,
+            protobuf::AggregateFunction::CovariancePop => {
+                AggregateFunction::CovariancePop
+            }
             protobuf::AggregateFunction::Stddev => AggregateFunction::Stddev,
             protobuf::AggregateFunction::StddevPop => 
AggregateFunction::StddevPop,
         }
diff --git a/datafusion/src/physical_plan/aggregates.rs 
b/datafusion/src/physical_plan/aggregates.rs
index 036c69e..949dbb1 100644
--- a/datafusion/src/physical_plan/aggregates.rs
+++ b/datafusion/src/physical_plan/aggregates.rs
@@ -36,7 +36,8 @@ use crate::physical_plan::distinct_expressions;
 use crate::physical_plan::expressions;
 use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
 use expressions::{
-    avg_return_type, stddev_return_type, sum_return_type, variance_return_type,
+    avg_return_type, covariance_return_type, stddev_return_type, 
sum_return_type,
+    variance_return_type,
 };
 use std::{fmt, str::FromStr, sync::Arc};
 
@@ -74,6 +75,10 @@ pub enum AggregateFunction {
     Stddev,
     /// Standard Deviation (Population)
     StddevPop,
+    /// Covariance (Sample)
+    Covariance,
+    /// Covariance (Population)
+    CovariancePop,
 }
 
 impl fmt::Display for AggregateFunction {
@@ -100,6 +105,9 @@ impl FromStr for AggregateFunction {
             "stddev" => AggregateFunction::Stddev,
             "stddev_samp" => AggregateFunction::Stddev,
             "stddev_pop" => AggregateFunction::StddevPop,
+            "covar" => AggregateFunction::Covariance,
+            "covar_samp" => AggregateFunction::Covariance,
+            "covar_pop" => AggregateFunction::CovariancePop,
             _ => {
                 return Err(DataFusionError::Plan(format!(
                     "There is no built-in function named {}",
@@ -134,6 +142,10 @@ pub fn return_type(
         AggregateFunction::Sum => sum_return_type(&coerced_data_types[0]),
         AggregateFunction::Variance => 
variance_return_type(&coerced_data_types[0]),
         AggregateFunction::VariancePop => 
variance_return_type(&coerced_data_types[0]),
+        AggregateFunction::Covariance => 
covariance_return_type(&coerced_data_types[0]),
+        AggregateFunction::CovariancePop => {
+            covariance_return_type(&coerced_data_types[0])
+        }
         AggregateFunction::Stddev => 
stddev_return_type(&coerced_data_types[0]),
         AggregateFunction::StddevPop => 
stddev_return_type(&coerced_data_types[0]),
         AggregateFunction::Avg => avg_return_type(&coerced_data_types[0]),
@@ -259,6 +271,30 @@ pub fn create_aggregate_expr(
                 "VAR_POP(DISTINCT) aggregations are not available".to_string(),
             ));
         }
+        (AggregateFunction::Covariance, false) => 
Arc::new(expressions::Covariance::new(
+            coerced_phy_exprs[0].clone(),
+            coerced_phy_exprs[1].clone(),
+            name,
+            return_type,
+        )),
+        (AggregateFunction::Covariance, true) => {
+            return Err(DataFusionError::NotImplemented(
+                "COVAR(DISTINCT) aggregations are not available".to_string(),
+            ));
+        }
+        (AggregateFunction::CovariancePop, false) => {
+            Arc::new(expressions::CovariancePop::new(
+                coerced_phy_exprs[0].clone(),
+                coerced_phy_exprs[1].clone(),
+                name,
+                return_type,
+            ))
+        }
+        (AggregateFunction::CovariancePop, true) => {
+            return Err(DataFusionError::NotImplemented(
+                "COVAR_POP(DISTINCT) aggregations are not 
available".to_string(),
+            ));
+        }
         (AggregateFunction::Stddev, false) => 
Arc::new(expressions::Stddev::new(
             coerced_phy_exprs[0].clone(),
             name,
@@ -331,6 +367,9 @@ pub fn signature(fun: &AggregateFunction) -> Signature {
         | AggregateFunction::StddevPop => {
             Signature::uniform(1, NUMERICS.to_vec(), Volatility::Immutable)
         }
+        AggregateFunction::Covariance | AggregateFunction::CovariancePop => {
+            Signature::uniform(2, NUMERICS.to_vec(), Volatility::Immutable)
+        }
     }
 }
 
diff --git a/datafusion/src/physical_plan/coercion_rule/aggregate_rule.rs 
b/datafusion/src/physical_plan/coercion_rule/aggregate_rule.rs
index d74b4e4..2b02ba0 100644
--- a/datafusion/src/physical_plan/coercion_rule/aggregate_rule.rs
+++ b/datafusion/src/physical_plan/coercion_rule/aggregate_rule.rs
@@ -21,8 +21,8 @@ use crate::arrow::datatypes::Schema;
 use crate::error::{DataFusionError, Result};
 use crate::physical_plan::aggregates::AggregateFunction;
 use crate::physical_plan::expressions::{
-    is_avg_support_arg_type, is_stddev_support_arg_type, 
is_sum_support_arg_type,
-    is_variance_support_arg_type, try_cast,
+    is_avg_support_arg_type, is_covariance_support_arg_type, 
is_stddev_support_arg_type,
+    is_sum_support_arg_type, is_variance_support_arg_type, try_cast,
 };
 use crate::physical_plan::functions::{Signature, TypeSignature};
 use crate::physical_plan::PhysicalExpr;
@@ -105,6 +105,24 @@ pub(crate) fn coerce_types(
             }
             Ok(input_types.to_vec())
         }
+        AggregateFunction::Covariance => {
+            if !is_covariance_support_arg_type(&input_types[0]) {
+                return Err(DataFusionError::Plan(format!(
+                    "The function {:?} does not support inputs of type {:?}.",
+                    agg_fun, input_types[0]
+                )));
+            }
+            Ok(input_types.to_vec())
+        }
+        AggregateFunction::CovariancePop => {
+            if !is_covariance_support_arg_type(&input_types[0]) {
+                return Err(DataFusionError::Plan(format!(
+                    "The function {:?} does not support inputs of type {:?}.",
+                    agg_fun, input_types[0]
+                )));
+            }
+            Ok(input_types.to_vec())
+        }
         AggregateFunction::Stddev => {
             if !is_stddev_support_arg_type(&input_types[0]) {
                 return Err(DataFusionError::Plan(format!(
diff --git a/datafusion/src/physical_plan/expressions/covariance.rs 
b/datafusion/src/physical_plan/expressions/covariance.rs
new file mode 100644
index 0000000..454dc43
--- /dev/null
+++ b/datafusion/src/physical_plan/expressions/covariance.rs
@@ -0,0 +1,723 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query 
execution
+
+use std::any::Any;
+use std::sync::Arc;
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr};
+use crate::scalar::ScalarValue;
+use arrow::array::Float64Array;
+use arrow::{
+    array::{ArrayRef, UInt64Array},
+    compute::cast,
+    datatypes::DataType,
+    datatypes::Field,
+};
+
+use super::{format_state_name, StatsType};
+
+/// COVAR and COVAR_SAMP aggregate expression
+#[derive(Debug)]
+pub struct Covariance {
+    name: String,
+    expr1: Arc<dyn PhysicalExpr>,
+    expr2: Arc<dyn PhysicalExpr>,
+}
+
+/// COVAR_POP aggregate expression
+#[derive(Debug)]
+pub struct CovariancePop {
+    name: String,
+    expr1: Arc<dyn PhysicalExpr>,
+    expr2: Arc<dyn PhysicalExpr>,
+}
+
+/// function return type of covariance
+pub(crate) fn covariance_return_type(arg_type: &DataType) -> Result<DataType> {
+    match arg_type {
+        DataType::Int8
+        | DataType::Int16
+        | DataType::Int32
+        | DataType::Int64
+        | DataType::UInt8
+        | DataType::UInt16
+        | DataType::UInt32
+        | DataType::UInt64
+        | DataType::Float32
+        | DataType::Float64 => Ok(DataType::Float64),
+        other => Err(DataFusionError::Plan(format!(
+            "VARIANCE does not support {:?}",
+            other
+        ))),
+    }
+}
+
+pub(crate) fn is_covariance_support_arg_type(arg_type: &DataType) -> bool {
+    matches!(
+        arg_type,
+        DataType::UInt8
+            | DataType::UInt16
+            | DataType::UInt32
+            | DataType::UInt64
+            | DataType::Int8
+            | DataType::Int16
+            | DataType::Int32
+            | DataType::Int64
+            | DataType::Float32
+            | DataType::Float64
+    )
+}
+
+impl Covariance {
+    /// Create a new COVAR aggregate function
+    pub fn new(
+        expr1: Arc<dyn PhysicalExpr>,
+        expr2: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        data_type: DataType,
+    ) -> Self {
+        // the result of covariance just support FLOAT64 data type.
+        assert!(matches!(data_type, DataType::Float64));
+        Self {
+            name: name.into(),
+            expr1,
+            expr2,
+        }
+    }
+}
+
+impl AggregateExpr for Covariance {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(&self.name, DataType::Float64, true))
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        Ok(Box::new(CovarianceAccumulator::try_new(StatsType::Sample)?))
+    }
+
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![
+            Field::new(
+                &format_state_name(&self.name, "count"),
+                DataType::UInt64,
+                true,
+            ),
+            Field::new(
+                &format_state_name(&self.name, "mean1"),
+                DataType::Float64,
+                true,
+            ),
+            Field::new(
+                &format_state_name(&self.name, "mean2"),
+                DataType::Float64,
+                true,
+            ),
+            Field::new(
+                &format_state_name(&self.name, "algo_const"),
+                DataType::Float64,
+                true,
+            ),
+        ])
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr1.clone(), self.expr2.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+}
+
+impl CovariancePop {
+    /// Create a new COVAR_POP aggregate function
+    pub fn new(
+        expr1: Arc<dyn PhysicalExpr>,
+        expr2: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        data_type: DataType,
+    ) -> Self {
+        // the result of covariance just support FLOAT64 data type.
+        assert!(matches!(data_type, DataType::Float64));
+        Self {
+            name: name.into(),
+            expr1,
+            expr2,
+        }
+    }
+}
+
+impl AggregateExpr for CovariancePop {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(&self.name, DataType::Float64, true))
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        Ok(Box::new(CovarianceAccumulator::try_new(
+            StatsType::Population,
+        )?))
+    }
+
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![
+            Field::new(
+                &format_state_name(&self.name, "count"),
+                DataType::UInt64,
+                true,
+            ),
+            Field::new(
+                &format_state_name(&self.name, "mean1"),
+                DataType::Float64,
+                true,
+            ),
+            Field::new(
+                &format_state_name(&self.name, "mean2"),
+                DataType::Float64,
+                true,
+            ),
+            Field::new(
+                &format_state_name(&self.name, "algo_const"),
+                DataType::Float64,
+                true,
+            ),
+        ])
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr1.clone(), self.expr2.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+}
+
+/// An accumulator to compute covariance
+/// The algrithm used is an online implementation and numerically stable. It 
is derived from the following paper
+/// for calculating variance:
+/// Welford, B. P. (1962). "Note on a method for calculating corrected sums of 
squares and products".
+/// Technometrics. 4 (3): 419–420. doi:10.2307/1266577. JSTOR 1266577.
+///
+/// The algorithm has been analyzed here:
+/// Ling, Robert F. (1974). "Comparison of Several Algorithms for Computing 
Sample Means and Variances".
+/// Journal of the American Statistical Association. 69 (348): 859–866. 
doi:10.2307/2286154. JSTOR 2286154.
+///
+/// Though it is not covered in the original paper but is based on the same 
idea, as a result the algorithm is online,
+/// parallelizable and numerically stable.
+
+#[derive(Debug)]
+pub struct CovarianceAccumulator {
+    algo_const: f64,
+    mean1: f64,
+    mean2: f64,
+    count: u64,
+    stats_type: StatsType,
+}
+
+impl CovarianceAccumulator {
+    /// Creates a new `CovarianceAccumulator`
+    pub fn try_new(s_type: StatsType) -> Result<Self> {
+        Ok(Self {
+            algo_const: 0_f64,
+            mean1: 0_f64,
+            mean2: 0_f64,
+            count: 0_u64,
+            stats_type: s_type,
+        })
+    }
+
+    // These functions are commented out for now, they will be used in the 
next PR
+    // pub fn get_count(&self) -> u64 {
+    //     self.count
+    // }
+
+    // pub fn get_mean1(&self) -> f64 {
+    //     self.mean1
+    // }
+
+    // pub fn get_mean2(&self) -> f64 {
+    //     self.mean2
+    // }
+
+    // pub fn get_algo_const(&self) -> f64 {
+    //     self.algo_const
+    // }
+}
+
+impl Accumulator for CovarianceAccumulator {
+    fn state(&self) -> Result<Vec<ScalarValue>> {
+        Ok(vec![
+            ScalarValue::from(self.count),
+            ScalarValue::from(self.mean1),
+            ScalarValue::from(self.mean2),
+            ScalarValue::from(self.algo_const),
+        ])
+    }
+
+    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        let values1 = &cast(&values[0], &DataType::Float64)?;
+        let values2 = &cast(&values[1], &DataType::Float64)?;
+
+        let mut arr1 = values1
+            .as_any()
+            .downcast_ref::<Float64Array>()
+            .unwrap()
+            .iter()
+            .flatten();
+        let mut arr2 = values2
+            .as_any()
+            .downcast_ref::<Float64Array>()
+            .unwrap()
+            .iter()
+            .flatten();
+
+        for _i in 0..values1.len() {
+            let value1 = arr1.next();
+            let value2 = arr2.next();
+
+            if value1 == None || value2 == None {
+                if value1 == None && value2 == None {
+                    continue;
+                } else {
+                    return Err(DataFusionError::Internal(
+                        "The two columns are not aligned".to_string(),
+                    ));
+                }
+            }
+
+            let new_count = self.count + 1;
+            let delta1 = value1.unwrap() - self.mean1;
+            let new_mean1 = delta1 / new_count as f64 + self.mean1;
+            let delta2 = value2.unwrap() - self.mean2;
+            let new_mean2 = delta2 / new_count as f64 + self.mean2;
+            let new_c = delta1 * (value2.unwrap() - new_mean2) + 
self.algo_const;
+
+            self.count += 1;
+            self.mean1 = new_mean1;
+            self.mean2 = new_mean2;
+            self.algo_const = new_c;
+        }
+
+        Ok(())
+    }
+
+    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
+        let counts = states[0].as_any().downcast_ref::<UInt64Array>().unwrap();
+        let means1 = 
states[1].as_any().downcast_ref::<Float64Array>().unwrap();
+        let means2 = 
states[2].as_any().downcast_ref::<Float64Array>().unwrap();
+        let cs = states[3].as_any().downcast_ref::<Float64Array>().unwrap();
+
+        for i in 0..counts.len() {
+            let c = counts.value(i);
+            if c == 0_u64 {
+                continue;
+            }
+            let new_count = self.count + c;
+            let new_mean1 = self.mean1 * self.count as f64 / new_count as f64
+                + means1.value(i) * c as f64 / new_count as f64;
+            let new_mean2 = self.mean2 * self.count as f64 / new_count as f64
+                + means2.value(i) * c as f64 / new_count as f64;
+            let delta1 = self.mean1 - means1.value(i);
+            let delta2 = self.mean2 - means2.value(i);
+            let new_c = self.algo_const
+                + cs.value(i)
+                + delta1 * delta2 * self.count as f64 * c as f64 / new_count 
as f64;
+
+            self.count = new_count;
+            self.mean1 = new_mean1;
+            self.mean2 = new_mean2;
+            self.algo_const = new_c;
+        }
+        Ok(())
+    }
+
+    fn update(&mut self, _values: &[ScalarValue]) -> Result<()> {
+        unimplemented!("update_batch is implemented instead");
+    }
+
+    fn merge(&mut self, _states: &[ScalarValue]) -> Result<()> {
+        unimplemented!("merge_batch is implemented instead");
+    }
+
+    fn evaluate(&self) -> Result<ScalarValue> {
+        let count = match self.stats_type {
+            StatsType::Population => self.count,
+            StatsType::Sample => {
+                if self.count > 0 {
+                    self.count - 1
+                } else {
+                    self.count
+                }
+            }
+        };
+
+        if count <= 1 {
+            return Err(DataFusionError::Internal(
+                "At least two values are needed to calculate 
covariance".to_string(),
+            ));
+        }
+
+        if self.count == 0 {
+            Ok(ScalarValue::Float64(None))
+        } else {
+            Ok(ScalarValue::Float64(Some(self.algo_const / count as f64)))
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::physical_plan::expressions::col;
+    use crate::{error::Result, generic_test_op2};
+    use arrow::record_batch::RecordBatch;
+    use arrow::{array::*, datatypes::*};
+
+    #[test]
+    fn covariance_f64_1() -> Result<()> {
+        let a: ArrayRef = Arc::new(Float64Array::from(vec![1_f64, 2_f64, 
3_f64]));
+        let b: ArrayRef = Arc::new(Float64Array::from(vec![4_f64, 5_f64, 
6_f64]));
+
+        generic_test_op2!(
+            a,
+            b,
+            DataType::Float64,
+            DataType::Float64,
+            CovariancePop,
+            ScalarValue::from(0.6666666666666666),
+            DataType::Float64
+        )
+    }
+
+    #[test]
+    fn covariance_f64_2() -> Result<()> {
+        let a: ArrayRef = Arc::new(Float64Array::from(vec![1_f64, 2_f64, 
3_f64]));
+        let b: ArrayRef = Arc::new(Float64Array::from(vec![4_f64, 5_f64, 
6_f64]));
+
+        generic_test_op2!(
+            a,
+            b,
+            DataType::Float64,
+            DataType::Float64,
+            Covariance,
+            ScalarValue::from(1_f64),
+            DataType::Float64
+        )
+    }
+
+    #[test]
+    fn covariance_f64_4() -> Result<()> {
+        let a: ArrayRef = Arc::new(Float64Array::from(vec![1.1_f64, 2_f64, 
3_f64]));
+        let b: ArrayRef = Arc::new(Float64Array::from(vec![4.1_f64, 5_f64, 
6_f64]));
+
+        generic_test_op2!(
+            a,
+            b,
+            DataType::Float64,
+            DataType::Float64,
+            Covariance,
+            ScalarValue::from(0.9033333333333335_f64),
+            DataType::Float64
+        )
+    }
+
+    #[test]
+    fn covariance_f64_5() -> Result<()> {
+        let a: ArrayRef = Arc::new(Float64Array::from(vec![1.1_f64, 2_f64, 
3_f64]));
+        let b: ArrayRef = Arc::new(Float64Array::from(vec![4.1_f64, 5_f64, 
6_f64]));
+
+        generic_test_op2!(
+            a,
+            b,
+            DataType::Float64,
+            DataType::Float64,
+            CovariancePop,
+            ScalarValue::from(0.6022222222222223_f64),
+            DataType::Float64
+        )
+    }
+
+    #[test]
+    fn covariance_f64_6() -> Result<()> {
+        let a = Arc::new(Float64Array::from(vec![
+            1_f64, 2_f64, 3_f64, 1.1_f64, 2.2_f64, 3.3_f64,
+        ]));
+        let b = Arc::new(Float64Array::from(vec![
+            4_f64, 5_f64, 6_f64, 4.4_f64, 5.5_f64, 6.6_f64,
+        ]));
+
+        generic_test_op2!(
+            a,
+            b,
+            DataType::Float64,
+            DataType::Float64,
+            CovariancePop,
+            ScalarValue::from(0.7616666666666666),
+            DataType::Float64
+        )
+    }
+
+    #[test]
+    fn covariance_i32() -> Result<()> {
+        let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
+        let b: ArrayRef = Arc::new(Int32Array::from(vec![4, 5, 6]));
+
+        generic_test_op2!(
+            a,
+            b,
+            DataType::Int32,
+            DataType::Int32,
+            CovariancePop,
+            ScalarValue::from(0.6666666666666666_f64),
+            DataType::Float64
+        )
+    }
+
+    #[test]
+    fn covariance_u32() -> Result<()> {
+        let a: ArrayRef = Arc::new(UInt32Array::from(vec![1_u32, 2_u32, 
3_u32]));
+        let b: ArrayRef = Arc::new(UInt32Array::from(vec![4_u32, 5_u32, 
6_u32]));
+        generic_test_op2!(
+            a,
+            b,
+            DataType::UInt32,
+            DataType::UInt32,
+            CovariancePop,
+            ScalarValue::from(0.6666666666666666_f64),
+            DataType::Float64
+        )
+    }
+
+    #[test]
+    fn covariance_f32() -> Result<()> {
+        let a: ArrayRef = Arc::new(Float32Array::from(vec![1_f32, 2_f32, 
3_f32]));
+        let b: ArrayRef = Arc::new(Float32Array::from(vec![4_f32, 5_f32, 
6_f32]));
+        generic_test_op2!(
+            a,
+            b,
+            DataType::Float32,
+            DataType::Float32,
+            CovariancePop,
+            ScalarValue::from(0.6666666666666666_f64),
+            DataType::Float64
+        )
+    }
+
+    #[test]
+    fn test_covariance_return_data_type() -> Result<()> {
+        let data_type = DataType::Float64;
+        let result_type = covariance_return_type(&data_type)?;
+        assert_eq!(DataType::Float64, result_type);
+
+        let data_type = DataType::Decimal(36, 10);
+        assert!(covariance_return_type(&data_type).is_err());
+        Ok(())
+    }
+
+    #[test]
+    fn covariance_i32_with_nulls_1() -> Result<()> {
+        let a: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, 
Some(3)]));
+        let b: ArrayRef = Arc::new(Int32Array::from(vec![Some(4), None, 
Some(6)]));
+
+        generic_test_op2!(
+            a,
+            b,
+            DataType::Int32,
+            DataType::Int32,
+            CovariancePop,
+            ScalarValue::from(1_f64),
+            DataType::Float64
+        )
+    }
+
+    #[test]
+    fn covariance_i32_with_nulls_2() -> Result<()> {
+        let a: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, 
Some(3)]));
+        let b: ArrayRef = Arc::new(Int32Array::from(vec![Some(4), Some(5), 
Some(6)]));
+
+        let schema = Schema::new(vec![
+            Field::new("a", DataType::Int32, false),
+            Field::new("b", DataType::Int32, false),
+        ]);
+        let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![a, 
b])?;
+
+        let agg = Arc::new(Covariance::new(
+            col("a", &schema)?,
+            col("b", &schema)?,
+            "bla".to_string(),
+            DataType::Float64,
+        ));
+        let actual = aggregate(&batch, agg);
+        assert!(actual.is_err());
+
+        Ok(())
+    }
+
+    #[test]
+    fn covariance_i32_all_nulls() -> Result<()> {
+        let a: ArrayRef = Arc::new(Int32Array::from(vec![None, None]));
+        let b: ArrayRef = Arc::new(Int32Array::from(vec![None, None]));
+
+        let schema = Schema::new(vec![
+            Field::new("a", DataType::Int32, false),
+            Field::new("b", DataType::Int32, false),
+        ]);
+        let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![a, 
b])?;
+
+        let agg = Arc::new(Covariance::new(
+            col("a", &schema)?,
+            col("b", &schema)?,
+            "bla".to_string(),
+            DataType::Float64,
+        ));
+        let actual = aggregate(&batch, agg);
+        assert!(actual.is_err());
+
+        Ok(())
+    }
+
+    #[test]
+    fn covariance_f64_merge_1() -> Result<()> {
+        let a = Arc::new(Float64Array::from(vec![1_f64, 2_f64, 3_f64]));
+        let b = Arc::new(Float64Array::from(vec![4_f64, 5_f64, 6_f64]));
+        let c = Arc::new(Float64Array::from(vec![1.1_f64, 2.2_f64, 3.3_f64]));
+        let d = Arc::new(Float64Array::from(vec![4.4_f64, 5.5_f64, 6.6_f64]));
+
+        let schema = Schema::new(vec![
+            Field::new("a", DataType::Float64, false),
+            Field::new("b", DataType::Float64, false),
+        ]);
+
+        let batch1 = RecordBatch::try_new(Arc::new(schema.clone()), vec![a, 
b])?;
+        let batch2 = RecordBatch::try_new(Arc::new(schema.clone()), vec![c, 
d])?;
+
+        let agg1 = Arc::new(CovariancePop::new(
+            col("a", &schema)?,
+            col("b", &schema)?,
+            "bla".to_string(),
+            DataType::Float64,
+        ));
+
+        let agg2 = Arc::new(CovariancePop::new(
+            col("a", &schema)?,
+            col("b", &schema)?,
+            "bla".to_string(),
+            DataType::Float64,
+        ));
+
+        let actual = merge(&batch1, &batch2, agg1, agg2)?;
+        assert!(actual == ScalarValue::from(0.7616666666666666));
+
+        Ok(())
+    }
+
+    #[test]
+    fn covariance_f64_merge_2() -> Result<()> {
+        let a = Arc::new(Float64Array::from(vec![1_f64, 2_f64, 3_f64]));
+        let b = Arc::new(Float64Array::from(vec![4_f64, 5_f64, 6_f64]));
+        let c = Arc::new(Float64Array::from(vec![None]));
+        let d = Arc::new(Float64Array::from(vec![None]));
+
+        let schema = Schema::new(vec![
+            Field::new("a", DataType::Float64, false),
+            Field::new("b", DataType::Float64, false),
+        ]);
+
+        let batch1 = RecordBatch::try_new(Arc::new(schema.clone()), vec![a, 
b])?;
+        let batch2 = RecordBatch::try_new(Arc::new(schema.clone()), vec![c, 
d])?;
+
+        let agg1 = Arc::new(CovariancePop::new(
+            col("a", &schema)?,
+            col("b", &schema)?,
+            "bla".to_string(),
+            DataType::Float64,
+        ));
+
+        let agg2 = Arc::new(CovariancePop::new(
+            col("a", &schema)?,
+            col("b", &schema)?,
+            "bla".to_string(),
+            DataType::Float64,
+        ));
+
+        let actual = merge(&batch1, &batch2, agg1, agg2)?;
+        assert!(actual == ScalarValue::from(0.6666666666666666));
+
+        Ok(())
+    }
+
+    fn aggregate(
+        batch: &RecordBatch,
+        agg: Arc<dyn AggregateExpr>,
+    ) -> Result<ScalarValue> {
+        let mut accum = agg.create_accumulator()?;
+        let expr = agg.expressions();
+        let values = expr
+            .iter()
+            .map(|e| e.evaluate(batch))
+            .map(|r| r.map(|v| v.into_array(batch.num_rows())))
+            .collect::<Result<Vec<_>>>()?;
+        accum.update_batch(&values)?;
+        accum.evaluate()
+    }
+
+    fn merge(
+        batch1: &RecordBatch,
+        batch2: &RecordBatch,
+        agg1: Arc<dyn AggregateExpr>,
+        agg2: Arc<dyn AggregateExpr>,
+    ) -> Result<ScalarValue> {
+        let mut accum1 = agg1.create_accumulator()?;
+        let mut accum2 = agg2.create_accumulator()?;
+        let expr1 = agg1.expressions();
+        let expr2 = agg2.expressions();
+
+        let values1 = expr1
+            .iter()
+            .map(|e| e.evaluate(batch1))
+            .map(|r| r.map(|v| v.into_array(batch1.num_rows())))
+            .collect::<Result<Vec<_>>>()?;
+        let values2 = expr2
+            .iter()
+            .map(|e| e.evaluate(batch2))
+            .map(|r| r.map(|v| v.into_array(batch2.num_rows())))
+            .collect::<Result<Vec<_>>>()?;
+        accum1.update_batch(&values1)?;
+        accum2.update_batch(&values2)?;
+        let state2 = accum2
+            .state()?
+            .iter()
+            .map(|v| vec![v.clone()])
+            .map(|x| ScalarValue::iter_to_array(x).unwrap())
+            .collect::<Vec<_>>();
+        accum1.merge_batch(&state2)?;
+        accum1.evaluate()
+    }
+}
diff --git a/datafusion/src/physical_plan/expressions/mod.rs 
b/datafusion/src/physical_plan/expressions/mod.rs
index a85d867..6943265 100644
--- a/datafusion/src/physical_plan/expressions/mod.rs
+++ b/datafusion/src/physical_plan/expressions/mod.rs
@@ -44,6 +44,7 @@ mod lead_lag;
 mod literal;
 #[macro_use]
 mod min_max;
+mod covariance;
 mod negative;
 mod not;
 mod nth_value;
@@ -72,6 +73,9 @@ pub use cast::{
 };
 pub use column::{col, Column};
 pub use count::Count;
+pub(crate) use covariance::{
+    covariance_return_type, is_covariance_support_arg_type, Covariance, 
CovariancePop,
+};
 pub use cume_dist::cume_dist;
 pub use get_indexed_field::GetIndexedFieldExpr;
 pub use in_list::{in_list, InListExpr};
@@ -172,6 +176,32 @@ mod tests {
         }};
     }
 
+    /// macro to perform an aggregation with two inputs and verify the result.
+    #[macro_export]
+    macro_rules! generic_test_op2 {
+        ($ARRAY1:expr, $ARRAY2:expr, $DATATYPE1:expr, $DATATYPE2:expr, 
$OP:ident, $EXPECTED:expr, $EXPECTED_DATATYPE:expr) => {{
+            let schema = Schema::new(vec![
+                Field::new("a", $DATATYPE1, false),
+                Field::new("b", $DATATYPE2, false),
+            ]);
+            let batch =
+                RecordBatch::try_new(Arc::new(schema.clone()), vec![$ARRAY1, 
$ARRAY2])?;
+
+            let agg = Arc::new(<$OP>::new(
+                col("a", &schema)?,
+                col("b", &schema)?,
+                "bla".to_string(),
+                $EXPECTED_DATATYPE,
+            ));
+            let actual = aggregate(&batch, agg)?;
+            let expected = ScalarValue::from($EXPECTED);
+
+            assert_eq!(expected, actual);
+
+            Ok(())
+        }};
+    }
+
     pub fn aggregate(
         batch: &RecordBatch,
         agg: Arc<dyn AggregateExpr>,
diff --git a/datafusion/src/physical_plan/expressions/stddev.rs 
b/datafusion/src/physical_plan/expressions/stddev.rs
index 2c85b90..191ac07 100644
--- a/datafusion/src/physical_plan/expressions/stddev.rs
+++ b/datafusion/src/physical_plan/expressions/stddev.rs
@@ -411,6 +411,62 @@ mod tests {
         Ok(())
     }
 
+    #[test]
+    fn stddev_f64_merge_1() -> Result<()> {
+        let a = Arc::new(Float64Array::from(vec![1_f64, 2_f64, 3_f64]));
+        let b = Arc::new(Float64Array::from(vec![4_f64, 5_f64]));
+
+        let schema = Schema::new(vec![Field::new("a", DataType::Float64, 
false)]);
+
+        let batch1 = RecordBatch::try_new(Arc::new(schema.clone()), vec![a])?;
+        let batch2 = RecordBatch::try_new(Arc::new(schema.clone()), vec![b])?;
+
+        let agg1 = Arc::new(StddevPop::new(
+            col("a", &schema)?,
+            "bla".to_string(),
+            DataType::Float64,
+        ));
+
+        let agg2 = Arc::new(StddevPop::new(
+            col("a", &schema)?,
+            "bla".to_string(),
+            DataType::Float64,
+        ));
+
+        let actual = merge(&batch1, &batch2, agg1, agg2)?;
+        assert!(actual == ScalarValue::from(std::f64::consts::SQRT_2));
+
+        Ok(())
+    }
+
+    #[test]
+    fn stddev_f64_merge_2() -> Result<()> {
+        let a = Arc::new(Float64Array::from(vec![1_f64, 2_f64, 3_f64, 4_f64, 
5_f64]));
+        let b = Arc::new(Float64Array::from(vec![None]));
+
+        let schema = Schema::new(vec![Field::new("a", DataType::Float64, 
false)]);
+
+        let batch1 = RecordBatch::try_new(Arc::new(schema.clone()), vec![a])?;
+        let batch2 = RecordBatch::try_new(Arc::new(schema.clone()), vec![b])?;
+
+        let agg1 = Arc::new(StddevPop::new(
+            col("a", &schema)?,
+            "bla".to_string(),
+            DataType::Float64,
+        ));
+
+        let agg2 = Arc::new(StddevPop::new(
+            col("a", &schema)?,
+            "bla".to_string(),
+            DataType::Float64,
+        ));
+
+        let actual = merge(&batch1, &batch2, agg1, agg2)?;
+        assert!(actual == ScalarValue::from(std::f64::consts::SQRT_2));
+
+        Ok(())
+    }
+
     fn aggregate(
         batch: &RecordBatch,
         agg: Arc<dyn AggregateExpr>,
@@ -425,4 +481,37 @@ mod tests {
         accum.update_batch(&values)?;
         accum.evaluate()
     }
+
+    fn merge(
+        batch1: &RecordBatch,
+        batch2: &RecordBatch,
+        agg1: Arc<dyn AggregateExpr>,
+        agg2: Arc<dyn AggregateExpr>,
+    ) -> Result<ScalarValue> {
+        let mut accum1 = agg1.create_accumulator()?;
+        let mut accum2 = agg2.create_accumulator()?;
+        let expr1 = agg1.expressions();
+        let expr2 = agg2.expressions();
+
+        let values1 = expr1
+            .iter()
+            .map(|e| e.evaluate(batch1))
+            .map(|r| r.map(|v| v.into_array(batch1.num_rows())))
+            .collect::<Result<Vec<_>>>()?;
+        let values2 = expr2
+            .iter()
+            .map(|e| e.evaluate(batch2))
+            .map(|r| r.map(|v| v.into_array(batch2.num_rows())))
+            .collect::<Result<Vec<_>>>()?;
+        accum1.update_batch(&values1)?;
+        accum2.update_batch(&values2)?;
+        let state2 = accum2
+            .state()?
+            .iter()
+            .map(|v| vec![v.clone()])
+            .map(|x| ScalarValue::iter_to_array(x).unwrap())
+            .collect::<Vec<_>>();
+        accum1.merge_batch(&state2)?;
+        accum1.evaluate()
+    }
 }
diff --git a/datafusion/src/physical_plan/expressions/variance.rs 
b/datafusion/src/physical_plan/expressions/variance.rs
index 05d8e41..117350c 100644
--- a/datafusion/src/physical_plan/expressions/variance.rs
+++ b/datafusion/src/physical_plan/expressions/variance.rs
@@ -255,14 +255,14 @@ impl Accumulator for VarianceAccumulator {
 
     fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
         let values = &cast(&values[0], &DataType::Float64)?;
-        let arr = values.as_any().downcast_ref::<Float64Array>().unwrap();
-
-        for i in 0..arr.len() {
-            let value = arr.value(i);
+        let arr = values
+            .as_any()
+            .downcast_ref::<Float64Array>()
+            .unwrap()
+            .iter()
+            .flatten();
 
-            if value == 0_f64 && values.is_null(i) {
-                continue;
-            }
+        for value in arr {
             let new_count = self.count + 1;
             let delta1 = value - self.mean;
             let new_mean = delta1 / new_count as f64 + self.mean;
@@ -495,6 +495,62 @@ mod tests {
         Ok(())
     }
 
+    #[test]
+    fn variance_f64_merge_1() -> Result<()> {
+        let a = Arc::new(Float64Array::from(vec![1_f64, 2_f64, 3_f64]));
+        let b = Arc::new(Float64Array::from(vec![4_f64, 5_f64]));
+
+        let schema = Schema::new(vec![Field::new("a", DataType::Float64, 
false)]);
+
+        let batch1 = RecordBatch::try_new(Arc::new(schema.clone()), vec![a])?;
+        let batch2 = RecordBatch::try_new(Arc::new(schema.clone()), vec![b])?;
+
+        let agg1 = Arc::new(VariancePop::new(
+            col("a", &schema)?,
+            "bla".to_string(),
+            DataType::Float64,
+        ));
+
+        let agg2 = Arc::new(VariancePop::new(
+            col("a", &schema)?,
+            "bla".to_string(),
+            DataType::Float64,
+        ));
+
+        let actual = merge(&batch1, &batch2, agg1, agg2)?;
+        assert!(actual == ScalarValue::from(2_f64));
+
+        Ok(())
+    }
+
+    #[test]
+    fn variance_f64_merge_2() -> Result<()> {
+        let a = Arc::new(Float64Array::from(vec![1_f64, 2_f64, 3_f64, 4_f64, 
5_f64]));
+        let b = Arc::new(Float64Array::from(vec![None]));
+
+        let schema = Schema::new(vec![Field::new("a", DataType::Float64, 
false)]);
+
+        let batch1 = RecordBatch::try_new(Arc::new(schema.clone()), vec![a])?;
+        let batch2 = RecordBatch::try_new(Arc::new(schema.clone()), vec![b])?;
+
+        let agg1 = Arc::new(VariancePop::new(
+            col("a", &schema)?,
+            "bla".to_string(),
+            DataType::Float64,
+        ));
+
+        let agg2 = Arc::new(VariancePop::new(
+            col("a", &schema)?,
+            "bla".to_string(),
+            DataType::Float64,
+        ));
+
+        let actual = merge(&batch1, &batch2, agg1, agg2)?;
+        assert!(actual == ScalarValue::from(2_f64));
+
+        Ok(())
+    }
+
     fn aggregate(
         batch: &RecordBatch,
         agg: Arc<dyn AggregateExpr>,
@@ -509,4 +565,37 @@ mod tests {
         accum.update_batch(&values)?;
         accum.evaluate()
     }
+
+    fn merge(
+        batch1: &RecordBatch,
+        batch2: &RecordBatch,
+        agg1: Arc<dyn AggregateExpr>,
+        agg2: Arc<dyn AggregateExpr>,
+    ) -> Result<ScalarValue> {
+        let mut accum1 = agg1.create_accumulator()?;
+        let mut accum2 = agg2.create_accumulator()?;
+        let expr1 = agg1.expressions();
+        let expr2 = agg2.expressions();
+
+        let values1 = expr1
+            .iter()
+            .map(|e| e.evaluate(batch1))
+            .map(|r| r.map(|v| v.into_array(batch1.num_rows())))
+            .collect::<Result<Vec<_>>>()?;
+        let values2 = expr2
+            .iter()
+            .map(|e| e.evaluate(batch2))
+            .map(|r| r.map(|v| v.into_array(batch2.num_rows())))
+            .collect::<Result<Vec<_>>>()?;
+        accum1.update_batch(&values1)?;
+        accum2.update_batch(&values2)?;
+        let state2 = accum2
+            .state()?
+            .iter()
+            .map(|v| vec![v.clone()])
+            .map(|x| ScalarValue::iter_to_array(x).unwrap())
+            .collect::<Vec<_>>();
+        accum1.merge_batch(&state2)?;
+        accum1.evaluate()
+    }
 }
diff --git a/datafusion/tests/sql/aggregates.rs 
b/datafusion/tests/sql/aggregates.rs
index edf530b..eb65601 100644
--- a/datafusion/tests/sql/aggregates.rs
+++ b/datafusion/tests/sql/aggregates.rs
@@ -50,6 +50,30 @@ async fn csv_query_avg() -> Result<()> {
 }
 
 #[tokio::test]
+async fn csv_query_covariance_1() -> Result<()> {
+    let mut ctx = ExecutionContext::new();
+    register_aggregate_csv(&mut ctx).await?;
+    let sql = "SELECT covar_pop(c2, c12) FROM aggregate_test_100";
+    let mut actual = execute(&mut ctx, sql).await;
+    actual.sort();
+    let expected = vec![vec!["-0.07916932235380847"]];
+    assert_float_eq(&expected, &actual);
+    Ok(())
+}
+
+#[tokio::test]
+async fn csv_query_covariance_2() -> Result<()> {
+    let mut ctx = ExecutionContext::new();
+    register_aggregate_csv(&mut ctx).await?;
+    let sql = "SELECT covar(c2, c12) FROM aggregate_test_100";
+    let mut actual = execute(&mut ctx, sql).await;
+    actual.sort();
+    let expected = vec![vec!["-0.07996901247859442"]];
+    assert_float_eq(&expected, &actual);
+    Ok(())
+}
+
+#[tokio::test]
 async fn csv_query_variance_1() -> Result<()> {
     let mut ctx = ExecutionContext::new();
     register_aggregate_csv(&mut ctx).await?;

Reply via email to