This is an automated email from the ASF dual-hosted git repository. dheres pushed a commit to branch hash_agg_spike in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
commit d7c1581fdee87a6962b5b4167c74272bd472b04a Author: Andrew Lamb <[email protected]> AuthorDate: Fri Jun 30 12:56:51 2023 -0400 touchups --- datafusion/core/src/physical_plan/aggregates/row_hash2.rs | 6 +++--- datafusion/physical-expr/src/aggregate/average.rs | 10 +++++++--- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash2.rs b/datafusion/core/src/physical_plan/aggregates/row_hash2.rs index 90e7cd0724..2eb058d8c5 100644 --- a/datafusion/core/src/physical_plan/aggregates/row_hash2.rs +++ b/datafusion/core/src/physical_plan/aggregates/row_hash2.rs @@ -20,7 +20,7 @@ //! POC demonstration of GroupByHashApproach use datafusion_physical_expr::GroupsAccumulator; -use log::info; +use log::debug; use std::sync::Arc; use std::task::{Context, Poll}; use std::vec; @@ -123,7 +123,7 @@ impl GroupedHashAggregateStream2 { context: Arc<TaskContext>, partition: usize, ) -> Result<Self> { - info!("Creating GroupedHashAggregateStream2"); + debug!("Creating GroupedHashAggregateStream2"); let agg_schema = Arc::clone(&agg.schema); let agg_group_by = agg.group_by.clone(); let agg_filter_expr = agg.filter_expr.clone(); @@ -208,7 +208,7 @@ impl GroupedHashAggregateStream2 { fn create_accumulators( aggregate_exprs: Vec<Arc<dyn AggregateExpr>>, ) -> Result<Vec<Box<dyn GroupsAccumulator>>> { - info!("Creating accumulator for {aggregate_exprs:#?}"); + debug!("Creating accumulator for {aggregate_exprs:#?}"); aggregate_exprs .into_iter() .map(|agg_expr| agg_expr.create_groups_accumulator()) diff --git a/datafusion/physical-expr/src/aggregate/average.rs b/datafusion/physical-expr/src/aggregate/average.rs index b23b555805..7043ed9ce1 100644 --- a/datafusion/physical-expr/src/aggregate/average.rs +++ b/datafusion/physical-expr/src/aggregate/average.rs @@ -483,7 +483,8 @@ where None => { let iter = group_indicies.iter().zip(data.iter()); for (group_index, new_value) in iter { - self.sums[*group_index].add_wrapping(*new_value); + let sum = &mut self.sums[*group_index]; + *sum = sum.add_wrapping(*new_value); } } // @@ -504,7 +505,8 @@ where group_index_chunk.iter().zip(data_chunk.iter()).for_each( |(group_index, new_value)| { if (mask & index_mask) != 0 { - self.sums[*group_index].add_wrapping(*new_value); + let sum = &mut self.sums[*group_index]; + *sum = sum.add_wrapping(*new_value); } index_mask <<= 1; }, @@ -518,7 +520,8 @@ where .enumerate() .for_each(|(i, (group_index, new_value))| { if remainder_bits & (1 << i) != 0 { - self.sums[*group_index].add_wrapping(*new_value); + let sum = &mut self.sums[*group_index]; + *sum = sum.add_wrapping(*new_value); } }); } @@ -550,6 +553,7 @@ where // update values self.update_sums(values, group_indicies, opt_filter, total_num_groups)?; + Ok(()) }
