alamb commented on code in PR #5904:
URL: https://github.com/apache/arrow-datafusion/pull/5904#discussion_r1163316574


##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -419,92 +423,154 @@ impl GroupedHashAggregateStream {
             let row_values = get_at_indices(&row_aggr_input_values, 
&batch_indices);
             let normal_values = get_at_indices(&normal_aggr_input_values, 
&batch_indices);
 
+            let accumulator_set_pre =
+                get_accumulator_set_size(&groups_with_rows, row_group_states);
             // 2.1 for each key in this batch
             // 2.2 for each aggregation
             // 2.3 `slice` from each of its arrays the keys' values
             // 2.4 update / merge the accumulator with the values
             // 2.5 clear indices
-            groups_with_rows
-                .iter()
-                .zip(offsets.windows(2))
-                .try_for_each(|(group_idx, offsets)| {
-                    let group_state = &mut row_group_states[*group_idx];
-                    // 2.2
-                    self.row_accumulators
-                        .iter_mut()
-                        .zip(row_values.iter())
-                        .map(|(accumulator, aggr_array)| {
-                            (
-                                accumulator,
-                                aggr_array
-                                    .iter()
-                                    .map(|array| {
-                                        // 2.3
-                                        array.slice(offsets[0], offsets[1] - 
offsets[0])
-                                    })
-                                    .collect::<Vec<ArrayRef>>(),
-                            )
-                        })
-                        .try_for_each(|(accumulator, values)| {
-                            let mut state_accessor = 
RowAccessor::new_from_layout(
-                                self.row_aggr_layout.clone(),
-                            );
-                            state_accessor.point_to(
-                                0,
-                                group_state.aggregation_buffer.as_mut_slice(),
-                            );
-                            match self.mode {
-                                AggregateMode::Partial => {
+            match self.mode {
+                AggregateMode::Partial => {
+                    groups_with_rows
+                        .iter()
+                        .zip(offsets.windows(2))
+                        .try_for_each(|(group_idx, offsets)| {
+                            let group_state = &mut 
row_group_states[*group_idx];
+                            // 2.2
+                            self.row_accumulators
+                                .iter_mut()
+                                .zip(row_values.iter())
+                                .map(|(accumulator, aggr_array)| {
+                                    (
+                                        accumulator,
+                                        aggr_array
+                                            .iter()
+                                            .map(|array| {
+                                                // 2.3
+                                                array.slice(
+                                                    offsets[0],
+                                                    offsets[1] - offsets[0],
+                                                )
+                                            })
+                                            .collect::<Vec<ArrayRef>>(),
+                                    )
+                                })
+                                .try_for_each(|(accumulator, values)| {
+                                    let mut state_accessor = 
RowAccessor::new_from_layout(
+                                        self.row_aggr_layout.clone(),
+                                    );
+                                    state_accessor.point_to(
+                                        0,
+                                        
group_state.aggregation_buffer.as_mut_slice(),
+                                    );
                                     accumulator.update_batch(&values, &mut 
state_accessor)
-                                }
-                                AggregateMode::FinalPartitioned
-                                | AggregateMode::Final => {
-                                    // note: the aggregation here is over 
states, not values, thus the merge
-                                    accumulator.merge_batch(&values, &mut 
state_accessor)
-                                }
-                            }
-                        })
-                        // 2.5
-                        .and(Ok(()))?;
-                    // normal accumulators
-                    group_state
-                        .accumulator_set
-                        .iter_mut()
-                        .zip(normal_values.iter())
-                        .map(|(accumulator, aggr_array)| {
-                            (
-                                accumulator,
-                                aggr_array
-                                    .iter()
-                                    .map(|array| {
-                                        // 2.3
-                                        array.slice(offsets[0], offsets[1] - 
offsets[0])
-                                    })
-                                    .collect::<Vec<ArrayRef>>(),
-                            )
-                        })
-                        .try_for_each(|(accumulator, values)| {
-                            let size_pre = accumulator.size();
-                            let res = match self.mode {
-                                AggregateMode::Partial => {
+                                })
+                                // 2.5
+                                .and(Ok(()))?;
+                            // normal accumulators
+                            group_state
+                                .accumulator_set
+                                .iter_mut()
+                                .zip(normal_values.iter())
+                                .map(|(accumulator, aggr_array)| {
+                                    (
+                                        accumulator,
+                                        aggr_array
+                                            .iter()
+                                            .map(|array| {
+                                                // 2.3
+                                                array.slice(
+                                                    offsets[0],
+                                                    offsets[1] - offsets[0],
+                                                )
+                                            })
+                                            .collect::<Vec<ArrayRef>>(),
+                                    )
+                                })
+                                .try_for_each(|(accumulator, values)| {
                                     accumulator.update_batch(&values)
-                                }
-                                AggregateMode::FinalPartitioned
-                                | AggregateMode::Final => {
+                                })
+                                // 2.5
+                                .and({
+                                    group_state.indices.clear();
+                                    Ok(())
+                                })
+                        })?;
+                }
+                AggregateMode::FinalPartitioned | AggregateMode::Final => {

Review Comment:
   Maybe github is rendering the diff confusingly, but this seems like a 
significant amount of new code



##########
datafusion/physical-expr/src/aggregate/sum.rs:
##########
@@ -266,7 +266,7 @@ impl Accumulator for SumAccumulator {
     }
 
     fn size(&self) -> usize {
-        std::mem::size_of_val(self) - std::mem::size_of_val(&self.sum) + 
self.sum.size()
+        std::mem::size_of_val(self)
     }

Review Comment:
   According to the docs 
   
   
https://github.com/yahoNanJing/arrow-datafusion/blob/issue-5903/datafusion/expr/src/accumulator.rs#L88
   
   
   ```rust
       /// Allocated size required for this accumulator, in bytes, including 
`Self`.
       /// Allocated means that for internal containers such as `Vec`, the 
`capacity` should be used
       /// not the `len`
       fn size(&self) -> usize;
   ```
   
   The change in this PR seems to avoid extra allocations in ScalarValue (such 
as `ScalarValue::Utf8` which has an allocated string in it)



##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -514,6 +580,19 @@ impl GroupedHashAggregateStream {
     }
 }
 
+fn get_accumulator_set_size(
+    groups_with_rows: &[usize],
+    row_group_states: &[RowGroupState],
+) -> usize {
+    groups_with_rows.iter().fold(0usize, |acc, group_idx| {
+        let group_state = &row_group_states[*group_idx];
+        group_state
+            .accumulator_set
+            .iter()
+            .fold(acc, |acc, accumulator| acc + accumulator.size())
+    })
+}
+
 /// The state that is built for each output group.

Review Comment:
   Yes, I agree the calculation for the memory size is complicated. 
   
   I think @tustvold  has been thinking about how to improve performance in 
this area, but I am not sure how far he has gotten
   
   In general, managing individual allocations (and then accounting for their 
sizes) for each group is a significant additional overhead for grouping.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to