alamb commented on code in PR #6904:
URL: https://github.com/apache/arrow-datafusion/pull/6904#discussion_r1259602569


##########
datafusion/physical-expr/src/aggregate/count.rs:
##########
@@ -76,6 +85,109 @@ impl Count {
     }
 }
 
+/// An accumulator to compute the counts of [`PrimitiveArray<T>`].
+/// Stores values as native types, and does overflow checking
+///
+/// Unlike most other accumulators, COUNT never produces NULLs. If no
+/// non-null values are seen in any group the output is 0. Thus, this
+/// accumulator has no additional null or seen filter tracking.
+#[derive(Debug)]
+struct CountGroupsAccumulator {
+    /// Count per group (use i64 to make Int64Array)
+    counts: Vec<i64>,
+}
+
+impl CountGroupsAccumulator {
+    pub fn new() -> Self {
+        Self { counts: vec![] }
+    }
+}
+
+impl GroupsAccumulator for CountGroupsAccumulator {
+    fn update_batch(
+        &mut self,
+        values: &[ArrayRef],
+        group_indices: &[usize],
+        opt_filter: Option<&arrow_array::BooleanArray>,
+        total_num_groups: usize,
+    ) -> Result<()> {
+        assert_eq!(values.len(), 1, "single argument to update_batch");
+        let values = values.get(0).unwrap();
+
+        // Add one to each group's counter for each non null, non
+        // filtered value
+        self.counts.resize(total_num_groups, 0);
+        accumulate_indices(
+            group_indices,
+            values.nulls(), // ignore values
+            opt_filter,
+            |group_index| {

Review Comment:
   When I tried to use `wrapping_add` then `rustc` told me I needed to check 
the return type. Given I never expect an overflow and this is on the hot path, 
I think we should not use `wrapping_add`
   
   ```
   warning: unused return value of `core::num::<impl i64>::wrapping_add` that 
must be used
      --> datafusion/physical-expr/src/aggregate/count.rs:130:17
       |
   130 |                 self.counts[group_index].wrapping_add(1);
       |                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
       |
       = note: this returns the result of the operation, without modifying the 
original
       = note: `#[warn(unused_must_use)]` on by default
   help: use `let _ = ...` to ignore the resulting value
       |
   130 |                 let _ = self.counts[group_index].wrapping_add(1);
       |                 +++++++
   ```
   
   > Maybe we should remove the wrapping_add in the other places and "just" use 
+ etc.
   
   What other places are you referring to? I didn't see any in this PR but 
maybe I am misunderstanding 



##########
datafusion/physical-expr/src/aggregate/count.rs:
##########
@@ -76,6 +85,109 @@ impl Count {
     }
 }
 
+/// An accumulator to compute the counts of [`PrimitiveArray<T>`].
+/// Stores values as native types, and does overflow checking
+///
+/// Unlike most other accumulators, COUNT never produces NULLs. If no
+/// non-null values are seen in any group the output is 0. Thus, this
+/// accumulator has no additional null or seen filter tracking.
+#[derive(Debug)]
+struct CountGroupsAccumulator {
+    /// Count per group (use i64 to make Int64Array)
+    counts: Vec<i64>,
+}
+
+impl CountGroupsAccumulator {
+    pub fn new() -> Self {
+        Self { counts: vec![] }
+    }
+}
+
+impl GroupsAccumulator for CountGroupsAccumulator {
+    fn update_batch(
+        &mut self,
+        values: &[ArrayRef],
+        group_indices: &[usize],
+        opt_filter: Option<&arrow_array::BooleanArray>,
+        total_num_groups: usize,
+    ) -> Result<()> {
+        assert_eq!(values.len(), 1, "single argument to update_batch");
+        let values = values.get(0).unwrap();
+
+        // Add one to each group's counter for each non null, non
+        // filtered value
+        self.counts.resize(total_num_groups, 0);
+        accumulate_indices(
+            group_indices,
+            values.nulls(), // ignore values
+            opt_filter,
+            |group_index| {
+                self.counts[group_index] += 1;
+            },
+        );
+
+        Ok(())
+    }
+
+    fn merge_batch(
+        &mut self,
+        values: &[ArrayRef],
+        group_indices: &[usize],
+        opt_filter: Option<&arrow_array::BooleanArray>,
+        total_num_groups: usize,
+    ) -> Result<()> {
+        assert_eq!(values.len(), 1, "one argument to merge_batch");
+        // first batch is counts, second is partial sums
+        let partial_counts = 
values.get(0).unwrap().as_primitive::<Int64Type>();
+
+        // intermediate counts are always created as non null
+        assert_eq!(partial_counts.null_count(), 0);
+        let partial_counts = partial_counts.values();
+
+        // Adds the counts with the partial counts
+        self.counts.resize(total_num_groups, 0);
+        match opt_filter {
+            Some(filter) => filter
+                .iter()
+                .zip(group_indices.iter())
+                .zip(partial_counts.iter())
+                .for_each(|((filter_value, &group_index), partial_count)| {
+                    if let Some(true) = filter_value {
+                        self.counts[group_index] += partial_count;

Review Comment:
   see above: 
https://github.com/apache/arrow-datafusion/pull/6904#discussion_r1259602569



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to