alamb commented on code in PR #20446:
URL: https://github.com/apache/datafusion/pull/20446#discussion_r2843283788


##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -1036,7 +1036,19 @@ impl GroupedHashAggregateStream {
                     self.group_values.len()
                 };
 
-                if let Some(batch) = self.emit(EmitTo::First(n), false)? {
+                // Clamp to the sort boundary when using partial group 
ordering,
+                // otherwise remove_groups panics (#20445).
+                let n = match &self.group_ordering {

Review Comment:
   I found this logic quite confusing and the use of `0` as a sentiel also 
zero. Could we maybe encapsulate it into a method? That way we could at least 
explain what it is doing better. 
   
   I tried a bunch of different forumations, and the best I could come up with 
was
   
   ```rust
   impl GroupedHashAggregateStream {
   ...
        // Clamp to the sort boundary when using partial group ordering,
                   // otherwise remove_groups panics (#20445).
                   if let Some(emit_to) = self.emit_target_for_oom() {
                       if let Some(batch) = self.emit(EmitTo::First(n), false)?
                       {
                           return 
Ok(Some(ExecutionState::ProducingOutput(batch)))
                       }
                   }
   ...
   
       /// Returns how many groups to try and emit in order to avoid an 
out-of-memory
       /// condition.
       ///
       /// Returns `None` if emitting is not possible.
       ///
       /// Returns Some(EmitTo) with the number of groups to emit if it is 
possible
       /// to emit some groups to free memory
       fn emit_target_for_oom(&self) -> Option<EmitTo> {
           let n = if self.group_values.len() >= self.batch_size {
               // Try to emit an integer multiple of batch size if possible
               self.group_values.len() / self.batch_size * self.batch_size
           } else {
               // Otherwise emit whatever we can
               self.group_values.len()
           };
   
           // Special case for GroupOrdering::None since emit_to() returns None 
for
           // that case, but we can still emit some groups to try to resolve 
the OOM
            if matches!(&self.group_ordering, GroupOrdering::None) {
                return Some(EmitTo::First(n));
            };
   
           self.group_ordering.emit_to()
               .map(|emit_to| match emit_to {
                   // If the ordering allows emitting some groups,
                   // emit as many as we can to try to resolve the OOM,
                   EmitTo::First(max)=> EmitTo::First(n.min(max)),
                   // if the ordering allows emitting all groups, we can emit n
                   // groups to try to resolve the OOM
                   EmitTo::All => EmitTo::First(n),
               })
       }
   
   ...
   }
   ```
   
   
   Here is the entire proposed diff
   
   ```diff
   index 35f32ac7a..5bd33aab5 100644
   --- a/datafusion/physical-plan/src/aggregates/row_hash.rs
   +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs
   @@ -1038,26 +1038,50 @@ impl GroupedHashAggregateStream {
   
                    // Clamp to the sort boundary when using partial group 
ordering,
                    // otherwise remove_groups panics (#20445).
   -                let n = match &self.group_ordering {
   -                    GroupOrdering::None => n,
   -                    _ => match self.group_ordering.emit_to() {
   -                        Some(EmitTo::First(max)) => n.min(max),
   -                        _ => 0,
   -                    },
   -                };
   -
   -                if n > 0
   -                    && let Some(batch) = self.emit(EmitTo::First(n), false)?
   -                {
   -                    Ok(Some(ExecutionState::ProducingOutput(batch)))
   -                } else {
   -                    Err(oom)
   +                if let Some(emit_to) = self.emit_target_for_oom() {
   +                    if let Some(batch) = self.emit(EmitTo::First(n), 
false)? {
   +                        return 
Ok(Some(ExecutionState::ProducingOutput(batch)));
   +                    }
                    }
   +
   +                Err(oom)
                }
                _ => Err(oom),
            }
        }
   
   +    /// Returns how many groups to try and emit in order to avoid an 
out-of-memory
   +    /// condition.
   +    ///
   +    /// Returns `None` if emitting is not possible.
   +    ///
   +    /// Returns Some(EmitTo) with the number of groups to emit if it is 
possible
   +    /// to emit some groups to free memory
   +    fn emit_target_for_oom(&self) -> Option<EmitTo> {
   +        let n = if self.group_values.len() >= self.batch_size {
   +            // Try to emit an integer multiple of batch size if possible
   +            self.group_values.len() / self.batch_size * self.batch_size
   +        } else {
   +            // Otherwise emit whatever we can
   +            self.group_values.len()
   +        };
   +
   +        // Special case for GroupOrdering::None since emit_to() returns 
None for
   +        // that case, but we can still emit some groups to try to resolve 
the OOM
   +        if matches!(&self.group_ordering, GroupOrdering::None) {
   +            return Some(EmitTo::First(n));
   +        };
   +
   +        self.group_ordering.emit_to().map(|emit_to| match emit_to {
   +            // If the ordering allows emitting some groups,
   +            // emit as many as we can to try to resolve the OOM,
   +            EmitTo::First(max) => EmitTo::First(n.min(max)),
   +            // if the ordering allows emitting all groups, we can emit n
   +            // groups to try to resolve the OOM
   +            EmitTo::All => EmitTo::First(n),
   +        })
   +    }
   +
   ```



##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -1036,7 +1036,19 @@ impl GroupedHashAggregateStream {
                     self.group_values.len()
                 };
 
-                if let Some(batch) = self.emit(EmitTo::First(n), false)? {
+                // Clamp to the sort boundary when using partial group 
ordering,
+                // otherwise remove_groups panics (#20445).
+                let n = match &self.group_ordering {
+                    GroupOrdering::None => n,
+                    _ => match self.group_ordering.emit_to() {
+                        Some(EmitTo::First(max)) => n.min(max),
+                        _ => 0,

Review Comment:
   Why not change this to emit n groups on `Some(EmitTo::All)`?
   
   ```rust
                _ => match self.group_ordering.emit_to() {
                           Some(EmitTo::First(max)) => n.min(max),
                           Some(EmitTo::All) => n,
                           _ => 0,
   ```
   
   As I understand what it is doing, we are trying to ensure at most n groups 
are emitted
   (this is incorporated above as well)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to