This is an automated email from the ASF dual-hosted git repository.

adriangb pushed a commit to branch branch-52
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/branch-52 by this push:
     new 38c89a49fb [branch-53] Clamp early aggregation emit to the sort 
boundary when using partial group ordering (#20446) (#20558)
38c89a49fb is described below

commit 38c89a49fbd21d7a9e7513fdb779db8dbb516db7
Author: Andrew Lamb <[email protected]>
AuthorDate: Wed Feb 25 19:13:19 2026 -0500

    [branch-53] Clamp early aggregation emit to the sort boundary when using 
partial group ordering (#20446) (#20558)
    
    ## Which issue does this PR close?
    - part of https://github.com/apache/datafusion/issues/20287
    - Fixes https://github.com/apache/datafusion/issues/20445 on branch-52
    ## Rationale for this change
    See issues
    
    ## What changes are included in this PR?
    
    - backports https://github.com/apache/datafusion/pull/20446
    
    ## Are these changes tested?
    
    By CI
    
    Co-authored-by: Jack Kleeman <[email protected]>
---
 .../physical-plan/src/aggregates/row_hash.rs       | 99 +++++++++++++++++++++-
 1 file changed, 98 insertions(+), 1 deletion(-)

diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs 
b/datafusion/physical-plan/src/aggregates/row_hash.rs
index 1ae7202711..7cc59b44a3 100644
--- a/datafusion/physical-plan/src/aggregates/row_hash.rs
+++ b/datafusion/physical-plan/src/aggregates/row_hash.rs
@@ -1045,7 +1045,19 @@ impl GroupedHashAggregateStream {
                     self.group_values.len()
                 };
 
-                if let Some(batch) = self.emit(EmitTo::First(n), false)? {
+                // Clamp to the sort boundary when using partial group 
ordering,
+                // otherwise remove_groups panics (#20445).
+                let n = match &self.group_ordering {
+                    GroupOrdering::None => n,
+                    _ => match self.group_ordering.emit_to() {
+                        Some(EmitTo::First(max)) => n.min(max),
+                        _ => 0,
+                    },
+                };
+
+                if n > 0
+                    && let Some(batch) = self.emit(EmitTo::First(n), false)?
+                {
                     Ok(Some(ExecutionState::ProducingOutput(batch)))
                 } else {
                     Err(oom)
@@ -1305,6 +1317,7 @@ impl GroupedHashAggregateStream {
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::InputOrderMode;
     use crate::execution_plan::ExecutionPlan;
     use crate::test::TestMemoryExec;
     use arrow::array::{Int32Array, Int64Array};
@@ -1567,4 +1580,88 @@ mod tests {
 
         Ok(())
     }
+
+    #[tokio::test]
+    async fn test_emit_early_with_partially_sorted() -> Result<()> {
+        // Reproducer for #20445: EmitEarly with PartiallySorted panics in
+        // remove_groups because it emits more groups than the sort boundary.
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("sort_col", DataType::Int32, false),
+            Field::new("group_col", DataType::Int32, false),
+            Field::new("value_col", DataType::Int64, false),
+        ]));
+
+        // All rows share sort_col=1 (no sort boundary), with unique group_col
+        // values to create many groups and trigger memory pressure.
+        let n = 256;
+        let batch = RecordBatch::try_new(
+            Arc::clone(&schema),
+            vec![
+                Arc::new(Int32Array::from(vec![1; n])),
+                Arc::new(Int32Array::from((0..n as i32).collect::<Vec<_>>())),
+                Arc::new(Int64Array::from(vec![1; n])),
+            ],
+        )?;
+
+        let runtime = RuntimeEnvBuilder::default()
+            .with_memory_limit(4096, 1.0)
+            .build_arc()?;
+        let mut task_ctx = TaskContext::default().with_runtime(runtime);
+        let mut cfg = task_ctx.session_config().clone();
+        cfg = cfg.set(
+            "datafusion.execution.batch_size",
+            &datafusion_common::ScalarValue::UInt64(Some(128)),
+        );
+        cfg = cfg.set(
+            
"datafusion.execution.skip_partial_aggregation_probe_rows_threshold",
+            &datafusion_common::ScalarValue::UInt64(Some(u64::MAX)),
+        );
+        task_ctx = task_ctx.with_session_config(cfg);
+        let task_ctx = Arc::new(task_ctx);
+
+        let ordering = 
LexOrdering::new(vec![PhysicalSortExpr::new_default(Arc::new(
+            Column::new("sort_col", 0),
+        )
+            as _)])
+        .unwrap();
+        let exec = TestMemoryExec::try_new(&[vec![batch]], 
Arc::clone(&schema), None)?
+            .try_with_sort_information(vec![ordering])?;
+        let exec = Arc::new(TestMemoryExec::update_cache(&Arc::new(exec)));
+
+        // GROUP BY sort_col, group_col with input sorted on sort_col
+        // gives PartiallySorted([0])
+        let aggregate_exec = AggregateExec::try_new(
+            AggregateMode::Partial,
+            PhysicalGroupBy::new_single(vec![
+                (col("sort_col", &schema)?, "sort_col".to_string()),
+                (col("group_col", &schema)?, "group_col".to_string()),
+            ]),
+            vec![Arc::new(
+                AggregateExprBuilder::new(count_udaf(), vec![col("value_col", 
&schema)?])
+                    .schema(Arc::clone(&schema))
+                    .alias("count_value")
+                    .build()?,
+            )],
+            vec![None],
+            exec,
+            Arc::clone(&schema),
+        )?;
+        assert!(matches!(
+            aggregate_exec.input_order_mode(),
+            InputOrderMode::PartiallySorted(_)
+        ));
+
+        // Must not panic with "assertion failed: *current_sort >= n"
+        let mut stream = GroupedHashAggregateStream::new(&aggregate_exec, 
&task_ctx, 0)?;
+        while let Some(result) = stream.next().await {
+            if let Err(e) = result {
+                if e.to_string().contains("Resources exhausted") {
+                    break;
+                }
+                return Err(e);
+            }
+        }
+
+        Ok(())
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to