gruuya opened a new issue, #9417:
URL: https://github.com/apache/arrow-datafusion/issues/9417
### Is your feature request related to a problem or challenge?
The Top-K operator has recently been added for a specialized use case when
encountering `ORDER BY` and `LIMIT` clauses together (#7250, #7721), as a way
to optimize the memory usage of the sorting procedure.
Still the present implementation relies on keeping in memory the input
record batches with potential row candidates for the final K output rows. This
means that in the pathological case, there can be K batches in memory per the
TopK operator, which are themselves spawned per input partition.
In particular this leads to the following error for ClickBench query 19:
```bash
% datafusion-cli -m 8gb
DataFusion CLI v36.0.0
❯ CREATE EXTERNAL TABLE hits STORED AS PARQUET LOCATION
'/path/to/hits.parquet';
0 rows in set. Query took 0.032 seconds.
❯ SELECT "UserID", extract(minute FROM to_timestamp("EventTime")) AS m,
"SearchPhrase", COUNT(*) FROM hits GROUP BY "UserID", m, "SearchPhrase" ORDER
BY COUNT(*) DESC LIMIT 10;
Resources exhausted: Failed to allocate additional 220827928 bytes for
TopK[3] with 883453086 bytes already allocated - maximum available is 150024911
```
In the above case I see 12 partitions x ~3.5 batches per TopK operator in
memory x 223 MB per batch (which is kind of strange for 4 columns) = 9366 MB,
thus peaking above the set memory limit of 8GB.
### Describe the solution you'd like
Ideally something that doesn't hurt performance but reduces the memory
footprint even more. Failing that, something that perhaps hurts performance
only once the memory limit threshold has been surpassed (e.g. by spilling), but
without crashing the query.
### Describe alternatives you've considered
### Option 1
Increasing or not setting a memory limit.
### Option 2
Introduce spilling to disk for the TopK operator as a fallback when the
memory limit is hit.
### Option 3
Potentially something like converting the column arrays of the input record
batch to rows, like for the evaluated sort keys
https://github.com/apache/arrow-datafusion/blob/b2ff249bfb918ac6697dbc92b51262a7bdbb5971/datafusion/physical-plan/src/topk/mod.rs#L163
and then making `TopKRow` track the projected rows, in addition to the sort
keys, but compare only against the sort key. This would enable the `BinaryHeap`
to discard the unneeded rows.
Finally one could use `arrow_row::RowConverter::convert_rows` to get back
the columns when `emit`ing.
However this is almost guaranteed to lead to worse performance in the
general case due to all of the row-conversion taking place.
### Additional context
Potentially relevant for #7195.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]