This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 1eb46df49 Covariance single row input & null skipping (#4852)
1eb46df49 is described below

commit 1eb46df49fcd2479235500810c0562b10da77c90
Author: Eduard Karacharov <13005055+kor...@users.noreply.github.com>
AuthorDate: Tue Jan 10 09:44:17 2023 +0300

    Covariance single row input & null skipping (#4852)
    
    * covariance & correlation single row & null skipping
    
    * Apply suggestions from code review
    
    Co-authored-by: Mehmet Ozan Kabak <ozanka...@gmail.com>
    
    * unwrap_or_internal_err macro instead of unwrap
    
    Co-authored-by: Mehmet Ozan Kabak <ozanka...@gmail.com>
---
 .../tests/sqllogictests/test_files/aggregate.slt   |  82 ++++++++
 .../physical-expr/src/aggregate/correlation.rs     |  99 +++++----
 .../physical-expr/src/aggregate/covariance.rs      | 221 ++++++++++++++-------
 3 files changed, 292 insertions(+), 110 deletions(-)

diff --git a/datafusion/core/tests/sqllogictests/test_files/aggregate.slt 
b/datafusion/core/tests/sqllogictests/test_files/aggregate.slt
index dbb3b69ca..5ddec784e 100644
--- a/datafusion/core/tests/sqllogictests/test_files/aggregate.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/aggregate.slt
@@ -36,12 +36,94 @@ SELECT covar(c2, c12) FROM aggregate_test_100
 ----
 -0.07996901247859442
 
+# single_row_query_covar_1
+query R
+select covar_samp(sq.column1, sq.column2) from (values (1.1, 2.2)) as sq
+----
+NULL
+
+# single_row_query_covar_2
+query R
+select covar_pop(sq.column1, sq.column2) from (values (1.1, 2.2)) as sq
+----
+0
+
+# all_nulls_query_covar
+query R
+with data as (
+  select null::int as f, null::int as b
+  union all
+  select null::int as f, null::int as b
+)
+select covar_samp(f, b), covar_pop(f, b)
+from data
+----
+NULL NULL
+
+# covar_query_with_nulls
+query R
+with data as (
+  select 1 as f,       4 as b
+  union all
+  select null as f,   99 as b
+  union all
+  select 2 as f,       5 as b
+  union all
+  select 98 as f,   null as b
+  union all
+  select 3 as f,       6 as b
+  union all
+  select null as f, null as b
+)
+select covar_samp(f, b), covar_pop(f, b)
+from data
+----
+1 0.6666666666666666
+
 # csv_query_correlation
 query R
 SELECT corr(c2, c12) FROM aggregate_test_100
 ----
 -0.19064544190576607
 
+# single_row_query_correlation
+query R
+select corr(sq.column1, sq.column2) from (values (1.1, 2.2)) as sq
+----
+0
+
+# all_nulls_query_correlation
+query R
+with data as (
+  select null::int as f, null::int as b
+  union all
+  select null::int as f, null::int as b
+)
+select corr(f, b)
+from data
+----
+NULL
+
+# correlation_query_with_nulls
+query R
+with data as (
+  select 1 as f,       4 as b
+  union all
+  select null as f,   99 as b
+  union all
+  select 2 as f,       5 as b
+  union all
+  select 98 as f,   null as b
+  union all
+  select 3 as f,       6 as b
+  union all
+  select null as f, null as b
+)
+select corr(f, b)
+from data
+----
+1
+
 # csv_query_variance_1
 query R
 SELECT var_pop(c2) FROM aggregate_test_100
diff --git a/datafusion/physical-expr/src/aggregate/correlation.rs 
b/datafusion/physical-expr/src/aggregate/correlation.rs
index 6211c578f..1bed3fe03 100644
--- a/datafusion/physical-expr/src/aggregate/correlation.rs
+++ b/datafusion/physical-expr/src/aggregate/correlation.rs
@@ -22,7 +22,11 @@ use crate::aggregate::stats::StatsType;
 use crate::aggregate::stddev::StddevAccumulator;
 use crate::expressions::format_state_name;
 use crate::{AggregateExpr, PhysicalExpr};
-use arrow::{array::ArrayRef, datatypes::DataType, datatypes::Field};
+use arrow::{
+    array::ArrayRef,
+    compute::{and, filter, is_not_null},
+    datatypes::{DataType, Field},
+};
 use datafusion_common::Result;
 use datafusion_common::ScalarValue;
 use datafusion_expr::Accumulator;
@@ -145,14 +149,39 @@ impl Accumulator for CorrelationAccumulator {
     }
 
     fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
-        self.covar.update_batch(values)?;
+        // TODO: null input skipping logic duplicated across Correlation
+        // and its children accumulators.
+        // This could be simplified by splitting up input filtering and
+        // calculation logic in children accumulators, and calling only
+        // calculation part from Correlation
+        let values = if values[0].null_count() != 0 || values[1].null_count() 
!= 0 {
+            let mask = and(&is_not_null(&values[0])?, 
&is_not_null(&values[1])?)?;
+            let values1 = filter(&values[0], &mask)?;
+            let values2 = filter(&values[1], &mask)?;
+
+            vec![values1, values2]
+        } else {
+            values.to_vec()
+        };
+
+        self.covar.update_batch(&values)?;
         self.stddev1.update_batch(&values[0..1])?;
         self.stddev2.update_batch(&values[1..2])?;
         Ok(())
     }
 
     fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
-        self.covar.retract_batch(values)?;
+        let values = if values[0].null_count() != 0 || values[1].null_count() 
!= 0 {
+            let mask = and(&is_not_null(&values[0])?, 
&is_not_null(&values[1])?)?;
+            let values1 = filter(&values[0], &mask)?;
+            let values2 = filter(&values[1], &mask)?;
+
+            vec![values1, values2]
+        } else {
+            values.to_vec()
+        };
+
+        self.covar.retract_batch(&values)?;
         self.stddev1.retract_batch(&values[0..1])?;
         self.stddev2.retract_batch(&values[1..2])?;
         Ok(())
@@ -341,25 +370,29 @@ mod tests {
 
     #[test]
     fn correlation_i32_with_nulls_2() -> Result<()> {
-        let a: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, 
Some(3)]));
-        let b: ArrayRef = Arc::new(Int32Array::from(vec![Some(4), Some(5), 
Some(6)]));
-
-        let schema = Schema::new(vec![
-            Field::new("a", DataType::Int32, true),
-            Field::new("b", DataType::Int32, true),
-        ]);
-        let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![a, 
b])?;
-
-        let agg = Arc::new(Correlation::new(
-            col("a", &schema)?,
-            col("b", &schema)?,
-            "bla".to_string(),
-            DataType::Float64,
-        ));
-        let actual = aggregate(&batch, agg);
-        assert!(actual.is_err());
+        let a: ArrayRef = Arc::new(Int32Array::from(vec![
+            Some(1),
+            None,
+            Some(2),
+            Some(9),
+            Some(3),
+        ]));
+        let b: ArrayRef = Arc::new(Int32Array::from(vec![
+            Some(4),
+            Some(5),
+            Some(5),
+            None,
+            Some(6),
+        ]));
 
