mingmwang commented on code in PR #6657:
URL: https://github.com/apache/arrow-datafusion/pull/6657#discussion_r1231742904
##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -761,4 +784,67 @@ impl GroupedHashAggregateStream {
}
Ok(Some(RecordBatch::try_new(self.schema.clone(), output)?))
}
+
+ fn update_one_accumulator_with_native_value<T1>(
+ &mut self,
+ groups_with_rows: &[usize],
+ agg_input_array1: &T1,
+ acc_idx1: usize,
+ filter_bool_array: &[Option<&BooleanArray>],
+ ) -> Result<()>
+ where
+ T1: ArrowArrayReader,
+ {
+ let accumulator1 = &self.row_accumulators[acc_idx1];
+ let filter_array1 = &filter_bool_array[acc_idx1];
+ for group_idx in groups_with_rows {
+ let group_state = &mut self.aggr_state.group_states[*group_idx];
+ let mut state_accessor =
+ RowAccessor::new_from_layout(self.row_aggr_layout.clone());
+ state_accessor.point_to(0,
group_state.aggregation_buffer.as_mut_slice());
+ for idx in &group_state.indices {
+ let value = col_to_value(agg_input_array1, filter_array1, *idx
as usize);
+ accumulator1.update_value::<T1::Item>(value, &mut
state_accessor);
+ }
+ // clear the group indices in this group
+ group_state.indices.clear();
+ }
+
+ Ok(())
+ }
+
+ fn update_two_accumulator2_with_native_value<T1, T2>(
Review Comment:
Yes, I think we have to handle the 2 accumulators cases specially, to avoid
the dynamic type dispatching in the inner loops and leverage generics to do the
static type dispatching, and also to achieve the best performance.
The current update flow is:
```
outmost loop : for each `group idx` in groups_with_rows
construct the RowAccessor
inner loop:s for each accumuator
update accumulator stats
```
The accumuators loop **must be** in the inner loop, so that we can benefit
from the row layout and
different accumulator stats are updated to the single row(the same
`RowAccessor`).
And what we want to achieve is to do the array type cast out side the
outmost loop
```
for each agg input arrays, type cast here
outmost loop : for each `group idx` in groups_with_rows
construct the RowAccessor
inner loop:s for each accumuator
call generic method to update the accumuator stats
```
Because the agg input types for each accumulator is different, the generic
method can not be called in the inner loops, so that I have to handle the 2
accumulators cases specially. So the update flow becomes
1 Accumuator version:
```
agg input array1, type cast to T1
outmost loop : for each `group idx` in groups_with_rows
construct the RowAccessor
generic method <T1>: update accumuator stats
```
2 Accumuators version:
```
agg input array1, type cast to T1
agg input array2, type cast to T2
outmost loop : for each `group idx` in groups_with_rows
construct the RowAccessor
generic method <T1>: update accumuator1 stats
generic method <T2>: update accumuator2 stats
```
3 Accumuators version:
```
agg input array1, type cast to T1
agg input array2, type cast to T2
agg input array3, type cast to T3
outmost loop : for each `group idx` in groups_with_rows
construct the RowAccessor
generic method <T1>: update accumuator1 stats
generic method <T2>: update accumuator2 stats
generic method <T3>: update accumuator3 stats
```
Hope this explains. I'm not sure whether there are other good ways to simply
the code.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]