This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 1eb46df49 Covariance single row input & null skipping (#4852)
1eb46df49 is described below
commit 1eb46df49fcd2479235500810c0562b10da77c90
Author: Eduard Karacharov <[email protected]>
AuthorDate: Tue Jan 10 09:44:17 2023 +0300
Covariance single row input & null skipping (#4852)
* covariance & correlation single row & null skipping
* Apply suggestions from code review
Co-authored-by: Mehmet Ozan Kabak <[email protected]>
* unwrap_or_internal_err macro instead of unwrap
Co-authored-by: Mehmet Ozan Kabak <[email protected]>
---
.../tests/sqllogictests/test_files/aggregate.slt | 82 ++++++++
.../physical-expr/src/aggregate/correlation.rs | 99 +++++----
.../physical-expr/src/aggregate/covariance.rs | 221 ++++++++++++++-------
3 files changed, 292 insertions(+), 110 deletions(-)
diff --git a/datafusion/core/tests/sqllogictests/test_files/aggregate.slt
b/datafusion/core/tests/sqllogictests/test_files/aggregate.slt
index dbb3b69ca..5ddec784e 100644
--- a/datafusion/core/tests/sqllogictests/test_files/aggregate.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/aggregate.slt
@@ -36,12 +36,94 @@ SELECT covar(c2, c12) FROM aggregate_test_100
----
-0.07996901247859442
+# single_row_query_covar_1
+query R
+select covar_samp(sq.column1, sq.column2) from (values (1.1, 2.2)) as sq
+----
+NULL
+
+# single_row_query_covar_2
+query R
+select covar_pop(sq.column1, sq.column2) from (values (1.1, 2.2)) as sq
+----
+0
+
+# all_nulls_query_covar
+query R
+with data as (
+ select null::int as f, null::int as b
+ union all
+ select null::int as f, null::int as b
+)
+select covar_samp(f, b), covar_pop(f, b)
+from data
+----
+NULL NULL
+
+# covar_query_with_nulls
+query R
+with data as (
+ select 1 as f, 4 as b
+ union all
+ select null as f, 99 as b
+ union all
+ select 2 as f, 5 as b
+ union all
+ select 98 as f, null as b
+ union all
+ select 3 as f, 6 as b
+ union all
+ select null as f, null as b
+)
+select covar_samp(f, b), covar_pop(f, b)
+from data
+----
+1 0.6666666666666666
+
# csv_query_correlation
query R
SELECT corr(c2, c12) FROM aggregate_test_100
----
-0.19064544190576607
+# single_row_query_correlation
+query R
+select corr(sq.column1, sq.column2) from (values (1.1, 2.2)) as sq
+----
+0
+
+# all_nulls_query_correlation
+query R
+with data as (
+ select null::int as f, null::int as b
+ union all
+ select null::int as f, null::int as b
+)
+select corr(f, b)
+from data
+----
+NULL
+
+# correlation_query_with_nulls
+query R
+with data as (
+ select 1 as f, 4 as b
+ union all
+ select null as f, 99 as b
+ union all
+ select 2 as f, 5 as b
+ union all
+ select 98 as f, null as b
+ union all
+ select 3 as f, 6 as b
+ union all
+ select null as f, null as b
+)
+select corr(f, b)
+from data
+----
+1
+
# csv_query_variance_1
query R
SELECT var_pop(c2) FROM aggregate_test_100
diff --git a/datafusion/physical-expr/src/aggregate/correlation.rs
b/datafusion/physical-expr/src/aggregate/correlation.rs
index 6211c578f..1bed3fe03 100644
--- a/datafusion/physical-expr/src/aggregate/correlation.rs
+++ b/datafusion/physical-expr/src/aggregate/correlation.rs
@@ -22,7 +22,11 @@ use crate::aggregate::stats::StatsType;
use crate::aggregate::stddev::StddevAccumulator;
use crate::expressions::format_state_name;
use crate::{AggregateExpr, PhysicalExpr};
-use arrow::{array::ArrayRef, datatypes::DataType, datatypes::Field};
+use arrow::{
+ array::ArrayRef,
+ compute::{and, filter, is_not_null},
+ datatypes::{DataType, Field},
+};
use datafusion_common::Result;
use datafusion_common::ScalarValue;
use datafusion_expr::Accumulator;
@@ -145,14 +149,39 @@ impl Accumulator for CorrelationAccumulator {
}
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
- self.covar.update_batch(values)?;
+ // TODO: null input skipping logic duplicated across Correlation
+ // and its children accumulators.
+ // This could be simplified by splitting up input filtering and
+ // calculation logic in children accumulators, and calling only
+ // calculation part from Correlation
+ let values = if values[0].null_count() != 0 || values[1].null_count()
!= 0 {
+ let mask = and(&is_not_null(&values[0])?,
&is_not_null(&values[1])?)?;
+ let values1 = filter(&values[0], &mask)?;
+ let values2 = filter(&values[1], &mask)?;
+
+ vec![values1, values2]
+ } else {
+ values.to_vec()
+ };
+
+ self.covar.update_batch(&values)?;
self.stddev1.update_batch(&values[0..1])?;
self.stddev2.update_batch(&values[1..2])?;
Ok(())
}
fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
- self.covar.retract_batch(values)?;
+ let values = if values[0].null_count() != 0 || values[1].null_count()
!= 0 {
+ let mask = and(&is_not_null(&values[0])?,
&is_not_null(&values[1])?)?;
+ let values1 = filter(&values[0], &mask)?;
+ let values2 = filter(&values[1], &mask)?;
+
+ vec![values1, values2]
+ } else {
+ values.to_vec()
+ };
+
+ self.covar.retract_batch(&values)?;
self.stddev1.retract_batch(&values[0..1])?;
self.stddev2.retract_batch(&values[1..2])?;
Ok(())
@@ -341,25 +370,29 @@ mod tests {
#[test]
fn correlation_i32_with_nulls_2() -> Result<()> {
- let a: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None,
Some(3)]));
- let b: ArrayRef = Arc::new(Int32Array::from(vec![Some(4), Some(5),
Some(6)]));
-
- let schema = Schema::new(vec![
- Field::new("a", DataType::Int32, true),
- Field::new("b", DataType::Int32, true),
- ]);
- let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![a,
b])?;
-
- let agg = Arc::new(Correlation::new(
- col("a", &schema)?,
- col("b", &schema)?,
- "bla".to_string(),
- DataType::Float64,
- ));
- let actual = aggregate(&batch, agg);
- assert!(actual.is_err());
+ let a: ArrayRef = Arc::new(Int32Array::from(vec![
+ Some(1),
+ None,
+ Some(2),
+ Some(9),
+ Some(3),
+ ]));
+ let b: ArrayRef = Arc::new(Int32Array::from(vec![
+ Some(4),
+ Some(5),
+ Some(5),
+ None,
+ Some(6),
+ ]));
- Ok(())
+ generic_test_op2!(
+ a,
+ b,
+ DataType::Int32,
+ DataType::Int32,
+ Correlation,
+ ScalarValue::from(1_f64)
+ )
}
#[test]
@@ -367,22 +400,14 @@ mod tests {
let a: ArrayRef = Arc::new(Int32Array::from(vec![None, None]));
let b: ArrayRef = Arc::new(Int32Array::from(vec![None, None]));
- let schema = Schema::new(vec![
- Field::new("a", DataType::Int32, true),
- Field::new("b", DataType::Int32, true),
- ]);
- let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![a,
b])?;
-
- let agg = Arc::new(Correlation::new(
- col("a", &schema)?,
- col("b", &schema)?,
- "bla".to_string(),
- DataType::Float64,
- ));
- let actual = aggregate(&batch, agg);
- assert!(actual.is_err());
-
- Ok(())
+ generic_test_op2!(
+ a,
+ b,
+ DataType::Int32,
+ DataType::Int32,
+ Correlation,
+ ScalarValue::Float64(None)
+ )
}
#[test]
diff --git a/datafusion/physical-expr/src/aggregate/covariance.rs
b/datafusion/physical-expr/src/aggregate/covariance.rs
index 3963ba643..f0625aef7 100644
--- a/datafusion/physical-expr/src/aggregate/covariance.rs
+++ b/datafusion/physical-expr/src/aggregate/covariance.rs
@@ -253,33 +253,35 @@ impl Accumulator for CovarianceAccumulator {
let mut arr1 = downcast_value!(values1, Float64Array).iter().flatten();
let mut arr2 = downcast_value!(values2, Float64Array).iter().flatten();
- for _i in 0..values1.len() {
- let value1 = arr1.next();
- let value2 = arr2.next();
+ for i in 0..values1.len() {
+ let value1 = if values1.is_valid(i) {
+ arr1.next()
+ } else {
+ None
+ };
+ let value2 = if values2.is_valid(i) {
+ arr2.next()
+ } else {
+ None
+ };
if value1.is_none() || value2.is_none() {
- if value1.is_none() && value2.is_none() {
- continue;
- } else {
- return Err(DataFusionError::Internal(
- "The two columns are not aligned".to_string(),
- ));
- }
- } else {
- let value1 = unwrap_or_internal_err!(value1);
- let value2 = unwrap_or_internal_err!(value2);
- let new_count = self.count + 1;
- let delta1 = value1 - self.mean1;
- let new_mean1 = delta1 / new_count as f64 + self.mean1;
- let delta2 = value2 - self.mean2;
- let new_mean2 = delta2 / new_count as f64 + self.mean2;
- let new_c = delta1 * (value2 - new_mean2) + self.algo_const;
-
- self.count += 1;
- self.mean1 = new_mean1;
- self.mean2 = new_mean2;
- self.algo_const = new_c;
+ continue;
}
+
+ let value1 = unwrap_or_internal_err!(value1);
+ let value2 = unwrap_or_internal_err!(value2);
+ let new_count = self.count + 1;
+ let delta1 = value1 - self.mean1;
+ let new_mean1 = delta1 / new_count as f64 + self.mean1;
+ let delta2 = value2 - self.mean2;
+ let new_mean2 = delta2 / new_count as f64 + self.mean2;
+ let new_c = delta1 * (value2 - new_mean2) + self.algo_const;
+
+ self.count += 1;
+ self.mean1 = new_mean1;
+ self.mean2 = new_mean2;
+ self.algo_const = new_c;
}
Ok(())
@@ -291,31 +293,38 @@ impl Accumulator for CovarianceAccumulator {
let mut arr1 = downcast_value!(values1, Float64Array).iter().flatten();
let mut arr2 = downcast_value!(values2, Float64Array).iter().flatten();
- for _i in 0..values1.len() {
- let value1 = arr1.next();
- let value2 = arr2.next();
+ for i in 0..values1.len() {
+ let value1 = if values1.is_valid(i) {
+ arr1.next()
+ } else {
+ None
+ };
+ let value2 = if values2.is_valid(i) {
+ arr2.next()
+ } else {
+ None
+ };
if value1.is_none() || value2.is_none() {
- if value1.is_none() && value2.is_none() {
- continue;
- } else {
- return Err(DataFusionError::Internal(
- "The two columns are not aligned".to_string(),
- ));
- }
+ continue;
}
+
+ let value1 = unwrap_or_internal_err!(value1);
+ let value2 = unwrap_or_internal_err!(value2);
+
let new_count = self.count - 1;
- let delta1 = self.mean1 - value1.unwrap();
+ let delta1 = self.mean1 - value1;
let new_mean1 = delta1 / new_count as f64 + self.mean1;
- let delta2 = self.mean2 - value2.unwrap();
+ let delta2 = self.mean2 - value2;
let new_mean2 = delta2 / new_count as f64 + self.mean2;
- let new_c = self.algo_const - delta1 * (new_mean2 -
value2.unwrap());
+ let new_c = self.algo_const - delta1 * (new_mean2 - value2);
self.count -= 1;
self.mean1 = new_mean1;
self.mean2 = new_mean2;
self.algo_const = new_c;
}
+
Ok(())
}
@@ -361,13 +370,7 @@ impl Accumulator for CovarianceAccumulator {
}
};
- if count <= 1 {
- return Err(DataFusionError::Internal(
- "At least two values are needed to calculate
covariance".to_string(),
- ));
- }
-
- if self.count == 0 {
+ if count == 0 {
Ok(ScalarValue::Float64(None))
} else {
Ok(ScalarValue::Float64(Some(self.algo_const / count as f64)))
@@ -529,25 +532,60 @@ mod tests {
#[test]
fn covariance_i32_with_nulls_2() -> Result<()> {
- let a: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None,
Some(3)]));
- let b: ArrayRef = Arc::new(Int32Array::from(vec![Some(4), Some(5),
Some(6)]));
+ let a: ArrayRef = Arc::new(Int32Array::from(vec![
+ Some(1),
+ None,
+ Some(2),
+ None,
+ Some(3),
+ None,
+ ]));
+ let b: ArrayRef = Arc::new(Int32Array::from(vec![
+ Some(4),
+ Some(9),
+ Some(5),
+ Some(8),
+ Some(6),
+ None,
+ ]));
- let schema = Schema::new(vec![
- Field::new("a", DataType::Int32, true),
- Field::new("b", DataType::Int32, true),
- ]);
- let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![a,
b])?;
+ generic_test_op2!(
+ a,
+ b,
+ DataType::Int32,
+ DataType::Int32,
+ CovariancePop,
+ ScalarValue::from(0.6666666666666666_f64)
+ )
+ }
- let agg = Arc::new(Covariance::new(
- col("a", &schema)?,
- col("b", &schema)?,
- "bla".to_string(),
- DataType::Float64,
- ));
- let actual = aggregate(&batch, agg);
- assert!(actual.is_err());
+ #[test]
+ fn covariance_i32_with_nulls_3() -> Result<()> {
+ let a: ArrayRef = Arc::new(Int32Array::from(vec![
+ Some(1),
+ None,
+ Some(2),
+ None,
+ Some(3),
+ None,
+ ]));
+ let b: ArrayRef = Arc::new(Int32Array::from(vec![
+ Some(4),
+ Some(9),
+ Some(5),
+ Some(8),
+ Some(6),
+ None,
+ ]));
- Ok(())
+ generic_test_op2!(
+ a,
+ b,
+ DataType::Int32,
+ DataType::Int32,
+ Covariance,
+ ScalarValue::from(1_f64)
+ )
}
#[test]
@@ -555,22 +593,59 @@ mod tests {
let a: ArrayRef = Arc::new(Int32Array::from(vec![None, None]));
let b: ArrayRef = Arc::new(Int32Array::from(vec![None, None]));
- let schema = Schema::new(vec![
- Field::new("a", DataType::Int32, true),
- Field::new("b", DataType::Int32, true),
- ]);
- let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![a,
b])?;
+ generic_test_op2!(
+ a,
+ b,
+ DataType::Int32,
+ DataType::Int32,
+ Covariance,
+ ScalarValue::Float64(None)
+ )
+ }
- let agg = Arc::new(Covariance::new(
- col("a", &schema)?,
- col("b", &schema)?,
- "bla".to_string(),
+ #[test]
+ fn covariance_pop_i32_all_nulls() -> Result<()> {
+ let a: ArrayRef = Arc::new(Int32Array::from(vec![None, None]));
+ let b: ArrayRef = Arc::new(Int32Array::from(vec![None, None]));
+
+ generic_test_op2!(
+ a,
+ b,
+ DataType::Int32,
+ DataType::Int32,
+ CovariancePop,
+ ScalarValue::Float64(None)
+ )
+ }
+
+ #[test]
+ fn covariance_1_input() -> Result<()> {
+ let a: ArrayRef = Arc::new(Float64Array::from(vec![1_f64]));
+ let b: ArrayRef = Arc::new(Float64Array::from(vec![2_f64]));
+
+ generic_test_op2!(
+ a,
+ b,
DataType::Float64,
- ));
- let actual = aggregate(&batch, agg);
- assert!(actual.is_err());
+ DataType::Float64,
+ Covariance,
+ ScalarValue::Float64(None)
+ )
+ }
- Ok(())
+ #[test]
+ fn covariance_pop_1_input() -> Result<()> {
+ let a: ArrayRef = Arc::new(Float64Array::from(vec![1_f64]));
+ let b: ArrayRef = Arc::new(Float64Array::from(vec![2_f64]));
+
+ generic_test_op2!(
+ a,
+ b,
+ DataType::Float64,
+ DataType::Float64,
+ CovariancePop,
+ ScalarValue::from(0_f64)
+ )
}
#[test]