This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new cf0a182604 Simplify logic for memory pressure partial emit from 
ordered group by (#20559)
cf0a182604 is described below

commit cf0a1826044d786e5d7df8708ce63ee00f20d750
Author: Andrew Lamb <[email protected]>
AuthorDate: Wed Mar 18 03:01:22 2026 -0400

    Simplify logic for memory pressure partial emit from ordered group by 
(#20559)
    
    ## Which issue does this PR close?
    - related to https://github.com/apache/datafusion/issues/20445
    - Follow on to https://github.com/apache/datafusion/pull/20446
    
    ## Rationale for this change
    
    I found the formulation of the fix in
    https://github.com/apache/datafusion/pull/20446 hard to follow (see
    https://github.com/apache/datafusion/pull/20446#pullrequestreview-3843736859
    for details).
    
    Basically the meaning of emit_to and 0 are inverted in this case.
    
    ## What changes are included in this PR?
    
    Pull the logic of what to emit into its own function with more comments
    that I think make it clearer what is going on
    
    ## Are these changes tested?
    
    Yes by existing tests
    
    ## Are there any user-facing changes?
    
    <!--
    If there are user-facing changes then we may require documentation to be
    updated before approving the PR.
    -->
    
    <!--
    If there are any breaking changes to public APIs, please add the `api
    change` label.
    -->
---
 .../physical-plan/src/aggregates/order/mod.rs      | 85 +++++++++++++++++++++-
 .../physical-plan/src/aggregates/row_hash.rs       | 23 ++----
 2 files changed, 91 insertions(+), 17 deletions(-)

diff --git a/datafusion/physical-plan/src/aggregates/order/mod.rs 
b/datafusion/physical-plan/src/aggregates/order/mod.rs
index bbcb30d877..183e2b0098 100644
--- a/datafusion/physical-plan/src/aggregates/order/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/order/mod.rs
@@ -52,7 +52,8 @@ impl GroupOrdering {
         }
     }
 
-    // How many groups be emitted, or None if no data can be emitted
+    /// Returns how many groups be emitted while respecting the current 
ordering
+    /// guarantees, or `None` if no data can be emitted
     pub fn emit_to(&self) -> Option<EmitTo> {
         match self {
             GroupOrdering::None => None,
@@ -61,6 +62,28 @@ impl GroupOrdering {
         }
     }
 
+    /// Returns the emit strategy to use under memory pressure (OOM).
+    ///
+    /// Returns the strategy that must be used when emitting up to `n` groups
+    /// while respecting the current ordering guarantees.
+    ///
+    /// Returns `None` if no data can be emitted.
+    pub fn oom_emit_to(&self, n: usize) -> Option<EmitTo> {
+        if n == 0 {
+            return None;
+        }
+
+        match self {
+            GroupOrdering::None => Some(EmitTo::First(n)),
+            GroupOrdering::Partial(_) | GroupOrdering::Full(_) => {
+                self.emit_to().map(|emit_to| match emit_to {
+                    EmitTo::First(max) => EmitTo::First(n.min(max)),
+                    EmitTo::All => EmitTo::First(n),
+                })
+            }
+        }
+    }
+
     /// Updates the state the input is done
     pub fn input_done(&mut self) {
         match self {
@@ -122,3 +145,63 @@ impl GroupOrdering {
             }
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    use std::sync::Arc;
+
+    use arrow::array::{ArrayRef, Int32Array};
+
+    #[test]
+    fn test_oom_emit_to_none_ordering() {
+        let group_ordering = GroupOrdering::None;
+
+        assert_eq!(group_ordering.oom_emit_to(0), None);
+        assert_eq!(group_ordering.oom_emit_to(5), Some(EmitTo::First(5)));
+    }
+
+    /// Creates a partially ordered grouping state with three groups.
+    ///
+    /// `sort_key_values` controls whether a sort boundary exists in the batch:
+    /// distinct values such as `[1, 2, 3]` create boundaries, while repeated
+    /// values such as `[1, 1, 1]` do not.
+    fn partial_ordering(sort_key_values: Vec<i32>) -> Result<GroupOrdering> {
+        let mut group_ordering =
+            GroupOrdering::Partial(GroupOrderingPartial::try_new(vec![0])?);
+
+        let batch_group_values: Vec<ArrayRef> = vec![
+            Arc::new(Int32Array::from(sort_key_values)),
+            Arc::new(Int32Array::from(vec![10, 20, 30])),
+        ];
+        let group_indices = vec![0, 1, 2];
+
+        group_ordering.new_groups(&batch_group_values, &group_indices, 3)?;
+
+        Ok(group_ordering)
+    }
+
+    #[test]
+    fn test_oom_emit_to_partial_clamps_to_boundary() -> Result<()> {
+        let group_ordering = partial_ordering(vec![1, 2, 3])?;
+
+        // Can emit both `1` and `2` groups because we have seen `3`
+        assert_eq!(group_ordering.emit_to(), Some(EmitTo::First(2)));
+        assert_eq!(group_ordering.oom_emit_to(1), Some(EmitTo::First(1)));
+        assert_eq!(group_ordering.oom_emit_to(3), Some(EmitTo::First(2)));
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_oom_emit_to_partial_without_boundary() -> Result<()> {
+        let group_ordering = partial_ordering(vec![1, 1, 1])?;
+
+        // Can't emit the last `1` group as it may have more values
+        assert_eq!(group_ordering.emit_to(), None);
+        assert_eq!(group_ordering.oom_emit_to(3), None);
+
+        Ok(())
+    }
+}
diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs 
b/datafusion/physical-plan/src/aggregates/row_hash.rs
index 25116716bd..b857fdca3f 100644
--- a/datafusion/physical-plan/src/aggregates/row_hash.rs
+++ b/datafusion/physical-plan/src/aggregates/row_hash.rs
@@ -1037,25 +1037,16 @@ impl GroupedHashAggregateStream {
                     self.group_values.len()
                 };
 
-                // Clamp to the sort boundary when using partial group 
ordering,
-                // otherwise remove_groups panics (#20445).
-                let n = match &self.group_ordering {
-                    GroupOrdering::None => n,
-                    _ => match self.group_ordering.emit_to() {
-                        Some(EmitTo::First(max)) => n.min(max),
-                        _ => 0,
-                    },
-                };
-
-                if n > 0
-                    && let Some(batch) = self.emit(EmitTo::First(n), false)?
+                if let Some(emit_to) = self.group_ordering.oom_emit_to(n)
+                    && let Some(batch) = self.emit(emit_to, false)?
                 {
-                    Ok(Some(ExecutionState::ProducingOutput(batch)))
-                } else {
-                    Err(oom)
+                    return Ok(Some(ExecutionState::ProducingOutput(batch)));
                 }
+                Err(oom)
             }
-            _ => Err(oom),
+            OutOfMemoryMode::EmitEarly
+            | OutOfMemoryMode::Spill
+            | OutOfMemoryMode::ReportError => Err(oom),
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to