This is an automated email from the ASF dual-hosted git repository. alamb pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push: new 1eb46df49 Covariance single row input & null skipping (#4852) 1eb46df49 is described below commit 1eb46df49fcd2479235500810c0562b10da77c90 Author: Eduard Karacharov <13005055+kor...@users.noreply.github.com> AuthorDate: Tue Jan 10 09:44:17 2023 +0300 Covariance single row input & null skipping (#4852) * covariance & correlation single row & null skipping * Apply suggestions from code review Co-authored-by: Mehmet Ozan Kabak <ozanka...@gmail.com> * unwrap_or_internal_err macro instead of unwrap Co-authored-by: Mehmet Ozan Kabak <ozanka...@gmail.com> --- .../tests/sqllogictests/test_files/aggregate.slt | 82 ++++++++ .../physical-expr/src/aggregate/correlation.rs | 99 +++++---- .../physical-expr/src/aggregate/covariance.rs | 221 ++++++++++++++------- 3 files changed, 292 insertions(+), 110 deletions(-) diff --git a/datafusion/core/tests/sqllogictests/test_files/aggregate.slt b/datafusion/core/tests/sqllogictests/test_files/aggregate.slt index dbb3b69ca..5ddec784e 100644 --- a/datafusion/core/tests/sqllogictests/test_files/aggregate.slt +++ b/datafusion/core/tests/sqllogictests/test_files/aggregate.slt @@ -36,12 +36,94 @@ SELECT covar(c2, c12) FROM aggregate_test_100 ---- -0.07996901247859442 +# single_row_query_covar_1 +query R +select covar_samp(sq.column1, sq.column2) from (values (1.1, 2.2)) as sq +---- +NULL + +# single_row_query_covar_2 +query R +select covar_pop(sq.column1, sq.column2) from (values (1.1, 2.2)) as sq +---- +0 + +# all_nulls_query_covar +query R +with data as ( + select null::int as f, null::int as b + union all + select null::int as f, null::int as b +) +select covar_samp(f, b), covar_pop(f, b) +from data +---- +NULL NULL + +# covar_query_with_nulls +query R +with data as ( + select 1 as f, 4 as b + union all + select null as f, 99 as b + union all + select 2 as f, 5 as b + union all + select 98 as f, null as b + union all + select 3 as f, 6 as b + union all + select null as f, null as b +) +select covar_samp(f, b), covar_pop(f, b) +from data +---- +1 0.6666666666666666 + # csv_query_correlation query R SELECT corr(c2, c12) FROM aggregate_test_100 ---- -0.19064544190576607 +# single_row_query_correlation +query R +select corr(sq.column1, sq.column2) from (values (1.1, 2.2)) as sq +---- +0 + +# all_nulls_query_correlation +query R +with data as ( + select null::int as f, null::int as b + union all + select null::int as f, null::int as b +) +select corr(f, b) +from data +---- +NULL + +# correlation_query_with_nulls +query R +with data as ( + select 1 as f, 4 as b + union all + select null as f, 99 as b + union all + select 2 as f, 5 as b + union all + select 98 as f, null as b + union all + select 3 as f, 6 as b + union all + select null as f, null as b +) +select corr(f, b) +from data +---- +1 + # csv_query_variance_1 query R SELECT var_pop(c2) FROM aggregate_test_100 diff --git a/datafusion/physical-expr/src/aggregate/correlation.rs b/datafusion/physical-expr/src/aggregate/correlation.rs index 6211c578f..1bed3fe03 100644 --- a/datafusion/physical-expr/src/aggregate/correlation.rs +++ b/datafusion/physical-expr/src/aggregate/correlation.rs @@ -22,7 +22,11 @@ use crate::aggregate::stats::StatsType; use crate::aggregate::stddev::StddevAccumulator; use crate::expressions::format_state_name; use crate::{AggregateExpr, PhysicalExpr}; -use arrow::{array::ArrayRef, datatypes::DataType, datatypes::Field}; +use arrow::{ + array::ArrayRef, + compute::{and, filter, is_not_null}, + datatypes::{DataType, Field}, +}; use datafusion_common::Result; use datafusion_common::ScalarValue; use datafusion_expr::Accumulator; @@ -145,14 +149,39 @@ impl Accumulator for CorrelationAccumulator { } fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - self.covar.update_batch(values)?; + // TODO: null input skipping logic duplicated across Correlation + // and its children accumulators. + // This could be simplified by splitting up input filtering and + // calculation logic in children accumulators, and calling only + // calculation part from Correlation + let values = if values[0].null_count() != 0 || values[1].null_count() != 0 { + let mask = and(&is_not_null(&values[0])?, &is_not_null(&values[1])?)?; + let values1 = filter(&values[0], &mask)?; + let values2 = filter(&values[1], &mask)?; + + vec![values1, values2] + } else { + values.to_vec() + }; + + self.covar.update_batch(&values)?; self.stddev1.update_batch(&values[0..1])?; self.stddev2.update_batch(&values[1..2])?; Ok(()) } fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - self.covar.retract_batch(values)?; + let values = if values[0].null_count() != 0 || values[1].null_count() != 0 { + let mask = and(&is_not_null(&values[0])?, &is_not_null(&values[1])?)?; + let values1 = filter(&values[0], &mask)?; + let values2 = filter(&values[1], &mask)?; + + vec![values1, values2] + } else { + values.to_vec() + }; + + self.covar.retract_batch(&values)?; self.stddev1.retract_batch(&values[0..1])?; self.stddev2.retract_batch(&values[1..2])?; Ok(()) @@ -341,25 +370,29 @@ mod tests { #[test] fn correlation_i32_with_nulls_2() -> Result<()> { - let a: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, Some(3)])); - let b: ArrayRef = Arc::new(Int32Array::from(vec![Some(4), Some(5), Some(6)])); - - let schema = Schema::new(vec![ - Field::new("a", DataType::Int32, true), - Field::new("b", DataType::Int32, true), - ]); - let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![a, b])?; - - let agg = Arc::new(Correlation::new( - col("a", &schema)?, - col("b", &schema)?, - "bla".to_string(), - DataType::Float64, - )); - let actual = aggregate(&batch, agg); - assert!(actual.is_err()); + let a: ArrayRef = Arc::new(Int32Array::from(vec![ + Some(1), + None, + Some(2), + Some(9), + Some(3), + ])); + let b: ArrayRef = Arc::new(Int32Array::from(vec![ + Some(4), + Some(5), + Some(5), + None, + Some(6), + ])); - Ok(()) + generic_test_op2!( + a, + b, + DataType::Int32, + DataType::Int32, + Correlation, + ScalarValue::from(1_f64) + ) } #[test] @@ -367,22 +400,14 @@ mod tests { let a: ArrayRef = Arc::new(Int32Array::from(vec![None, None])); let b: ArrayRef = Arc::new(Int32Array::from(vec![None, None])); - let schema = Schema::new(vec![ - Field::new("a", DataType::Int32, true), - Field::new("b", DataType::Int32, true), - ]); - let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![a, b])?; - - let agg = Arc::new(Correlation::new( - col("a", &schema)?, - col("b", &schema)?, - "bla".to_string(), - DataType::Float64, - )); - let actual = aggregate(&batch, agg); - assert!(actual.is_err()); - - Ok(()) + generic_test_op2!( + a, + b, + DataType::Int32, + DataType::Int32, + Correlation, + ScalarValue::Float64(None) + ) } #[test] diff --git a/datafusion/physical-expr/src/aggregate/covariance.rs b/datafusion/physical-expr/src/aggregate/covariance.rs index 3963ba643..f0625aef7 100644 --- a/datafusion/physical-expr/src/aggregate/covariance.rs +++ b/datafusion/physical-expr/src/aggregate/covariance.rs @@ -253,33 +253,35 @@ impl Accumulator for CovarianceAccumulator { let mut arr1 = downcast_value!(values1, Float64Array).iter().flatten(); let mut arr2 = downcast_value!(values2, Float64Array).iter().flatten(); - for _i in 0..values1.len() { - let value1 = arr1.next(); - let value2 = arr2.next(); + for i in 0..values1.len() { + let value1 = if values1.is_valid(i) { + arr1.next() + } else { + None + }; + let value2 = if values2.is_valid(i) { + arr2.next() + } else { + None + }; if value1.is_none() || value2.is_none() { - if value1.is_none() && value2.is_none() { - continue; - } else { - return Err(DataFusionError::Internal( - "The two columns are not aligned".to_string(), - )); - } - } else { - let value1 = unwrap_or_internal_err!(value1); - let value2 = unwrap_or_internal_err!(value2); - let new_count = self.count + 1; - let delta1 = value1 - self.mean1; - let new_mean1 = delta1 / new_count as f64 + self.mean1; - let delta2 = value2 - self.mean2; - let new_mean2 = delta2 / new_count as f64 + self.mean2; - let new_c = delta1 * (value2 - new_mean2) + self.algo_const; - - self.count += 1; - self.mean1 = new_mean1; - self.mean2 = new_mean2; - self.algo_const = new_c; + continue; } + + let value1 = unwrap_or_internal_err!(value1); + let value2 = unwrap_or_internal_err!(value2); + let new_count = self.count + 1; + let delta1 = value1 - self.mean1; + let new_mean1 = delta1 / new_count as f64 + self.mean1; + let delta2 = value2 - self.mean2; + let new_mean2 = delta2 / new_count as f64 + self.mean2; + let new_c = delta1 * (value2 - new_mean2) + self.algo_const; + + self.count += 1; + self.mean1 = new_mean1; + self.mean2 = new_mean2; + self.algo_const = new_c; } Ok(()) @@ -291,31 +293,38 @@ impl Accumulator for CovarianceAccumulator { let mut arr1 = downcast_value!(values1, Float64Array).iter().flatten(); let mut arr2 = downcast_value!(values2, Float64Array).iter().flatten(); - for _i in 0..values1.len() { - let value1 = arr1.next(); - let value2 = arr2.next(); + for i in 0..values1.len() { + let value1 = if values1.is_valid(i) { + arr1.next() + } else { + None + }; + let value2 = if values2.is_valid(i) { + arr2.next() + } else { + None + }; if value1.is_none() || value2.is_none() { - if value1.is_none() && value2.is_none() { - continue; - } else { - return Err(DataFusionError::Internal( - "The two columns are not aligned".to_string(), - )); - } + continue; } + + let value1 = unwrap_or_internal_err!(value1); + let value2 = unwrap_or_internal_err!(value2); + let new_count = self.count - 1; - let delta1 = self.mean1 - value1.unwrap(); + let delta1 = self.mean1 - value1; let new_mean1 = delta1 / new_count as f64 + self.mean1; - let delta2 = self.mean2 - value2.unwrap(); + let delta2 = self.mean2 - value2; let new_mean2 = delta2 / new_count as f64 + self.mean2; - let new_c = self.algo_const - delta1 * (new_mean2 - value2.unwrap()); + let new_c = self.algo_const - delta1 * (new_mean2 - value2); self.count -= 1; self.mean1 = new_mean1; self.mean2 = new_mean2; self.algo_const = new_c; } + Ok(()) } @@ -361,13 +370,7 @@ impl Accumulator for CovarianceAccumulator { } }; - if count <= 1 { - return Err(DataFusionError::Internal( - "At least two values are needed to calculate covariance".to_string(), - )); - } - - if self.count == 0 { + if count == 0 { Ok(ScalarValue::Float64(None)) } else { Ok(ScalarValue::Float64(Some(self.algo_const / count as f64))) @@ -529,25 +532,60 @@ mod tests { #[test] fn covariance_i32_with_nulls_2() -> Result<()> { - let a: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, Some(3)])); - let b: ArrayRef = Arc::new(Int32Array::from(vec![Some(4), Some(5), Some(6)])); + let a: ArrayRef = Arc::new(Int32Array::from(vec![ + Some(1), + None, + Some(2), + None, + Some(3), + None, + ])); + let b: ArrayRef = Arc::new(Int32Array::from(vec![ + Some(4), + Some(9), + Some(5), + Some(8), + Some(6), + None, + ])); - let schema = Schema::new(vec![ - Field::new("a", DataType::Int32, true), - Field::new("b", DataType::Int32, true), - ]); - let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![a, b])?; + generic_test_op2!( + a, + b, + DataType::Int32, + DataType::Int32, + CovariancePop, + ScalarValue::from(0.6666666666666666_f64) + ) + } - let agg = Arc::new(Covariance::new( - col("a", &schema)?, - col("b", &schema)?, - "bla".to_string(), - DataType::Float64, - )); - let actual = aggregate(&batch, agg); - assert!(actual.is_err()); + #[test] + fn covariance_i32_with_nulls_3() -> Result<()> { + let a: ArrayRef = Arc::new(Int32Array::from(vec![ + Some(1), + None, + Some(2), + None, + Some(3), + None, + ])); + let b: ArrayRef = Arc::new(Int32Array::from(vec![ + Some(4), + Some(9), + Some(5), + Some(8), + Some(6), + None, + ])); - Ok(()) + generic_test_op2!( + a, + b, + DataType::Int32, + DataType::Int32, + Covariance, + ScalarValue::from(1_f64) + ) } #[test] @@ -555,22 +593,59 @@ mod tests { let a: ArrayRef = Arc::new(Int32Array::from(vec![None, None])); let b: ArrayRef = Arc::new(Int32Array::from(vec![None, None])); - let schema = Schema::new(vec![ - Field::new("a", DataType::Int32, true), - Field::new("b", DataType::Int32, true), - ]); - let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![a, b])?; + generic_test_op2!( + a, + b, + DataType::Int32, + DataType::Int32, + Covariance, + ScalarValue::Float64(None) + ) + } - let agg = Arc::new(Covariance::new( - col("a", &schema)?, - col("b", &schema)?, - "bla".to_string(), + #[test] + fn covariance_pop_i32_all_nulls() -> Result<()> { + let a: ArrayRef = Arc::new(Int32Array::from(vec![None, None])); + let b: ArrayRef = Arc::new(Int32Array::from(vec![None, None])); + + generic_test_op2!( + a, + b, + DataType::Int32, + DataType::Int32, + CovariancePop, + ScalarValue::Float64(None) + ) + } + + #[test] + fn covariance_1_input() -> Result<()> { + let a: ArrayRef = Arc::new(Float64Array::from(vec![1_f64])); + let b: ArrayRef = Arc::new(Float64Array::from(vec![2_f64])); + + generic_test_op2!( + a, + b, DataType::Float64, - )); - let actual = aggregate(&batch, agg); - assert!(actual.is_err()); + DataType::Float64, + Covariance, + ScalarValue::Float64(None) + ) + } - Ok(()) + #[test] + fn covariance_pop_1_input() -> Result<()> { + let a: ArrayRef = Arc::new(Float64Array::from(vec![1_f64])); + let b: ArrayRef = Arc::new(Float64Array::from(vec![2_f64])); + + generic_test_op2!( + a, + b, + DataType::Float64, + DataType::Float64, + CovariancePop, + ScalarValue::from(0_f64) + ) } #[test]