mustafasrepo commented on code in PR #6904:
URL: https://github.com/apache/arrow-datafusion/pull/6904#discussion_r1259792085
##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -16,100 +16,181 @@
// under the License.
//! Hash aggregation through row format
+//!
+//! POC demonstration of GroupByHashApproach
-use std::cmp::min;
-use std::ops::Range;
+use datafusion_physical_expr::{
+ AggregateExpr, GroupsAccumulator, GroupsAccumulatorAdapter,
+};
+use log::debug;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::vec;
use ahash::RandomState;
-use arrow::row::{RowConverter, SortField};
+use arrow::row::{RowConverter, Rows, SortField};
use datafusion_physical_expr::hash_utils::create_hashes;
use futures::ready;
use futures::stream::{Stream, StreamExt};
-use crate::physical_plan::aggregates::utils::{
- aggr_state_schema, col_to_scalar, get_at_indices, get_optional_filters,
- read_as_batch, slice_and_maybe_filter, ExecutionState, GroupState,
-};
use crate::physical_plan::aggregates::{
evaluate_group_by, evaluate_many, evaluate_optional, group_schema,
AggregateMode,
- PhysicalGroupBy, RowAccumulatorItem,
+ PhysicalGroupBy,
};
use crate::physical_plan::metrics::{BaselineMetrics, RecordOutput};
-use crate::physical_plan::{aggregates, AggregateExpr, PhysicalExpr};
+use crate::physical_plan::{aggregates, PhysicalExpr};
use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
use arrow::array::*;
-use arrow::compute::cast;
-use arrow::datatypes::DataType;
use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
-use datafusion_common::cast::as_boolean_array;
-use datafusion_common::{Result, ScalarValue};
+use datafusion_common::Result;
use datafusion_execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt};
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
use datafusion_execution::TaskContext;
-use datafusion_expr::Accumulator;
-use datafusion_row::accessor::RowAccessor;
-use datafusion_row::layout::RowLayout;
use hashbrown::raw::RawTable;
-use itertools::izip;
+
+#[derive(Debug, Clone)]
+/// This object tracks the aggregation phase (input/output)
+pub(crate) enum ExecutionState {
+ ReadingInput,
+ /// When producing output, the remaining rows to output are stored
+ /// here and are sliced off as needed in batch_size chunks
+ ProducingOutput(RecordBatch),
+ Done,
+}
use super::AggregateExec;
-/// Grouping aggregate with row-format aggregation states inside.
+/// Hash based Grouping Aggregator
+///
+/// # Design Goals
+///
+/// This structure is designed so that updating the aggregates can be
+/// vectorized (done in a tight loop) without allocatons. The
+/// accumulator state is *not* managed by this operator (e.g in the
+/// hash table) and instead is delegated to the individual
+/// accumulators which have type specialized inner loops that perform
+/// the aggregation.
+///
+/// # Architecture
+///
+/// ```text
+///
+/// stores "group stores group values, internally stores aggregate
+/// indexes" in arrow_row format values, for all groups
///
-/// For each aggregation entry, we use:
-/// - [Arrow-row] represents grouping keys for fast hash computation and
comparison directly on raw bytes.
-/// - [WordAligned] row to store aggregation state, designed to be
CPU-friendly when updates over every field are often.
+/// ┌─────────────┐ ┌────────────┐ ┌──────────────┐
┌──────────────┐
+/// │ ┌─────┐ │ │ ┌────────┐ │ │┌────────────┐│
│┌────────────┐│
+/// │ │ 5 │ │ ┌────┼▶│ "A" │ │ ││accumulator ││
││accumulator ││
+/// │ ├─────┤ │ │ │ ├────────┤ │ ││ 0 ││ ││ N
││
+/// │ │ 9 │ │ │ │ │ "Z" │ │ ││ ┌────────┐ ││ ││
┌────────┐ ││
+/// │ └─────┘ │ │ │ └────────┘ │ ││ │ state │ ││ ││ │ state
│ ││
+/// │ ... │ │ │ │ ││ │┌─────┐ │ ││ ... ││ │┌─────┐
│ ││
+/// │ ┌─────┐ │ │ │ ... │ ││ │├─────┤ │ ││ ││ │├─────┤
│ ││
+/// │ │ 1 │───┼─┘ │ │ ││ │└─────┘ │ ││ ││ │└─────┘
│ ││
+/// │ ├─────┤ │ │ │ ││ │ │ ││ ││ │
│ ││
+/// │ │ 13 │───┼─┐ │ ┌────────┐ │ ││ │ ... │ ││ ││ │ ...
│ ││
+/// │ └─────┘ │ └────┼▶│ "Q" │ │ ││ │ │ ││ ││ │
│ ││
+/// └─────────────┘ │ └────────┘ │ ││ │┌─────┐ │ ││ ││ │┌─────┐
│ ││
+/// │ │ ││ │└─────┘ │ ││ ││ │└─────┘
│ ││
+/// └────────────┘ ││ └────────┘ ││ ││
└────────┘ ││
+/// │└────────────┘│
│└────────────┘│
+/// └──────────────┘
└──────────────┘
///
-/// The architecture is the following:
+/// map group_values accumulators
+/// (Hash Table)
///
-/// 1. For each input RecordBatch, update aggregation states corresponding to
all appeared grouping keys.
-/// 2. At the end of the aggregation (e.g. end of batches in a partition), the
accumulator converts its state to a RecordBatch of a single row
-/// 3. The RecordBatches of all accumulators are merged (`concatenate` in
`rust/arrow`) together to a single RecordBatch.
-/// 4. The state's RecordBatch is `merge`d to a new state
-/// 5. The state is mapped to the final value
+/// ```
///
-/// [WordAligned]: datafusion_row::layout
+/// For example, given a query like `COUNT(x), SUM(y) ... GROUP BY z`,
+/// [`group_values`] will store the distinct values of `z`. There will
+/// be one accumulator for `COUNT(x)`, specialized for the data type
+/// of `x` and one accumulator for `SUM(y)`, specialized for the data
+/// type of `y`.
+///
+/// # Description
+///
+/// The hash table does not store any aggregate state inline. It only
+/// stores "group indices", one for each (distinct) group value. The
+/// accumulators manage the in-progress aggregate state for each
+/// group, and the group values themselves are stored in
+/// [`group_values`] at the corresponding group index.
+///
+/// The accumulator state (e.g partial sums) is managed by and stored
+/// by a [`GroupsAccumulator`] accumulator. There is one accumulator
+/// per aggregate expression (COUNT, AVG, etc) in the
+/// stream. Internally, each `GroupsAccumulator` manages the state for
+/// multiple groups, and is passed `group_indexes` during update. Note
+/// The accumulator state is not managed by this operator (e.g in the
+/// hash table).
+///
+/// [`group_values`]: Self::group_values
pub(crate) struct GroupedHashAggregateStream {
schema: SchemaRef,
input: SendableRecordBatchStream,
mode: AggregateMode,
- normal_aggr_expr: Vec<Arc<dyn AggregateExpr>>,
- /// Aggregate expressions not supporting row accumulation
- normal_aggregate_expressions: Vec<Vec<Arc<dyn PhysicalExpr>>>,
- /// Filter expression for each normal aggregate expression
- normal_filter_expressions: Vec<Option<Arc<dyn PhysicalExpr>>>,
-
- /// Aggregate expressions supporting row accumulation
- row_aggregate_expressions: Vec<Vec<Arc<dyn PhysicalExpr>>>,
- /// Filter expression for each row aggregate expression
- row_filter_expressions: Vec<Option<Arc<dyn PhysicalExpr>>>,
- row_accumulators: Vec<RowAccumulatorItem>,
+ /// Accumulators, one for each `AggregateExpr` in the query
+ ///
+ /// For example, if the query has aggregates, `SUM(x)`,
+ /// `COUNT(y)`, there will be two accumulators, each one
+ /// specialized for that partcular aggregate and its input types
+ accumulators: Vec<Box<dyn GroupsAccumulator>>,
+
+ /// Arguments to pass to accumulator.
+ aggregate_arguments: Vec<Vec<Arc<dyn PhysicalExpr>>>,
Review Comment:
Maybe you can stress that inner `vec` is because some of the aggregate
functions such as `CORR` may receive more than 1 argument. Not important though.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]