mustafasrepo commented on code in PR #8558:
URL: https://github.com/apache/arrow-datafusion/pull/8558#discussion_r1432581771
##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -522,79 +579,87 @@ impl RecordBatchStream for GroupedHashAggregateStream {
impl GroupedHashAggregateStream {
/// Perform group-by aggregation for the given [`RecordBatch`].
fn group_aggregate_batch(&mut self, batch: RecordBatch) -> Result<()> {
- // Evaluate the grouping expressions
- let group_by_values = if self.spill_state.is_stream_merging {
- evaluate_group_by(&self.spill_state.merging_group_by, &batch)?
- } else {
- evaluate_group_by(&self.group_by, &batch)?
- };
-
- // Evaluate the aggregation expressions.
- let input_values = if self.spill_state.is_stream_merging {
- evaluate_many(&self.spill_state.merging_aggregate_arguments,
&batch)?
- } else {
- evaluate_many(&self.aggregate_arguments, &batch)?
- };
-
- // Evaluate the filter expressions, if any, against the inputs
- let filter_values = if self.spill_state.is_stream_merging {
- let filter_expressions = vec![None; self.accumulators.len()];
- evaluate_optional(&filter_expressions, &batch)?
- } else {
- evaluate_optional(&self.filter_expressions, &batch)?
- };
-
- for group_values in &group_by_values {
- // calculate the group indices for each input row
- let starting_num_groups = self.group_values.len();
- self.group_values
- .intern(group_values, &mut self.current_group_indices)?;
- let group_indices = &self.current_group_indices;
-
- // Update ordering information if necessary
- let total_num_groups = self.group_values.len();
- if total_num_groups > starting_num_groups {
- self.group_ordering.new_groups(
- group_values,
- group_indices,
- total_num_groups,
- )?;
- }
+ for aggregate_group in &mut self.aggregate_groups {
+ let batch = if aggregate_group.requirement.is_empty() {
+ batch.clone()
+ } else {
+ sort_batch(&batch, &aggregate_group.requirement, None)?
Review Comment:
It sorts the current batch. By changing the implementation of the
order-sensitive aggregators, we no longer depend on the property: entire input
for the group is ordered.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]