This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 3a970c58ff Clamp early aggregation emit to the sort boundary when 
using partial group ordering (#20446)
3a970c58ff is described below

commit 3a970c58ffd241fe793bc3104a801e1e55bf8210
Author: Jack Kleeman <[email protected]>
AuthorDate: Wed Feb 25 14:03:50 2026 -0800

    Clamp early aggregation emit to the sort boundary when using partial group 
ordering (#20446)
    
    ## Which issue does this PR close?
    
    <!--
    We generally require a GitHub issue to be filed for all bug fixes and
    enhancements and this helps us generate change logs for our releases.
    You can link an issue to this PR using the GitHub syntax. For example
    `Closes #123` indicates that this PR will close issue #123.
    -->
    
    - Closes #20445.
    
    ## What changes are included in this PR?
    
    Fix a panic on early emit with partial sort aggregations, by clamping
    our emit point to the sort boundary
    
    ## Are these changes tested?
    
    Yes
    
    ## Are there any user-facing changes?
    
    No
---
 .../physical-plan/src/aggregates/row_hash.rs       | 99 +++++++++++++++++++++-
 1 file changed, 98 insertions(+), 1 deletion(-)

diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs 
b/datafusion/physical-plan/src/aggregates/row_hash.rs
index de857370ce..35f32ac7ae 100644
--- a/datafusion/physical-plan/src/aggregates/row_hash.rs
+++ b/datafusion/physical-plan/src/aggregates/row_hash.rs
@@ -1036,7 +1036,19 @@ impl GroupedHashAggregateStream {
                     self.group_values.len()
                 };
 
-                if let Some(batch) = self.emit(EmitTo::First(n), false)? {
+                // Clamp to the sort boundary when using partial group 
ordering,
+                // otherwise remove_groups panics (#20445).
+                let n = match &self.group_ordering {
+                    GroupOrdering::None => n,
+                    _ => match self.group_ordering.emit_to() {
+                        Some(EmitTo::First(max)) => n.min(max),
+                        _ => 0,
+                    },
+                };
+
+                if n > 0
+                    && let Some(batch) = self.emit(EmitTo::First(n), false)?
+                {
                     Ok(Some(ExecutionState::ProducingOutput(batch)))
                 } else {
                     Err(oom)
@@ -1291,6 +1303,7 @@ impl GroupedHashAggregateStream {
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::InputOrderMode;
     use crate::execution_plan::ExecutionPlan;
     use crate::test::TestMemoryExec;
     use arrow::array::{Int32Array, Int64Array};
@@ -1553,4 +1566,88 @@ mod tests {
 
         Ok(())
     }
+
+    #[tokio::test]
+    async fn test_emit_early_with_partially_sorted() -> Result<()> {
+        // Reproducer for #20445: EmitEarly with PartiallySorted panics in
+        // remove_groups because it emits more groups than the sort boundary.
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("sort_col", DataType::Int32, false),
+            Field::new("group_col", DataType::Int32, false),
+            Field::new("value_col", DataType::Int64, false),
+        ]));
+
+        // All rows share sort_col=1 (no sort boundary), with unique group_col
+        // values to create many groups and trigger memory pressure.
+        let n = 256;
+        let batch = RecordBatch::try_new(
+            Arc::clone(&schema),
+            vec![
+                Arc::new(Int32Array::from(vec![1; n])),
+                Arc::new(Int32Array::from((0..n as i32).collect::<Vec<_>>())),
+                Arc::new(Int64Array::from(vec![1; n])),
+            ],
+        )?;
+
+        let runtime = RuntimeEnvBuilder::default()
+            .with_memory_limit(4096, 1.0)
+            .build_arc()?;
+        let mut task_ctx = TaskContext::default().with_runtime(runtime);
+        let mut cfg = task_ctx.session_config().clone();
+        cfg = cfg.set(
+            "datafusion.execution.batch_size",
+            &datafusion_common::ScalarValue::UInt64(Some(128)),
+        );
+        cfg = cfg.set(
+            
"datafusion.execution.skip_partial_aggregation_probe_rows_threshold",
+            &datafusion_common::ScalarValue::UInt64(Some(u64::MAX)),
+        );
+        task_ctx = task_ctx.with_session_config(cfg);
+        let task_ctx = Arc::new(task_ctx);
+
+        let ordering = 
LexOrdering::new(vec![PhysicalSortExpr::new_default(Arc::new(
+            Column::new("sort_col", 0),
+        )
+            as _)])
+        .unwrap();
+        let exec = TestMemoryExec::try_new(&[vec![batch]], 
Arc::clone(&schema), None)?
+            .try_with_sort_information(vec![ordering])?;
+        let exec = Arc::new(TestMemoryExec::update_cache(&Arc::new(exec)));
+
+        // GROUP BY sort_col, group_col with input sorted on sort_col
+        // gives PartiallySorted([0])
+        let aggregate_exec = AggregateExec::try_new(
+            AggregateMode::Partial,
+            PhysicalGroupBy::new_single(vec![
+                (col("sort_col", &schema)?, "sort_col".to_string()),
+                (col("group_col", &schema)?, "group_col".to_string()),
+            ]),
+            vec![Arc::new(
+                AggregateExprBuilder::new(count_udaf(), vec![col("value_col", 
&schema)?])
+                    .schema(Arc::clone(&schema))
+                    .alias("count_value")
+                    .build()?,
+            )],
+            vec![None],
+            exec,
+            Arc::clone(&schema),
+        )?;
+        assert!(matches!(
+            aggregate_exec.input_order_mode(),
+            InputOrderMode::PartiallySorted(_)
+        ));
+
+        // Must not panic with "assertion failed: *current_sort >= n"
+        let mut stream = GroupedHashAggregateStream::new(&aggregate_exec, 
&task_ctx, 0)?;
+        while let Some(result) = stream.next().await {
+            if let Err(e) = result {
+                if e.to_string().contains("Resources exhausted") {
+                    break;
+                }
+                return Err(e);
+            }
+        }
+
+        Ok(())
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to