mustafasrepo commented on code in PR #6904:
URL: https://github.com/apache/arrow-datafusion/pull/6904#discussion_r1259785213


##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -16,100 +16,181 @@
 // under the License.
 
 //! Hash aggregation through row format
+//!
+//! POC demonstration of GroupByHashApproach
 
-use std::cmp::min;
-use std::ops::Range;
+use datafusion_physical_expr::{
+    AggregateExpr, GroupsAccumulator, GroupsAccumulatorAdapter,
+};
+use log::debug;
 use std::sync::Arc;
 use std::task::{Context, Poll};
 use std::vec;
 
 use ahash::RandomState;
-use arrow::row::{RowConverter, SortField};
+use arrow::row::{RowConverter, Rows, SortField};
 use datafusion_physical_expr::hash_utils::create_hashes;
 use futures::ready;
 use futures::stream::{Stream, StreamExt};
 
-use crate::physical_plan::aggregates::utils::{
-    aggr_state_schema, col_to_scalar, get_at_indices, get_optional_filters,
-    read_as_batch, slice_and_maybe_filter, ExecutionState, GroupState,
-};
 use crate::physical_plan::aggregates::{
     evaluate_group_by, evaluate_many, evaluate_optional, group_schema, 
AggregateMode,
-    PhysicalGroupBy, RowAccumulatorItem,
+    PhysicalGroupBy,
 };
 use crate::physical_plan::metrics::{BaselineMetrics, RecordOutput};
-use crate::physical_plan::{aggregates, AggregateExpr, PhysicalExpr};
+use crate::physical_plan::{aggregates, PhysicalExpr};
 use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
 use arrow::array::*;
-use arrow::compute::cast;
-use arrow::datatypes::DataType;
 use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
-use datafusion_common::cast::as_boolean_array;
-use datafusion_common::{Result, ScalarValue};
+use datafusion_common::Result;
 use datafusion_execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt};
 use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
 use datafusion_execution::TaskContext;
-use datafusion_expr::Accumulator;
-use datafusion_row::accessor::RowAccessor;
-use datafusion_row::layout::RowLayout;
 use hashbrown::raw::RawTable;
-use itertools::izip;
+
+#[derive(Debug, Clone)]
+/// This object tracks the aggregation phase (input/output)
+pub(crate) enum ExecutionState {
+    ReadingInput,
+    /// When producing output, the remaining rows to output are stored
+    /// here and are sliced off as needed in batch_size chunks
+    ProducingOutput(RecordBatch),
+    Done,
+}
 
 use super::AggregateExec;
 
-/// Grouping aggregate with row-format aggregation states inside.
+/// Hash based Grouping Aggregator
+///
+/// # Design Goals
+///
+/// This structure is designed so that updating the aggregates can be
+/// vectorized (done in a tight loop) without allocatons. The
+/// accumulator state is *not* managed by this operator (e.g in the
+/// hash table) and instead is delegated to the individual
+/// accumulators which have type specialized inner loops that perform
+/// the aggregation.
+///
+/// # Architecture
+///
+/// ```text
+///
+/// stores "group       stores group values,       internally stores aggregate
+///    indexes"          in arrow_row format         values, for all groups
 ///
-/// For each aggregation entry, we use:
-/// - [Arrow-row] represents grouping keys for fast hash computation and 
comparison directly on raw bytes.
-/// - [WordAligned] row to store aggregation state, designed to be 
CPU-friendly when updates over every field are often.
+/// ┌─────────────┐      ┌────────────┐    ┌──────────────┐       
┌──────────────┐
+/// │   ┌─────┐   │      │ ┌────────┐ │    │┌────────────┐│       
│┌────────────┐│
+/// │   │  5  │   │ ┌────┼▶│  "A"   │ │    ││accumulator ││       
││accumulator ││
+/// │   ├─────┤   │ │    │ ├────────┤ │    ││     0      ││       ││     N     
 ││
+/// │   │  9  │   │ │    │ │  "Z"   │ │    ││ ┌────────┐ ││       ││ 
┌────────┐ ││
+/// │   └─────┘   │ │    │ └────────┘ │    ││ │ state  │ ││       ││ │ state  
│ ││
+/// │     ...     │ │    │            │    ││ │┌─────┐ │ ││  ...  ││ │┌─────┐ 
│ ││
+/// │   ┌─────┐   │ │    │    ...     │    ││ │├─────┤ │ ││       ││ │├─────┤ 
│ ││
+/// │   │  1  │───┼─┘    │            │    ││ │└─────┘ │ ││       ││ │└─────┘ 
│ ││
+/// │   ├─────┤   │      │            │    ││ │        │ ││       ││ │        
│ ││
+/// │   │ 13  │───┼─┐    │ ┌────────┐ │    ││ │  ...   │ ││       ││ │  ...   
│ ││
+/// │   └─────┘   │ └────┼▶│  "Q"   │ │    ││ │        │ ││       ││ │        
│ ││
+/// └─────────────┘      │ └────────┘ │    ││ │┌─────┐ │ ││       ││ │┌─────┐ 
│ ││
+///                      │            │    ││ │└─────┘ │ ││       ││ │└─────┘ 
│ ││
+///                      └────────────┘    ││ └────────┘ ││       ││ 
└────────┘ ││
+///                                        │└────────────┘│       
│└────────────┘│
+///                                        └──────────────┘       
└──────────────┘
 ///
