ahmed-mez commented on code in PR #18906:
URL: https://github.com/apache/datafusion/pull/18906#discussion_r2650712133
##########
datafusion/physical-plan/src/aggregates/group_values/row.rs:
##########
@@ -206,37 +233,52 @@ impl GroupValues for GroupValuesRows {
output
}
EmitTo::First(n) => {
- let groups_rows = group_values.iter().take(n);
- let output = self.row_converter.convert_rows(groups_rows)?;
- // Clear out first n group keys by copying them to a new Rows.
- // TODO file some ticket in arrow-rs to make this more
efficient?
- let mut new_group_values = self.row_converter.empty_rows(0, 0);
- for row in group_values.iter().skip(n) {
- new_group_values.push(row);
- }
- std::mem::swap(&mut new_group_values, &mut group_values);
-
- self.map.retain(|(_exists_hash, group_idx)| {
- // Decrement group index by n
- match group_idx.checked_sub(n) {
- // Group index was >= n, shift value down
- Some(sub) => {
- *group_idx = sub;
- true
- }
- // Group index was < n, so remove from table
- None => false,
+ if self.drain_mode {
Review Comment:
> I don't understand this change: I think EmitTo::First(..) is only called
when in "partial" grouping mode (aka we are emitting values before we have seen
the end of the input because the data is sorted by some prefix of the group
keys)
The `drain_mode` check is needed because this PR reuses `EmitTo::First` for
chunked emission in the new `DrainingGroups` state, but that context is
different from the sorted-input context. Without `drain_mode`, we'd
unnecessarily try to retain/shift hash map entries that have already been
cleared, and we'd copy rows instead of using the more efficient offset-based
approach.
That said, I do acknowledge that the current implementation makes the code
more convoluted and the fact that you and @gabotechs got confused by this piece
of code is already a bad signal. We have to do something about it.
> Thus I would have thought that the long pause in group by agrgegateion was
casued by the code above in the EmitTo::All path that basically copies all the
data into that single large batch:
```rust
self.row_converter.convert_rows(&group_values)?;
```
This is correct, and is aligned with the observations (see
https://github.com/apache/datafusion/issues/18907).
> The only way I can think of to avoid the cost / delay of converting all
groups at once would be to change how EmitTo work -- perhaps change EmitTo::All
to something like Emit::Next(n) which would incrementally emit batches of rows
in slices of n rows
> This would be a fairly large change, but it could be the start of
implementing chunked memory group management as well, which is a long term goal
of the project I think
This sounds reasonable, thanks! Is there an epic to track the effort? I'd be
happy to contribute to it.
Also while searching for related issues, I found
https://github.com/apache/datafusion/issues/19481 which seems to be another
symptom/manifestation (memory pressure and OOMs) from the same root cause
(`Emit::All` emitting everything at once).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]