-        Ok(())
+        generic_test_op2!(
+            a,
+            b,
+            DataType::Int32,
+            DataType::Int32,
+            Correlation,
+            ScalarValue::from(1_f64)
+        )
     }
 
     #[test]
@@ -367,22 +400,14 @@ mod tests {
         let a: ArrayRef = Arc::new(Int32Array::from(vec![None, None]));
         let b: ArrayRef = Arc::new(Int32Array::from(vec![None, None]));
 
-        let schema = Schema::new(vec![
-            Field::new("a", DataType::Int32, true),
-            Field::new("b", DataType::Int32, true),
-        ]);
-        let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![a, 
b])?;
-
-        let agg = Arc::new(Correlation::new(
-            col("a", &schema)?,
-            col("b", &schema)?,
-            "bla".to_string(),
-            DataType::Float64,
-        ));
-        let actual = aggregate(&batch, agg);
-        assert!(actual.is_err());
-
-        Ok(())
+        generic_test_op2!(
+            a,
+            b,
+            DataType::Int32,
+            DataType::Int32,
+            Correlation,
+            ScalarValue::Float64(None)
+        )
     }
 
     #[test]
diff --git a/datafusion/physical-expr/src/aggregate/covariance.rs 
b/datafusion/physical-expr/src/aggregate/covariance.rs
index 3963ba643..f0625aef7 100644
--- a/datafusion/physical-expr/src/aggregate/covariance.rs
+++ b/datafusion/physical-expr/src/aggregate/covariance.rs
@@ -253,33 +253,35 @@ impl Accumulator for CovarianceAccumulator {
         let mut arr1 = downcast_value!(values1, Float64Array).iter().flatten();
         let mut arr2 = downcast_value!(values2, Float64Array).iter().flatten();
 
-        for _i in 0..values1.len() {
-            let value1 = arr1.next();
-            let value2 = arr2.next();
+        for i in 0..values1.len() {
+            let value1 = if values1.is_valid(i) {
+                arr1.next()
+            } else {
+                None
+            };
+            let value2 = if values2.is_valid(i) {
+                arr2.next()
+            } else {
+                None
+            };
 
             if value1.is_none() || value2.is_none() {
-                if value1.is_none() && value2.is_none() {
-                    continue;
-                } else {
-                    return Err(DataFusionError::Internal(
-                        "The two columns are not aligned".to_string(),
-                    ));
-                }
-            } else {
-                let value1 = unwrap_or_internal_err!(value1);
-                let value2 = unwrap_or_internal_err!(value2);
-                let new_count = self.count + 1;
-                let delta1 = value1 - self.mean1;
-                let new_mean1 = delta1 / new_count as f64 + self.mean1;
-                let delta2 = value2 - self.mean2;
-                let new_mean2 = delta2 / new_count as f64 + self.mean2;
-                let new_c = delta1 * (value2 - new_mean2) + self.algo_const;
-
-                self.count += 1;
-                self.mean1 = new_mean1;
-                self.mean2 = new_mean2;
-                self.algo_const = new_c;
+                continue;
             }
+
+            let value1 = unwrap_or_internal_err!(value1);
+            let value2 = unwrap_or_internal_err!(value2);
+            let new_count = self.count + 1;
+            let delta1 = value1 - self.mean1;
+            let new_mean1 = delta1 / new_count as f64 + self.mean1;
+            let delta2 = value2 - self.mean2;
+            let new_mean2 = delta2 / new_count as f64 + self.mean2;
+            let new_c = delta1 * (value2 - new_mean2) + self.algo_const;
+
+            self.count += 1;
+            self.mean1 = new_mean1;
+            self.mean2 = new_mean2;
+            self.algo_const = new_c;
         }
 
         Ok(())
@@ -291,31 +293,38 @@ impl Accumulator for CovarianceAccumulator {
         let mut arr1 = downcast_value!(values1, Float64Array).iter().flatten();
         let mut arr2 = downcast_value!(values2, Float64Array).iter().flatten();
 
-        for _i in 0..values1.len() {
-            let value1 = arr1.next();
-            let value2 = arr2.next();
+        for i in 0..values1.len() {
+            let value1 = if values1.is_valid(i) {
+                arr1.next()
+            } else {
+                None
+            };
+            let value2 = if values2.is_valid(i) {
+                arr2.next()
+            } else {
+                None
+            };
 
             if value1.is_none() || value2.is_none() {
-                if value1.is_none() && value2.is_none() {
-                    continue;
-                } else {
-                    return Err(DataFusionError::Internal(
-                        "The two columns are not aligned".to_string(),
-                    ));
-                }
+                continue;
             }
+
+            let value1 = unwrap_or_internal_err!(value1);
+            let value2 = unwrap_or_internal_err!(value2);
+
             let new_count = self.count - 1;
-            let delta1 = self.mean1 - value1.unwrap();
+            let delta1 = self.mean1 - value1;
             let new_mean1 = delta1 / new_count as f64 + self.mean1;
-            let delta2 = self.mean2 - value2.unwrap();
+            let delta2 = self.mean2 - value2;
             let new_mean2 = delta2 / new_count as f64 + self.mean2;
-            let new_c = self.algo_const - delta1 * (new_mean2 - 
value2.unwrap());
+            let new_c = self.algo_const - delta1 * (new_mean2 - value2);
 
             self.count -= 1;
             self.mean1 = new_mean1;
             self.mean2 = new_mean2;
             self.algo_const = new_c;
         }
+
         Ok(())
     }
 
@@ -361,13 +370,7 @@ impl Accumulator for CovarianceAccumulator {
             }
         };
 
-        if count <= 1 {
-            return Err(DataFusionError::Internal(
-                "At least two values are needed to calculate 
covariance".to_string(),
-            ));
-        }
-
-        if self.count == 0 {
+        if count == 0 {
             Ok(ScalarValue::Float64(None))
         } else {
             Ok(ScalarValue::Float64(Some(self.algo_const / count as f64)))
@@ -529,25 +532,60 @@ mod tests {
 
     #[test]
     fn covariance_i32_with_nulls_2() -> Result<()> {
-        let a: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, 
Some(3)]));
-        let b: ArrayRef = Arc::new(Int32Array::from(vec![Some(4), Some(5), 
Some(6)]));
+        let a: ArrayRef = Arc::new(Int32Array::from(vec![
+            Some(1),
+            None,
+            Some(2),
+            None,
+            Some(3),
+            None,
+        ]));
+        let b: ArrayRef = Arc::new(Int32Array::from(vec![
+            Some(4),
+            Some(9),
+            Some(5),
+            Some(8),
+            Some(6),
+            None,
+        ]));
 
-        let schema = Schema::new(vec![
-            Field::new("a", DataType::Int32, true),
-            Field::new("b", DataType::Int32, true),
-        ]);
-        let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![a, 
b])?;
+        generic_test_op2!(
+            a,
+            b,
+            DataType::Int32,
+            DataType::Int32,
+            CovariancePop,
+            ScalarValue::from(0.6666666666666666_f64)
+        )
+    }
 
-        let agg = Arc::new(Covariance::new(
-            col("a", &schema)?,
-            col("b", &schema)?,
-            "bla".to_string(),
-            DataType::Float64,
-        ));
-        let actual = aggregate(&batch, agg);
-        assert!(actual.is_err());
+    #[test]
+    fn covariance_i32_with_nulls_3() -> Result<()> {
+        let a: ArrayRef = Arc::new(Int32Array::from(vec![
+            Some(1),
+            None,
+            Some(2),
+            None,
+            Some(3),
+            None,
+        ]));
+        let b: ArrayRef = Arc::new(Int32Array::from(vec![
+            Some(4),
+            Some(9),
+            Some(5),
+            Some(8),
+            Some(6),
+            None,
+        ]));
 
-        Ok(())
+        generic_test_op2!(
+            a,
+            b,
+            DataType::Int32,
+            DataType::Int32,
+            Covariance,
+            ScalarValue::from(1_f64)
+        )
     }
 
     #[test]
@@ -555,22 +593,59 @@ mod tests {
         let a: ArrayRef = Arc::new(Int32Array::from(vec![None, None]));
         let b: ArrayRef = Arc::new(Int32Array::from(vec![None, None]));
 
-        let schema = Schema::new(vec![
-            Field::new("a", DataType::Int32, true),
-            Field::new("b", DataType::Int32, true),
-        ]);
-        let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![a, 
b])?;
+        generic_test_op2!(
+            a,
+            b,
+            DataType::Int32,
+            DataType::Int32,
+            Covariance,
+            ScalarValue::Float64(None)
+        )
+    }
 
