mustafasrepo commented on code in PR #6904:
URL: https://github.com/apache/arrow-datafusion/pull/6904#discussion_r1259832823


##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -306,460 +370,194 @@ impl RecordBatchStream for GroupedHashAggregateStream {
 }
 
 impl GroupedHashAggregateStream {
-    // Update the row_aggr_state according to groub_by values (result of 
group_by_expressions)
+    /// Calculates the group indicies for each input row of
+    /// `group_values`.
+    ///
+    /// At the return of this function,
+    /// `self.scratch_space.current_group_indices` has the same number
+    /// of entries as each array in `group_values` and holds the
+    /// correct group_index for that row.
+    ///
+    /// This is one of the core hot loops in the algorithm
     fn update_group_state(
         &mut self,
         group_values: &[ArrayRef],
         allocated: &mut usize,
-    ) -> Result<Vec<usize>> {
+    ) -> Result<()> {
+        // Convert the group keys into the row format
+        // Avoid reallocation when 
https://github.com/apache/arrow-rs/issues/4479 is available
         let group_rows = self.row_converter.convert_columns(group_values)?;
         let n_rows = group_rows.num_rows();
-        // 1.1 construct the key from the group values
-        // 1.2 construct the mapping key if it does not exist
-        // 1.3 add the row' index to `indices`
 
-        // track which entries in `aggr_state` have rows in this batch to 
aggregate
-        let mut groups_with_rows = vec![];
+        // track memory used
+        let group_values_size_pre = self.group_values.size();
+        let scratch_size_pre = self.scratch_space.size();
 
-        // 1.1 Calculate the group keys for the group values
-        let mut batch_hashes = vec![0; n_rows];
-        create_hashes(group_values, &self.random_state, &mut batch_hashes)?;
+        // tracks to which group each of the input rows belongs
+        let group_indices = &mut self.scratch_space.current_group_indices;
+        group_indices.clear();
 
-        let AggregationState {
-            map, group_states, ..
-        } = &mut self.aggr_state;
+        // 1.1 Calculate the group keys for the group values
+        let batch_hashes = &mut self.scratch_space.hashes_buffer;
+        batch_hashes.clear();
+        batch_hashes.resize(n_rows, 0);
+        create_hashes(group_values, &self.random_state, batch_hashes)?;
 
-        for (row, hash) in batch_hashes.into_iter().enumerate() {
-            let entry = map.get_mut(hash, |(_hash, group_idx)| {
+        for (row, &hash) in batch_hashes.iter().enumerate() {
+            let entry = self.map.get_mut(hash, |(_hash, group_idx)| {
                 // verify that a group that we are inserting with hash is
                 // actually the same key value as the group in
                 // existing_idx  (aka group_values @ row)
-                let group_state = &group_states[*group_idx];
-
-                group_rows.row(row) == group_state.group_by_values.row()
+                group_rows.row(row) == self.group_values.row(*group_idx)
             });
 
-            match entry {
-                // Existing entry for this group value
-                Some((_hash, group_idx)) => {
-                    let group_state = &mut group_states[*group_idx];
-
-                    // 1.3
-                    if group_state.indices.is_empty() {
-                        groups_with_rows.push(*group_idx);
-                    };
-
-                    group_state.indices.push_accounted(row as u32, allocated); 
// remember this row
-                }
-                //  1.2 Need to create new entry
+            let group_idx = match entry {
+                // Existing group_index for this group value
+                Some((_hash, group_idx)) => *group_idx,
+                //  1.2 Need to create new entry for the group
                 None => {
-                    let accumulator_set =
-                        
aggregates::create_accumulators(&self.normal_aggr_expr)?;
-                    // Add new entry to group_states and save newly created 
index
-                    let group_state = GroupState {
-                        group_by_values: group_rows.row(row).owned(),
-                        aggregation_buffer: vec![
-                            0;
-                            self.row_aggr_layout.fixed_part_width()
-                        ],
-                        accumulator_set,
-                        indices: vec![row as u32], // 1.3
-                    };
-                    let group_idx = group_states.len();
-
-                    // NOTE: do NOT include the `GroupState` struct size in 
here because this is captured by
-                    // `group_states` (see allocation down below)
-                    *allocated += 
std::mem::size_of_val(&group_state.group_by_values)
-                        + (std::mem::size_of::<u8>()
-                            * group_state.aggregation_buffer.capacity())
-                        + (std::mem::size_of::<u32>() * 
group_state.indices.capacity());
-
-                    // Allocation done by normal accumulators
-                    *allocated += (std::mem::size_of::<Box<dyn Accumulator>>()
-                        * group_state.accumulator_set.capacity())
-                        + group_state
-                            .accumulator_set
-                            .iter()
-                            .map(|accu| accu.size())
-                            .sum::<usize>();
+                    // Add new entry to aggr_state and save newly created index
+                    let group_idx = self.group_values.num_rows();
+                    self.group_values.push(group_rows.row(row));
 
                     // for hasher function, use precomputed hash value
-                    map.insert_accounted(
+                    self.map.insert_accounted(
                         (hash, group_idx),
                         |(hash, _group_index)| *hash,
                         allocated,
                     );
-
-                    group_states.push_accounted(group_state, allocated);
-
-                    groups_with_rows.push(group_idx);
+                    group_idx
                 }
             };
+            group_indices.push(group_idx);
         }
-        Ok(groups_with_rows)
-    }
 
-    // Update the accumulator results, according to row_aggr_state.
-    #[allow(clippy::too_many_arguments)]
-    fn update_accumulators_using_batch(
-        &mut self,
-        groups_with_rows: &[usize],
-        offsets: &[usize],
-        row_values: &[Vec<ArrayRef>],
-        normal_values: &[Vec<ArrayRef>],
-        row_filter_values: &[Option<ArrayRef>],
-        normal_filter_values: &[Option<ArrayRef>],
-        allocated: &mut usize,
-    ) -> Result<()> {
-        // 2.1 for each key in this batch
-        // 2.2 for each aggregation
-        // 2.3 `slice` from each of its arrays the keys' values
-        // 2.4 update / merge the accumulator with the values
-        // 2.5 clear indices
-        groups_with_rows
-            .iter()
-            .zip(offsets.windows(2))
-            .try_for_each(|(group_idx, offsets)| {
-                let group_state = &mut 
self.aggr_state.group_states[*group_idx];
-                // 2.2
-                // Process row accumulators
-                self.row_accumulators
-                    .iter_mut()
-                    .zip(row_values.iter())
-                    .zip(row_filter_values.iter())
-                    .try_for_each(|((accumulator, aggr_array), filter_opt)| {
-                        let values = slice_and_maybe_filter(
-                            aggr_array,
-                            filter_opt.as_ref(),
-                            offsets,
-                        )?;
-                        let mut state_accessor =
-                            
RowAccessor::new_from_layout(self.row_aggr_layout.clone());
-                        state_accessor
-                            .point_to(0, 
group_state.aggregation_buffer.as_mut_slice());
-                        match self.mode {
-                            AggregateMode::Partial | AggregateMode::Single => {
-                                accumulator.update_batch(&values, &mut 
state_accessor)
-                            }
-                            AggregateMode::FinalPartitioned | 
AggregateMode::Final => {
-                                // note: the aggregation here is over states, 
not values, thus the merge
-                                accumulator.merge_batch(&values, &mut 
state_accessor)
-                            }
-                        }
-                    })?;
-                // normal accumulators
-                group_state
-                    .accumulator_set
-                    .iter_mut()
-                    .zip(normal_values.iter())
-                    .zip(normal_filter_values.iter())
-                    .try_for_each(|((accumulator, aggr_array), filter_opt)| {
-                        let values = slice_and_maybe_filter(
-                            aggr_array,
-                            filter_opt.as_ref(),
-                            offsets,
-                        )?;
-                        let size_pre = accumulator.size();
-                        let res = match self.mode {
-                            AggregateMode::Partial | AggregateMode::Single => {
-                                accumulator.update_batch(&values)
-                            }
-                            AggregateMode::FinalPartitioned | 
AggregateMode::Final => {
-                                // note: the aggregation here is over states, 
not values, thus the merge
-                                accumulator.merge_batch(&values)
-                            }
-                        };
-                        let size_post = accumulator.size();
-                        *allocated += size_post.saturating_sub(size_pre);
-                        res
-                    })
-                    // 2.5
-                    .and({
-                        group_state.indices.clear();
-                        Ok(())
-                    })
-            })?;
-        Ok(())
-    }
+        // account for memory growth in scratch space
+        *allocated += self.scratch_space.size();
+        *allocated -= scratch_size_pre; // subtract after adding to avoid 
underflow
 
-    // Update the accumulator results, according to row_aggr_state.
-    fn update_accumulators_using_scalar(
-        &mut self,
-        groups_with_rows: &[usize],
-        row_values: &[Vec<ArrayRef>],
-        row_filter_values: &[Option<ArrayRef>],
-    ) -> Result<()> {
-        let filter_bool_array = row_filter_values
-            .iter()
-            .map(|filter_opt| match filter_opt {
-                Some(f) => Ok(Some(as_boolean_array(f)?)),
-                None => Ok(None),
-            })
-            .collect::<Result<Vec<_>>>()?;
-
-        for group_idx in groups_with_rows {
-            let group_state = &mut self.aggr_state.group_states[*group_idx];
-            let mut state_accessor =
-                RowAccessor::new_from_layout(self.row_aggr_layout.clone());
-            state_accessor.point_to(0, 
group_state.aggregation_buffer.as_mut_slice());
-            for idx in &group_state.indices {
-                for (accumulator, values_array, filter_array) in izip!(
-                    self.row_accumulators.iter_mut(),
-                    row_values.iter(),
-                    filter_bool_array.iter()
-                ) {
-                    if values_array.len() == 1 {
-                        let scalar_value =
-                            col_to_scalar(&values_array[0], filter_array, *idx 
as usize)?;
-                        accumulator.update_scalar(&scalar_value, &mut 
state_accessor)?;
-                    } else {
-                        let scalar_values = values_array
-                            .iter()
-                            .map(|array| {
-                                col_to_scalar(array, filter_array, *idx as 
usize)
-                            })
-                            .collect::<Result<Vec<_>>>()?;
-                        accumulator
-                            .update_scalar_values(&scalar_values, &mut 
state_accessor)?;
-                    }
-                }
-            }
-            // clear the group indices in this group
-            group_state.indices.clear();
-        }
+        // account for any memory increase used to store group_values
+        *allocated += self.group_values.size();
+        *allocated -= group_values_size_pre; // subtract after adding to avoid 
underflow
 
         Ok(())
     }
 
     /// Perform group-by aggregation for the given [`RecordBatch`].
     ///
-    /// If successful, this returns the additional number of bytes that were 
allocated during this process.
-    ///
+    /// If successful, returns the additional amount of memory, in
+    /// bytes, that were allocated during this process.
     fn group_aggregate_batch(&mut self, batch: RecordBatch) -> Result<usize> {
-        // Evaluate the grouping expressions:
+        // Evaluate the grouping expressions
         let group_by_values = evaluate_group_by(&self.group_by, &batch)?;
+
         // Keep track of memory allocated:
         let mut allocated = 0usize;
 
         // Evaluate the aggregation expressions.
-        // We could evaluate them after the `take`, but since we need to 
evaluate all
-        // of them anyways, it is more performant to do it while they are 
together.
-        let row_aggr_input_values =
-            evaluate_many(&self.row_aggregate_expressions, &batch)?;
-        let normal_aggr_input_values =
-            evaluate_many(&self.normal_aggregate_expressions, &batch)?;
-        let row_filter_values = 
evaluate_optional(&self.row_filter_expressions, &batch)?;
-        let normal_filter_values =
-            evaluate_optional(&self.normal_filter_expressions, &batch)?;
+        let input_values = evaluate_many(&self.aggregate_arguments, &batch)?;
+
+        // Evalute the filter expressions, if any, against the inputs
+        let filter_values = evaluate_optional(&self.filter_expressions, 
&batch)?;
 
         let row_converter_size_pre = self.row_converter.size();
+
         for group_values in &group_by_values {
-            let groups_with_rows =
-                self.update_group_state(group_values, &mut allocated)?;
-            // Decide the accumulators update mode, use scalar value to update 
the accumulators when all of the conditions are meet:
-            // 1) The aggregation mode is Partial or Single
-            // 2) There is not normal aggregation expressions
-            // 3) The number of affected groups is high (entries in 
`aggr_state` have rows need to update). Usually the high cardinality case
-            if matches!(self.mode, AggregateMode::Partial | 
AggregateMode::Single)
-                && normal_aggr_input_values.is_empty()
-                && normal_filter_values.is_empty()
-                && groups_with_rows.len() >= batch.num_rows() / 
self.scalar_update_factor
-            {
-                self.update_accumulators_using_scalar(
-                    &groups_with_rows,
-                    &row_aggr_input_values,
-                    &row_filter_values,
-                )?;
-            } else {
-                // Collect all indices + offsets based on keys in this vec
-                let mut batch_indices: UInt32Builder = 
UInt32Builder::with_capacity(0);
-                let mut offsets = vec![0];
-                let mut offset_so_far = 0;
-                for &group_idx in groups_with_rows.iter() {
-                    let indices = 
&self.aggr_state.group_states[group_idx].indices;
-                    batch_indices.append_slice(indices);
-                    offset_so_far += indices.len();
-                    offsets.push(offset_so_far);
+            // calculate the group indicies for each input row
+            self.update_group_state(group_values, &mut allocated)?;
+            let group_indices = &self.scratch_space.current_group_indices;
+
+            // Gather the inputs to call the actual accumulator
+            let t = self
+                .accumulators
+                .iter_mut()
+                .zip(input_values.iter())
+                .zip(filter_values.iter());

Review Comment:
   I think, `izip!` is more readable. However, It is a preference call I guess.
   ```rust
     let t = izip!(
         self.accumulators.iter_mut(),
         input_values.iter(),
         filter_values.iter()
     );
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to