alamb commented on code in PR #5904:
URL: https://github.com/apache/arrow-datafusion/pull/5904#discussion_r1163316574
##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -419,92 +423,154 @@ impl GroupedHashAggregateStream {
let row_values = get_at_indices(&row_aggr_input_values,
&batch_indices);
let normal_values = get_at_indices(&normal_aggr_input_values,
&batch_indices);
+ let accumulator_set_pre =
+ get_accumulator_set_size(&groups_with_rows, row_group_states);
// 2.1 for each key in this batch
// 2.2 for each aggregation
// 2.3 `slice` from each of its arrays the keys' values
// 2.4 update / merge the accumulator with the values
// 2.5 clear indices
- groups_with_rows
- .iter()
- .zip(offsets.windows(2))
- .try_for_each(|(group_idx, offsets)| {
- let group_state = &mut row_group_states[*group_idx];
- // 2.2
- self.row_accumulators
- .iter_mut()
- .zip(row_values.iter())
- .map(|(accumulator, aggr_array)| {
- (
- accumulator,
- aggr_array
- .iter()
- .map(|array| {
- // 2.3
- array.slice(offsets[0], offsets[1] -
offsets[0])
- })
- .collect::<Vec<ArrayRef>>(),
- )
- })
- .try_for_each(|(accumulator, values)| {
- let mut state_accessor =
RowAccessor::new_from_layout(
- self.row_aggr_layout.clone(),
- );
- state_accessor.point_to(
- 0,
- group_state.aggregation_buffer.as_mut_slice(),
- );
- match self.mode {
- AggregateMode::Partial => {
+ match self.mode {
+ AggregateMode::Partial => {
+ groups_with_rows
+ .iter()
+ .zip(offsets.windows(2))
+ .try_for_each(|(group_idx, offsets)| {
+ let group_state = &mut
row_group_states[*group_idx];
+ // 2.2
+ self.row_accumulators
+ .iter_mut()
+ .zip(row_values.iter())
+ .map(|(accumulator, aggr_array)| {
+ (
+ accumulator,
+ aggr_array
+ .iter()
+ .map(|array| {
+ // 2.3
+ array.slice(
+ offsets[0],
+ offsets[1] - offsets[0],
+ )
+ })
+ .collect::<Vec<ArrayRef>>(),
+ )
+ })
+ .try_for_each(|(accumulator, values)| {
+ let mut state_accessor =
RowAccessor::new_from_layout(
+ self.row_aggr_layout.clone(),
+ );
+ state_accessor.point_to(
+ 0,
+
group_state.aggregation_buffer.as_mut_slice(),
+ );
accumulator.update_batch(&values, &mut
state_accessor)
- }
- AggregateMode::FinalPartitioned
- | AggregateMode::Final => {
- // note: the aggregation here is over
states, not values, thus the merge
- accumulator.merge_batch(&values, &mut
state_accessor)
- }
- }
- })
- // 2.5
- .and(Ok(()))?;
- // normal accumulators
- group_state
- .accumulator_set
- .iter_mut()
- .zip(normal_values.iter())
- .map(|(accumulator, aggr_array)| {
- (
- accumulator,
- aggr_array
- .iter()
- .map(|array| {
- // 2.3
- array.slice(offsets[0], offsets[1] -
offsets[0])
- })
- .collect::<Vec<ArrayRef>>(),
- )
- })
- .try_for_each(|(accumulator, values)| {
- let size_pre = accumulator.size();
- let res = match self.mode {
- AggregateMode::Partial => {
+ })
+ // 2.5
+ .and(Ok(()))?;
+ // normal accumulators
+ group_state
+ .accumulator_set
+ .iter_mut()
+ .zip(normal_values.iter())
+ .map(|(accumulator, aggr_array)| {
+ (
+ accumulator,
+ aggr_array
+ .iter()
+ .map(|array| {
+ // 2.3
+ array.slice(
+ offsets[0],
+ offsets[1] - offsets[0],
+ )
+ })
+ .collect::<Vec<ArrayRef>>(),
+ )
+ })
+ .try_for_each(|(accumulator, values)| {
accumulator.update_batch(&values)
- }
- AggregateMode::FinalPartitioned
- | AggregateMode::Final => {
+ })
+ // 2.5
+ .and({
+ group_state.indices.clear();
+ Ok(())
+ })
+ })?;
+ }
+ AggregateMode::FinalPartitioned | AggregateMode::Final => {
Review Comment:
Maybe github is rendering the diff confusingly, but this seems like a
significant amount of new code
##########
datafusion/physical-expr/src/aggregate/sum.rs:
##########
@@ -266,7 +266,7 @@ impl Accumulator for SumAccumulator {
}
fn size(&self) -> usize {
- std::mem::size_of_val(self) - std::mem::size_of_val(&self.sum) +
self.sum.size()
+ std::mem::size_of_val(self)
}
Review Comment:
According to the docs
https://github.com/yahoNanJing/arrow-datafusion/blob/issue-5903/datafusion/expr/src/accumulator.rs#L88
```rust
/// Allocated size required for this accumulator, in bytes, including
`Self`.
/// Allocated means that for internal containers such as `Vec`, the
`capacity` should be used
/// not the `len`
fn size(&self) -> usize;
```
The change in this PR seems to avoid extra allocations in ScalarValue (such
as `ScalarValue::Utf8` which has an allocated string in it)
##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -514,6 +580,19 @@ impl GroupedHashAggregateStream {
}
}
+fn get_accumulator_set_size(
+ groups_with_rows: &[usize],
+ row_group_states: &[RowGroupState],
+) -> usize {
+ groups_with_rows.iter().fold(0usize, |acc, group_idx| {
+ let group_state = &row_group_states[*group_idx];
+ group_state
+ .accumulator_set
+ .iter()
+ .fold(acc, |acc, accumulator| acc + accumulator.size())
+ })
+}
+
/// The state that is built for each output group.
Review Comment:
Yes, I agree the calculation for the memory size is complicated.
I think @tustvold has been thinking about how to improve performance in
this area, but I am not sure how far he has gotten
In general, managing individual allocations (and then accounting for their
sizes) for each group is a significant additional overhead for grouping.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]