alamb commented on code in PR #6904:
URL: https://github.com/apache/arrow-datafusion/pull/6904#discussion_r1260037410


##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -306,460 +370,194 @@ impl RecordBatchStream for GroupedHashAggregateStream {
 }
 
 impl GroupedHashAggregateStream {
-    // Update the row_aggr_state according to groub_by values (result of 
group_by_expressions)
+    /// Calculates the group indicies for each input row of
+    /// `group_values`.
+    ///
+    /// At the return of this function,
+    /// `self.scratch_space.current_group_indices` has the same number
+    /// of entries as each array in `group_values` and holds the
+    /// correct group_index for that row.
+    ///
+    /// This is one of the core hot loops in the algorithm
     fn update_group_state(
         &mut self,
         group_values: &[ArrayRef],
         allocated: &mut usize,
-    ) -> Result<Vec<usize>> {
+    ) -> Result<()> {

Review Comment:
   The idea of having `scratch_space` as a field on 
`GroupedHashAggregateStream` is so that the same `Vec` can be reused for each 
batch. Creating a new Vec for each batch requires both allocating  and zeroing 
out memory which we are trying to avoid as much as possible. 



##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -306,460 +370,194 @@ impl RecordBatchStream for GroupedHashAggregateStream {
 }
 
 impl GroupedHashAggregateStream {
-    // Update the row_aggr_state according to groub_by values (result of 
group_by_expressions)
+    /// Calculates the group indicies for each input row of
+    /// `group_values`.
+    ///
+    /// At the return of this function,
+    /// `self.scratch_space.current_group_indices` has the same number
+    /// of entries as each array in `group_values` and holds the
+    /// correct group_index for that row.
+    ///
+    /// This is one of the core hot loops in the algorithm
     fn update_group_state(
         &mut self,
         group_values: &[ArrayRef],
         allocated: &mut usize,
-    ) -> Result<Vec<usize>> {
+    ) -> Result<()> {
+        // Convert the group keys into the row format
+        // Avoid reallocation when 
https://github.com/apache/arrow-rs/issues/4479 is available
         let group_rows = self.row_converter.convert_columns(group_values)?;
         let n_rows = group_rows.num_rows();
-        // 1.1 construct the key from the group values
-        // 1.2 construct the mapping key if it does not exist
-        // 1.3 add the row' index to `indices`
 
-        // track which entries in `aggr_state` have rows in this batch to 
aggregate
-        let mut groups_with_rows = vec![];
+        // track memory used
+        let group_values_size_pre = self.group_values.size();
+        let scratch_size_pre = self.scratch_space.size();
 
-        // 1.1 Calculate the group keys for the group values
-        let mut batch_hashes = vec![0; n_rows];
-        create_hashes(group_values, &self.random_state, &mut batch_hashes)?;
+        // tracks to which group each of the input rows belongs
+        let group_indices = &mut self.scratch_space.current_group_indices;
+        group_indices.clear();
 
-        let AggregationState {
-            map, group_states, ..
-        } = &mut self.aggr_state;
+        // 1.1 Calculate the group keys for the group values
+        let batch_hashes = &mut self.scratch_space.hashes_buffer;
+        batch_hashes.clear();
+        batch_hashes.resize(n_rows, 0);
+        create_hashes(group_values, &self.random_state, batch_hashes)?;
 
-        for (row, hash) in batch_hashes.into_iter().enumerate() {
-            let entry = map.get_mut(hash, |(_hash, group_idx)| {
+        for (row, &hash) in batch_hashes.iter().enumerate() {
+            let entry = self.map.get_mut(hash, |(_hash, group_idx)| {
                 // verify that a group that we are inserting with hash is
                 // actually the same key value as the group in
                 // existing_idx  (aka group_values @ row)
-                let group_state = &group_states[*group_idx];
-
-                group_rows.row(row) == group_state.group_by_values.row()
+                group_rows.row(row) == self.group_values.row(*group_idx)
             });
 
-            match entry {
-                // Existing entry for this group value
-                Some((_hash, group_idx)) => {
-                    let group_state = &mut group_states[*group_idx];
-
-                    // 1.3
-                    if group_state.indices.is_empty() {
-                        groups_with_rows.push(*group_idx);
-                    };
-
-                    group_state.indices.push_accounted(row as u32, allocated); 
// remember this row
-                }
-                //  1.2 Need to create new entry
+            let group_idx = match entry {
+                // Existing group_index for this group value
+                Some((_hash, group_idx)) => *group_idx,
+                //  1.2 Need to create new entry for the group
                 None => {
-                    let accumulator_set =
-                        
aggregates::create_accumulators(&self.normal_aggr_expr)?;
-                    // Add new entry to group_states and save newly created 
index
-                    let group_state = GroupState {
-                        group_by_values: group_rows.row(row).owned(),
-                        aggregation_buffer: vec![
-                            0;
-                            self.row_aggr_layout.fixed_part_width()
-                        ],
-                        accumulator_set,
-                        indices: vec![row as u32], // 1.3
-                    };
-                    let group_idx = group_states.len();
-
-                    // NOTE: do NOT include the `GroupState` struct size in 
here because this is captured by
-                    // `group_states` (see allocation down below)
-                    *allocated += 
std::mem::size_of_val(&group_state.group_by_values)
-                        + (std::mem::size_of::<u8>()
-                            * group_state.aggregation_buffer.capacity())
-                        + (std::mem::size_of::<u32>() * 
group_state.indices.capacity());
-
-                    // Allocation done by normal accumulators
-                    *allocated += (std::mem::size_of::<Box<dyn Accumulator>>()
-                        * group_state.accumulator_set.capacity())
-                        + group_state
-                            .accumulator_set
-                            .iter()
-                            .map(|accu| accu.size())
-                            .sum::<usize>();
+                    // Add new entry to aggr_state and save newly created index
+                    let group_idx = self.group_values.num_rows();
+                    self.group_values.push(group_rows.row(row));
 
                     // for hasher function, use precomputed hash value
-                    map.insert_accounted(
+                    self.map.insert_accounted(
                         (hash, group_idx),
                         |(hash, _group_index)| *hash,
                         allocated,
                     );
-
-                    group_states.push_accounted(group_state, allocated);
-
-                    groups_with_rows.push(group_idx);
+                    group_idx
                 }
             };
+            group_indices.push(group_idx);
         }
-        Ok(groups_with_rows)
-    }
 
-    // Update the accumulator results, according to row_aggr_state.
-    #[allow(clippy::too_many_arguments)]
-    fn update_accumulators_using_batch(
-        &mut self,
-        groups_with_rows: &[usize],
-        offsets: &[usize],
-        row_values: &[Vec<ArrayRef>],
-        normal_values: &[Vec<ArrayRef>],
-        row_filter_values: &[Option<ArrayRef>],
-        normal_filter_values: &[Option<ArrayRef>],
-        allocated: &mut usize,
-    ) -> Result<()> {
-        // 2.1 for each key in this batch
-        // 2.2 for each aggregation
-        // 2.3 `slice` from each of its arrays the keys' values
-        // 2.4 update / merge the accumulator with the values
-        // 2.5 clear indices
-        groups_with_rows
-            .iter()
-            .zip(offsets.windows(2))
-            .try_for_each(|(group_idx, offsets)| {
-                let group_state = &mut 
self.aggr_state.group_states[*group_idx];
-                // 2.2
-                // Process row accumulators
-                self.row_accumulators
-                    .iter_mut()
-                    .zip(row_values.iter())
-                    .zip(row_filter_values.iter())
-                    .try_for_each(|((accumulator, aggr_array), filter_opt)| {
-                        let values = slice_and_maybe_filter(
-                            aggr_array,
-                            filter_opt.as_ref(),
-                            offsets,
-                        )?;
-                        let mut state_accessor =
-                            
RowAccessor::new_from_layout(self.row_aggr_layout.clone());
-                        state_accessor
-                            .point_to(0, 
group_state.aggregation_buffer.as_mut_slice());
-                        match self.mode {
-                            AggregateMode::Partial | AggregateMode::Single => {
-                                accumulator.update_batch(&values, &mut 
state_accessor)
-                            }
-                            AggregateMode::FinalPartitioned | 
AggregateMode::Final => {
-                                // note: the aggregation here is over states, 
not values, thus the merge
-                                accumulator.merge_batch(&values, &mut 
state_accessor)
-                            }
-                        }
-                    })?;
-                // normal accumulators
-                group_state
-                    .accumulator_set
-                    .iter_mut()
-                    .zip(normal_values.iter())
-                    .zip(normal_filter_values.iter())
-                    .try_for_each(|((accumulator, aggr_array), filter_opt)| {
-                        let values = slice_and_maybe_filter(
-                            aggr_array,
-                            filter_opt.as_ref(),
-                            offsets,
-                        )?;
-                        let size_pre = accumulator.size();
-                        let res = match self.mode {
-                            AggregateMode::Partial | AggregateMode::Single => {
-                                accumulator.update_batch(&values)
-                            }
-                            AggregateMode::FinalPartitioned | 
AggregateMode::Final => {
-                                // note: the aggregation here is over states, 
not values, thus the merge
-                                accumulator.merge_batch(&values)
-                            }
-                        };
-                        let size_post = accumulator.size();
-                        *allocated += size_post.saturating_sub(size_pre);
-                        res
-                    })
-                    // 2.5
-                    .and({
-                        group_state.indices.clear();
-                        Ok(())
-                    })
-            })?;
-        Ok(())
-    }
+        // account for memory growth in scratch space
+        *allocated += self.scratch_space.size();
+        *allocated -= scratch_size_pre; // subtract after adding to avoid 
underflow
 
-    // Update the accumulator results, according to row_aggr_state.
-    fn update_accumulators_using_scalar(
-        &mut self,
-        groups_with_rows: &[usize],
-        row_values: &[Vec<ArrayRef>],
-        row_filter_values: &[Option<ArrayRef>],
-    ) -> Result<()> {
-        let filter_bool_array = row_filter_values
-            .iter()
-            .map(|filter_opt| match filter_opt {
-                Some(f) => Ok(Some(as_boolean_array(f)?)),
-                None => Ok(None),
-            })
-            .collect::<Result<Vec<_>>>()?;
-
-        for group_idx in groups_with_rows {
-            let group_state = &mut self.aggr_state.group_states[*group_idx];
-            let mut state_accessor =
-                RowAccessor::new_from_layout(self.row_aggr_layout.clone());
-            state_accessor.point_to(0, 
group_state.aggregation_buffer.as_mut_slice());
-            for idx in &group_state.indices {
-                for (accumulator, values_array, filter_array) in izip!(
-                    self.row_accumulators.iter_mut(),
-                    row_values.iter(),
-                    filter_bool_array.iter()
-                ) {
-                    if values_array.len() == 1 {
-                        let scalar_value =
-                            col_to_scalar(&values_array[0], filter_array, *idx 
as usize)?;
-                        accumulator.update_scalar(&scalar_value, &mut 
state_accessor)?;
-                    } else {
-                        let scalar_values = values_array
-                            .iter()
-                            .map(|array| {
-                                col_to_scalar(array, filter_array, *idx as 
usize)
-                            })
-                            .collect::<Result<Vec<_>>>()?;
-                        accumulator
-                            .update_scalar_values(&scalar_values, &mut 
state_accessor)?;
-                    }
-                }
-            }
-            // clear the group indices in this group
-            group_state.indices.clear();
-        }
+        // account for any memory increase used to store group_values
+        *allocated += self.group_values.size();
+        *allocated -= group_values_size_pre; // subtract after adding to avoid 
underflow
 
         Ok(())
     }
 
     /// Perform group-by aggregation for the given [`RecordBatch`].
     ///
-    /// If successful, this returns the additional number of bytes that were 
allocated during this process.
-    ///
+    /// If successful, returns the additional amount of memory, in
+    /// bytes, that were allocated during this process.
     fn group_aggregate_batch(&mut self, batch: RecordBatch) -> Result<usize> {
-        // Evaluate the grouping expressions:
+        // Evaluate the grouping expressions
         let group_by_values = evaluate_group_by(&self.group_by, &batch)?;
+
         // Keep track of memory allocated:
         let mut allocated = 0usize;
 
         // Evaluate the aggregation expressions.
-        // We could evaluate them after the `take`, but since we need to 
evaluate all
-        // of them anyways, it is more performant to do it while they are 
together.
-        let row_aggr_input_values =
-            evaluate_many(&self.row_aggregate_expressions, &batch)?;
-        let normal_aggr_input_values =
-            evaluate_many(&self.normal_aggregate_expressions, &batch)?;
-        let row_filter_values = 
evaluate_optional(&self.row_filter_expressions, &batch)?;
-        let normal_filter_values =
-            evaluate_optional(&self.normal_filter_expressions, &batch)?;
+        let input_values = evaluate_many(&self.aggregate_arguments, &batch)?;
+
+        // Evalute the filter expressions, if any, against the inputs
+        let filter_values = evaluate_optional(&self.filter_expressions, 
&batch)?;
 
         let row_converter_size_pre = self.row_converter.size();
+
         for group_values in &group_by_values {
-            let groups_with_rows =
-                self.update_group_state(group_values, &mut allocated)?;
-            // Decide the accumulators update mode, use scalar value to update 
the accumulators when all of the conditions are meet:
-            // 1) The aggregation mode is Partial or Single
-            // 2) There is not normal aggregation expressions
-            // 3) The number of affected groups is high (entries in 
`aggr_state` have rows need to update). Usually the high cardinality case
-            if matches!(self.mode, AggregateMode::Partial | 
AggregateMode::Single)
-                && normal_aggr_input_values.is_empty()
-                && normal_filter_values.is_empty()
-                && groups_with_rows.len() >= batch.num_rows() / 
self.scalar_update_factor
-            {
-                self.update_accumulators_using_scalar(
-                    &groups_with_rows,
-                    &row_aggr_input_values,
-                    &row_filter_values,
-                )?;
-            } else {
-                // Collect all indices + offsets based on keys in this vec
-                let mut batch_indices: UInt32Builder = 
UInt32Builder::with_capacity(0);
-                let mut offsets = vec![0];
-                let mut offset_so_far = 0;
-                for &group_idx in groups_with_rows.iter() {
-                    let indices = 
&self.aggr_state.group_states[group_idx].indices;
-                    batch_indices.append_slice(indices);
-                    offset_so_far += indices.len();
-                    offsets.push(offset_so_far);
+            // calculate the group indicies for each input row
+            self.update_group_state(group_values, &mut allocated)?;
+            let group_indices = &self.scratch_space.current_group_indices;
+
+            // Gather the inputs to call the actual accumulator
+            let t = self
+                .accumulators
+                .iter_mut()
+                .zip(input_values.iter())
+                .zip(filter_values.iter());

Review Comment:
   Thank you -- I didn't know about `izip!` -- I would like to leave this as 
zip for the moment as I did all the performance testing using `zip`. maybe we 
can change it to `izip` as a follow on PR



##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -16,100 +16,181 @@
 // under the License.
 
 //! Hash aggregation through row format
+//!
+//! POC demonstration of GroupByHashApproach
 
-use std::cmp::min;
-use std::ops::Range;
+use datafusion_physical_expr::{
+    AggregateExpr, GroupsAccumulator, GroupsAccumulatorAdapter,
+};
+use log::debug;
 use std::sync::Arc;
 use std::task::{Context, Poll};
 use std::vec;
 
 use ahash::RandomState;
-use arrow::row::{RowConverter, SortField};
+use arrow::row::{RowConverter, Rows, SortField};
 use datafusion_physical_expr::hash_utils::create_hashes;
 use futures::ready;
 use futures::stream::{Stream, StreamExt};
 
-use crate::physical_plan::aggregates::utils::{
-    aggr_state_schema, col_to_scalar, get_at_indices, get_optional_filters,
-    read_as_batch, slice_and_maybe_filter, ExecutionState, GroupState,
-};
 use crate::physical_plan::aggregates::{
     evaluate_group_by, evaluate_many, evaluate_optional, group_schema, 
AggregateMode,
-    PhysicalGroupBy, RowAccumulatorItem,
+    PhysicalGroupBy,
 };
 use crate::physical_plan::metrics::{BaselineMetrics, RecordOutput};
-use crate::physical_plan::{aggregates, AggregateExpr, PhysicalExpr};
+use crate::physical_plan::{aggregates, PhysicalExpr};
 use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
 use arrow::array::*;
-use arrow::compute::cast;
-use arrow::datatypes::DataType;
 use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
-use datafusion_common::cast::as_boolean_array;
-use datafusion_common::{Result, ScalarValue};
+use datafusion_common::Result;
 use datafusion_execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt};
 use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
 use datafusion_execution::TaskContext;
-use datafusion_expr::Accumulator;
-use datafusion_row::accessor::RowAccessor;
-use datafusion_row::layout::RowLayout;
 use hashbrown::raw::RawTable;
-use itertools::izip;
+
+#[derive(Debug, Clone)]
+/// This object tracks the aggregation phase (input/output)
+pub(crate) enum ExecutionState {
+    ReadingInput,
+    /// When producing output, the remaining rows to output are stored
+    /// here and are sliced off as needed in batch_size chunks
+    ProducingOutput(RecordBatch),
+    Done,
+}
 
 use super::AggregateExec;
 
-/// Grouping aggregate with row-format aggregation states inside.
+/// Hash based Grouping Aggregator
+///
+/// # Design Goals
+///
+/// This structure is designed so that updating the aggregates can be
+/// vectorized (done in a tight loop) without allocatons. The
+/// accumulator state is *not* managed by this operator (e.g in the
+/// hash table) and instead is delegated to the individual
+/// accumulators which have type specialized inner loops that perform
+/// the aggregation.
+///
+/// # Architecture
+///
+/// ```text
+///
+/// stores "group       stores group values,       internally stores aggregate
+///    indexes"          in arrow_row format         values, for all groups
 ///
-/// For each aggregation entry, we use:
-/// - [Arrow-row] represents grouping keys for fast hash computation and 
comparison directly on raw bytes.
-/// - [WordAligned] row to store aggregation state, designed to be 
CPU-friendly when updates over every field are often.
+/// ┌─────────────┐      ┌────────────┐    ┌──────────────┐       
┌──────────────┐
+/// │   ┌─────┐   │      │ ┌────────┐ │    │┌────────────┐│       
│┌────────────┐│
+/// │   │  5  │   │ ┌────┼▶│  "A"   │ │    ││accumulator ││       
││accumulator ││
+/// │   ├─────┤   │ │    │ ├────────┤ │    ││     0      ││       ││     N     
 ││
+/// │   │  9  │   │ │    │ │  "Z"   │ │    ││ ┌────────┐ ││       ││ 
┌────────┐ ││
+/// │   └─────┘   │ │    │ └────────┘ │    ││ │ state  │ ││       ││ │ state  
│ ││
+/// │     ...     │ │    │            │    ││ │┌─────┐ │ ││  ...  ││ │┌─────┐ 
│ ││
+/// │   ┌─────┐   │ │    │    ...     │    ││ │├─────┤ │ ││       ││ │├─────┤ 
│ ││
+/// │   │  1  │───┼─┘    │            │    ││ │└─────┘ │ ││       ││ │└─────┘ 
│ ││
+/// │   ├─────┤   │      │            │    ││ │        │ ││       ││ │        
│ ││
+/// │   │ 13  │───┼─┐    │ ┌────────┐ │    ││ │  ...   │ ││       ││ │  ...   
│ ││
+/// │   └─────┘   │ └────┼▶│  "Q"   │ │    ││ │        │ ││       ││ │        
│ ││
+/// └─────────────┘      │ └────────┘ │    ││ │┌─────┐ │ ││       ││ │┌─────┐ 
│ ││
+///                      │            │    ││ │└─────┘ │ ││       ││ │└─────┘ 
│ ││
+///                      └────────────┘    ││ └────────┘ ││       ││ 
└────────┘ ││
+///                                        │└────────────┘│       
│└────────────┘│
+///                                        └──────────────┘       
└──────────────┘
 ///
-/// The architecture is the following:
+///       map            group_values                   accumulators
+///  (Hash Table)
 ///
-/// 1. For each input RecordBatch, update aggregation states corresponding to 
all appeared grouping keys.
-/// 2. At the end of the aggregation (e.g. end of batches in a partition), the 
accumulator converts its state to a RecordBatch of a single row
-/// 3. The RecordBatches of all accumulators are merged (`concatenate` in 
`rust/arrow`) together to a single RecordBatch.
-/// 4. The state's RecordBatch is `merge`d to a new state
-/// 5. The state is mapped to the final value
+///  ```
 ///
-/// [WordAligned]: datafusion_row::layout
+/// For example, given a query like `COUNT(x), SUM(y) ... GROUP BY z`,
+/// [`group_values`] will store the distinct values of `z`. There will
+/// be one accumulator for `COUNT(x)`, specialized for the data type
+/// of `x` and one accumulator for `SUM(y)`, specialized for the data
+/// type of `y`.
+///
+/// # Description
+///
+/// The hash table does not store any aggregate state inline. It only
+/// stores "group indices", one for each (distinct) group value. The
+/// accumulators manage the in-progress aggregate state for each
+/// group, and the group values themselves are stored in
+/// [`group_values`] at the corresponding group index.
+///
+/// The accumulator state (e.g partial sums) is managed by and stored
+/// by a [`GroupsAccumulator`] accumulator. There is one accumulator
+/// per aggregate expression (COUNT, AVG, etc) in the
+/// stream. Internally, each `GroupsAccumulator` manages the state for
+/// multiple groups, and is passed `group_indexes` during update. Note
+/// The accumulator state is not managed by this operator (e.g in the
+/// hash table).
+///
+/// [`group_values`]: Self::group_values
 pub(crate) struct GroupedHashAggregateStream {
     schema: SchemaRef,
     input: SendableRecordBatchStream,
     mode: AggregateMode,
 
-    normal_aggr_expr: Vec<Arc<dyn AggregateExpr>>,
-    /// Aggregate expressions not supporting row accumulation
-    normal_aggregate_expressions: Vec<Vec<Arc<dyn PhysicalExpr>>>,
-    /// Filter expression for each normal aggregate expression
-    normal_filter_expressions: Vec<Option<Arc<dyn PhysicalExpr>>>,
-
-    /// Aggregate expressions supporting row accumulation
-    row_aggregate_expressions: Vec<Vec<Arc<dyn PhysicalExpr>>>,
-    /// Filter expression for each row aggregate expression
-    row_filter_expressions: Vec<Option<Arc<dyn PhysicalExpr>>>,
-    row_accumulators: Vec<RowAccumulatorItem>,
+    /// Accumulators, one for each `AggregateExpr` in the query
+    ///
+    /// For example, if the query has aggregates, `SUM(x)`,
+    /// `COUNT(y)`, there will be two accumulators, each one
+    /// specialized for that partcular aggregate and its input types
+    accumulators: Vec<Box<dyn GroupsAccumulator>>,
+
+    /// Arguments to pass to  accumulator.
+    aggregate_arguments: Vec<Vec<Arc<dyn PhysicalExpr>>>,

Review Comment:
   Good idea -- done in commit fc96b136d7



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to