This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new cf0a182604 Simplify logic for memory pressure partial emit from
ordered group by (#20559)
cf0a182604 is described below
commit cf0a1826044d786e5d7df8708ce63ee00f20d750
Author: Andrew Lamb <[email protected]>
AuthorDate: Wed Mar 18 03:01:22 2026 -0400
Simplify logic for memory pressure partial emit from ordered group by
(#20559)
## Which issue does this PR close?
- related to https://github.com/apache/datafusion/issues/20445
- Follow on to https://github.com/apache/datafusion/pull/20446
## Rationale for this change
I found the formulation of the fix in
https://github.com/apache/datafusion/pull/20446 hard to follow (see
https://github.com/apache/datafusion/pull/20446#pullrequestreview-3843736859
for details).
Basically the meaning of emit_to and 0 are inverted in this case.
## What changes are included in this PR?
Pull the logic of what to emit into its own function with more comments
that I think make it clearer what is going on
## Are these changes tested?
Yes by existing tests
## Are there any user-facing changes?
<!--
If there are user-facing changes then we may require documentation to be
updated before approving the PR.
-->
<!--
If there are any breaking changes to public APIs, please add the `api
change` label.
-->
---
.../physical-plan/src/aggregates/order/mod.rs | 85 +++++++++++++++++++++-
.../physical-plan/src/aggregates/row_hash.rs | 23 ++----
2 files changed, 91 insertions(+), 17 deletions(-)
diff --git a/datafusion/physical-plan/src/aggregates/order/mod.rs
b/datafusion/physical-plan/src/aggregates/order/mod.rs
index bbcb30d877..183e2b0098 100644
--- a/datafusion/physical-plan/src/aggregates/order/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/order/mod.rs
@@ -52,7 +52,8 @@ impl GroupOrdering {
}
}
- // How many groups be emitted, or None if no data can be emitted
+ /// Returns how many groups be emitted while respecting the current
ordering
+ /// guarantees, or `None` if no data can be emitted
pub fn emit_to(&self) -> Option<EmitTo> {
match self {
GroupOrdering::None => None,
@@ -61,6 +62,28 @@ impl GroupOrdering {
}
}
+ /// Returns the emit strategy to use under memory pressure (OOM).
+ ///
+ /// Returns the strategy that must be used when emitting up to `n` groups
+ /// while respecting the current ordering guarantees.
+ ///
+ /// Returns `None` if no data can be emitted.
+ pub fn oom_emit_to(&self, n: usize) -> Option<EmitTo> {
+ if n == 0 {
+ return None;
+ }
+
+ match self {
+ GroupOrdering::None => Some(EmitTo::First(n)),
+ GroupOrdering::Partial(_) | GroupOrdering::Full(_) => {
+ self.emit_to().map(|emit_to| match emit_to {
+ EmitTo::First(max) => EmitTo::First(n.min(max)),
+ EmitTo::All => EmitTo::First(n),
+ })
+ }
+ }
+ }
+
/// Updates the state the input is done
pub fn input_done(&mut self) {
match self {
@@ -122,3 +145,63 @@ impl GroupOrdering {
}
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ use std::sync::Arc;
+
+ use arrow::array::{ArrayRef, Int32Array};
+
+ #[test]
+ fn test_oom_emit_to_none_ordering() {
+ let group_ordering = GroupOrdering::None;
+
+ assert_eq!(group_ordering.oom_emit_to(0), None);
+ assert_eq!(group_ordering.oom_emit_to(5), Some(EmitTo::First(5)));
+ }
+
+ /// Creates a partially ordered grouping state with three groups.
+ ///
+ /// `sort_key_values` controls whether a sort boundary exists in the batch:
+ /// distinct values such as `[1, 2, 3]` create boundaries, while repeated
+ /// values such as `[1, 1, 1]` do not.
+ fn partial_ordering(sort_key_values: Vec<i32>) -> Result<GroupOrdering> {
+ let mut group_ordering =
+ GroupOrdering::Partial(GroupOrderingPartial::try_new(vec![0])?);
+
+ let batch_group_values: Vec<ArrayRef> = vec![
+ Arc::new(Int32Array::from(sort_key_values)),
+ Arc::new(Int32Array::from(vec![10, 20, 30])),
+ ];
+ let group_indices = vec![0, 1, 2];
+
+ group_ordering.new_groups(&batch_group_values, &group_indices, 3)?;
+
+ Ok(group_ordering)
+ }
+
+ #[test]
+ fn test_oom_emit_to_partial_clamps_to_boundary() -> Result<()> {
+ let group_ordering = partial_ordering(vec![1, 2, 3])?;
+
+ // Can emit both `1` and `2` groups because we have seen `3`
+ assert_eq!(group_ordering.emit_to(), Some(EmitTo::First(2)));
+ assert_eq!(group_ordering.oom_emit_to(1), Some(EmitTo::First(1)));
+ assert_eq!(group_ordering.oom_emit_to(3), Some(EmitTo::First(2)));
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_oom_emit_to_partial_without_boundary() -> Result<()> {
+ let group_ordering = partial_ordering(vec![1, 1, 1])?;
+
+ // Can't emit the last `1` group as it may have more values
+ assert_eq!(group_ordering.emit_to(), None);
+ assert_eq!(group_ordering.oom_emit_to(3), None);
+
+ Ok(())
+ }
+}
diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs
b/datafusion/physical-plan/src/aggregates/row_hash.rs
index 25116716bd..b857fdca3f 100644
--- a/datafusion/physical-plan/src/aggregates/row_hash.rs
+++ b/datafusion/physical-plan/src/aggregates/row_hash.rs
@@ -1037,25 +1037,16 @@ impl GroupedHashAggregateStream {
self.group_values.len()
};
- // Clamp to the sort boundary when using partial group
ordering,
- // otherwise remove_groups panics (#20445).
- let n = match &self.group_ordering {
- GroupOrdering::None => n,
- _ => match self.group_ordering.emit_to() {
- Some(EmitTo::First(max)) => n.min(max),
- _ => 0,
- },
- };
-
- if n > 0
- && let Some(batch) = self.emit(EmitTo::First(n), false)?
+ if let Some(emit_to) = self.group_ordering.oom_emit_to(n)
+ && let Some(batch) = self.emit(emit_to, false)?
{
- Ok(Some(ExecutionState::ProducingOutput(batch)))
- } else {
- Err(oom)
+ return Ok(Some(ExecutionState::ProducingOutput(batch)));
}
+ Err(oom)
}
- _ => Err(oom),
+ OutOfMemoryMode::EmitEarly
+ | OutOfMemoryMode::Spill
+ | OutOfMemoryMode::ReportError => Err(oom),
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]