This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new b4c77e5 Add covar operators (#1551)
b4c77e5 is described below
commit b4c77e5313c53b67eeb3c42e390e22b5ef059055
Author: Lin Ma <[email protected]>
AuthorDate: Thu Jan 13 11:31:15 2022 -0800
Add covar operators (#1551)
* initial change
* fix some logic
* adding basic tests
* mark update and merge unimplemented
* all tests pass
* add doc and tests
* lint
* clipy
* add more merge tests
* add merge tests for stddev
* lint
* clippy
---
README.md | 2 +-
ballista/rust/core/proto/ballista.proto | 6 +-
.../rust/core/src/serde/logical_plan/to_proto.rs | 8 +
ballista/rust/core/src/serde/mod.rs | 4 +
datafusion/src/physical_plan/aggregates.rs | 41 +-
.../physical_plan/coercion_rule/aggregate_rule.rs | 22 +-
.../src/physical_plan/expressions/covariance.rs | 723 +++++++++++++++++++++
datafusion/src/physical_plan/expressions/mod.rs | 30 +
datafusion/src/physical_plan/expressions/stddev.rs | 89 +++
.../src/physical_plan/expressions/variance.rs | 103 ++-
datafusion/tests/sql/aggregates.rs | 24 +
11 files changed, 1039 insertions(+), 13 deletions(-)
diff --git a/README.md b/README.md
index 82089f1..19ccdef 100644
--- a/README.md
+++ b/README.md
@@ -268,7 +268,7 @@ This library currently supports many SQL constructs,
including
- `CAST` to change types, including e.g. `Timestamp(Nanosecond, None)`
- Many mathematical unary and binary expressions such as `+`, `/`, `sqrt`,
`tan`, `>=`.
- `WHERE` to filter
-- `GROUP BY` together with one of the following aggregations: `MIN`, `MAX`,
`COUNT`, `SUM`, `AVG`, `VAR`, `STDDEV` (sample and population)
+- `GROUP BY` together with one of the following aggregations: `MIN`, `MAX`,
`COUNT`, `SUM`, `AVG`, `VAR`, `COVAR`, `STDDEV` (sample and population)
- `ORDER BY` together with an expression and optional `ASC` or `DESC` and also
optional `NULLS FIRST` or `NULLS LAST`
## Supported Functions
diff --git a/ballista/rust/core/proto/ballista.proto
b/ballista/rust/core/proto/ballista.proto
index 3fa14f1..cc6e00a 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -171,8 +171,10 @@ enum AggregateFunction {
ARRAY_AGG = 6;
VARIANCE=7;
VARIANCE_POP=8;
- STDDEV=9;
- STDDEV_POP=10;
+ COVARIANCE=9;
+ COVARIANCE_POP=10;
+ STDDEV=11;
+ STDDEV_POP=12;
}
message AggregateExprNode {
diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs
b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
index 46543ea..36e9ba6 100644
--- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
@@ -1039,6 +1039,12 @@ impl TryInto<protobuf::LogicalExprNode> for &Expr {
AggregateFunction::VariancePop => {
protobuf::AggregateFunction::VariancePop
}
+ AggregateFunction::Covariance => {
+ protobuf::AggregateFunction::Covariance
+ }
+ AggregateFunction::CovariancePop => {
+ protobuf::AggregateFunction::CovariancePop
+ }
AggregateFunction::Stddev =>
protobuf::AggregateFunction::Stddev,
AggregateFunction::StddevPop => {
protobuf::AggregateFunction::StddevPop
@@ -1275,6 +1281,8 @@ impl From<&AggregateFunction> for
protobuf::AggregateFunction {
AggregateFunction::ArrayAgg => Self::ArrayAgg,
AggregateFunction::Variance => Self::Variance,
AggregateFunction::VariancePop => Self::VariancePop,
+ AggregateFunction::Covariance => Self::Covariance,
+ AggregateFunction::CovariancePop => Self::CovariancePop,
AggregateFunction::Stddev => Self::Stddev,
AggregateFunction::StddevPop => Self::StddevPop,
}
diff --git a/ballista/rust/core/src/serde/mod.rs
b/ballista/rust/core/src/serde/mod.rs
index e68983a..62246a0 100644
--- a/ballista/rust/core/src/serde/mod.rs
+++ b/ballista/rust/core/src/serde/mod.rs
@@ -122,6 +122,10 @@ impl From<protobuf::AggregateFunction> for
AggregateFunction {
protobuf::AggregateFunction::ArrayAgg =>
AggregateFunction::ArrayAgg,
protobuf::AggregateFunction::Variance =>
AggregateFunction::Variance,
protobuf::AggregateFunction::VariancePop =>
AggregateFunction::VariancePop,
+ protobuf::AggregateFunction::Covariance =>
AggregateFunction::Covariance,
+ protobuf::AggregateFunction::CovariancePop => {
+ AggregateFunction::CovariancePop
+ }
protobuf::AggregateFunction::Stddev => AggregateFunction::Stddev,
protobuf::AggregateFunction::StddevPop =>
AggregateFunction::StddevPop,
}
diff --git a/datafusion/src/physical_plan/aggregates.rs
b/datafusion/src/physical_plan/aggregates.rs
index 036c69e..949dbb1 100644
--- a/datafusion/src/physical_plan/aggregates.rs
+++ b/datafusion/src/physical_plan/aggregates.rs
@@ -36,7 +36,8 @@ use crate::physical_plan::distinct_expressions;
use crate::physical_plan::expressions;
use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use expressions::{
- avg_return_type, stddev_return_type, sum_return_type, variance_return_type,
+ avg_return_type, covariance_return_type, stddev_return_type,
sum_return_type,
+ variance_return_type,
};
use std::{fmt, str::FromStr, sync::Arc};
@@ -74,6 +75,10 @@ pub enum AggregateFunction {
Stddev,
/// Standard Deviation (Population)
StddevPop,
+ /// Covariance (Sample)
+ Covariance,
+ /// Covariance (Population)
+ CovariancePop,
}
impl fmt::Display for AggregateFunction {
@@ -100,6 +105,9 @@ impl FromStr for AggregateFunction {
"stddev" => AggregateFunction::Stddev,
"stddev_samp" => AggregateFunction::Stddev,
"stddev_pop" => AggregateFunction::StddevPop,
+ "covar" => AggregateFunction::Covariance,
+ "covar_samp" => AggregateFunction::Covariance,
+ "covar_pop" => AggregateFunction::CovariancePop,
_ => {
return Err(DataFusionError::Plan(format!(
"There is no built-in function named {}",
@@ -134,6 +142,10 @@ pub fn return_type(
AggregateFunction::Sum => sum_return_type(&coerced_data_types[0]),
AggregateFunction::Variance =>
variance_return_type(&coerced_data_types[0]),
AggregateFunction::VariancePop =>
variance_return_type(&coerced_data_types[0]),
+ AggregateFunction::Covariance =>
covariance_return_type(&coerced_data_types[0]),
+ AggregateFunction::CovariancePop => {
+ covariance_return_type(&coerced_data_types[0])
+ }
AggregateFunction::Stddev =>
stddev_return_type(&coerced_data_types[0]),
AggregateFunction::StddevPop =>
stddev_return_type(&coerced_data_types[0]),
AggregateFunction::Avg => avg_return_type(&coerced_data_types[0]),
@@ -259,6 +271,30 @@ pub fn create_aggregate_expr(
"VAR_POP(DISTINCT) aggregations are not available".to_string(),
));
}
+ (AggregateFunction::Covariance, false) =>
Arc::new(expressions::Covariance::new(
+ coerced_phy_exprs[0].clone(),
+ coerced_phy_exprs[1].clone(),
+ name,
+ return_type,
+ )),
+ (AggregateFunction::Covariance, true) => {
+ return Err(DataFusionError::NotImplemented(
+ "COVAR(DISTINCT) aggregations are not available".to_string(),
+ ));
+ }
+ (AggregateFunction::CovariancePop, false) => {
+ Arc::new(expressions::CovariancePop::new(
+ coerced_phy_exprs[0].clone(),
+ coerced_phy_exprs[1].clone(),
+ name,
+ return_type,
+ ))
+ }
+ (AggregateFunction::CovariancePop, true) => {
+ return Err(DataFusionError::NotImplemented(
+ "COVAR_POP(DISTINCT) aggregations are not
available".to_string(),
+ ));
+ }
(AggregateFunction::Stddev, false) =>
Arc::new(expressions::Stddev::new(
coerced_phy_exprs[0].clone(),
name,
@@ -331,6 +367,9 @@ pub fn signature(fun: &AggregateFunction) -> Signature {
| AggregateFunction::StddevPop => {
Signature::uniform(1, NUMERICS.to_vec(), Volatility::Immutable)
}
+ AggregateFunction::Covariance | AggregateFunction::CovariancePop => {
+ Signature::uniform(2, NUMERICS.to_vec(), Volatility::Immutable)
+ }
}
}
diff --git a/datafusion/src/physical_plan/coercion_rule/aggregate_rule.rs
b/datafusion/src/physical_plan/coercion_rule/aggregate_rule.rs
index d74b4e4..2b02ba0 100644
--- a/datafusion/src/physical_plan/coercion_rule/aggregate_rule.rs
+++ b/datafusion/src/physical_plan/coercion_rule/aggregate_rule.rs
@@ -21,8 +21,8 @@ use crate::arrow::datatypes::Schema;
use crate::error::{DataFusionError, Result};
use crate::physical_plan::aggregates::AggregateFunction;
use crate::physical_plan::expressions::{
- is_avg_support_arg_type, is_stddev_support_arg_type,
is_sum_support_arg_type,
- is_variance_support_arg_type, try_cast,
+ is_avg_support_arg_type, is_covariance_support_arg_type,
is_stddev_support_arg_type,
+ is_sum_support_arg_type, is_variance_support_arg_type, try_cast,
};
use crate::physical_plan::functions::{Signature, TypeSignature};
use crate::physical_plan::PhysicalExpr;
@@ -105,6 +105,24 @@ pub(crate) fn coerce_types(
}
Ok(input_types.to_vec())
}
+ AggregateFunction::Covariance => {
+ if !is_covariance_support_arg_type(&input_types[0]) {
+ return Err(DataFusionError::Plan(format!(
+ "The function {:?} does not support inputs of type {:?}.",
+ agg_fun, input_types[0]
+ )));
+ }
+ Ok(input_types.to_vec())
+ }
+ AggregateFunction::CovariancePop => {
+ if !is_covariance_support_arg_type(&input_types[0]) {
+ return Err(DataFusionError::Plan(format!(
+ "The function {:?} does not support inputs of type {:?}.",
+ agg_fun, input_types[0]
+ )));
+ }
+ Ok(input_types.to_vec())
+ }
AggregateFunction::Stddev => {
if !is_stddev_support_arg_type(&input_types[0]) {
return Err(DataFusionError::Plan(format!(
diff --git a/datafusion/src/physical_plan/expressions/covariance.rs
b/datafusion/src/physical_plan/expressions/covariance.rs
new file mode 100644
index 0000000..454dc43
--- /dev/null
+++ b/datafusion/src/physical_plan/expressions/covariance.rs
@@ -0,0 +1,723 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query
execution
+
+use std::any::Any;
+use std::sync::Arc;
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr};
+use crate::scalar::ScalarValue;
+use arrow::array::Float64Array;
+use arrow::{
+ array::{ArrayRef, UInt64Array},
+ compute::cast,
+ datatypes::DataType,
+ datatypes::Field,
+};
+
+use super::{format_state_name, StatsType};
+
+/// COVAR and COVAR_SAMP aggregate expression
+#[derive(Debug)]
+pub struct Covariance {
+ name: String,
+ expr1: Arc<dyn PhysicalExpr>,
+ expr2: Arc<dyn PhysicalExpr>,
+}
+
+/// COVAR_POP aggregate expression
+#[derive(Debug)]
+pub struct CovariancePop {
+ name: String,
+ expr1: Arc<dyn PhysicalExpr>,
+ expr2: Arc<dyn PhysicalExpr>,
+}
+
+/// function return type of covariance
+pub(crate) fn covariance_return_type(arg_type: &DataType) -> Result<DataType> {
+ match arg_type {
+ DataType::Int8
+ | DataType::Int16
+ | DataType::Int32
+ | DataType::Int64
+ | DataType::UInt8
+ | DataType::UInt16
+ | DataType::UInt32
+ | DataType::UInt64
+ | DataType::Float32
+ | DataType::Float64 => Ok(DataType::Float64),
+ other => Err(DataFusionError::Plan(format!(
+ "VARIANCE does not support {:?}",
+ other
+ ))),
+ }
+}
+
+pub(crate) fn is_covariance_support_arg_type(arg_type: &DataType) -> bool {
+ matches!(
+ arg_type,
+ DataType::UInt8
+ | DataType::UInt16
+ | DataType::UInt32
+ | DataType::UInt64
+ | DataType::Int8
+ | DataType::Int16
+ | DataType::Int32
+ | DataType::Int64
+ | DataType::Float32
+ | DataType::Float64
+ )
+}
+
+impl Covariance {
+ /// Create a new COVAR aggregate function
+ pub fn new(
+ expr1: Arc<dyn PhysicalExpr>,
+ expr2: Arc<dyn PhysicalExpr>,
+ name: impl Into<String>,
+ data_type: DataType,
+ ) -> Self {
+ // the result of covariance just support FLOAT64 data type.
+ assert!(matches!(data_type, DataType::Float64));
+ Self {
+ name: name.into(),
+ expr1,
+ expr2,
+ }
+ }
+}
+
+impl AggregateExpr for Covariance {
+ /// Return a reference to Any that can be used for downcasting
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn field(&self) -> Result<Field> {
+ Ok(Field::new(&self.name, DataType::Float64, true))
+ }
+
+ fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+ Ok(Box::new(CovarianceAccumulator::try_new(StatsType::Sample)?))
+ }
+
+ fn state_fields(&self) -> Result<Vec<Field>> {
+ Ok(vec![
+ Field::new(
+ &format_state_name(&self.name, "count"),
+ DataType::UInt64,
+ true,
+ ),
+ Field::new(
+ &format_state_name(&self.name, "mean1"),
+ DataType::Float64,
+ true,
+ ),
+ Field::new(
+ &format_state_name(&self.name, "mean2"),
+ DataType::Float64,
+ true,
+ ),
+ Field::new(
+ &format_state_name(&self.name, "algo_const"),
+ DataType::Float64,
+ true,
+ ),
+ ])
+ }
+
+ fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+ vec![self.expr1.clone(), self.expr2.clone()]
+ }
+
+ fn name(&self) -> &str {
+ &self.name
+ }
+}
+
+impl CovariancePop {
+ /// Create a new COVAR_POP aggregate function
+ pub fn new(
+ expr1: Arc<dyn PhysicalExpr>,
+ expr2: Arc<dyn PhysicalExpr>,
+ name: impl Into<String>,
+ data_type: DataType,
+ ) -> Self {
+ // the result of covariance just support FLOAT64 data type.
+ assert!(matches!(data_type, DataType::Float64));
+ Self {
+ name: name.into(),
+ expr1,
+ expr2,
+ }
+ }
+}
+
+impl AggregateExpr for CovariancePop {
+ /// Return a reference to Any that can be used for downcasting
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn field(&self) -> Result<Field> {
+ Ok(Field::new(&self.name, DataType::Float64, true))
+ }
+
+ fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+ Ok(Box::new(CovarianceAccumulator::try_new(
+ StatsType::Population,
+ )?))
+ }
+
+ fn state_fields(&self) -> Result<Vec<Field>> {
+ Ok(vec![
+ Field::new(
+ &format_state_name(&self.name, "count"),
+ DataType::UInt64,
+ true,
+ ),
+ Field::new(
+ &format_state_name(&self.name, "mean1"),
+ DataType::Float64,
+ true,
+ ),
+ Field::new(
+ &format_state_name(&self.name, "mean2"),
+ DataType::Float64,
+ true,
+ ),
+ Field::new(
+ &format_state_name(&self.name, "algo_const"),
+ DataType::Float64,
+ true,
+ ),
+ ])
+ }
+
+ fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+ vec![self.expr1.clone(), self.expr2.clone()]
+ }
+
+ fn name(&self) -> &str {
+ &self.name
+ }
+}
+
+/// An accumulator to compute covariance
+/// The algrithm used is an online implementation and numerically stable. It
is derived from the following paper
+/// for calculating variance:
+/// Welford, B. P. (1962). "Note on a method for calculating corrected sums of
squares and products".
+/// Technometrics. 4 (3): 419–420. doi:10.2307/1266577. JSTOR 1266577.
+///
+/// The algorithm has been analyzed here:
+/// Ling, Robert F. (1974). "Comparison of Several Algorithms for Computing
Sample Means and Variances".
+/// Journal of the American Statistical Association. 69 (348): 859–866.
doi:10.2307/2286154. JSTOR 2286154.
+///
+/// Though it is not covered in the original paper but is based on the same
idea, as a result the algorithm is online,
+/// parallelizable and numerically stable.
+
+#[derive(Debug)]
+pub struct CovarianceAccumulator {
+ algo_const: f64,
+ mean1: f64,
+ mean2: f64,
+ count: u64,
+ stats_type: StatsType,
+}
+
+impl CovarianceAccumulator {
+ /// Creates a new `CovarianceAccumulator`
+ pub fn try_new(s_type: StatsType) -> Result<Self> {
+ Ok(Self {
+ algo_const: 0_f64,
+ mean1: 0_f64,
+ mean2: 0_f64,
+ count: 0_u64,
+ stats_type: s_type,
+ })
+ }
+
+ // These functions are commented out for now, they will be used in the
next PR
+ // pub fn get_count(&self) -> u64 {
+ // self.count
+ // }
+
+ // pub fn get_mean1(&self) -> f64 {
+ // self.mean1
+ // }
+
+ // pub fn get_mean2(&self) -> f64 {
+ // self.mean2
+ // }
+
+ // pub fn get_algo_const(&self) -> f64 {
+ // self.algo_const
+ // }
+}
+
+impl Accumulator for CovarianceAccumulator {
+ fn state(&self) -> Result<Vec<ScalarValue>> {
+ Ok(vec![
+ ScalarValue::from(self.count),
+ ScalarValue::from(self.mean1),
+ ScalarValue::from(self.mean2),
+ ScalarValue::from(self.algo_const),
+ ])
+ }
+
+ fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+ let values1 = &cast(&values[0], &DataType::Float64)?;
+ let values2 = &cast(&values[1], &DataType::Float64)?;
+
+ let mut arr1 = values1
+ .as_any()
+ .downcast_ref::<Float64Array>()
+ .unwrap()
+ .iter()
+ .flatten();
+ let mut arr2 = values2
+ .as_any()
+ .downcast_ref::<Float64Array>()
+ .unwrap()
+ .iter()
+ .flatten();
+
+ for _i in 0..values1.len() {
+ let value1 = arr1.next();
+ let value2 = arr2.next();
+
+ if value1 == None || value2 == None {
+ if value1 == None && value2 == None {
+ continue;
+ } else {
+ return Err(DataFusionError::Internal(
+ "The two columns are not aligned".to_string(),
+ ));
+ }
+ }
+
+ let new_count = self.count + 1;
+ let delta1 = value1.unwrap() - self.mean1;
+ let new_mean1 = delta1 / new_count as f64 + self.mean1;
+ let delta2 = value2.unwrap() - self.mean2;
+ let new_mean2 = delta2 / new_count as f64 + self.mean2;
+ let new_c = delta1 * (value2.unwrap() - new_mean2) +
self.algo_const;
+
+ self.count += 1;
+ self.mean1 = new_mean1;
+ self.mean2 = new_mean2;
+ self.algo_const = new_c;
+ }
+
+ Ok(())
+ }
+
+ fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
+ let counts = states[0].as_any().downcast_ref::<UInt64Array>().unwrap();
+ let means1 =
states[1].as_any().downcast_ref::<Float64Array>().unwrap();
+ let means2 =
states[2].as_any().downcast_ref::<Float64Array>().unwrap();
+ let cs = states[3].as_any().downcast_ref::<Float64Array>().unwrap();
+
+ for i in 0..counts.len() {
+ let c = counts.value(i);
+ if c == 0_u64 {
+ continue;
+ }
+ let new_count = self.count + c;
+ let new_mean1 = self.mean1 * self.count as f64 / new_count as f64
+ + means1.value(i) * c as f64 / new_count as f64;
+ let new_mean2 = self.mean2 * self.count as f64 / new_count as f64
+ + means2.value(i) * c as f64 / new_count as f64;
+ let delta1 = self.mean1 - means1.value(i);
+ let delta2 = self.mean2 - means2.value(i);
+ let new_c = self.algo_const
+ + cs.value(i)
+ + delta1 * delta2 * self.count as f64 * c as f64 / new_count
as f64;
+
+ self.count = new_count;
+ self.mean1 = new_mean1;
+ self.mean2 = new_mean2;
+ self.algo_const = new_c;
+ }
+ Ok(())
+ }
+
+ fn update(&mut self, _values: &[ScalarValue]) -> Result<()> {
+ unimplemented!("update_batch is implemented instead");
+ }
+
+ fn merge(&mut self, _states: &[ScalarValue]) -> Result<()> {
+ unimplemented!("merge_batch is implemented instead");
+ }
+
+ fn evaluate(&self) -> Result<ScalarValue> {
+ let count = match self.stats_type {
+ StatsType::Population => self.count,
+ StatsType::Sample => {
+ if self.count > 0 {
+ self.count - 1
+ } else {
+ self.count
+ }
+ }
+ };
+
+ if count <= 1 {
+ return Err(DataFusionError::Internal(
+ "At least two values are needed to calculate
covariance".to_string(),
+ ));
+ }
+
+ if self.count == 0 {
+ Ok(ScalarValue::Float64(None))
+ } else {
+ Ok(ScalarValue::Float64(Some(self.algo_const / count as f64)))
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::physical_plan::expressions::col;
+ use crate::{error::Result, generic_test_op2};
+ use arrow::record_batch::RecordBatch;
+ use arrow::{array::*, datatypes::*};
+
+ #[test]
+ fn covariance_f64_1() -> Result<()> {
+ let a: ArrayRef = Arc::new(Float64Array::from(vec![1_f64, 2_f64,
3_f64]));
+ let b: ArrayRef = Arc::new(Float64Array::from(vec![4_f64, 5_f64,
6_f64]));
+
+ generic_test_op2!(
+ a,
+ b,
+ DataType::Float64,
+ DataType::Float64,
+ CovariancePop,
+ ScalarValue::from(0.6666666666666666),
+ DataType::Float64
+ )
+ }
+
+ #[test]
+ fn covariance_f64_2() -> Result<()> {
+ let a: ArrayRef = Arc::new(Float64Array::from(vec![1_f64, 2_f64,
3_f64]));
+ let b: ArrayRef = Arc::new(Float64Array::from(vec![4_f64, 5_f64,
6_f64]));
+
+ generic_test_op2!(
+ a,
+ b,
+ DataType::Float64,
+ DataType::Float64,
+ Covariance,
+ ScalarValue::from(1_f64),
+ DataType::Float64
+ )
+ }
+
+ #[test]
+ fn covariance_f64_4() -> Result<()> {
+ let a: ArrayRef = Arc::new(Float64Array::from(vec![1.1_f64, 2_f64,
3_f64]));
+ let b: ArrayRef = Arc::new(Float64Array::from(vec![4.1_f64, 5_f64,
6_f64]));
+
+ generic_test_op2!(
+ a,
+ b,
+ DataType::Float64,
+ DataType::Float64,
+ Covariance,
+ ScalarValue::from(0.9033333333333335_f64),
+ DataType::Float64
+ )
+ }
+
+ #[test]
+ fn covariance_f64_5() -> Result<()> {
+ let a: ArrayRef = Arc::new(Float64Array::from(vec![1.1_f64, 2_f64,
3_f64]));
+ let b: ArrayRef = Arc::new(Float64Array::from(vec![4.1_f64, 5_f64,
6_f64]));
+
+ generic_test_op2!(
+ a,
+ b,
+ DataType::Float64,
+ DataType::Float64,
+ CovariancePop,
+ ScalarValue::from(0.6022222222222223_f64),
+ DataType::Float64
+ )
+ }
+
+ #[test]
+ fn covariance_f64_6() -> Result<()> {
+ let a = Arc::new(Float64Array::from(vec![
+ 1_f64, 2_f64, 3_f64, 1.1_f64, 2.2_f64, 3.3_f64,
+ ]));
+ let b = Arc::new(Float64Array::from(vec![
+ 4_f64, 5_f64, 6_f64, 4.4_f64, 5.5_f64, 6.6_f64,
+ ]));
+
+ generic_test_op2!(
+ a,
+ b,
+ DataType::Float64,
+ DataType::Float64,
+ CovariancePop,
+ ScalarValue::from(0.7616666666666666),
+ DataType::Float64
+ )
+ }
+
+ #[test]
+ fn covariance_i32() -> Result<()> {
+ let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
+ let b: ArrayRef = Arc::new(Int32Array::from(vec![4, 5, 6]));
+
+ generic_test_op2!(
+ a,
+ b,
+ DataType::Int32,
+ DataType::Int32,
+ CovariancePop,
+ ScalarValue::from(0.6666666666666666_f64),
+ DataType::Float64
+ )
+ }
+
+ #[test]
+ fn covariance_u32() -> Result<()> {
+ let a: ArrayRef = Arc::new(UInt32Array::from(vec![1_u32, 2_u32,
3_u32]));
+ let b: ArrayRef = Arc::new(UInt32Array::from(vec![4_u32, 5_u32,
6_u32]));
+ generic_test_op2!(
+ a,
+ b,
+ DataType::UInt32,
+ DataType::UInt32,
+ CovariancePop,
+ ScalarValue::from(0.6666666666666666_f64),
+ DataType::Float64
+ )
+ }
+
+ #[test]
+ fn covariance_f32() -> Result<()> {
+ let a: ArrayRef = Arc::new(Float32Array::from(vec![1_f32, 2_f32,
3_f32]));
+ let b: ArrayRef = Arc::new(Float32Array::from(vec![4_f32, 5_f32,
6_f32]));
+ generic_test_op2!(
+ a,
+ b,
+ DataType::Float32,
+ DataType::Float32,
+ CovariancePop,
+ ScalarValue::from(0.6666666666666666_f64),
+ DataType::Float64
+ )
+ }
+
+ #[test]
+ fn test_covariance_return_data_type() -> Result<()> {
+ let data_type = DataType::Float64;
+ let result_type = covariance_return_type(&data_type)?;
+ assert_eq!(DataType::Float64, result_type);
+
+ let data_type = DataType::Decimal(36, 10);
+ assert!(covariance_return_type(&data_type).is_err());
+ Ok(())
+ }
+
+ #[test]
+ fn covariance_i32_with_nulls_1() -> Result<()> {
+ let a: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None,
Some(3)]));
+ let b: ArrayRef = Arc::new(Int32Array::from(vec![Some(4), None,
Some(6)]));
+
+ generic_test_op2!(
+ a,
+ b,
+ DataType::Int32,
+ DataType::Int32,
+ CovariancePop,
+ ScalarValue::from(1_f64),
+ DataType::Float64
+ )
+ }
+
+ #[test]
+ fn covariance_i32_with_nulls_2() -> Result<()> {
+ let a: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None,
Some(3)]));
+ let b: ArrayRef = Arc::new(Int32Array::from(vec![Some(4), Some(5),
Some(6)]));
+
+ let schema = Schema::new(vec![
+ Field::new("a", DataType::Int32, false),
+ Field::new("b", DataType::Int32, false),
+ ]);
+ let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![a,
b])?;
+
+ let agg = Arc::new(Covariance::new(
+ col("a", &schema)?,
+ col("b", &schema)?,
+ "bla".to_string(),
+ DataType::Float64,
+ ));
+ let actual = aggregate(&batch, agg);
+ assert!(actual.is_err());
+
+ Ok(())
+ }
+
+ #[test]
+ fn covariance_i32_all_nulls() -> Result<()> {
+ let a: ArrayRef = Arc::new(Int32Array::from(vec![None, None]));
+ let b: ArrayRef = Arc::new(Int32Array::from(vec![None, None]));
+
+ let schema = Schema::new(vec![
+ Field::new("a", DataType::Int32, false),
+ Field::new("b", DataType::Int32, false),
+ ]);
+ let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![a,
b])?;
+
+ let agg = Arc::new(Covariance::new(
+ col("a", &schema)?,
+ col("b", &schema)?,
+ "bla".to_string(),
+ DataType::Float64,
+ ));
+ let actual = aggregate(&batch, agg);
+ assert!(actual.is_err());
+
+ Ok(())
+ }
+
+ #[test]
+ fn covariance_f64_merge_1() -> Result<()> {
+ let a = Arc::new(Float64Array::from(vec![1_f64, 2_f64, 3_f64]));
+ let b = Arc::new(Float64Array::from(vec![4_f64, 5_f64, 6_f64]));
+ let c = Arc::new(Float64Array::from(vec![1.1_f64, 2.2_f64, 3.3_f64]));
+ let d = Arc::new(Float64Array::from(vec![4.4_f64, 5.5_f64, 6.6_f64]));
+
+ let schema = Schema::new(vec![
+ Field::new("a", DataType::Float64, false),
+ Field::new("b", DataType::Float64, false),
+ ]);
+
+ let batch1 = RecordBatch::try_new(Arc::new(schema.clone()), vec![a,
b])?;
+ let batch2 = RecordBatch::try_new(Arc::new(schema.clone()), vec![c,
d])?;
+
+ let agg1 = Arc::new(CovariancePop::new(
+ col("a", &schema)?,
+ col("b", &schema)?,
+ "bla".to_string(),
+ DataType::Float64,
+ ));
+
+ let agg2 = Arc::new(CovariancePop::new(
+ col("a", &schema)?,
+ col("b", &schema)?,
+ "bla".to_string(),
+ DataType::Float64,
+ ));
+
+ let actual = merge(&batch1, &batch2, agg1, agg2)?;
+ assert!(actual == ScalarValue::from(0.7616666666666666));
+
+ Ok(())
+ }
+
+ #[test]
+ fn covariance_f64_merge_2() -> Result<()> {
+ let a = Arc::new(Float64Array::from(vec![1_f64, 2_f64, 3_f64]));
+ let b = Arc::new(Float64Array::from(vec![4_f64, 5_f64, 6_f64]));
+ let c = Arc::new(Float64Array::from(vec![None]));
+ let d = Arc::new(Float64Array::from(vec![None]));
+
+ let schema = Schema::new(vec![
+ Field::new("a", DataType::Float64, false),
+ Field::new("b", DataType::Float64, false),
+ ]);
+
+ let batch1 = RecordBatch::try_new(Arc::new(schema.clone()), vec![a,
b])?;
+ let batch2 = RecordBatch::try_new(Arc::new(schema.clone()), vec![c,
d])?;
+
+ let agg1 = Arc::new(CovariancePop::new(
+ col("a", &schema)?,
+ col("b", &schema)?,
+ "bla".to_string(),
+ DataType::Float64,
+ ));
+
+ let agg2 = Arc::new(CovariancePop::new(
+ col("a", &schema)?,
+ col("b", &schema)?,
+ "bla".to_string(),
+ DataType::Float64,
+ ));
+
+ let actual = merge(&batch1, &batch2, agg1, agg2)?;
+ assert!(actual == ScalarValue::from(0.6666666666666666));
+
+ Ok(())
+ }
+
+ fn aggregate(
+ batch: &RecordBatch,
+ agg: Arc<dyn AggregateExpr>,
+ ) -> Result<ScalarValue> {
+ let mut accum = agg.create_accumulator()?;
+ let expr = agg.expressions();
+ let values = expr
+ .iter()
+ .map(|e| e.evaluate(batch))
+ .map(|r| r.map(|v| v.into_array(batch.num_rows())))
+ .collect::<Result<Vec<_>>>()?;
+ accum.update_batch(&values)?;
+ accum.evaluate()
+ }
+
+ fn merge(
+ batch1: &RecordBatch,
+ batch2: &RecordBatch,
+ agg1: Arc<dyn AggregateExpr>,
+ agg2: Arc<dyn AggregateExpr>,
+ ) -> Result<ScalarValue> {
+ let mut accum1 = agg1.create_accumulator()?;
+ let mut accum2 = agg2.create_accumulator()?;
+ let expr1 = agg1.expressions();
+ let expr2 = agg2.expressions();
+
+ let values1 = expr1
+ .iter()
+ .map(|e| e.evaluate(batch1))
+ .map(|r| r.map(|v| v.into_array(batch1.num_rows())))
+ .collect::<Result<Vec<_>>>()?;
+ let values2 = expr2
+ .iter()
+ .map(|e| e.evaluate(batch2))
+ .map(|r| r.map(|v| v.into_array(batch2.num_rows())))
+ .collect::<Result<Vec<_>>>()?;
+ accum1.update_batch(&values1)?;
+ accum2.update_batch(&values2)?;
+ let state2 = accum2
+ .state()?
+ .iter()
+ .map(|v| vec![v.clone()])
+ .map(|x| ScalarValue::iter_to_array(x).unwrap())
+ .collect::<Vec<_>>();
+ accum1.merge_batch(&state2)?;
+ accum1.evaluate()
+ }
+}
diff --git a/datafusion/src/physical_plan/expressions/mod.rs
b/datafusion/src/physical_plan/expressions/mod.rs
index a85d867..6943265 100644
--- a/datafusion/src/physical_plan/expressions/mod.rs
+++ b/datafusion/src/physical_plan/expressions/mod.rs
@@ -44,6 +44,7 @@ mod lead_lag;
mod literal;
#[macro_use]
mod min_max;
+mod covariance;
mod negative;
mod not;
mod nth_value;
@@ -72,6 +73,9 @@ pub use cast::{
};
pub use column::{col, Column};
pub use count::Count;
+pub(crate) use covariance::{
+ covariance_return_type, is_covariance_support_arg_type, Covariance,
CovariancePop,
+};
pub use cume_dist::cume_dist;
pub use get_indexed_field::GetIndexedFieldExpr;
pub use in_list::{in_list, InListExpr};
@@ -172,6 +176,32 @@ mod tests {
}};
}
+ /// macro to perform an aggregation with two inputs and verify the result.
+ #[macro_export]
+ macro_rules! generic_test_op2 {
+ ($ARRAY1:expr, $ARRAY2:expr, $DATATYPE1:expr, $DATATYPE2:expr,
$OP:ident, $EXPECTED:expr, $EXPECTED_DATATYPE:expr) => {{
+ let schema = Schema::new(vec![
+ Field::new("a", $DATATYPE1, false),
+ Field::new("b", $DATATYPE2, false),
+ ]);
+ let batch =
+ RecordBatch::try_new(Arc::new(schema.clone()), vec![$ARRAY1,
$ARRAY2])?;
+
+ let agg = Arc::new(<$OP>::new(
+ col("a", &schema)?,
+ col("b", &schema)?,
+ "bla".to_string(),
+ $EXPECTED_DATATYPE,
+ ));
+ let actual = aggregate(&batch, agg)?;
+ let expected = ScalarValue::from($EXPECTED);
+
+ assert_eq!(expected, actual);
+
+ Ok(())
+ }};
+ }
+
pub fn aggregate(
batch: &RecordBatch,
agg: Arc<dyn AggregateExpr>,
diff --git a/datafusion/src/physical_plan/expressions/stddev.rs
b/datafusion/src/physical_plan/expressions/stddev.rs
index 2c85b90..191ac07 100644
--- a/datafusion/src/physical_plan/expressions/stddev.rs
+++ b/datafusion/src/physical_plan/expressions/stddev.rs
@@ -411,6 +411,62 @@ mod tests {
Ok(())
}
+ #[test]
+ fn stddev_f64_merge_1() -> Result<()> {
+ let a = Arc::new(Float64Array::from(vec![1_f64, 2_f64, 3_f64]));
+ let b = Arc::new(Float64Array::from(vec![4_f64, 5_f64]));
+
+ let schema = Schema::new(vec![Field::new("a", DataType::Float64,
false)]);
+
+ let batch1 = RecordBatch::try_new(Arc::new(schema.clone()), vec![a])?;
+ let batch2 = RecordBatch::try_new(Arc::new(schema.clone()), vec![b])?;
+
+ let agg1 = Arc::new(StddevPop::new(
+ col("a", &schema)?,
+ "bla".to_string(),
+ DataType::Float64,
+ ));
+
+ let agg2 = Arc::new(StddevPop::new(
+ col("a", &schema)?,
+ "bla".to_string(),
+ DataType::Float64,
+ ));
+
+ let actual = merge(&batch1, &batch2, agg1, agg2)?;
+ assert!(actual == ScalarValue::from(std::f64::consts::SQRT_2));
+
+ Ok(())
+ }
+
+ #[test]
+ fn stddev_f64_merge_2() -> Result<()> {
+ let a = Arc::new(Float64Array::from(vec![1_f64, 2_f64, 3_f64, 4_f64,
5_f64]));
+ let b = Arc::new(Float64Array::from(vec![None]));
+
+ let schema = Schema::new(vec![Field::new("a", DataType::Float64,
false)]);
+
+ let batch1 = RecordBatch::try_new(Arc::new(schema.clone()), vec![a])?;
+ let batch2 = RecordBatch::try_new(Arc::new(schema.clone()), vec![b])?;
+
+ let agg1 = Arc::new(StddevPop::new(
+ col("a", &schema)?,
+ "bla".to_string(),
+ DataType::Float64,
+ ));
+
+ let agg2 = Arc::new(StddevPop::new(
+ col("a", &schema)?,
+ "bla".to_string(),
+ DataType::Float64,
+ ));
+
+ let actual = merge(&batch1, &batch2, agg1, agg2)?;
+ assert!(actual == ScalarValue::from(std::f64::consts::SQRT_2));
+
+ Ok(())
+ }
+
fn aggregate(
batch: &RecordBatch,
agg: Arc<dyn AggregateExpr>,
@@ -425,4 +481,37 @@ mod tests {
accum.update_batch(&values)?;
accum.evaluate()
}
+
+ fn merge(
+ batch1: &RecordBatch,
+ batch2: &RecordBatch,
+ agg1: Arc<dyn AggregateExpr>,
+ agg2: Arc<dyn AggregateExpr>,
+ ) -> Result<ScalarValue> {
+ let mut accum1 = agg1.create_accumulator()?;
+ let mut accum2 = agg2.create_accumulator()?;
+ let expr1 = agg1.expressions();
+ let expr2 = agg2.expressions();
+
+ let values1 = expr1
+ .iter()
+ .map(|e| e.evaluate(batch1))
+ .map(|r| r.map(|v| v.into_array(batch1.num_rows())))
+ .collect::<Result<Vec<_>>>()?;
+ let values2 = expr2
+ .iter()
+ .map(|e| e.evaluate(batch2))
+ .map(|r| r.map(|v| v.into_array(batch2.num_rows())))
+ .collect::<Result<Vec<_>>>()?;
+ accum1.update_batch(&values1)?;
+ accum2.update_batch(&values2)?;
+ let state2 = accum2
+ .state()?
+ .iter()
+ .map(|v| vec![v.clone()])
+ .map(|x| ScalarValue::iter_to_array(x).unwrap())
+ .collect::<Vec<_>>();
+ accum1.merge_batch(&state2)?;
+ accum1.evaluate()
+ }
}
diff --git a/datafusion/src/physical_plan/expressions/variance.rs
b/datafusion/src/physical_plan/expressions/variance.rs
index 05d8e41..117350c 100644
--- a/datafusion/src/physical_plan/expressions/variance.rs
+++ b/datafusion/src/physical_plan/expressions/variance.rs
@@ -255,14 +255,14 @@ impl Accumulator for VarianceAccumulator {
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
let values = &cast(&values[0], &DataType::Float64)?;
- let arr = values.as_any().downcast_ref::<Float64Array>().unwrap();
-
- for i in 0..arr.len() {
- let value = arr.value(i);
+ let arr = values
+ .as_any()
+ .downcast_ref::<Float64Array>()
+ .unwrap()
+ .iter()
+ .flatten();
- if value == 0_f64 && values.is_null(i) {
- continue;
- }
+ for value in arr {
let new_count = self.count + 1;
let delta1 = value - self.mean;
let new_mean = delta1 / new_count as f64 + self.mean;
@@ -495,6 +495,62 @@ mod tests {
Ok(())
}
+ #[test]
+ fn variance_f64_merge_1() -> Result<()> {
+ let a = Arc::new(Float64Array::from(vec![1_f64, 2_f64, 3_f64]));
+ let b = Arc::new(Float64Array::from(vec![4_f64, 5_f64]));
+
+ let schema = Schema::new(vec![Field::new("a", DataType::Float64,
false)]);
+
+ let batch1 = RecordBatch::try_new(Arc::new(schema.clone()), vec![a])?;
+ let batch2 = RecordBatch::try_new(Arc::new(schema.clone()), vec![b])?;
+
+ let agg1 = Arc::new(VariancePop::new(
+ col("a", &schema)?,
+ "bla".to_string(),
+ DataType::Float64,
+ ));
+
+ let agg2 = Arc::new(VariancePop::new(
+ col("a", &schema)?,
+ "bla".to_string(),
+ DataType::Float64,
+ ));
+
+ let actual = merge(&batch1, &batch2, agg1, agg2)?;
+ assert!(actual == ScalarValue::from(2_f64));
+
+ Ok(())
+ }
+
+ #[test]
+ fn variance_f64_merge_2() -> Result<()> {
+ let a = Arc::new(Float64Array::from(vec![1_f64, 2_f64, 3_f64, 4_f64,
5_f64]));
+ let b = Arc::new(Float64Array::from(vec![None]));
+
+ let schema = Schema::new(vec![Field::new("a", DataType::Float64,
false)]);
+
+ let batch1 = RecordBatch::try_new(Arc::new(schema.clone()), vec![a])?;
+ let batch2 = RecordBatch::try_new(Arc::new(schema.clone()), vec![b])?;
+
+ let agg1 = Arc::new(VariancePop::new(
+ col("a", &schema)?,
+ "bla".to_string(),
+ DataType::Float64,
+ ));
+
+ let agg2 = Arc::new(VariancePop::new(
+ col("a", &schema)?,
+ "bla".to_string(),
+ DataType::Float64,
+ ));
+
+ let actual = merge(&batch1, &batch2, agg1, agg2)?;
+ assert!(actual == ScalarValue::from(2_f64));
+
+ Ok(())
+ }
+
fn aggregate(
batch: &RecordBatch,
agg: Arc<dyn AggregateExpr>,
@@ -509,4 +565,37 @@ mod tests {
accum.update_batch(&values)?;
accum.evaluate()
}
+
+ fn merge(
+ batch1: &RecordBatch,
+ batch2: &RecordBatch,
+ agg1: Arc<dyn AggregateExpr>,
+ agg2: Arc<dyn AggregateExpr>,
+ ) -> Result<ScalarValue> {
+ let mut accum1 = agg1.create_accumulator()?;
+ let mut accum2 = agg2.create_accumulator()?;
+ let expr1 = agg1.expressions();
+ let expr2 = agg2.expressions();
+
+ let values1 = expr1
+ .iter()
+ .map(|e| e.evaluate(batch1))
+ .map(|r| r.map(|v| v.into_array(batch1.num_rows())))
+ .collect::<Result<Vec<_>>>()?;
+ let values2 = expr2
+ .iter()
+ .map(|e| e.evaluate(batch2))
+ .map(|r| r.map(|v| v.into_array(batch2.num_rows())))
+ .collect::<Result<Vec<_>>>()?;
+ accum1.update_batch(&values1)?;
+ accum2.update_batch(&values2)?;
+ let state2 = accum2
+ .state()?
+ .iter()
+ .map(|v| vec![v.clone()])
+ .map(|x| ScalarValue::iter_to_array(x).unwrap())
+ .collect::<Vec<_>>();
+ accum1.merge_batch(&state2)?;
+ accum1.evaluate()
+ }
}
diff --git a/datafusion/tests/sql/aggregates.rs
b/datafusion/tests/sql/aggregates.rs
index edf530b..eb65601 100644
--- a/datafusion/tests/sql/aggregates.rs
+++ b/datafusion/tests/sql/aggregates.rs
@@ -50,6 +50,30 @@ async fn csv_query_avg() -> Result<()> {
}
#[tokio::test]
+async fn csv_query_covariance_1() -> Result<()> {
+ let mut ctx = ExecutionContext::new();
+ register_aggregate_csv(&mut ctx).await?;
+ let sql = "SELECT covar_pop(c2, c12) FROM aggregate_test_100";
+ let mut actual = execute(&mut ctx, sql).await;
+ actual.sort();
+ let expected = vec![vec!["-0.07916932235380847"]];
+ assert_float_eq(&expected, &actual);
+ Ok(())
+}
+
+#[tokio::test]
+async fn csv_query_covariance_2() -> Result<()> {
+ let mut ctx = ExecutionContext::new();
+ register_aggregate_csv(&mut ctx).await?;
+ let sql = "SELECT covar(c2, c12) FROM aggregate_test_100";
+ let mut actual = execute(&mut ctx, sql).await;
+ actual.sort();
+ let expected = vec![vec!["-0.07996901247859442"]];
+ assert_float_eq(&expected, &actual);
+ Ok(())
+}
+
+#[tokio::test]
async fn csv_query_variance_1() -> Result<()> {
let mut ctx = ExecutionContext::new();
register_aggregate_csv(&mut ctx).await?;