alamb commented on code in PR #11627: URL: https://github.com/apache/datafusion/pull/11627#discussion_r1691345297
########## datafusion/physical-plan/src/aggregates/row_hash.rs: ########## @@ -797,4 +931,59 @@ impl GroupedHashAggregateStream { timer.done(); Ok(()) } + + // Updates skip aggregation probe state. + // In case stream has any spills, the probe is forcefully set to + // forbid aggregation skipping, and locked, since spilling resets + // total number of unique groups. + // + // Note: currently spilling is not supported for Partial aggregation + fn update_skip_aggregation_probe(&mut self, input_rows: usize) { + if let Some(probe) = self.skip_aggregation_probe.as_mut() { + if !self.spill_state.spills.is_empty() { + probe.forbid_skipping(); + } else { + probe.update_state(input_rows, self.group_values.len()); + } + }; + } + + // In case the probe indicates that aggregation may be + // skipped, forces stream to produce currently accumulated output. + fn switch_to_skip_aggregation(&mut self) -> Result<()> { + if let Some(probe) = self.skip_aggregation_probe.as_mut() { + if probe.should_skip() { + let batch = self.emit(EmitTo::All, false)?; + self.exec_state = ExecutionState::ProducingOutput(batch); + } + } + + Ok(()) + } + + // Transforms input batch to intermediate aggregate state, without grouping it + fn transform_to_states(&self, batch: RecordBatch) -> Result<RecordBatch> { Review Comment: This is quite clever ########## datafusion/functions-aggregate/src/count.rs: ########## @@ -432,6 +432,49 @@ impl GroupsAccumulator for CountGroupsAccumulator { Ok(vec![Arc::new(counts) as ArrayRef]) } + fn convert_to_state( + &self, + values: &[ArrayRef], + opt_filter: Option<&BooleanArray>, + ) -> Result<Vec<ArrayRef>> { + let values = &values[0]; + + let state_array = match (values.logical_nulls(), opt_filter) { + (Some(nulls), None) => { + let mut builder = Int64Builder::with_capacity(values.len()); + nulls + .into_iter() + .for_each(|is_valid| builder.append_value(is_valid as i64)); + builder.finish() + } + (Some(nulls), Some(filter)) => { + let mut builder = Int64Builder::with_capacity(values.len()); + nulls.into_iter().zip(filter.iter()).for_each( + |(is_valid, filter_value)| { + builder.append_value( + (is_valid && filter_value.is_some_and(|val| val)) as i64, + ) + }, + ); + builder.finish() + } + (None, Some(filter)) => { + let mut builder = Int64Builder::with_capacity(values.len()); + filter.into_iter().for_each(|filter_value| { + builder.append_value(filter_value.is_some_and(|val| val) as i64) + }); + builder.finish() + } + (None, None) => Int64Array::from_value(1, values.len()), + }; + + Ok(vec![Arc::new(state_array)]) + } + + fn convert_to_state_supported(&self) -> bool { Review Comment: There may be other accumulators to enable this for. For example https://github.com/apache/datafusion/blob/7db4213b71ed9e914c5a4f16954abfa20b091ae3/datafusion/functions-aggregate/src/average.rs#L144 https://github.com/apache/datafusion/blob/7db4213b71ed9e914c5a4f16954abfa20b091ae3/datafusion/physical-expr/src/aggregate/min_max.rs#L199 https://github.com/apache/datafusion/blob/7db4213b71ed9e914c5a4f16954abfa20b091ae3/datafusion/functions-aggregate/src/sum.rs#L200 ########## datafusion/functions-aggregate/src/count.rs: ########## @@ -432,6 +432,49 @@ impl GroupsAccumulator for CountGroupsAccumulator { Ok(vec![Arc::new(counts) as ArrayRef]) } + fn convert_to_state( + &self, + values: &[ArrayRef], + opt_filter: Option<&BooleanArray>, + ) -> Result<Vec<ArrayRef>> { + let values = &values[0]; + + let state_array = match (values.logical_nulls(), opt_filter) { + (Some(nulls), None) => { + let mut builder = Int64Builder::with_capacity(values.len()); + nulls + .into_iter() + .for_each(|is_valid| builder.append_value(is_valid as i64)); + builder.finish() + } + (Some(nulls), Some(filter)) => { + let mut builder = Int64Builder::with_capacity(values.len()); + nulls.into_iter().zip(filter.iter()).for_each( + |(is_valid, filter_value)| { + builder.append_value( + (is_valid && filter_value.is_some_and(|val| val)) as i64, + ) + }, + ); + builder.finish() + } + (None, Some(filter)) => { + let mut builder = Int64Builder::with_capacity(values.len()); + filter.into_iter().for_each(|filter_value| { + builder.append_value(filter_value.is_some_and(|val| val) as i64) + }); + builder.finish() + } + (None, None) => Int64Array::from_value(1, values.len()), Review Comment: it is unfortunate that we need to create this over and over again 🤔 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org