rluvaton commented on code in PR #15022: URL: https://github.com/apache/datafusion/pull/15022#discussion_r2049483295
########## datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs: ########## @@ -163,6 +177,50 @@ impl GroupsAccumulatorAdapter { /// invokes f(accumulator, values) for each group that has values /// in group_indices. + fn invoke_per_accumulator<F>( + &mut self, + values: &[ArrayRef], + group_indices: &[usize], + opt_filter: Option<&BooleanArray>, + total_num_groups: usize, + f: &F, + ) -> Result<()> + where + F: Fn(&mut dyn Accumulator, &[ArrayRef]) -> Result<()>, + { + self.make_accumulators_if_needed(total_num_groups)?; + + assert_eq!(values[0].len(), group_indices.len()); + + if group_indices.is_empty() { + return Ok(()); + } + + let (sizes_pre, sizes_post) = if self.contiguous_group_indices { + self.invoke_per_accumulator_on_contiguous_group_indices( + values, + group_indices, + opt_filter, + f, + ) + } else { + self.invoke_per_accumulator_on_non_ordered_group_indices( + values, + group_indices, + opt_filter, + f, + ) + }?; + + self.adjust_allocation(sizes_pre, sizes_post); + + Ok(()) + } + + /// invokes f(accumulator, values) for each group that has values + /// in group_indices. + /// + /// if the group indices are contiguous we avoiding Review Comment: updated ########## datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs: ########## @@ -249,24 +303,92 @@ impl GroupsAccumulatorAdapter { let mut sizes_pre = 0; let mut sizes_post = 0; for (&group_idx, offsets) in iter { - let state = &mut self.states[group_idx]; - sizes_pre += state.size(); - - let values_to_accumulate = slice_and_maybe_filter( - &values, - opt_filter.as_ref().map(|f| f.as_boolean()), - offsets, - )?; - f(state.accumulator.as_mut(), &values_to_accumulate)?; + sizes_pre += self.states[group_idx].size(); + self.invoke_accumulator(group_idx, &values, offsets, opt_boolean_filter, f)?; // clear out the state so they are empty for next // iteration - state.indices.clear(); - sizes_post += state.size(); + self.states[group_idx].indices.clear(); + + sizes_post += self.states[group_idx].size(); } - self.adjust_allocation(sizes_pre, sizes_post); - Ok(()) + Ok((sizes_pre, sizes_post)) + } + + /// invokes f(accumulator, values) for each group that has values + /// in group_indices. + /// + /// This function is the same as [`Self::invoke_per_accumulator_on_non_ordered_group_indices`] but avoid reordering of the + /// input as we know that each group_index is contiguous + /// Review Comment: updated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org