alamb commented on a change in pull request #1547: URL: https://github.com/apache/arrow-datafusion/pull/1547#discussion_r782496558
########## File path: datafusion/src/physical_plan/expressions/variance.rs ########## @@ -230,93 +235,186 @@ impl VarianceAccumulator { self.count } - pub fn get_mean(&self) -> ScalarValue { - self.mean.clone() + pub fn get_mean(&self) -> f64 { + self.mean } - pub fn get_m2(&self) -> ScalarValue { - self.m2.clone() + pub fn get_m2(&self) -> f64 { + self.m2 } } impl Accumulator for VarianceAccumulator { fn state(&self) -> Result<Vec<ScalarValue>> { Ok(vec![ ScalarValue::from(self.count), - self.mean.clone(), - self.m2.clone(), + ScalarValue::from(self.mean), + ScalarValue::from(self.m2), ]) } + fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + let values = &cast(&values[0], &DataType::Float64)?; + let arr = values.as_any().downcast_ref::<Float64Array>().unwrap(); + + for i in 0..arr.len() { + let value = arr.value(i); + + if value == 0_f64 && values.is_null(i) { + continue; + } + let new_count = self.count + 1; Review comment: Here is a more idiomatic way to iterate over the array and skip nulls (and also faster as it doesn't check the bounds on each access to `arr.value(i)`: ```suggestion // NB: filter map skips `None` (null) values for value in arr.iter().filter_map(|v| v) { let new_count = self.count + 1; ``` ########## File path: datafusion/src/physical_plan/expressions/variance.rs ########## @@ -230,93 +235,186 @@ impl VarianceAccumulator { self.count } - pub fn get_mean(&self) -> ScalarValue { - self.mean.clone() + pub fn get_mean(&self) -> f64 { + self.mean } - pub fn get_m2(&self) -> ScalarValue { - self.m2.clone() + pub fn get_m2(&self) -> f64 { + self.m2 } } impl Accumulator for VarianceAccumulator { fn state(&self) -> Result<Vec<ScalarValue>> { Ok(vec![ ScalarValue::from(self.count), - self.mean.clone(), - self.m2.clone(), + ScalarValue::from(self.mean), + ScalarValue::from(self.m2), ]) } + fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + let values = &cast(&values[0], &DataType::Float64)?; + let arr = values.as_any().downcast_ref::<Float64Array>().unwrap(); + + for i in 0..arr.len() { + let value = arr.value(i); + + if value == 0_f64 && values.is_null(i) { + continue; + } + let new_count = self.count + 1; + let delta1 = value - self.mean; + let new_mean = delta1 / new_count as f64 + self.mean; + let delta2 = value - new_mean; + let new_m2 = self.m2 + delta1 * delta2; + + self.count += 1; + self.mean = new_mean; + self.m2 = new_m2; + } + + Ok(()) + } + + fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { + let counts = states[0].as_any().downcast_ref::<UInt64Array>().unwrap(); + let means = states[1].as_any().downcast_ref::<Float64Array>().unwrap(); + let m2s = states[2].as_any().downcast_ref::<Float64Array>().unwrap(); + + for i in 0..counts.len() { + let c = counts.value(i); + if c == 0_u64 { + continue; + } + let new_count = self.count + c; Review comment: ```suggestion let non_null_counts = counts .iter() .enumerate() .filter_map(|(i, c)| c.map(|c| (i, c))); for (i,c) in non_null_counts { let new_count = self.count + c; ``` By the same logic as above this also skips checking bounds on each row. Though for sure I would say this is less readable :( -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org