2010YOUY01 commented on code in PR #23055: URL: https://github.com/apache/datafusion/pull/23055#discussion_r3458446438
########## datafusion/physical-plan/src/aggregates/aggregate_hash_table/common.rs: ########## @@ -0,0 +1,406 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::marker::PhantomData; +use std::sync::Arc; + +use arrow::array::{ArrayRef, AsArray, new_null_array}; +use arrow::datatypes::SchemaRef; +use arrow::record_batch::RecordBatch; +use datafusion_common::{Result, internal_err}; +use datafusion_execution::memory_pool::proxy::VecAllocExt; +use datafusion_expr::{EmitTo, GroupsAccumulator}; +use datafusion_physical_expr::aggregate::AggregateFunctionExpr; + +use crate::PhysicalExpr; +use crate::aggregates::group_values::{GroupByMetrics, GroupValues, new_group_values}; +use crate::aggregates::order::GroupOrdering; +use crate::aggregates::row_hash::create_group_accumulator; +use crate::aggregates::{ + AggregateExec, PhysicalGroupBy, aggregate_expressions, evaluate_group_by, +}; + +/// Marker for raw rows -> partial state aggregation. +pub(in crate::aggregates) struct Partial; +/// Marker for raw rows -> partial state conversion without aggregation. +pub(in crate::aggregates) struct PartialSkip; +/// Marker for partial state -> final value aggregation. +pub(in crate::aggregates) struct Final; + +/// Grouped hash table shared by the partial and final paths. +/// +/// While building, it consumes input batches and updates group / accumulator +/// state. While outputting, it incrementally drains that state into output +/// batches. +/// +/// # Marker Type +/// `AggrMode` selects the aggregate semantics. +/// +/// e.g. `AggregateHashTable::<Partial>::new(...)` creates an aggregate hash table +/// for the partial hash aggregate stage, the input schema is raw rows and output +/// schema is intermediate states. +/// +/// It is a zero-sized compile-time marker, so each stage keeps its update logic +/// in a separate impl block, to make the behavior difference explicit. +pub(in crate::aggregates) struct AggregateHashTable<AggrMode> { + /// Grouping and accumulator-specific timing metrics. + pub(super) group_by_metrics: GroupByMetrics, + + /// Raw input schema, used to evaluate expressions and synthesize empty + /// grouping-set rows. + pub(super) input_schema: SchemaRef, + + /// Output schema: group columns followed by aggregate state or final values. + pub(super) output_schema: SchemaRef, + + /// Maximum rows per emitted output batch, from config `batch_size`. + pub(super) batch_size: usize, + + /// Lifecycle-specific state: building stage / outputting stage. + pub(super) state: AggregateHashTableState, + + pub(super) _mode: PhantomData<AggrMode>, +} + +pub(super) struct HashAggregateAccumulator { + /// Aggregate expression used to create a fresh accumulator for related + /// hash tables, such as the partial-skip table. + aggregate_expr: Arc<AggregateFunctionExpr>, + + /// Arguments to pass to this accumulator. + /// + /// Example: `CORR(x, y)` stores two expressions here, while `SUM(x)` stores one. + arguments: Vec<Arc<dyn PhysicalExpr>>, + + /// Optional `FILTER` expression for this accumulator. + /// + /// Example: `SUM(x) FILTER (WHERE x > 10)` stores the `x > 10` predicate. + filter: Option<Arc<dyn PhysicalExpr>>, + + /// Accumulator state for all groups for one aggregate expression. + accumulator: Box<dyn GroupsAccumulator>, +} + +pub(super) struct EvaluatedHashAggregateAccumulator { + pub(super) arguments: Vec<ArrayRef>, + pub(super) filter: Option<ArrayRef>, +} + +/// Evaluated all group by keys and accumulator args. +/// +/// e.g., `select k+1, sum(v*v) from t group by (k+1)`, this function evaluates +/// `k+1`, `v*v` +pub(super) struct EvaluatedAggregateBatch { + /// One entry per grouping set; each entry contains all evaluated group key + /// arrays for the current input batch. + pub(super) grouping_set_args: Vec<Vec<ArrayRef>>, + + /// Evaluated arguments and filters, one entry per aggregate expression. + pub(super) accumulator_args: Vec<EvaluatedHashAggregateAccumulator>, +} + +/// Hash table state while grouped aggregation is consuming input. Review Comment: [cd9ba97](https://github.com/apache/datafusion/pull/23055/commits/cd9ba97e1650e8264349b4e554521d14c5508be7) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
