alamb commented on code in PR #12095:
URL: https://github.com/apache/datafusion/pull/12095#discussion_r1729934151


##########
datafusion/sqllogictest/test_files/aggregate.slt:
##########
@@ -500,6 +500,85 @@ select stddev(sq.column1) from (values (1.1), (2.0), 
(3.0)) as sq
 ----
 0.950438495292
 
+# csv_query_stddev_7

Review Comment:
   💯  for test coverage



##########
datafusion/functions-aggregate/src/variance.rs:
##########
@@ -344,3 +391,181 @@ impl Accumulator for VarianceAccumulator {
         true
     }
 }
+
+#[derive(Debug)]
+pub struct VarianceGroupsAccumulator {
+    m2s: Vec<f64>,
+    means: Vec<f64>,
+    counts: Vec<u64>,
+    stats_type: StatsType,
+}
+
+impl VarianceGroupsAccumulator {
+    pub fn new(s_type: StatsType) -> Self {
+        Self {
+            m2s: Vec::new(),
+            means: Vec::new(),
+            counts: Vec::new(),
+            stats_type: s_type,
+        }
+    }
+
+    fn resize(&mut self, total_num_groups: usize) {
+        self.m2s.resize(total_num_groups, 0.0);
+        self.means.resize(total_num_groups, 0.0);
+        self.counts.resize(total_num_groups, 0);
+    }
+
+    fn merge<F>(
+        group_indices: &[usize],
+        counts: &UInt64Array,
+        means: &Float64Array,
+        m2s: &Float64Array,
+        opt_filter: Option<&BooleanArray>,
+        mut value_fn: F,
+    ) where
+        F: FnMut(usize, u64, f64, f64) + Send,
+    {
+        assert_eq!(counts.null_count(), 0);
+        assert_eq!(means.null_count(), 0);
+        assert_eq!(m2s.null_count(), 0);
+
+        match opt_filter {
+            None => {
+                group_indices
+                    .iter()
+                    .zip(counts.values().iter())
+                    .zip(means.values().iter())
+                    .zip(m2s.values().iter())
+                    .for_each(|(((&group_index, &count), &mean), &m2)| {
+                        value_fn(group_index, count, mean, m2);
+                    });
+            }
+            Some(filter) => {
+                group_indices
+                    .iter()
+                    .zip(counts.values().iter())
+                    .zip(means.values().iter())
+                    .zip(m2s.values().iter())
+                    .zip(filter.iter())
+                    .for_each(
+                        |((((&group_index, &count), &mean), &m2), 
filter_value)| {
+                            if let Some(true) = filter_value {
+                                value_fn(group_index, count, mean, m2);
+                            }
+                        },
+                    );
+            }
+        }
+    }
+
+    pub fn variance(
+        &mut self,
+        emit_to: datafusion_expr::EmitTo,
+    ) -> (Vec<f64>, NullBuffer) {
+        let mut counts = emit_to.take_needed(&mut self.counts);
+        let _ = emit_to.take_needed(&mut self.means);
+        let m2s = emit_to.take_needed(&mut self.m2s);
+
+        if let StatsType::Sample = self.stats_type {
+            counts.iter_mut().for_each(|count| {
+                *count -= 1;
+            });
+        }
+        let nulls = NullBuffer::from_iter(counts.iter().map(|&count| count != 
0));
+        let variance = m2s
+            .iter()
+            .zip(counts)
+            .map(|(m2, count)| m2 / count as f64)
+            .collect();
+        (variance, nulls)
+    }
+}
+
+impl GroupsAccumulator for VarianceGroupsAccumulator {
+    fn update_batch(
+        &mut self,
+        values: &[ArrayRef],
+        group_indices: &[usize],
+        opt_filter: Option<&arrow::array::BooleanArray>,
+        total_num_groups: usize,
+    ) -> Result<()> {
+        assert_eq!(values.len(), 1, "single argument to update_batch");
+        let values = &cast(&values[0], &DataType::Float64)?;
+        let values = downcast_value!(values, Float64Array);
+
+        self.resize(total_num_groups);
+        accumulate(group_indices, values, opt_filter, |group_index, value| {
+            let (new_count, new_mean, new_m2) = update(
+                self.counts[group_index],
+                self.means[group_index],
+                self.m2s[group_index],
+                value,
+            );
+            self.counts[group_index] = new_count;
+            self.means[group_index] = new_mean;
+            self.m2s[group_index] = new_m2;
+        });
+        Ok(())
+    }
+
+    fn merge_batch(
+        &mut self,
+        values: &[ArrayRef],
+        group_indices: &[usize],
+        opt_filter: Option<&arrow::array::BooleanArray>,
+        total_num_groups: usize,
+    ) -> Result<()> {
+        assert_eq!(values.len(), 3, "two arguments to merge_batch");
+        // first batch is counts, second is partial sums

Review Comment:
   ```suggestion
           // first batch is counts, second is partial sums, third is square 
means
   ```



##########
datafusion/functions-aggregate/src/variance.rs:
##########
@@ -344,3 +391,181 @@ impl Accumulator for VarianceAccumulator {
         true
     }
 }
+
+#[derive(Debug)]
+pub struct VarianceGroupsAccumulator {
+    m2s: Vec<f64>,
+    means: Vec<f64>,
+    counts: Vec<u64>,
+    stats_type: StatsType,
+}
+
+impl VarianceGroupsAccumulator {
+    pub fn new(s_type: StatsType) -> Self {
+        Self {
+            m2s: Vec::new(),
+            means: Vec::new(),
+            counts: Vec::new(),
+            stats_type: s_type,
+        }
+    }
+
+    fn resize(&mut self, total_num_groups: usize) {
+        self.m2s.resize(total_num_groups, 0.0);
+        self.means.resize(total_num_groups, 0.0);
+        self.counts.resize(total_num_groups, 0);
+    }
+
+    fn merge<F>(
+        group_indices: &[usize],
+        counts: &UInt64Array,
+        means: &Float64Array,
+        m2s: &Float64Array,
+        opt_filter: Option<&BooleanArray>,
+        mut value_fn: F,
+    ) where
+        F: FnMut(usize, u64, f64, f64) + Send,
+    {
+        assert_eq!(counts.null_count(), 0);
+        assert_eq!(means.null_count(), 0);
+        assert_eq!(m2s.null_count(), 0);
+
+        match opt_filter {
+            None => {
+                group_indices
+                    .iter()
+                    .zip(counts.values().iter())
+                    .zip(means.values().iter())
+                    .zip(m2s.values().iter())
+                    .for_each(|(((&group_index, &count), &mean), &m2)| {
+                        value_fn(group_index, count, mean, m2);
+                    });
+            }
+            Some(filter) => {
+                group_indices
+                    .iter()
+                    .zip(counts.values().iter())
+                    .zip(means.values().iter())
+                    .zip(m2s.values().iter())
+                    .zip(filter.iter())
+                    .for_each(
+                        |((((&group_index, &count), &mean), &m2), 
filter_value)| {
+                            if let Some(true) = filter_value {
+                                value_fn(group_index, count, mean, m2);
+                            }
+                        },
+                    );
+            }
+        }
+    }
+
+    pub fn variance(
+        &mut self,
+        emit_to: datafusion_expr::EmitTo,
+    ) -> (Vec<f64>, NullBuffer) {
+        let mut counts = emit_to.take_needed(&mut self.counts);
+        let _ = emit_to.take_needed(&mut self.means);
+        let m2s = emit_to.take_needed(&mut self.m2s);
+
+        if let StatsType::Sample = self.stats_type {
+            counts.iter_mut().for_each(|count| {
+                *count -= 1;
+            });
+        }
+        let nulls = NullBuffer::from_iter(counts.iter().map(|&count| count != 
0));

Review Comment:
   👍  it took me a while to figure out how nulls were handled -- LGTM



##########
datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs:
##########
@@ -351,6 +260,106 @@ impl NullState {
     }
 }
 
+pub fn accumulate<T, F>(

Review Comment:
   Could you please add some documentation to this function that explains what 
it does and its arguments?



##########
datafusion/functions-aggregate/src/variance.rs:
##########
@@ -344,3 +391,181 @@ impl Accumulator for VarianceAccumulator {
         true
     }
 }
+
+#[derive(Debug)]
+pub struct VarianceGroupsAccumulator {
+    m2s: Vec<f64>,
+    means: Vec<f64>,
+    counts: Vec<u64>,
+    stats_type: StatsType,
+}
+
+impl VarianceGroupsAccumulator {
+    pub fn new(s_type: StatsType) -> Self {
+        Self {
+            m2s: Vec::new(),
+            means: Vec::new(),
+            counts: Vec::new(),
+            stats_type: s_type,
+        }
+    }
+
+    fn resize(&mut self, total_num_groups: usize) {
+        self.m2s.resize(total_num_groups, 0.0);
+        self.means.resize(total_num_groups, 0.0);
+        self.counts.resize(total_num_groups, 0);
+    }
+
+    fn merge<F>(
+        group_indices: &[usize],
+        counts: &UInt64Array,
+        means: &Float64Array,
+        m2s: &Float64Array,
+        opt_filter: Option<&BooleanArray>,
+        mut value_fn: F,
+    ) where
+        F: FnMut(usize, u64, f64, f64) + Send,
+    {
+        assert_eq!(counts.null_count(), 0);
+        assert_eq!(means.null_count(), 0);
+        assert_eq!(m2s.null_count(), 0);
+
+        match opt_filter {
+            None => {
+                group_indices
+                    .iter()
+                    .zip(counts.values().iter())
+                    .zip(means.values().iter())
+                    .zip(m2s.values().iter())
+                    .for_each(|(((&group_index, &count), &mean), &m2)| {
+                        value_fn(group_index, count, mean, m2);
+                    });
+            }
+            Some(filter) => {
+                group_indices
+                    .iter()
+                    .zip(counts.values().iter())
+                    .zip(means.values().iter())
+                    .zip(m2s.values().iter())
+                    .zip(filter.iter())
+                    .for_each(
+                        |((((&group_index, &count), &mean), &m2), 
filter_value)| {
+                            if let Some(true) = filter_value {
+                                value_fn(group_index, count, mean, m2);
+                            }
+                        },
+                    );
+            }
+        }
+    }
+
+    pub fn variance(
+        &mut self,
+        emit_to: datafusion_expr::EmitTo,
+    ) -> (Vec<f64>, NullBuffer) {
+        let mut counts = emit_to.take_needed(&mut self.counts);
+        let _ = emit_to.take_needed(&mut self.means);

Review Comment:
   it took me a while to understand why `self.means` was ignored in the final 
calculation (and thus is there any need to carry it through).
   
   However, with some study I see that the means are needed to calculate `m2` 
during accumulation but are not used in the final output
   
   Maybe we could point that out in a comment (I can do it as a follow on PR 
too)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to