alamb commented on code in PR #19287:
URL: https://github.com/apache/datafusion/pull/19287#discussion_r2620687927


##########
datafusion/physical-plan/src/aggregates/mod.rs:
##########
@@ -3401,6 +3403,116 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn test_grouped_aggregation_respects_memory_limit() -> Result<()> {

Review Comment:
   I verified these tests cover the new code as they fail without the code in 
this PR
   
   ```
   failures:
   
   ---- aggregates::tests::test_grouped_aggregation_respects_memory_limit 
stdout ----
   
   thread 'aggregates::tests::test_grouped_aggregation_respects_memory_limit' 
(25899629) panicked at datafusion/physical-plan/src/aggregates/mod.rs:3384:17:
   Expected spill but SpillCount metric not found or SpillCount was 0.
   note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
   
   ---- aggregates::tests::test_order_is_retained_when_spilling stdout ----
   
   thread 'aggregates::tests::test_order_is_retained_when_spilling' (25899632) 
panicked at datafusion/physical-plan/src/aggregates/mod.rs:3384:17:
   Expected spill but SpillCount metric not found or SpillCount was 0.
   
   
   
   failures:
       aggregates::tests::test_grouped_aggregation_respects_memory_limit
       aggregates::tests::test_order_is_retained_when_spilling
   
   
   ```



##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -1060,18 +1084,26 @@ impl GroupedHashAggregateStream {
         Ok(Some(batch))
     }
 
+    /// Determines if `spill_state_if_oom` can free up memory by spilling 
state to disk
+    fn can_spill_on_oom(&self) -> bool {

Review Comment:
   nit would be to move this closer in the code to `can_emit_early_on_oom` so 
they are closer together in the code



##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -510,12 +522,12 @@ impl GroupedHashAggregateStream {
         // Therefore, when we spill these intermediate states or pass them to 
another
         // aggregation operator, we must use a schema that includes both the 
group
         // columns **and** the partial-state columns.
-        let partial_agg_schema = create_schema(
+        let spill_schema = Arc::new(create_schema(

Review Comment:
   given that this is used for spilling (per the comments) renaming these 
variables makes sense to me



##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -527,20 +539,27 @@ impl GroupedHashAggregateStream {
             })
             .collect();
 
-        let partial_agg_schema = Arc::new(partial_agg_schema);
+        let output_ordering = agg.cache.output_ordering();
 
-        let spill_expr =
+        let spill_sort_exprs =
             group_schema
                 .fields
                 .into_iter()
                 .enumerate()
                 .map(|(idx, field)| {
-                    PhysicalSortExpr::new_default(Arc::new(Column::new(
-                        field.name().as_str(),
-                        idx,
-                    )) as _)
+                    let output_expr = Column::new(field.name().as_str(), idx);
+
+                    // Try to use the sort options from the output ordering, 
if available.
+                    // This ensures that spilled state is emitted in the 
expected order.

Review Comment:
   👍 



##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -1151,26 +1198,18 @@ impl GroupedHashAggregateStream {
     /// Conduct a streaming merge sort between the batch and spilled data. 
Since the stream is fully
     /// sorted, set `self.group_ordering` to Full, then later we can read with 
[`EmitTo::First`].
     fn update_merged_stream(&mut self) -> Result<()> {
-        let Some(batch) = self.emit(EmitTo::All, true)? else {
-            return Ok(());
-        };
+        // Spill the last remaining rows (if any) to free up as much memory as 
possible.
+        // Since we're already spilling, we can be sure we're memory 
constrained.
+        // Creating an extra spill file won't make much of a difference.
+        self.spill()?;

Review Comment:
   It sort of depends on how large the spill file was -- like if we have 2GB of 
data in memorying, writing it to a new spill file will likely make a measurable 
negative difference in performance



##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -643,6 +674,21 @@ impl GroupedHashAggregateStream {
     }
 }
 
+fn find_sort_options(
+    output_ordering: Option<&LexOrdering>,
+    expr: &dyn PhysicalExpr,
+) -> SortOptions {
+    if let Some(ordering) = output_ordering {
+        for e in ordering {
+            if e.expr.as_ref().dyn_eq(expr) {
+                return e.options;
+            }
+        }
+    }
+
+    SortOptions::default()
+}
+

Review Comment:
   Maybe we could add a method to LexOrdering -- so this could be like
   
   ```rust
   LexOrdering::find_options(expr).unwrap_or_default()
   ```



##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -1151,26 +1198,18 @@ impl GroupedHashAggregateStream {
     /// Conduct a streaming merge sort between the batch and spilled data. 
Since the stream is fully
     /// sorted, set `self.group_ordering` to Full, then later we can read with 
[`EmitTo::First`].
     fn update_merged_stream(&mut self) -> Result<()> {
-        let Some(batch) = self.emit(EmitTo::All, true)? else {
-            return Ok(());
-        };
+        // Spill the last remaining rows (if any) to free up as much memory as 
possible.
+        // Since we're already spilling, we can be sure we're memory 
constrained.
+        // Creating an extra spill file won't make much of a difference.
+        self.spill()?;
+
         // clear up memory for streaming_merge
         self.clear_all();
         self.update_memory_reservation()?;
-        let mut streams: Vec<SendableRecordBatchStream> = vec![];

Review Comment:
   This change is because the code now forces a spill even if it had memory to 
hold the last batch in memory, right?
   
   I feel like while the code is now cleaner we have lost some performance
   
   Maybe we could factor the creation of the stream into a method so it looks 
something like this
   ```rust
   if let Some(stream) = self.emit_to_stream(EmitTo:All, true) { 
    builder = builder.with_streams(vec![stream]));
   }
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to