mustafasrepo commented on code in PR #5936:
URL: https://github.com/apache/arrow-datafusion/pull/5936#discussion_r1161482858
##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -319,192 +499,31 @@ impl GroupedHashAggregateStream {
let row_converter_size_pre = self.row_converter.size();
for group_values in &group_by_values {
- let group_rows = self.row_converter.convert_columns(group_values)?;
-
- // 1.1 construct the key from the group values
- // 1.2 construct the mapping key if it does not exist
- // 1.3 add the row' index to `indices`
-
- // track which entries in `aggr_state` have rows in this batch to
aggregate
- let mut groups_with_rows = vec![];
-
- // 1.1 Calculate the group keys for the group values
- let mut batch_hashes = vec![0; batch.num_rows()];
- create_hashes(group_values, &self.random_state, &mut
batch_hashes)?;
-
- for (row, hash) in batch_hashes.into_iter().enumerate() {
- let entry = row_map.get_mut(hash, |(_hash, group_idx)| {
- // verify that a group that we are inserting with hash is
- // actually the same key value as the group in
- // existing_idx (aka group_values @ row)
- let group_state = &row_group_states[*group_idx];
- group_rows.row(row) == group_state.group_by_values.row()
- });
-
- match entry {
- // Existing entry for this group value
- Some((_hash, group_idx)) => {
- let group_state = &mut row_group_states[*group_idx];
-
- // 1.3
- if group_state.indices.is_empty() {
- groups_with_rows.push(*group_idx);
- };
-
- group_state
- .indices
- .push_accounted(row as u32, &mut allocated); //
remember this row
- }
- // 1.2 Need to create new entry
- None => {
- let accumulator_set =
-
aggregates::create_accumulators(&self.normal_aggr_expr)?;
- // Add new entry to group_states and save newly
created index
- let group_state = RowGroupState {
- group_by_values: group_rows.row(row).owned(),
- aggregation_buffer: vec![
- 0;
- self.row_aggr_layout
- .fixed_part_width()
- ],
- accumulator_set,
- indices: vec![row as u32], // 1.3
- };
- let group_idx = row_group_states.len();
-
- // NOTE: do NOT include the `RowGroupState` struct
size in here because this is captured by
- // `group_states` (see allocation down below)
- allocated += (std::mem::size_of::<u8>()
- * group_state.group_by_values.as_ref().len())
- + (std::mem::size_of::<u8>()
- * group_state.aggregation_buffer.capacity())
- + (std::mem::size_of::<u32>()
- * group_state.indices.capacity());
-
- // Allocation done by normal accumulators
- allocated += (std::mem::size_of::<Box<dyn
Accumulator>>()
- * group_state.accumulator_set.capacity())
- + group_state
- .accumulator_set
- .iter()
- .map(|accu| accu.size())
- .sum::<usize>();
-
- // for hasher function, use precomputed hash value
- row_map.insert_accounted(
- (hash, group_idx),
- |(hash, _group_index)| *hash,
- &mut allocated,
- );
-
- row_group_states.push_accounted(group_state, &mut
allocated);
-
- groups_with_rows.push(group_idx);
- }
- };
- }
+ let groups_with_rows =
+ self.update_group_state(group_values, &mut allocated)?;
// Collect all indices + offsets based on keys in this vec
let mut batch_indices: UInt32Builder =
UInt32Builder::with_capacity(0);
let mut offsets = vec![0];
let mut offset_so_far = 0;
for &group_idx in groups_with_rows.iter() {
- let indices = &row_group_states[group_idx].indices;
+ let indices =
&self.row_aggr_state.group_states[group_idx].indices;
batch_indices.append_slice(indices);
offset_so_far += indices.len();
offsets.push(offset_so_far);
}
let batch_indices = batch_indices.finish();
- let row_values = get_at_indices(&row_aggr_input_values,
&batch_indices);
Review Comment:
This chunk is moved to the function `update_accumulators`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]