alamb commented on code in PR #18906:
URL: https://github.com/apache/datafusion/pull/18906#discussion_r2635687188


##########
datafusion/physical-plan/src/aggregates/group_values/row.rs:
##########
@@ -206,37 +233,52 @@ impl GroupValues for GroupValuesRows {
                 output
             }
             EmitTo::First(n) => {
-                let groups_rows = group_values.iter().take(n);
-                let output = self.row_converter.convert_rows(groups_rows)?;
-                // Clear out first n group keys by copying them to a new Rows.
-                // TODO file some ticket in arrow-rs to make this more 
efficient?
-                let mut new_group_values = self.row_converter.empty_rows(0, 0);
-                for row in group_values.iter().skip(n) {
-                    new_group_values.push(row);
-                }
-                std::mem::swap(&mut new_group_values, &mut group_values);
-
-                self.map.retain(|(_exists_hash, group_idx)| {
-                    // Decrement group index by n
-                    match group_idx.checked_sub(n) {
-                        // Group index was >= n, shift value down
-                        Some(sub) => {
-                            *group_idx = sub;
-                            true
-                        }
-                        // Group index was < n, so remove from table
-                        None => false,
+                if self.drain_mode {

Review Comment:
   I don't understand this change: I think `EmitTo::First(..)` is only called 
when in "partial" grouping mode (aka we are emitting values *before* we have 
seen the end of the input because the data is sorted by some prefix of the 
group keys)
   
   When there is no sorting by group keys, the `EmitTo::All` path is taken to 
create a single (very large) RecordBatch
   
   Once the output RecordBatch is created, then the existing code emits it in 
chunks, as described in  
https://github.com/apache/datafusion/blob/2e3707e380172a4ba1ae5efabe7bd27a354bfb2d/datafusion/physical-plan/src/aggregates/row_hash.rs#L66
   
   
   Thus I would have thought  that the long pause in group by agrgegateion was 
casued by the code above in the `EmitTo::All` path that basically copies all 
the data into that single large batch: 
   
   ```rust
   self.row_converter.convert_rows(&group_values)?;
   ```
   
   The only way I can think of to avoid the cost / delay of converting all 
groups at once would be to change how `EmitTo` work -- perhaps change 
`EmitTo::All` to something like `Emit::Next(n)` which would incrementally emit 
batches of rows in slices of `n` rows
   
   This would be a fairly large change, but it could be the start of 
implementing chunked memory group management as well, which is a long term goal 
of the project I think 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to