alamb commented on code in PR #6034:
URL: https://github.com/apache/arrow-datafusion/pull/6034#discussion_r1175324013
##########
datafusion-examples/examples/custom_datasource.rs:
##########
@@ -217,7 +217,7 @@ impl ExecutionPlan for CustomExec {
datafusion::physical_plan::Partitioning::UnknownPartitioning(1)
}
- fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
+ fn output_ordering(&self) -> Option<Vec<PhysicalSortExpr>> {
Review Comment:
Why does this signature need to change? The new signature requires an
allocation / `clone` of a Vec where the previous one didn't and thus this seems
to change the API for the worse.
If it is to support a calculated output in the grouping perhaps we can
calculate the output ordering once in the constructor rather than on demand.
```
fn output_ordering(&self) -> Option<Vec<PhysicalSortExpr>> {
self.calc_aggregation_ordering().map(|state| state.ordering)
}
```
##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -591,38 +785,93 @@ impl GroupedHashAggregateStream {
}
let batch_indices = batch_indices.finish();
- let row_values = get_at_indices(&row_aggr_input_values,
&batch_indices)?;
- let normal_values =
- get_at_indices(&normal_aggr_input_values, &batch_indices)?;
let row_filter_values =
get_optional_filters(&row_filter_values, &batch_indices);
let normal_filter_values =
get_optional_filters(&normal_filter_values,
&batch_indices);
- self.update_accumulators_using_batch(
- &groups_with_rows,
- &offsets,
- &row_values,
- &normal_values,
- &row_filter_values,
- &normal_filter_values,
- &mut allocated,
- )?;
+ if self
+ .aggregation_ordering
+ .as_ref()
+ .map_or(false, |s| s.mode == GroupByOrderMode::Ordered)
+ {
+ self.update_accumulators_using_batch(
+ &groups_with_rows,
+ &offsets,
+ &row_aggr_input_values,
+ &normal_aggr_input_values,
+ &row_filter_values,
+ &normal_filter_values,
+ &mut allocated,
+ )?;
+ } else {
+ let row_values =
+ get_at_indices(&row_aggr_input_values,
&batch_indices)?;
+ let normal_values =
+ get_at_indices(&normal_aggr_input_values,
&batch_indices)?;
+ self.update_accumulators_using_batch(
+ &groups_with_rows,
+ &offsets,
+ &row_values,
+ &normal_values,
+ &row_filter_values,
+ &normal_filter_values,
+ &mut allocated,
+ )?;
+ };
}
}
allocated += self
.row_converter
.size()
.saturating_sub(row_converter_size_pre);
+
+ if self.aggregation_ordering.is_some() {
+ let mut new_result = false;
+ let last_ordered_columns = self
+ .aggr_state
+ .group_states
+ .last()
+ .map(|item| item.ordered_columns.clone());
+
+ if let Some(last_ordered_columns) = last_ordered_columns {
Review Comment:
I may be mis understanding this code, but it seems like it is tracking
per-group if the group can be emitted or not. As I understand The `“Partial
Streaming” / “Partitioned Streaming”` section of
https://docs.google.com/document/d/16rm5VR1nGkY6DedMCh1NUmThwf3RduAweaBH9b1h6AY/edit#heading=h.uapxuhfa9wyi
The entire hash table could be flushed each time a new value of date is seen:

Perhaps with the obvious vectorization of only checking on record batch
boundaries, or something
##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -313,22 +331,144 @@ impl RecordBatchStream for GroupedHashAggregateStream {
}
}
+/// This utility object encapsulates the row object, the hash and the group
+/// indices for a group. This information is used when executing streaming
+/// GROUP BY calculations.
+struct GroupOrderInfo {
+ owned_row: OwnedRow,
+ hash: u64,
+ range: Range<usize>,
+}
+
impl GroupedHashAggregateStream {
- // Update the row_aggr_state according to groub_by values (result of
group_by_expressions)
+ // Update the aggr_state according to groub_by values (result of
group_by_expressions) when group by
Review Comment:
minor nit: `groub_by` --> `group_by`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]