This is an automated email from the ASF dual-hosted git repository.
adriangb pushed a commit to branch branch-52
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/branch-52 by this push:
new 38c89a49fb [branch-53] Clamp early aggregation emit to the sort
boundary when using partial group ordering (#20446) (#20558)
38c89a49fb is described below
commit 38c89a49fbd21d7a9e7513fdb779db8dbb516db7
Author: Andrew Lamb <[email protected]>
AuthorDate: Wed Feb 25 19:13:19 2026 -0500
[branch-53] Clamp early aggregation emit to the sort boundary when using
partial group ordering (#20446) (#20558)
## Which issue does this PR close?
- part of https://github.com/apache/datafusion/issues/20287
- Fixes https://github.com/apache/datafusion/issues/20445 on branch-52
## Rationale for this change
See issues
## What changes are included in this PR?
- backports https://github.com/apache/datafusion/pull/20446
## Are these changes tested?
By CI
Co-authored-by: Jack Kleeman <[email protected]>
---
.../physical-plan/src/aggregates/row_hash.rs | 99 +++++++++++++++++++++-
1 file changed, 98 insertions(+), 1 deletion(-)
diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs
b/datafusion/physical-plan/src/aggregates/row_hash.rs
index 1ae7202711..7cc59b44a3 100644
--- a/datafusion/physical-plan/src/aggregates/row_hash.rs
+++ b/datafusion/physical-plan/src/aggregates/row_hash.rs
@@ -1045,7 +1045,19 @@ impl GroupedHashAggregateStream {
self.group_values.len()
};
- if let Some(batch) = self.emit(EmitTo::First(n), false)? {
+ // Clamp to the sort boundary when using partial group
ordering,
+ // otherwise remove_groups panics (#20445).
+ let n = match &self.group_ordering {
+ GroupOrdering::None => n,
+ _ => match self.group_ordering.emit_to() {
+ Some(EmitTo::First(max)) => n.min(max),
+ _ => 0,
+ },
+ };
+
+ if n > 0
+ && let Some(batch) = self.emit(EmitTo::First(n), false)?
+ {
Ok(Some(ExecutionState::ProducingOutput(batch)))
} else {
Err(oom)
@@ -1305,6 +1317,7 @@ impl GroupedHashAggregateStream {
#[cfg(test)]
mod tests {
use super::*;
+ use crate::InputOrderMode;
use crate::execution_plan::ExecutionPlan;
use crate::test::TestMemoryExec;
use arrow::array::{Int32Array, Int64Array};
@@ -1567,4 +1580,88 @@ mod tests {
Ok(())
}
+
+ #[tokio::test]
+ async fn test_emit_early_with_partially_sorted() -> Result<()> {
+ // Reproducer for #20445: EmitEarly with PartiallySorted panics in
+ // remove_groups because it emits more groups than the sort boundary.
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("sort_col", DataType::Int32, false),
+ Field::new("group_col", DataType::Int32, false),
+ Field::new("value_col", DataType::Int64, false),
+ ]));
+
+ // All rows share sort_col=1 (no sort boundary), with unique group_col
+ // values to create many groups and trigger memory pressure.
+ let n = 256;
+ let batch = RecordBatch::try_new(
+ Arc::clone(&schema),
+ vec![
+ Arc::new(Int32Array::from(vec![1; n])),
+ Arc::new(Int32Array::from((0..n as i32).collect::<Vec<_>>())),
+ Arc::new(Int64Array::from(vec![1; n])),
+ ],
+ )?;
+
+ let runtime = RuntimeEnvBuilder::default()
+ .with_memory_limit(4096, 1.0)
+ .build_arc()?;
+ let mut task_ctx = TaskContext::default().with_runtime(runtime);
+ let mut cfg = task_ctx.session_config().clone();
+ cfg = cfg.set(
+ "datafusion.execution.batch_size",
+ &datafusion_common::ScalarValue::UInt64(Some(128)),
+ );
+ cfg = cfg.set(
+
"datafusion.execution.skip_partial_aggregation_probe_rows_threshold",
+ &datafusion_common::ScalarValue::UInt64(Some(u64::MAX)),
+ );
+ task_ctx = task_ctx.with_session_config(cfg);
+ let task_ctx = Arc::new(task_ctx);
+
+ let ordering =
LexOrdering::new(vec![PhysicalSortExpr::new_default(Arc::new(
+ Column::new("sort_col", 0),
+ )
+ as _)])
+ .unwrap();
+ let exec = TestMemoryExec::try_new(&[vec![batch]],
Arc::clone(&schema), None)?
+ .try_with_sort_information(vec![ordering])?;
+ let exec = Arc::new(TestMemoryExec::update_cache(&Arc::new(exec)));
+
+ // GROUP BY sort_col, group_col with input sorted on sort_col
+ // gives PartiallySorted([0])
+ let aggregate_exec = AggregateExec::try_new(
+ AggregateMode::Partial,
+ PhysicalGroupBy::new_single(vec![
+ (col("sort_col", &schema)?, "sort_col".to_string()),
+ (col("group_col", &schema)?, "group_col".to_string()),
+ ]),
+ vec![Arc::new(
+ AggregateExprBuilder::new(count_udaf(), vec![col("value_col",
&schema)?])
+ .schema(Arc::clone(&schema))
+ .alias("count_value")
+ .build()?,
+ )],
+ vec![None],
+ exec,
+ Arc::clone(&schema),
+ )?;
+ assert!(matches!(
+ aggregate_exec.input_order_mode(),
+ InputOrderMode::PartiallySorted(_)
+ ));
+
+ // Must not panic with "assertion failed: *current_sort >= n"
+ let mut stream = GroupedHashAggregateStream::new(&aggregate_exec,
&task_ctx, 0)?;
+ while let Some(result) = stream.next().await {
+ if let Err(e) = result {
+ if e.to_string().contains("Resources exhausted") {
+ break;
+ }
+ return Err(e);
+ }
+ }
+
+ Ok(())
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]