-/// The architecture is the following:
+///       map            group_values                   accumulators
+///  (Hash Table)
 ///
-/// 1. For each input RecordBatch, update aggregation states corresponding to 
all appeared grouping keys.
-/// 2. At the end of the aggregation (e.g. end of batches in a partition), the 
accumulator converts its state to a RecordBatch of a single row
-/// 3. The RecordBatches of all accumulators are merged (`concatenate` in 
`rust/arrow`) together to a single RecordBatch.
-/// 4. The state's RecordBatch is `merge`d to a new state
-/// 5. The state is mapped to the final value
+///  ```
 ///
-/// [WordAligned]: datafusion_row::layout
+/// For example, given a query like `COUNT(x), SUM(y) ... GROUP BY z`,
+/// [`group_values`] will store the distinct values of `z`. There will
+/// be one accumulator for `COUNT(x)`, specialized for the data type
+/// of `x` and one accumulator for `SUM(y)`, specialized for the data
+/// type of `y`.
+///
+/// # Description
+///
+/// The hash table does not store any aggregate state inline. It only
+/// stores "group indices", one for each (distinct) group value. The
+/// accumulators manage the in-progress aggregate state for each
+/// group, and the group values themselves are stored in
+/// [`group_values`] at the corresponding group index.
+///
+/// The accumulator state (e.g partial sums) is managed by and stored
+/// by a [`GroupsAccumulator`] accumulator. There is one accumulator
+/// per aggregate expression (COUNT, AVG, etc) in the
+/// stream. Internally, each `GroupsAccumulator` manages the state for
+/// multiple groups, and is passed `group_indexes` during update. Note
+/// The accumulator state is not managed by this operator (e.g in the
+/// hash table).
+///
+/// [`group_values`]: Self::group_values
 pub(crate) struct GroupedHashAggregateStream {
     schema: SchemaRef,
     input: SendableRecordBatchStream,
     mode: AggregateMode,
 
-    normal_aggr_expr: Vec<Arc<dyn AggregateExpr>>,
-    /// Aggregate expressions not supporting row accumulation
-    normal_aggregate_expressions: Vec<Vec<Arc<dyn PhysicalExpr>>>,
-    /// Filter expression for each normal aggregate expression
-    normal_filter_expressions: Vec<Option<Arc<dyn PhysicalExpr>>>,
-
-    /// Aggregate expressions supporting row accumulation
-    row_aggregate_expressions: Vec<Vec<Arc<dyn PhysicalExpr>>>,
-    /// Filter expression for each row aggregate expression
-    row_filter_expressions: Vec<Option<Arc<dyn PhysicalExpr>>>,
-    row_accumulators: Vec<RowAccumulatorItem>,
+    /// Accumulators, one for each `AggregateExpr` in the query
+    ///
+    /// For example, if the query has aggregates, `SUM(x)`,
+    /// `COUNT(y)`, there will be two accumulators, each one
+    /// specialized for that partcular aggregate and its input types
+    accumulators: Vec<Box<dyn GroupsAccumulator>>,
+
+    /// Arguments to pass to  accumulator.
+    aggregate_arguments: Vec<Vec<Arc<dyn PhysicalExpr>>>,
+
+    /// Optional filter expression to evaluate, one for each for
+    /// accumulator. If present, only those rows for which the filter
+    /// evaluate to true should be included in the aggregate results.
+    ///
+    /// For example, for an aggregate like `SUM(x FILTER x > 100)`,
+    /// the filter expression is  `x > 100`.
+    filter_expressions: Vec<Option<Arc<dyn PhysicalExpr>>>,
+
+    /// Converter for the group values
     row_converter: RowConverter,
-    row_aggr_schema: SchemaRef,
-    row_aggr_layout: Arc<RowLayout>,
 
+    /// GROUP BY expressions
     group_by: PhysicalGroupBy,
 
-    aggr_state: AggregationState,
+    /// The memory reservation for this grouping
+    reservation: MemoryReservation,
+
+    /// Logically maps group values to a group_index in
+    /// [`Self::group_values`] and in each accumulator
+    ///
+    /// Uses the raw API of hashbrown to avoid actually storing the
+    /// keys (group values) in the table
+    ///
+    /// keys: u64 hashes of the GroupValue
+    /// values: (hash, group_index)
+    map: RawTable<(u64, usize)>,
+
+    /// The actual group by values, stored in arrow [`Row`] format.
+    /// `group_values[i]` holds the group value for group_index `i`.
+    ///
+    /// The row format is used to compare group keys quickly and store
+    /// them efficiently in memory. Quick comparison is especially
+    /// important for multi-column group keys.
+    ///
+    /// [`Row`]: arrow::row::Row
+    group_values: Rows,
+
+    /// scratch space for the current input [`RecordBatch`] being
+    /// processed. Reused across batches here to avoid reallocations

Review Comment:
   ```suggestion
       /// processed. Reused across batches here to avoid re-allocations
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to