alamb commented on code in PR #2375: URL: https://github.com/apache/arrow-datafusion/pull/2375#discussion_r866315598
########## datafusion/core/src/physical_plan/aggregates/row_hash.rs: ########## @@ -0,0 +1,472 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Hash aggregation through row format + +use std::sync::Arc; +use std::task::{Context, Poll}; +use std::vec; + +use ahash::RandomState; +use futures::{ + ready, + stream::{Stream, StreamExt}, +}; + +use crate::error::Result; +use crate::physical_plan::aggregates::{ + evaluate, evaluate_many, group_schema, AccumulatorItemV2, AggregateMode, +}; +use crate::physical_plan::hash_utils::create_row_hashes; +use crate::physical_plan::metrics::{BaselineMetrics, RecordOutput}; +use crate::physical_plan::{aggregates, AggregateExpr, PhysicalExpr}; +use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream}; + +use arrow::compute::cast; +use arrow::datatypes::Schema; +use arrow::{array::ArrayRef, compute}; +use arrow::{ + array::{Array, UInt32Builder}, + error::{ArrowError, Result as ArrowResult}, +}; +use arrow::{datatypes::SchemaRef, record_batch::RecordBatch}; +use datafusion_common::ScalarValue; +use datafusion_row::accessor::RowAccessor; +use datafusion_row::layout::RowLayout; +use datafusion_row::reader::{read_row, RowReader}; +use datafusion_row::writer::{write_row, RowWriter}; +use datafusion_row::{MutableRecordBatch, RowType}; +use hashbrown::raw::RawTable; + +/// Grouping aggregate with row format to store the aggregation state. +/// +/// The Architecture is similar to that in [`super::GroupedHashAggregateStream`] but use +/// row format inside the HashTable to store aggregation buffers. +pub(crate) struct GroupedHashAggregateStreamV2 { + schema: SchemaRef, + input: SendableRecordBatchStream, + mode: AggregateMode, + aggr_state: AggregationState, + aggregate_expressions: Vec<Vec<Arc<dyn PhysicalExpr>>>, + + group_expr: Vec<Arc<dyn PhysicalExpr>>, + accumulators: Vec<AccumulatorItemV2>, + + group_schema: SchemaRef, + aggr_schema: SchemaRef, + aggr_layout: RowLayout, + + baseline_metrics: BaselineMetrics, + random_state: RandomState, + finished: bool, +} + +fn aggr_state_schema(aggr_expr: &[Arc<dyn AggregateExpr>]) -> Result<SchemaRef> { + let fields = aggr_expr + .iter() + .flat_map(|expr| expr.state_fields().unwrap().into_iter()) + .collect::<Vec<_>>(); + Ok(Arc::new(Schema::new(fields))) +} + +impl GroupedHashAggregateStreamV2 { + /// Create a new GroupedRowHashAggregateStream + pub fn new( + mode: AggregateMode, + schema: SchemaRef, + group_expr: Vec<Arc<dyn PhysicalExpr>>, + aggr_expr: Vec<Arc<dyn AggregateExpr>>, + input: SendableRecordBatchStream, + baseline_metrics: BaselineMetrics, + ) -> Result<Self> { + let timer = baseline_metrics.elapsed_compute().timer(); + + // The expressions to evaluate the batch, one vec of expressions per aggregation. + // Assume create_schema() always put group columns in front of aggr columns, we set + // col_idx_base to group expression count. + let aggregate_expressions = + aggregates::aggregate_expressions(&aggr_expr, &mode, group_expr.len())?; + + let accumulators = aggregates::create_accumulators_v2(&aggr_expr)?; + + let group_schema = group_schema(&schema, group_expr.len()); + let aggr_schema = aggr_state_schema(&aggr_expr)?; + + let aggr_layout = RowLayout::new(&aggr_schema, RowType::WordAligned); + timer.done(); + + Ok(Self { + schema, + mode, + input, + group_expr, + accumulators, + group_schema, + aggr_schema, + aggr_layout, + baseline_metrics, + aggregate_expressions, + aggr_state: Default::default(), + random_state: Default::default(), + finished: false, + }) + } +} + +impl Stream for GroupedHashAggregateStreamV2 { + type Item = ArrowResult<RecordBatch>; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Option<Self::Item>> { + let this = &mut *self; + if this.finished { + return Poll::Ready(None); + } + + let elapsed_compute = this.baseline_metrics.elapsed_compute(); + + loop { + let result = match ready!(this.input.poll_next_unpin(cx)) { + Some(Ok(batch)) => { + let timer = elapsed_compute.timer(); + let result = group_aggregate_batch( + &this.mode, + &this.random_state, + &this.group_expr, + &mut this.accumulators, + &this.group_schema, + &this.aggr_layout, + batch, + &mut this.aggr_state, + &this.aggregate_expressions, + ); + + timer.done(); + + match result { + Ok(_) => continue, + Err(e) => Err(ArrowError::ExternalError(Box::new(e))), + } + } + Some(Err(e)) => Err(e), + None => { + this.finished = true; + let timer = this.baseline_metrics.elapsed_compute().timer(); + let result = create_batch_from_map( Review Comment: This code effectively makes one massive output record batch -- I think it is also what `GroupedHashAggregateStream` does but it would be better in my opinion to stream the output (aka respect the `batch_size` configuration. Maybe we can file a ticket to do so ########## datafusion/core/src/physical_plan/aggregates/row_hash.rs: ########## @@ -0,0 +1,472 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Hash aggregation through row format + +use std::sync::Arc; +use std::task::{Context, Poll}; +use std::vec; + +use ahash::RandomState; +use futures::{ + ready, + stream::{Stream, StreamExt}, +}; + +use crate::error::Result; +use crate::physical_plan::aggregates::{ + evaluate, evaluate_many, group_schema, AccumulatorItemV2, AggregateMode, +}; +use crate::physical_plan::hash_utils::create_row_hashes; +use crate::physical_plan::metrics::{BaselineMetrics, RecordOutput}; +use crate::physical_plan::{aggregates, AggregateExpr, PhysicalExpr}; +use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream}; + +use arrow::compute::cast; +use arrow::datatypes::Schema; +use arrow::{array::ArrayRef, compute}; +use arrow::{ + array::{Array, UInt32Builder}, + error::{ArrowError, Result as ArrowResult}, +}; +use arrow::{datatypes::SchemaRef, record_batch::RecordBatch}; +use datafusion_common::ScalarValue; +use datafusion_row::accessor::RowAccessor; +use datafusion_row::layout::RowLayout; +use datafusion_row::reader::{read_row, RowReader}; +use datafusion_row::writer::{write_row, RowWriter}; +use datafusion_row::{MutableRecordBatch, RowType}; +use hashbrown::raw::RawTable; + +/// Grouping aggregate with row format to store the aggregation state. +/// +/// The Architecture is similar to that in [`super::GroupedHashAggregateStream`] but use +/// row format inside the HashTable to store aggregation buffers. +pub(crate) struct GroupedHashAggregateStreamV2 { + schema: SchemaRef, + input: SendableRecordBatchStream, + mode: AggregateMode, + aggr_state: AggregationState, + aggregate_expressions: Vec<Vec<Arc<dyn PhysicalExpr>>>, + + group_expr: Vec<Arc<dyn PhysicalExpr>>, + accumulators: Vec<AccumulatorItemV2>, + + group_schema: SchemaRef, + aggr_schema: SchemaRef, + aggr_layout: RowLayout, + + baseline_metrics: BaselineMetrics, + random_state: RandomState, + finished: bool, +} + +fn aggr_state_schema(aggr_expr: &[Arc<dyn AggregateExpr>]) -> Result<SchemaRef> { + let fields = aggr_expr + .iter() + .flat_map(|expr| expr.state_fields().unwrap().into_iter()) + .collect::<Vec<_>>(); + Ok(Arc::new(Schema::new(fields))) +} + +impl GroupedHashAggregateStreamV2 { + /// Create a new GroupedRowHashAggregateStream + pub fn new( + mode: AggregateMode, + schema: SchemaRef, + group_expr: Vec<Arc<dyn PhysicalExpr>>, + aggr_expr: Vec<Arc<dyn AggregateExpr>>, + input: SendableRecordBatchStream, + baseline_metrics: BaselineMetrics, + ) -> Result<Self> { + let timer = baseline_metrics.elapsed_compute().timer(); + + // The expressions to evaluate the batch, one vec of expressions per aggregation. + // Assume create_schema() always put group columns in front of aggr columns, we set + // col_idx_base to group expression count. + let aggregate_expressions = + aggregates::aggregate_expressions(&aggr_expr, &mode, group_expr.len())?; + + let accumulators = aggregates::create_accumulators_v2(&aggr_expr)?; + + let group_schema = group_schema(&schema, group_expr.len()); + let aggr_schema = aggr_state_schema(&aggr_expr)?; + + let aggr_layout = RowLayout::new(&aggr_schema, RowType::WordAligned); + timer.done(); + + Ok(Self { + schema, + mode, + input, + group_expr, + accumulators, + group_schema, + aggr_schema, + aggr_layout, + baseline_metrics, + aggregate_expressions, + aggr_state: Default::default(), + random_state: Default::default(), + finished: false, + }) + } +} + +impl Stream for GroupedHashAggregateStreamV2 { + type Item = ArrowResult<RecordBatch>; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Option<Self::Item>> { + let this = &mut *self; + if this.finished { + return Poll::Ready(None); + } + + let elapsed_compute = this.baseline_metrics.elapsed_compute(); + + loop { + let result = match ready!(this.input.poll_next_unpin(cx)) { + Some(Ok(batch)) => { + let timer = elapsed_compute.timer(); + let result = group_aggregate_batch( + &this.mode, + &this.random_state, + &this.group_expr, + &mut this.accumulators, + &this.group_schema, + &this.aggr_layout, + batch, + &mut this.aggr_state, + &this.aggregate_expressions, + ); + + timer.done(); + + match result { + Ok(_) => continue, + Err(e) => Err(ArrowError::ExternalError(Box::new(e))), + } + } + Some(Err(e)) => Err(e), + None => { + this.finished = true; + let timer = this.baseline_metrics.elapsed_compute().timer(); + let result = create_batch_from_map( + &this.mode, + &this.group_schema, + &this.aggr_schema, + &mut this.aggr_state, + &mut this.accumulators, + &this.schema, + ) + .record_output(&this.baseline_metrics); + + timer.done(); + result + } + }; + + this.finished = true; + return Poll::Ready(Some(result)); + } + } +} + +impl RecordBatchStream for GroupedHashAggregateStreamV2 { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} + +/// TODO: Make this a member function of [`GroupedHashAggregateStreamV2`] Review Comment: 👍 ########## datafusion/physical-expr/src/aggregate/sum.rs: ########## @@ -315,6 +354,84 @@ pub(crate) fn sum(lhs: &ScalarValue, rhs: &ScalarValue) -> Result<ScalarValue> { }) } +pub(crate) fn add_to_row( + dt: &DataType, + index: usize, + accessor: &mut RowAccessor, + s: &ScalarValue, +) -> Result<()> { + match (dt, s) { + // float64 coerces everything to f64 Review Comment: 🤔 I almost wonder how valuable supporting all these types are -- like I wonder if we can use `u64` or `i64` accumulators for all integer types and `f64` for floats and reduce the code. I don't think this PR is making things any better or worse, but it just seems like these type `match` statements are so common and repetitive ########## datafusion/physical-expr/src/aggregate/sum.rs: ########## @@ -338,6 +455,42 @@ impl Accumulator for SumAccumulator { } } +#[derive(Debug)] +struct SumAccumulatorV2 { + index: usize, Review Comment: I think it would help to document what this is an `index` into -- aka document what the parameters are ########## datafusion/core/src/physical_plan/aggregates/row_hash.rs: ########## @@ -0,0 +1,472 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Hash aggregation through row format + +use std::sync::Arc; +use std::task::{Context, Poll}; +use std::vec; + +use ahash::RandomState; +use futures::{ + ready, + stream::{Stream, StreamExt}, +}; + +use crate::error::Result; +use crate::physical_plan::aggregates::{ + evaluate, evaluate_many, group_schema, AccumulatorItemV2, AggregateMode, +}; +use crate::physical_plan::hash_utils::create_row_hashes; +use crate::physical_plan::metrics::{BaselineMetrics, RecordOutput}; +use crate::physical_plan::{aggregates, AggregateExpr, PhysicalExpr}; +use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream}; + +use arrow::compute::cast; +use arrow::datatypes::Schema; +use arrow::{array::ArrayRef, compute}; +use arrow::{ + array::{Array, UInt32Builder}, + error::{ArrowError, Result as ArrowResult}, +}; +use arrow::{datatypes::SchemaRef, record_batch::RecordBatch}; +use datafusion_common::ScalarValue; +use datafusion_row::accessor::RowAccessor; +use datafusion_row::layout::RowLayout; +use datafusion_row::reader::{read_row, RowReader}; +use datafusion_row::writer::{write_row, RowWriter}; +use datafusion_row::{MutableRecordBatch, RowType}; +use hashbrown::raw::RawTable; + +/// Grouping aggregate with row format to store the aggregation state. +/// +/// The Architecture is similar to that in [`super::GroupedHashAggregateStream`] but use +/// row format inside the HashTable to store aggregation buffers. +pub(crate) struct GroupedHashAggregateStreamV2 { + schema: SchemaRef, + input: SendableRecordBatchStream, + mode: AggregateMode, + aggr_state: AggregationState, + aggregate_expressions: Vec<Vec<Arc<dyn PhysicalExpr>>>, + + group_expr: Vec<Arc<dyn PhysicalExpr>>, + accumulators: Vec<AccumulatorItemV2>, + + group_schema: SchemaRef, + aggr_schema: SchemaRef, + aggr_layout: RowLayout, + + baseline_metrics: BaselineMetrics, + random_state: RandomState, + finished: bool, +} + +fn aggr_state_schema(aggr_expr: &[Arc<dyn AggregateExpr>]) -> Result<SchemaRef> { + let fields = aggr_expr + .iter() + .flat_map(|expr| expr.state_fields().unwrap().into_iter()) + .collect::<Vec<_>>(); + Ok(Arc::new(Schema::new(fields))) +} + +impl GroupedHashAggregateStreamV2 { + /// Create a new GroupedRowHashAggregateStream + pub fn new( + mode: AggregateMode, + schema: SchemaRef, + group_expr: Vec<Arc<dyn PhysicalExpr>>, + aggr_expr: Vec<Arc<dyn AggregateExpr>>, + input: SendableRecordBatchStream, + baseline_metrics: BaselineMetrics, + ) -> Result<Self> { + let timer = baseline_metrics.elapsed_compute().timer(); + + // The expressions to evaluate the batch, one vec of expressions per aggregation. + // Assume create_schema() always put group columns in front of aggr columns, we set + // col_idx_base to group expression count. + let aggregate_expressions = + aggregates::aggregate_expressions(&aggr_expr, &mode, group_expr.len())?; + + let accumulators = aggregates::create_accumulators_v2(&aggr_expr)?; + + let group_schema = group_schema(&schema, group_expr.len()); + let aggr_schema = aggr_state_schema(&aggr_expr)?; + + let aggr_layout = RowLayout::new(&aggr_schema, RowType::WordAligned); + timer.done(); + + Ok(Self { + schema, + mode, + input, + group_expr, + accumulators, + group_schema, + aggr_schema, + aggr_layout, + baseline_metrics, + aggregate_expressions, + aggr_state: Default::default(), + random_state: Default::default(), + finished: false, + }) + } +} + +impl Stream for GroupedHashAggregateStreamV2 { + type Item = ArrowResult<RecordBatch>; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Option<Self::Item>> { + let this = &mut *self; + if this.finished { + return Poll::Ready(None); + } + + let elapsed_compute = this.baseline_metrics.elapsed_compute(); + + loop { + let result = match ready!(this.input.poll_next_unpin(cx)) { + Some(Ok(batch)) => { + let timer = elapsed_compute.timer(); + let result = group_aggregate_batch( + &this.mode, + &this.random_state, + &this.group_expr, + &mut this.accumulators, + &this.group_schema, + &this.aggr_layout, + batch, + &mut this.aggr_state, + &this.aggregate_expressions, + ); + + timer.done(); + + match result { + Ok(_) => continue, + Err(e) => Err(ArrowError::ExternalError(Box::new(e))), + } + } + Some(Err(e)) => Err(e), + None => { + this.finished = true; + let timer = this.baseline_metrics.elapsed_compute().timer(); + let result = create_batch_from_map( + &this.mode, + &this.group_schema, + &this.aggr_schema, + &mut this.aggr_state, + &mut this.accumulators, + &this.schema, + ) + .record_output(&this.baseline_metrics); + + timer.done(); + result + } + }; + + this.finished = true; + return Poll::Ready(Some(result)); + } + } +} + +impl RecordBatchStream for GroupedHashAggregateStreamV2 { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} + +/// TODO: Make this a member function of [`GroupedHashAggregateStreamV2`] +#[allow(clippy::too_many_arguments)] +fn group_aggregate_batch( + mode: &AggregateMode, + random_state: &RandomState, + group_expr: &[Arc<dyn PhysicalExpr>], + accumulators: &mut [AccumulatorItemV2], + group_schema: &Schema, + state_layout: &RowLayout, + batch: RecordBatch, + aggr_state: &mut AggregationState, + aggregate_expressions: &[Vec<Arc<dyn PhysicalExpr>>], +) -> Result<()> { + // evaluate the grouping expressions + let group_values = evaluate(group_expr, &batch)?; + let group_rows: Vec<Vec<u8>> = create_group_rows(group_values, group_schema); + + // evaluate the aggregation expressions. + // We could evaluate them after the `take`, but since we need to evaluate all + // of them anyways, it is more performant to do it while they are together. + let aggr_input_values = evaluate_many(aggregate_expressions, &batch)?; + + // 1.1 construct the key from the group values + // 1.2 construct the mapping key if it does not exist + // 1.3 add the row' index to `indices` + + // track which entries in `aggr_state` have rows in this batch to aggregate + let mut groups_with_rows = vec![]; + + // 1.1 Calculate the group keys for the group values + let mut batch_hashes = vec![0; batch.num_rows()]; + create_row_hashes(&group_rows, random_state, &mut batch_hashes)?; + + for (row, hash) in batch_hashes.into_iter().enumerate() { + let AggregationState { map, group_states } = aggr_state; + + let entry = map.get_mut(hash, |(_hash, group_idx)| { + // verify that a group that we are inserting with hash is + // actually the same key value as the group in + // existing_idx (aka group_values @ row) + let group_state = &group_states[*group_idx]; + group_rows[row] == group_state.group_by_values + }); + + match entry { + // Existing entry for this group value + Some((_hash, group_idx)) => { + let group_state = &mut group_states[*group_idx]; + // 1.3 + if group_state.indices.is_empty() { + groups_with_rows.push(*group_idx); + }; + group_state.indices.push(row as u32); // remember this row + } + // 1.2 Need to create new entry + None => { + // Add new entry to group_states and save newly created index + let group_state = RowGroupState { + group_by_values: group_rows[row].clone(), + aggregation_buffer: vec![0; state_layout.fixed_part_width()], + indices: vec![row as u32], // 1.3 + }; + let group_idx = group_states.len(); + group_states.push(group_state); + groups_with_rows.push(group_idx); + + // for hasher function, use precomputed hash value + map.insert(hash, (hash, group_idx), |(hash, _group_idx)| *hash); + } + }; + } + + // Collect all indices + offsets based on keys in this vec + let mut batch_indices: UInt32Builder = UInt32Builder::new(0); + let mut offsets = vec![0]; + let mut offset_so_far = 0; + for group_idx in groups_with_rows.iter() { + let indices = &aggr_state.group_states[*group_idx].indices; + batch_indices.append_slice(indices)?; + offset_so_far += indices.len(); + offsets.push(offset_so_far); + } + let batch_indices = batch_indices.finish(); + + // `Take` all values based on indices into Arrays + let values: Vec<Vec<Arc<dyn Array>>> = aggr_input_values + .iter() + .map(|array| { + array + .iter() + .map(|array| { + compute::take( + array.as_ref(), + &batch_indices, + None, // None: no index check + ) + .unwrap() + }) + .collect() + // 2.3 + }) + .collect(); + + // 2.1 for each key in this batch + // 2.2 for each aggregation + // 2.3 `slice` from each of its arrays the keys' values + // 2.4 update / merge the accumulator with the values + // 2.5 clear indices + groups_with_rows + .iter() + .zip(offsets.windows(2)) + .try_for_each(|(group_idx, offsets)| { + let group_state = &mut aggr_state.group_states[*group_idx]; + // 2.2 + accumulators + .iter_mut() + .zip(values.iter()) + .map(|(accumulator, aggr_array)| { + ( + accumulator, + aggr_array + .iter() + .map(|array| { + // 2.3 + array.slice(offsets[0], offsets[1] - offsets[0]) + }) + .collect::<Vec<ArrayRef>>(), + ) + }) + .try_for_each(|(accumulator, values)| { + let mut state_accessor = + RowAccessor::new_from_layout(state_layout.clone()); + state_accessor + .point_to(0, group_state.aggregation_buffer.as_mut_slice()); + match mode { + AggregateMode::Partial => { + accumulator.update_batch(&values, &mut state_accessor) + } + AggregateMode::FinalPartitioned | AggregateMode::Final => { + // note: the aggregation here is over states, not values, thus the merge + accumulator.merge_batch(&values, &mut state_accessor) + } + } + }) + // 2.5 + .and({ + group_state.indices.clear(); + Ok(()) + }) + })?; + + Ok(()) +} + +/// The state that is built for each output group. +#[derive(Debug)] +struct RowGroupState { + /// The actual group by values, stored sequentially + group_by_values: Vec<u8>, + + // Accumulator state, stored sequentially + aggregation_buffer: Vec<u8>, + + /// scratch space used to collect indices for input rows in a + /// bach that have values to aggregate. Reset on each batch + indices: Vec<u32>, +} + +/// The state of all the groups +#[derive(Default)] +struct AggregationState { + /// Logically maps group values to an index in `group_states` + /// + /// Uses the raw API of hashbrown to avoid actually storing the + /// keys in the table + /// + /// keys: u64 hashes of the GroupValue + /// values: (hash, index into `group_states`) + map: RawTable<(u64, usize)>, + + /// State for each group + group_states: Vec<RowGroupState>, +} + +impl std::fmt::Debug for AggregationState { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + // hashes are not store inline, so could only get values + let map_string = "RawTable"; + f.debug_struct("RowAccumulators") Review Comment: ```suggestion f.debug_struct("AggregationState") ``` ########## datafusion/core/src/physical_plan/aggregates/row_hash.rs: ########## @@ -0,0 +1,472 @@ +// Licensed to the Apache Software Foundation (ASF) under one Review Comment: I found reviewing this code was easier after comparing with `hash.rs` as well: ```shell meld datafusion/core/src/physical_plan/aggregates/hash.rs datafusion/core/src/physical_plan/aggregates/row_hash.rs ``` ########## datafusion/core/src/physical_plan/aggregates/row_hash.rs: ########## @@ -0,0 +1,472 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Hash aggregation through row format + +use std::sync::Arc; +use std::task::{Context, Poll}; +use std::vec; + +use ahash::RandomState; +use futures::{ + ready, + stream::{Stream, StreamExt}, +}; + +use crate::error::Result; +use crate::physical_plan::aggregates::{ + evaluate, evaluate_many, group_schema, AccumulatorItemV2, AggregateMode, +}; +use crate::physical_plan::hash_utils::create_row_hashes; +use crate::physical_plan::metrics::{BaselineMetrics, RecordOutput}; +use crate::physical_plan::{aggregates, AggregateExpr, PhysicalExpr}; +use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream}; + +use arrow::compute::cast; +use arrow::datatypes::Schema; +use arrow::{array::ArrayRef, compute}; +use arrow::{ + array::{Array, UInt32Builder}, + error::{ArrowError, Result as ArrowResult}, +}; +use arrow::{datatypes::SchemaRef, record_batch::RecordBatch}; +use datafusion_common::ScalarValue; +use datafusion_row::accessor::RowAccessor; +use datafusion_row::layout::RowLayout; +use datafusion_row::reader::{read_row, RowReader}; +use datafusion_row::writer::{write_row, RowWriter}; +use datafusion_row::{MutableRecordBatch, RowType}; +use hashbrown::raw::RawTable; + +/// Grouping aggregate with row format to store the aggregation state. +/// +/// The Architecture is similar to that in [`super::GroupedHashAggregateStream`] but use +/// row format inside the HashTable to store aggregation buffers. +pub(crate) struct GroupedHashAggregateStreamV2 { + schema: SchemaRef, + input: SendableRecordBatchStream, + mode: AggregateMode, + aggr_state: AggregationState, + aggregate_expressions: Vec<Vec<Arc<dyn PhysicalExpr>>>, + + group_expr: Vec<Arc<dyn PhysicalExpr>>, + accumulators: Vec<AccumulatorItemV2>, + + group_schema: SchemaRef, + aggr_schema: SchemaRef, + aggr_layout: RowLayout, + + baseline_metrics: BaselineMetrics, + random_state: RandomState, + finished: bool, +} + +fn aggr_state_schema(aggr_expr: &[Arc<dyn AggregateExpr>]) -> Result<SchemaRef> { + let fields = aggr_expr + .iter() + .flat_map(|expr| expr.state_fields().unwrap().into_iter()) + .collect::<Vec<_>>(); + Ok(Arc::new(Schema::new(fields))) +} + +impl GroupedHashAggregateStreamV2 { + /// Create a new GroupedRowHashAggregateStream + pub fn new( + mode: AggregateMode, + schema: SchemaRef, + group_expr: Vec<Arc<dyn PhysicalExpr>>, + aggr_expr: Vec<Arc<dyn AggregateExpr>>, + input: SendableRecordBatchStream, + baseline_metrics: BaselineMetrics, + ) -> Result<Self> { + let timer = baseline_metrics.elapsed_compute().timer(); + + // The expressions to evaluate the batch, one vec of expressions per aggregation. + // Assume create_schema() always put group columns in front of aggr columns, we set + // col_idx_base to group expression count. + let aggregate_expressions = + aggregates::aggregate_expressions(&aggr_expr, &mode, group_expr.len())?; + + let accumulators = aggregates::create_accumulators_v2(&aggr_expr)?; + + let group_schema = group_schema(&schema, group_expr.len()); + let aggr_schema = aggr_state_schema(&aggr_expr)?; + + let aggr_layout = RowLayout::new(&aggr_schema, RowType::WordAligned); + timer.done(); + + Ok(Self { + schema, + mode, + input, + group_expr, + accumulators, + group_schema, + aggr_schema, + aggr_layout, + baseline_metrics, + aggregate_expressions, + aggr_state: Default::default(), + random_state: Default::default(), + finished: false, + }) + } +} + +impl Stream for GroupedHashAggregateStreamV2 { + type Item = ArrowResult<RecordBatch>; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Option<Self::Item>> { + let this = &mut *self; + if this.finished { + return Poll::Ready(None); + } + + let elapsed_compute = this.baseline_metrics.elapsed_compute(); + + loop { + let result = match ready!(this.input.poll_next_unpin(cx)) { + Some(Ok(batch)) => { + let timer = elapsed_compute.timer(); + let result = group_aggregate_batch( + &this.mode, + &this.random_state, + &this.group_expr, + &mut this.accumulators, + &this.group_schema, + &this.aggr_layout, + batch, + &mut this.aggr_state, + &this.aggregate_expressions, + ); + + timer.done(); + + match result { + Ok(_) => continue, + Err(e) => Err(ArrowError::ExternalError(Box::new(e))), + } + } + Some(Err(e)) => Err(e), + None => { + this.finished = true; + let timer = this.baseline_metrics.elapsed_compute().timer(); + let result = create_batch_from_map( + &this.mode, + &this.group_schema, + &this.aggr_schema, + &mut this.aggr_state, + &mut this.accumulators, + &this.schema, + ) + .record_output(&this.baseline_metrics); + + timer.done(); + result + } + }; + + this.finished = true; + return Poll::Ready(Some(result)); + } + } +} + +impl RecordBatchStream for GroupedHashAggregateStreamV2 { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} + +/// TODO: Make this a member function of [`GroupedHashAggregateStreamV2`] +#[allow(clippy::too_many_arguments)] +fn group_aggregate_batch( + mode: &AggregateMode, + random_state: &RandomState, + group_expr: &[Arc<dyn PhysicalExpr>], + accumulators: &mut [AccumulatorItemV2], + group_schema: &Schema, + state_layout: &RowLayout, + batch: RecordBatch, + aggr_state: &mut AggregationState, + aggregate_expressions: &[Vec<Arc<dyn PhysicalExpr>>], +) -> Result<()> { + // evaluate the grouping expressions + let group_values = evaluate(group_expr, &batch)?; + let group_rows: Vec<Vec<u8>> = create_group_rows(group_values, group_schema); + + // evaluate the aggregation expressions. + // We could evaluate them after the `take`, but since we need to evaluate all + // of them anyways, it is more performant to do it while they are together. + let aggr_input_values = evaluate_many(aggregate_expressions, &batch)?; + + // 1.1 construct the key from the group values + // 1.2 construct the mapping key if it does not exist + // 1.3 add the row' index to `indices` + + // track which entries in `aggr_state` have rows in this batch to aggregate + let mut groups_with_rows = vec![]; + + // 1.1 Calculate the group keys for the group values + let mut batch_hashes = vec![0; batch.num_rows()]; + create_row_hashes(&group_rows, random_state, &mut batch_hashes)?; + + for (row, hash) in batch_hashes.into_iter().enumerate() { + let AggregationState { map, group_states } = aggr_state; + + let entry = map.get_mut(hash, |(_hash, group_idx)| { + // verify that a group that we are inserting with hash is + // actually the same key value as the group in + // existing_idx (aka group_values @ row) + let group_state = &group_states[*group_idx]; + group_rows[row] == group_state.group_by_values + }); + + match entry { + // Existing entry for this group value + Some((_hash, group_idx)) => { + let group_state = &mut group_states[*group_idx]; + // 1.3 + if group_state.indices.is_empty() { + groups_with_rows.push(*group_idx); + }; + group_state.indices.push(row as u32); // remember this row + } + // 1.2 Need to create new entry + None => { + // Add new entry to group_states and save newly created index + let group_state = RowGroupState { + group_by_values: group_rows[row].clone(), + aggregation_buffer: vec![0; state_layout.fixed_part_width()], + indices: vec![row as u32], // 1.3 + }; + let group_idx = group_states.len(); + group_states.push(group_state); + groups_with_rows.push(group_idx); + + // for hasher function, use precomputed hash value + map.insert(hash, (hash, group_idx), |(hash, _group_idx)| *hash); + } + }; + } + + // Collect all indices + offsets based on keys in this vec + let mut batch_indices: UInt32Builder = UInt32Builder::new(0); + let mut offsets = vec![0]; + let mut offset_so_far = 0; + for group_idx in groups_with_rows.iter() { + let indices = &aggr_state.group_states[*group_idx].indices; + batch_indices.append_slice(indices)?; + offset_so_far += indices.len(); + offsets.push(offset_so_far); + } + let batch_indices = batch_indices.finish(); + + // `Take` all values based on indices into Arrays + let values: Vec<Vec<Arc<dyn Array>>> = aggr_input_values + .iter() + .map(|array| { + array + .iter() + .map(|array| { + compute::take( + array.as_ref(), + &batch_indices, + None, // None: no index check + ) + .unwrap() + }) + .collect() + // 2.3 + }) + .collect(); + + // 2.1 for each key in this batch + // 2.2 for each aggregation + // 2.3 `slice` from each of its arrays the keys' values + // 2.4 update / merge the accumulator with the values + // 2.5 clear indices + groups_with_rows + .iter() + .zip(offsets.windows(2)) + .try_for_each(|(group_idx, offsets)| { + let group_state = &mut aggr_state.group_states[*group_idx]; + // 2.2 + accumulators + .iter_mut() + .zip(values.iter()) + .map(|(accumulator, aggr_array)| { + ( + accumulator, + aggr_array + .iter() + .map(|array| { + // 2.3 + array.slice(offsets[0], offsets[1] - offsets[0]) + }) + .collect::<Vec<ArrayRef>>(), + ) + }) + .try_for_each(|(accumulator, values)| { + let mut state_accessor = Review Comment: Depending on how often this is called (as the state_layout has `Vec`s, etc), it may make sense to keep the RowAccessor in `group_state` or else reduce the cost of sharing `StateLayout`) ########## datafusion/core/src/physical_plan/aggregates/mod.rs: ########## @@ -142,6 +147,12 @@ impl AggregateExec { pub fn input_schema(&self) -> SchemaRef { self.input_schema.clone() } + + fn row_aggregate_supported(&self) -> bool { + let group_schema = group_schema(&self.schema, self.group_expr.len()); + row_supported(&group_schema, RowType::Compact) Review Comment: is it a problem that this is checking `RowType::Compact` but the code in `GroupedHashAggregateStreamV2` is using `RowType::WordAligned`: ```rust let aggr_layout = RowLayout::new(&aggr_schema, RowType::WordAligned); ``` ########## datafusion/core/src/physical_plan/aggregates/row_hash.rs: ########## @@ -0,0 +1,472 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Hash aggregation through row format + +use std::sync::Arc; +use std::task::{Context, Poll}; +use std::vec; + +use ahash::RandomState; +use futures::{ + ready, + stream::{Stream, StreamExt}, +}; + +use crate::error::Result; +use crate::physical_plan::aggregates::{ + evaluate, evaluate_many, group_schema, AccumulatorItemV2, AggregateMode, +}; +use crate::physical_plan::hash_utils::create_row_hashes; +use crate::physical_plan::metrics::{BaselineMetrics, RecordOutput}; +use crate::physical_plan::{aggregates, AggregateExpr, PhysicalExpr}; +use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream}; + +use arrow::compute::cast; +use arrow::datatypes::Schema; +use arrow::{array::ArrayRef, compute}; +use arrow::{ + array::{Array, UInt32Builder}, + error::{ArrowError, Result as ArrowResult}, +}; +use arrow::{datatypes::SchemaRef, record_batch::RecordBatch}; +use datafusion_common::ScalarValue; +use datafusion_row::accessor::RowAccessor; +use datafusion_row::layout::RowLayout; +use datafusion_row::reader::{read_row, RowReader}; +use datafusion_row::writer::{write_row, RowWriter}; +use datafusion_row::{MutableRecordBatch, RowType}; +use hashbrown::raw::RawTable; + +/// Grouping aggregate with row format to store the aggregation state. +/// +/// The Architecture is similar to that in [`super::GroupedHashAggregateStream`] but use +/// row format inside the HashTable to store aggregation buffers. +pub(crate) struct GroupedHashAggregateStreamV2 { + schema: SchemaRef, + input: SendableRecordBatchStream, + mode: AggregateMode, + aggr_state: AggregationState, + aggregate_expressions: Vec<Vec<Arc<dyn PhysicalExpr>>>, + + group_expr: Vec<Arc<dyn PhysicalExpr>>, + accumulators: Vec<AccumulatorItemV2>, + + group_schema: SchemaRef, + aggr_schema: SchemaRef, + aggr_layout: RowLayout, + + baseline_metrics: BaselineMetrics, + random_state: RandomState, + finished: bool, +} + +fn aggr_state_schema(aggr_expr: &[Arc<dyn AggregateExpr>]) -> Result<SchemaRef> { + let fields = aggr_expr + .iter() + .flat_map(|expr| expr.state_fields().unwrap().into_iter()) + .collect::<Vec<_>>(); + Ok(Arc::new(Schema::new(fields))) +} + +impl GroupedHashAggregateStreamV2 { + /// Create a new GroupedRowHashAggregateStream + pub fn new( + mode: AggregateMode, + schema: SchemaRef, + group_expr: Vec<Arc<dyn PhysicalExpr>>, + aggr_expr: Vec<Arc<dyn AggregateExpr>>, + input: SendableRecordBatchStream, + baseline_metrics: BaselineMetrics, + ) -> Result<Self> { + let timer = baseline_metrics.elapsed_compute().timer(); + + // The expressions to evaluate the batch, one vec of expressions per aggregation. + // Assume create_schema() always put group columns in front of aggr columns, we set + // col_idx_base to group expression count. + let aggregate_expressions = + aggregates::aggregate_expressions(&aggr_expr, &mode, group_expr.len())?; + + let accumulators = aggregates::create_accumulators_v2(&aggr_expr)?; + + let group_schema = group_schema(&schema, group_expr.len()); + let aggr_schema = aggr_state_schema(&aggr_expr)?; + + let aggr_layout = RowLayout::new(&aggr_schema, RowType::WordAligned); + timer.done(); + + Ok(Self { + schema, + mode, + input, + group_expr, + accumulators, + group_schema, + aggr_schema, + aggr_layout, + baseline_metrics, + aggregate_expressions, + aggr_state: Default::default(), + random_state: Default::default(), + finished: false, + }) + } +} + +impl Stream for GroupedHashAggregateStreamV2 { + type Item = ArrowResult<RecordBatch>; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Option<Self::Item>> { + let this = &mut *self; + if this.finished { + return Poll::Ready(None); + } + + let elapsed_compute = this.baseline_metrics.elapsed_compute(); + + loop { + let result = match ready!(this.input.poll_next_unpin(cx)) { + Some(Ok(batch)) => { + let timer = elapsed_compute.timer(); + let result = group_aggregate_batch( + &this.mode, + &this.random_state, + &this.group_expr, + &mut this.accumulators, + &this.group_schema, + &this.aggr_layout, + batch, + &mut this.aggr_state, + &this.aggregate_expressions, + ); + + timer.done(); + + match result { + Ok(_) => continue, + Err(e) => Err(ArrowError::ExternalError(Box::new(e))), + } + } + Some(Err(e)) => Err(e), + None => { + this.finished = true; + let timer = this.baseline_metrics.elapsed_compute().timer(); + let result = create_batch_from_map( + &this.mode, + &this.group_schema, + &this.aggr_schema, + &mut this.aggr_state, + &mut this.accumulators, + &this.schema, + ) + .record_output(&this.baseline_metrics); + + timer.done(); + result + } + }; + + this.finished = true; + return Poll::Ready(Some(result)); + } + } +} + +impl RecordBatchStream for GroupedHashAggregateStreamV2 { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} + +/// TODO: Make this a member function of [`GroupedHashAggregateStreamV2`] +#[allow(clippy::too_many_arguments)] +fn group_aggregate_batch( + mode: &AggregateMode, + random_state: &RandomState, + group_expr: &[Arc<dyn PhysicalExpr>], + accumulators: &mut [AccumulatorItemV2], + group_schema: &Schema, + state_layout: &RowLayout, + batch: RecordBatch, + aggr_state: &mut AggregationState, + aggregate_expressions: &[Vec<Arc<dyn PhysicalExpr>>], +) -> Result<()> { + // evaluate the grouping expressions + let group_values = evaluate(group_expr, &batch)?; + let group_rows: Vec<Vec<u8>> = create_group_rows(group_values, group_schema); + + // evaluate the aggregation expressions. + // We could evaluate them after the `take`, but since we need to evaluate all + // of them anyways, it is more performant to do it while they are together. + let aggr_input_values = evaluate_many(aggregate_expressions, &batch)?; + + // 1.1 construct the key from the group values + // 1.2 construct the mapping key if it does not exist + // 1.3 add the row' index to `indices` + + // track which entries in `aggr_state` have rows in this batch to aggregate + let mut groups_with_rows = vec![]; + + // 1.1 Calculate the group keys for the group values + let mut batch_hashes = vec![0; batch.num_rows()]; + create_row_hashes(&group_rows, random_state, &mut batch_hashes)?; + + for (row, hash) in batch_hashes.into_iter().enumerate() { + let AggregationState { map, group_states } = aggr_state; + + let entry = map.get_mut(hash, |(_hash, group_idx)| { + // verify that a group that we are inserting with hash is + // actually the same key value as the group in + // existing_idx (aka group_values @ row) + let group_state = &group_states[*group_idx]; + group_rows[row] == group_state.group_by_values + }); + + match entry { + // Existing entry for this group value + Some((_hash, group_idx)) => { + let group_state = &mut group_states[*group_idx]; + // 1.3 + if group_state.indices.is_empty() { + groups_with_rows.push(*group_idx); + }; + group_state.indices.push(row as u32); // remember this row + } + // 1.2 Need to create new entry + None => { + // Add new entry to group_states and save newly created index + let group_state = RowGroupState { + group_by_values: group_rows[row].clone(), + aggregation_buffer: vec![0; state_layout.fixed_part_width()], + indices: vec![row as u32], // 1.3 + }; + let group_idx = group_states.len(); + group_states.push(group_state); + groups_with_rows.push(group_idx); + + // for hasher function, use precomputed hash value + map.insert(hash, (hash, group_idx), |(hash, _group_idx)| *hash); + } + }; + } + + // Collect all indices + offsets based on keys in this vec + let mut batch_indices: UInt32Builder = UInt32Builder::new(0); + let mut offsets = vec![0]; + let mut offset_so_far = 0; + for group_idx in groups_with_rows.iter() { + let indices = &aggr_state.group_states[*group_idx].indices; + batch_indices.append_slice(indices)?; + offset_so_far += indices.len(); + offsets.push(offset_so_far); + } + let batch_indices = batch_indices.finish(); + + // `Take` all values based on indices into Arrays + let values: Vec<Vec<Arc<dyn Array>>> = aggr_input_values + .iter() + .map(|array| { + array + .iter() + .map(|array| { + compute::take( + array.as_ref(), + &batch_indices, + None, // None: no index check + ) + .unwrap() + }) + .collect() + // 2.3 + }) + .collect(); + + // 2.1 for each key in this batch + // 2.2 for each aggregation + // 2.3 `slice` from each of its arrays the keys' values + // 2.4 update / merge the accumulator with the values + // 2.5 clear indices + groups_with_rows + .iter() + .zip(offsets.windows(2)) + .try_for_each(|(group_idx, offsets)| { + let group_state = &mut aggr_state.group_states[*group_idx]; + // 2.2 + accumulators + .iter_mut() + .zip(values.iter()) + .map(|(accumulator, aggr_array)| { + ( + accumulator, + aggr_array + .iter() + .map(|array| { + // 2.3 + array.slice(offsets[0], offsets[1] - offsets[0]) + }) + .collect::<Vec<ArrayRef>>(), + ) + }) + .try_for_each(|(accumulator, values)| { + let mut state_accessor = + RowAccessor::new_from_layout(state_layout.clone()); + state_accessor + .point_to(0, group_state.aggregation_buffer.as_mut_slice()); + match mode { + AggregateMode::Partial => { + accumulator.update_batch(&values, &mut state_accessor) + } + AggregateMode::FinalPartitioned | AggregateMode::Final => { + // note: the aggregation here is over states, not values, thus the merge + accumulator.merge_batch(&values, &mut state_accessor) + } + } + }) + // 2.5 + .and({ + group_state.indices.clear(); + Ok(()) + }) + })?; + + Ok(()) +} + +/// The state that is built for each output group. +#[derive(Debug)] +struct RowGroupState { Review Comment: ❤️ ########## datafusion/row/src/accessor.rs: ########## @@ -0,0 +1,295 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Setter/Getter for row with all fixed-sized fields. + +use crate::layout::{RowLayout, RowType}; +use crate::validity::NullBitsFormatter; +use crate::{fn_get_idx, fn_get_idx_opt, fn_set_idx}; +use arrow::datatypes::{DataType, Schema}; +use arrow::util::bit_util::{get_bit_raw, set_bit_raw}; +use datafusion_common::ScalarValue; + +//TODO: DRY with reader and writer + +/// Read the tuple `data[base_offset..]` we are currently pointing to +pub struct RowAccessor<'a> { + /// Layout on how to read each field + layout: RowLayout, Review Comment: The layout is non trivial (it has a `Vec`) -- perhaps it is worth also making this a `layout: &RowLayout` or `layout: Arc<RowLayout>` ########## datafusion/physical-expr/src/aggregate/sum.rs: ########## @@ -338,6 +455,42 @@ impl Accumulator for SumAccumulator { } } +#[derive(Debug)] +struct SumAccumulatorV2 { + index: usize, + datatype: DataType, +} + +impl SumAccumulatorV2 { + pub fn new(index: usize, datatype: DataType) -> Self { + Self { index, datatype } + } +} + +impl AccumulatorV2 for SumAccumulatorV2 { + fn update_batch( + &mut self, + values: &[ArrayRef], + accessor: &mut RowAccessor, + ) -> Result<()> { + let values = &values[0]; + add_to_row(&self.datatype, self.index, accessor, &sum_batch(values)?)?; Review Comment: I wonder if it is needed to go through `sum_batch` here (which turns the sum into a `ScalarValue`) -- perhaps we could call the appropriate sum kernel followed by a direct update ########## datafusion/row/src/accessor.rs: ########## @@ -0,0 +1,295 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Setter/Getter for row with all fixed-sized fields. + +use crate::layout::{RowLayout, RowType}; +use crate::validity::NullBitsFormatter; +use crate::{fn_get_idx, fn_get_idx_opt, fn_set_idx}; +use arrow::datatypes::{DataType, Schema}; +use arrow::util::bit_util::{get_bit_raw, set_bit_raw}; +use datafusion_common::ScalarValue; + +//TODO: DRY with reader and writer Review Comment: yeah, maybe consolidating with the writer would be cool ########## datafusion/core/src/physical_plan/aggregates/row_hash.rs: ########## @@ -0,0 +1,472 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Hash aggregation through row format + Review Comment: It might be helpful to add the comments about the architecture from `hash.rs` ``` /* The architecture is the following: ``` ########## datafusion/core/src/physical_plan/aggregates/row_hash.rs: ########## @@ -0,0 +1,472 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Hash aggregation through row format + +use std::sync::Arc; +use std::task::{Context, Poll}; +use std::vec; + +use ahash::RandomState; +use futures::{ + ready, + stream::{Stream, StreamExt}, +}; + +use crate::error::Result; +use crate::physical_plan::aggregates::{ + evaluate, evaluate_many, group_schema, AccumulatorItemV2, AggregateMode, +}; +use crate::physical_plan::hash_utils::create_row_hashes; +use crate::physical_plan::metrics::{BaselineMetrics, RecordOutput}; +use crate::physical_plan::{aggregates, AggregateExpr, PhysicalExpr}; +use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream}; + +use arrow::compute::cast; +use arrow::datatypes::Schema; +use arrow::{array::ArrayRef, compute}; +use arrow::{ + array::{Array, UInt32Builder}, + error::{ArrowError, Result as ArrowResult}, +}; +use arrow::{datatypes::SchemaRef, record_batch::RecordBatch}; +use datafusion_common::ScalarValue; +use datafusion_row::accessor::RowAccessor; +use datafusion_row::layout::RowLayout; +use datafusion_row::reader::{read_row, RowReader}; +use datafusion_row::writer::{write_row, RowWriter}; +use datafusion_row::{MutableRecordBatch, RowType}; +use hashbrown::raw::RawTable; + +/// Grouping aggregate with row format to store the aggregation state. +/// +/// The Architecture is similar to that in [`super::GroupedHashAggregateStream`] but use +/// row format inside the HashTable to store aggregation buffers. +pub(crate) struct GroupedHashAggregateStreamV2 { + schema: SchemaRef, + input: SendableRecordBatchStream, + mode: AggregateMode, + aggr_state: AggregationState, + aggregate_expressions: Vec<Vec<Arc<dyn PhysicalExpr>>>, + + group_expr: Vec<Arc<dyn PhysicalExpr>>, + accumulators: Vec<AccumulatorItemV2>, + + group_schema: SchemaRef, + aggr_schema: SchemaRef, + aggr_layout: RowLayout, + + baseline_metrics: BaselineMetrics, + random_state: RandomState, + finished: bool, +} + +fn aggr_state_schema(aggr_expr: &[Arc<dyn AggregateExpr>]) -> Result<SchemaRef> { + let fields = aggr_expr + .iter() + .flat_map(|expr| expr.state_fields().unwrap().into_iter()) + .collect::<Vec<_>>(); + Ok(Arc::new(Schema::new(fields))) +} + +impl GroupedHashAggregateStreamV2 { + /// Create a new GroupedRowHashAggregateStream + pub fn new( + mode: AggregateMode, + schema: SchemaRef, + group_expr: Vec<Arc<dyn PhysicalExpr>>, + aggr_expr: Vec<Arc<dyn AggregateExpr>>, + input: SendableRecordBatchStream, + baseline_metrics: BaselineMetrics, + ) -> Result<Self> { + let timer = baseline_metrics.elapsed_compute().timer(); + + // The expressions to evaluate the batch, one vec of expressions per aggregation. + // Assume create_schema() always put group columns in front of aggr columns, we set + // col_idx_base to group expression count. + let aggregate_expressions = + aggregates::aggregate_expressions(&aggr_expr, &mode, group_expr.len())?; + + let accumulators = aggregates::create_accumulators_v2(&aggr_expr)?; + + let group_schema = group_schema(&schema, group_expr.len()); + let aggr_schema = aggr_state_schema(&aggr_expr)?; + + let aggr_layout = RowLayout::new(&aggr_schema, RowType::WordAligned); + timer.done(); + + Ok(Self { + schema, + mode, + input, + group_expr, + accumulators, + group_schema, + aggr_schema, + aggr_layout, + baseline_metrics, + aggregate_expressions, + aggr_state: Default::default(), + random_state: Default::default(), + finished: false, + }) + } +} + +impl Stream for GroupedHashAggregateStreamV2 { + type Item = ArrowResult<RecordBatch>; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Option<Self::Item>> { + let this = &mut *self; + if this.finished { + return Poll::Ready(None); + } + + let elapsed_compute = this.baseline_metrics.elapsed_compute(); + + loop { + let result = match ready!(this.input.poll_next_unpin(cx)) { + Some(Ok(batch)) => { + let timer = elapsed_compute.timer(); + let result = group_aggregate_batch( + &this.mode, + &this.random_state, + &this.group_expr, + &mut this.accumulators, + &this.group_schema, + &this.aggr_layout, + batch, + &mut this.aggr_state, + &this.aggregate_expressions, + ); + + timer.done(); + + match result { + Ok(_) => continue, + Err(e) => Err(ArrowError::ExternalError(Box::new(e))), + } + } + Some(Err(e)) => Err(e), + None => { + this.finished = true; + let timer = this.baseline_metrics.elapsed_compute().timer(); + let result = create_batch_from_map( + &this.mode, + &this.group_schema, + &this.aggr_schema, + &mut this.aggr_state, + &mut this.accumulators, + &this.schema, + ) + .record_output(&this.baseline_metrics); + + timer.done(); + result + } + }; + + this.finished = true; + return Poll::Ready(Some(result)); + } + } +} + +impl RecordBatchStream for GroupedHashAggregateStreamV2 { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} + +/// TODO: Make this a member function of [`GroupedHashAggregateStreamV2`] +#[allow(clippy::too_many_arguments)] +fn group_aggregate_batch( + mode: &AggregateMode, + random_state: &RandomState, + group_expr: &[Arc<dyn PhysicalExpr>], + accumulators: &mut [AccumulatorItemV2], + group_schema: &Schema, + state_layout: &RowLayout, + batch: RecordBatch, + aggr_state: &mut AggregationState, + aggregate_expressions: &[Vec<Arc<dyn PhysicalExpr>>], +) -> Result<()> { + // evaluate the grouping expressions + let group_values = evaluate(group_expr, &batch)?; + let group_rows: Vec<Vec<u8>> = create_group_rows(group_values, group_schema); Review Comment: This is cool that the group keys are also computed using row format ########## datafusion/core/src/physical_plan/aggregates/row_hash.rs: ########## @@ -0,0 +1,472 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Hash aggregation through row format + +use std::sync::Arc; +use std::task::{Context, Poll}; +use std::vec; + +use ahash::RandomState; +use futures::{ + ready, + stream::{Stream, StreamExt}, +}; + +use crate::error::Result; +use crate::physical_plan::aggregates::{ + evaluate, evaluate_many, group_schema, AccumulatorItemV2, AggregateMode, +}; +use crate::physical_plan::hash_utils::create_row_hashes; +use crate::physical_plan::metrics::{BaselineMetrics, RecordOutput}; +use crate::physical_plan::{aggregates, AggregateExpr, PhysicalExpr}; +use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream}; + +use arrow::compute::cast; +use arrow::datatypes::Schema; +use arrow::{array::ArrayRef, compute}; +use arrow::{ + array::{Array, UInt32Builder}, + error::{ArrowError, Result as ArrowResult}, +}; +use arrow::{datatypes::SchemaRef, record_batch::RecordBatch}; +use datafusion_common::ScalarValue; +use datafusion_row::accessor::RowAccessor; +use datafusion_row::layout::RowLayout; +use datafusion_row::reader::{read_row, RowReader}; +use datafusion_row::writer::{write_row, RowWriter}; +use datafusion_row::{MutableRecordBatch, RowType}; +use hashbrown::raw::RawTable; + +/// Grouping aggregate with row format to store the aggregation state. +/// +/// The Architecture is similar to that in [`super::GroupedHashAggregateStream`] but use +/// row format inside the HashTable to store aggregation buffers. +pub(crate) struct GroupedHashAggregateStreamV2 { + schema: SchemaRef, + input: SendableRecordBatchStream, + mode: AggregateMode, + aggr_state: AggregationState, + aggregate_expressions: Vec<Vec<Arc<dyn PhysicalExpr>>>, + + group_expr: Vec<Arc<dyn PhysicalExpr>>, + accumulators: Vec<AccumulatorItemV2>, + + group_schema: SchemaRef, + aggr_schema: SchemaRef, + aggr_layout: RowLayout, + + baseline_metrics: BaselineMetrics, + random_state: RandomState, + finished: bool, +} + +fn aggr_state_schema(aggr_expr: &[Arc<dyn AggregateExpr>]) -> Result<SchemaRef> { + let fields = aggr_expr + .iter() + .flat_map(|expr| expr.state_fields().unwrap().into_iter()) + .collect::<Vec<_>>(); + Ok(Arc::new(Schema::new(fields))) +} + +impl GroupedHashAggregateStreamV2 { + /// Create a new GroupedRowHashAggregateStream + pub fn new( + mode: AggregateMode, + schema: SchemaRef, + group_expr: Vec<Arc<dyn PhysicalExpr>>, + aggr_expr: Vec<Arc<dyn AggregateExpr>>, + input: SendableRecordBatchStream, + baseline_metrics: BaselineMetrics, + ) -> Result<Self> { + let timer = baseline_metrics.elapsed_compute().timer(); + + // The expressions to evaluate the batch, one vec of expressions per aggregation. + // Assume create_schema() always put group columns in front of aggr columns, we set + // col_idx_base to group expression count. + let aggregate_expressions = + aggregates::aggregate_expressions(&aggr_expr, &mode, group_expr.len())?; + + let accumulators = aggregates::create_accumulators_v2(&aggr_expr)?; + + let group_schema = group_schema(&schema, group_expr.len()); + let aggr_schema = aggr_state_schema(&aggr_expr)?; + + let aggr_layout = RowLayout::new(&aggr_schema, RowType::WordAligned); + timer.done(); + + Ok(Self { + schema, + mode, + input, + group_expr, + accumulators, + group_schema, + aggr_schema, + aggr_layout, + baseline_metrics, + aggregate_expressions, + aggr_state: Default::default(), + random_state: Default::default(), + finished: false, + }) + } +} + +impl Stream for GroupedHashAggregateStreamV2 { + type Item = ArrowResult<RecordBatch>; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Option<Self::Item>> { + let this = &mut *self; + if this.finished { + return Poll::Ready(None); + } + + let elapsed_compute = this.baseline_metrics.elapsed_compute(); + + loop { + let result = match ready!(this.input.poll_next_unpin(cx)) { + Some(Ok(batch)) => { + let timer = elapsed_compute.timer(); + let result = group_aggregate_batch( + &this.mode, + &this.random_state, + &this.group_expr, + &mut this.accumulators, + &this.group_schema, + &this.aggr_layout, + batch, + &mut this.aggr_state, + &this.aggregate_expressions, + ); + + timer.done(); + + match result { + Ok(_) => continue, + Err(e) => Err(ArrowError::ExternalError(Box::new(e))), + } + } + Some(Err(e)) => Err(e), + None => { + this.finished = true; + let timer = this.baseline_metrics.elapsed_compute().timer(); + let result = create_batch_from_map( + &this.mode, + &this.group_schema, + &this.aggr_schema, + &mut this.aggr_state, + &mut this.accumulators, + &this.schema, + ) + .record_output(&this.baseline_metrics); + + timer.done(); + result + } + }; + + this.finished = true; + return Poll::Ready(Some(result)); + } + } +} + +impl RecordBatchStream for GroupedHashAggregateStreamV2 { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} + +/// TODO: Make this a member function of [`GroupedHashAggregateStreamV2`] +#[allow(clippy::too_many_arguments)] +fn group_aggregate_batch( + mode: &AggregateMode, + random_state: &RandomState, + group_expr: &[Arc<dyn PhysicalExpr>], + accumulators: &mut [AccumulatorItemV2], + group_schema: &Schema, + state_layout: &RowLayout, + batch: RecordBatch, + aggr_state: &mut AggregationState, + aggregate_expressions: &[Vec<Arc<dyn PhysicalExpr>>], +) -> Result<()> { + // evaluate the grouping expressions + let group_values = evaluate(group_expr, &batch)?; + let group_rows: Vec<Vec<u8>> = create_group_rows(group_values, group_schema); + + // evaluate the aggregation expressions. + // We could evaluate them after the `take`, but since we need to evaluate all + // of them anyways, it is more performant to do it while they are together. + let aggr_input_values = evaluate_many(aggregate_expressions, &batch)?; + + // 1.1 construct the key from the group values + // 1.2 construct the mapping key if it does not exist + // 1.3 add the row' index to `indices` + + // track which entries in `aggr_state` have rows in this batch to aggregate + let mut groups_with_rows = vec![]; + + // 1.1 Calculate the group keys for the group values + let mut batch_hashes = vec![0; batch.num_rows()]; + create_row_hashes(&group_rows, random_state, &mut batch_hashes)?; + + for (row, hash) in batch_hashes.into_iter().enumerate() { + let AggregationState { map, group_states } = aggr_state; + + let entry = map.get_mut(hash, |(_hash, group_idx)| { + // verify that a group that we are inserting with hash is + // actually the same key value as the group in + // existing_idx (aka group_values @ row) + let group_state = &group_states[*group_idx]; + group_rows[row] == group_state.group_by_values + }); + + match entry { + // Existing entry for this group value + Some((_hash, group_idx)) => { + let group_state = &mut group_states[*group_idx]; + // 1.3 + if group_state.indices.is_empty() { + groups_with_rows.push(*group_idx); + }; + group_state.indices.push(row as u32); // remember this row + } + // 1.2 Need to create new entry + None => { + // Add new entry to group_states and save newly created index + let group_state = RowGroupState { + group_by_values: group_rows[row].clone(), + aggregation_buffer: vec![0; state_layout.fixed_part_width()], + indices: vec![row as u32], // 1.3 + }; + let group_idx = group_states.len(); + group_states.push(group_state); + groups_with_rows.push(group_idx); + + // for hasher function, use precomputed hash value + map.insert(hash, (hash, group_idx), |(hash, _group_idx)| *hash); + } + }; + } + + // Collect all indices + offsets based on keys in this vec + let mut batch_indices: UInt32Builder = UInt32Builder::new(0); + let mut offsets = vec![0]; + let mut offset_so_far = 0; + for group_idx in groups_with_rows.iter() { + let indices = &aggr_state.group_states[*group_idx].indices; + batch_indices.append_slice(indices)?; + offset_so_far += indices.len(); + offsets.push(offset_so_far); + } + let batch_indices = batch_indices.finish(); + + // `Take` all values based on indices into Arrays + let values: Vec<Vec<Arc<dyn Array>>> = aggr_input_values + .iter() + .map(|array| { + array + .iter() + .map(|array| { + compute::take( + array.as_ref(), + &batch_indices, + None, // None: no index check + ) + .unwrap() + }) + .collect() + // 2.3 + }) + .collect(); + + // 2.1 for each key in this batch + // 2.2 for each aggregation + // 2.3 `slice` from each of its arrays the keys' values + // 2.4 update / merge the accumulator with the values + // 2.5 clear indices + groups_with_rows + .iter() + .zip(offsets.windows(2)) + .try_for_each(|(group_idx, offsets)| { + let group_state = &mut aggr_state.group_states[*group_idx]; + // 2.2 + accumulators + .iter_mut() + .zip(values.iter()) + .map(|(accumulator, aggr_array)| { + ( + accumulator, + aggr_array + .iter() + .map(|array| { + // 2.3 + array.slice(offsets[0], offsets[1] - offsets[0]) + }) + .collect::<Vec<ArrayRef>>(), + ) + }) + .try_for_each(|(accumulator, values)| { + let mut state_accessor = + RowAccessor::new_from_layout(state_layout.clone()); + state_accessor + .point_to(0, group_state.aggregation_buffer.as_mut_slice()); + match mode { + AggregateMode::Partial => { + accumulator.update_batch(&values, &mut state_accessor) + } + AggregateMode::FinalPartitioned | AggregateMode::Final => { + // note: the aggregation here is over states, not values, thus the merge + accumulator.merge_batch(&values, &mut state_accessor) + } + } + }) + // 2.5 + .and({ + group_state.indices.clear(); + Ok(()) + }) + })?; + + Ok(()) +} + +/// The state that is built for each output group. +#[derive(Debug)] +struct RowGroupState { + /// The actual group by values, stored sequentially + group_by_values: Vec<u8>, + + // Accumulator state, stored sequentially + aggregation_buffer: Vec<u8>, + + /// scratch space used to collect indices for input rows in a + /// bach that have values to aggregate. Reset on each batch + indices: Vec<u32>, +} + +/// The state of all the groups +#[derive(Default)] +struct AggregationState { + /// Logically maps group values to an index in `group_states` + /// + /// Uses the raw API of hashbrown to avoid actually storing the + /// keys in the table + /// + /// keys: u64 hashes of the GroupValue + /// values: (hash, index into `group_states`) + map: RawTable<(u64, usize)>, + + /// State for each group + group_states: Vec<RowGroupState>, +} + +impl std::fmt::Debug for AggregationState { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + // hashes are not store inline, so could only get values + let map_string = "RawTable"; + f.debug_struct("RowAccumulators") + .field("map", &map_string) + .field("row_group_states", &self.group_states) Review Comment: ```suggestion .field("group_states", &self.group_states) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