-        let agg = Arc::new(Covariance::new(
-            col("a", &schema)?,
-            col("b", &schema)?,
-            "bla".to_string(),
+    #[test]
+    fn covariance_pop_i32_all_nulls() -> Result<()> {
+        let a: ArrayRef = Arc::new(Int32Array::from(vec![None, None]));
+        let b: ArrayRef = Arc::new(Int32Array::from(vec![None, None]));
+
+        generic_test_op2!(
+            a,
+            b,
+            DataType::Int32,
+            DataType::Int32,
+            CovariancePop,
+            ScalarValue::Float64(None)
+        )
+    }
+
+    #[test]
+    fn covariance_1_input() -> Result<()> {
+        let a: ArrayRef = Arc::new(Float64Array::from(vec![1_f64]));
+        let b: ArrayRef = Arc::new(Float64Array::from(vec![2_f64]));
+
+        generic_test_op2!(
+            a,
+            b,
             DataType::Float64,
-        ));
-        let actual = aggregate(&batch, agg);
-        assert!(actual.is_err());
+            DataType::Float64,
+            Covariance,
+            ScalarValue::Float64(None)
+        )
+    }
 
-        Ok(())
+    #[test]
+    fn covariance_pop_1_input() -> Result<()> {
+        let a: ArrayRef = Arc::new(Float64Array::from(vec![1_f64]));
+        let b: ArrayRef = Arc::new(Float64Array::from(vec![2_f64]));
+
+        generic_test_op2!(
+            a,
+            b,
+            DataType::Float64,
+            DataType::Float64,
+            CovariancePop,
+            ScalarValue::from(0_f64)
+        )
     }
 
     #[test]

Reply via email to