alamb commented on code in PR #11627:
URL: https://github.com/apache/datafusion/pull/11627#discussion_r1691345297


##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -797,4 +931,59 @@ impl GroupedHashAggregateStream {
         timer.done();
         Ok(())
     }
+
+    // Updates skip aggregation probe state.
+    // In case stream has any spills, the probe is forcefully set to
+    // forbid aggregation skipping, and locked, since spilling resets
+    // total number of unique groups.
+    //
+    // Note: currently spilling is not supported for Partial aggregation
+    fn update_skip_aggregation_probe(&mut self, input_rows: usize) {
+        if let Some(probe) = self.skip_aggregation_probe.as_mut() {
+            if !self.spill_state.spills.is_empty() {
+                probe.forbid_skipping();
+            } else {
+                probe.update_state(input_rows, self.group_values.len());
+            }
+        };
+    }
+
+    // In case the probe indicates that aggregation may be
+    // skipped, forces stream to produce currently accumulated output.
+    fn switch_to_skip_aggregation(&mut self) -> Result<()> {
+        if let Some(probe) = self.skip_aggregation_probe.as_mut() {
+            if probe.should_skip() {
+                let batch = self.emit(EmitTo::All, false)?;
+                self.exec_state = ExecutionState::ProducingOutput(batch);
+            }
+        }
+
+        Ok(())
+    }
+
+    // Transforms input batch to intermediate aggregate state, without 
grouping it
+    fn transform_to_states(&self, batch: RecordBatch) -> Result<RecordBatch> {

Review Comment:
   This is quite clever



##########
datafusion/functions-aggregate/src/count.rs:
##########
@@ -432,6 +432,49 @@ impl GroupsAccumulator for CountGroupsAccumulator {
         Ok(vec![Arc::new(counts) as ArrayRef])
     }
 
+    fn convert_to_state(
+        &self,
+        values: &[ArrayRef],
+        opt_filter: Option<&BooleanArray>,
+    ) -> Result<Vec<ArrayRef>> {
+        let values = &values[0];
+
+        let state_array = match (values.logical_nulls(), opt_filter) {
+            (Some(nulls), None) => {
+                let mut builder = Int64Builder::with_capacity(values.len());
+                nulls
+                    .into_iter()
+                    .for_each(|is_valid| builder.append_value(is_valid as 
i64));
+                builder.finish()
+            }
+            (Some(nulls), Some(filter)) => {
+                let mut builder = Int64Builder::with_capacity(values.len());
+                nulls.into_iter().zip(filter.iter()).for_each(
+                    |(is_valid, filter_value)| {
+                        builder.append_value(
+                            (is_valid && filter_value.is_some_and(|val| val)) 
as i64,
+                        )
+                    },
+                );
+                builder.finish()
+            }
+            (None, Some(filter)) => {
+                let mut builder = Int64Builder::with_capacity(values.len());
+                filter.into_iter().for_each(|filter_value| {
+                    builder.append_value(filter_value.is_some_and(|val| val) 
as i64)
+                });
+                builder.finish()
+            }
+            (None, None) => Int64Array::from_value(1, values.len()),
+        };
+
+        Ok(vec![Arc::new(state_array)])
+    }
+
+    fn convert_to_state_supported(&self) -> bool {

Review Comment:
   There may be other accumulators to enable this for. For example
   
   
https://github.com/apache/datafusion/blob/7db4213b71ed9e914c5a4f16954abfa20b091ae3/datafusion/functions-aggregate/src/average.rs#L144
   
   
https://github.com/apache/datafusion/blob/7db4213b71ed9e914c5a4f16954abfa20b091ae3/datafusion/physical-expr/src/aggregate/min_max.rs#L199
   
   
https://github.com/apache/datafusion/blob/7db4213b71ed9e914c5a4f16954abfa20b091ae3/datafusion/functions-aggregate/src/sum.rs#L200



##########
datafusion/functions-aggregate/src/count.rs:
##########
@@ -432,6 +432,49 @@ impl GroupsAccumulator for CountGroupsAccumulator {
         Ok(vec![Arc::new(counts) as ArrayRef])
     }
 
+    fn convert_to_state(
+        &self,
+        values: &[ArrayRef],
+        opt_filter: Option<&BooleanArray>,
+    ) -> Result<Vec<ArrayRef>> {
+        let values = &values[0];
+
+        let state_array = match (values.logical_nulls(), opt_filter) {
+            (Some(nulls), None) => {
+                let mut builder = Int64Builder::with_capacity(values.len());
+                nulls
+                    .into_iter()
+                    .for_each(|is_valid| builder.append_value(is_valid as 
i64));
+                builder.finish()
+            }
+            (Some(nulls), Some(filter)) => {
+                let mut builder = Int64Builder::with_capacity(values.len());
+                nulls.into_iter().zip(filter.iter()).for_each(
+                    |(is_valid, filter_value)| {
+                        builder.append_value(
+                            (is_valid && filter_value.is_some_and(|val| val)) 
as i64,
+                        )
+                    },
+                );
+                builder.finish()
+            }
+            (None, Some(filter)) => {
+                let mut builder = Int64Builder::with_capacity(values.len());
+                filter.into_iter().for_each(|filter_value| {
+                    builder.append_value(filter_value.is_some_and(|val| val) 
as i64)
+                });
+                builder.finish()
+            }
+            (None, None) => Int64Array::from_value(1, values.len()),

Review Comment:
   it is unfortunate that we need to create this over and over again 🤔 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to