alamb commented on code in PR #8558:
URL: https://github.com/apache/arrow-datafusion/pull/8558#discussion_r1430570883
##########
datafusion/sqllogictest/test_files/groupby.slt:
##########
@@ -2297,8 +2296,7 @@ Projection: sales_global.country,
ARRAY_AGG(sales_global.amount) ORDER BY [sales
physical_plan
ProjectionExec: expr=[country@0 as country, ARRAY_AGG(sales_global.amount)
ORDER BY [sales_global.amount ASC NULLS LAST]@1 as amounts]
--AggregateExec: mode=Single, gby=[country@0 as country],
aggr=[ARRAY_AGG(sales_global.amount)]
-----SortExec: expr=[amount@1 ASC NULLS LAST]
-------MemoryExec: partitions=1, partition_sizes=[1]
+----MemoryExec: partitions=1, partition_sizes=[1]
Review Comment:
I don't fully understand this plan change. Now there is no sort but the
comments above say there should be
```
# test_ordering_sensitive_aggregation
# ordering sensitive requirement should add a SortExec in the final plan. To
satisfy amount ASC
# in the aggregation
```
##########
datafusion/physical-plan/src/aggregates/mod.rs:
##########
@@ -994,6 +831,132 @@ fn group_schema(schema: &Schema, group_count: usize) ->
SchemaRef {
Arc::new(Schema::new(group_fields))
}
+/// Determines the lexical ordering requirement for an aggregate expression.
Review Comment:
was this code just moved, or was it also changed?
##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -522,79 +579,87 @@ impl RecordBatchStream for GroupedHashAggregateStream {
impl GroupedHashAggregateStream {
/// Perform group-by aggregation for the given [`RecordBatch`].
fn group_aggregate_batch(&mut self, batch: RecordBatch) -> Result<()> {
- // Evaluate the grouping expressions
- let group_by_values = if self.spill_state.is_stream_merging {
- evaluate_group_by(&self.spill_state.merging_group_by, &batch)?
- } else {
- evaluate_group_by(&self.group_by, &batch)?
- };
-
- // Evaluate the aggregation expressions.
- let input_values = if self.spill_state.is_stream_merging {
- evaluate_many(&self.spill_state.merging_aggregate_arguments,
&batch)?
- } else {
- evaluate_many(&self.aggregate_arguments, &batch)?
- };
-
- // Evaluate the filter expressions, if any, against the inputs
- let filter_values = if self.spill_state.is_stream_merging {
- let filter_expressions = vec![None; self.accumulators.len()];
- evaluate_optional(&filter_expressions, &batch)?
- } else {
- evaluate_optional(&self.filter_expressions, &batch)?
- };
-
- for group_values in &group_by_values {
- // calculate the group indices for each input row
- let starting_num_groups = self.group_values.len();
- self.group_values
- .intern(group_values, &mut self.current_group_indices)?;
- let group_indices = &self.current_group_indices;
-
- // Update ordering information if necessary
- let total_num_groups = self.group_values.len();
- if total_num_groups > starting_num_groups {
- self.group_ordering.new_groups(
- group_values,
- group_indices,
- total_num_groups,
- )?;
- }
+ for aggregate_group in &mut self.aggregate_groups {
+ let batch = if aggregate_group.requirement.is_empty() {
+ batch.clone()
+ } else {
+ sort_batch(&batch, &aggregate_group.requirement, None)?
Review Comment:
Does this sort the entire input to the group, or just the rows in the
current batch? I think it just sorts the current batch, but it would actually
need to sort the entire input for that group, right?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]