gruuya opened a new issue, #9417:
URL: https://github.com/apache/arrow-datafusion/issues/9417

   ### Is your feature request related to a problem or challenge?
   
   The Top-K operator has recently been added for a specialized use case when 
encountering `ORDER BY` and `LIMIT` clauses together (#7250, #7721), as a way 
to optimize the memory usage of the sorting procedure.
   
   Still the present implementation relies on keeping in memory the input 
record batches with potential row candidates for the final K output rows. This 
means that in the pathological case, there can be K batches in memory per the 
TopK operator, which are themselves spawned per input partition.
   
   In particular this leads to the following error for ClickBench query 19:
   
   ```bash
   % datafusion-cli -m 8gb
   DataFusion CLI v36.0.0
   ❯ CREATE EXTERNAL TABLE hits STORED AS PARQUET LOCATION 
'/path/to/hits.parquet';
   0 rows in set. Query took 0.032 seconds.
   
   ❯ SELECT "UserID", extract(minute FROM to_timestamp("EventTime")) AS m, 
"SearchPhrase", COUNT(*) FROM hits GROUP BY "UserID", m, "SearchPhrase" ORDER 
BY COUNT(*) DESC LIMIT 10;
   Resources exhausted: Failed to allocate additional 220827928 bytes for 
TopK[3] with 883453086 bytes already allocated - maximum available is 150024911
   ```
   
   In the above case I see 12 partitions x ~3.5 batches per TopK operator in 
memory x 223 MB per batch (which is kind of strange for 4 columns) = 9366 MB, 
thus peaking above the set memory limit of 8GB.
   
   ### Describe the solution you'd like
   
   Ideally something that doesn't hurt performance but reduces the memory 
footprint even more. Failing that, something that perhaps hurts performance 
only once the memory limit threshold has been surpassed (e.g. by spilling), but 
without crashing the query.
   
   ### Describe alternatives you've considered
   
   ### Option 1
   Increasing or not setting a memory limit.
   
   ### Option 2
   Introduce spilling to disk for the TopK operator as a fallback when the 
memory limit is hit.
   
   ### Option 3
   
   Potentially something like converting the column arrays of the input record 
batch to rows, like for the evaluated sort keys
   
https://github.com/apache/arrow-datafusion/blob/b2ff249bfb918ac6697dbc92b51262a7bdbb5971/datafusion/physical-plan/src/topk/mod.rs#L163
   and then making `TopKRow` track the projected rows, in addition to the sort 
keys, but compare only against the sort key. This would enable the `BinaryHeap` 
to discard the unneeded rows.
   
   Finally one could use `arrow_row::RowConverter::convert_rows` to get back 
the columns when `emit`ing.
   
   However this is almost guaranteed to lead to worse performance in the 
general case due to all of the row-conversion taking place.
   
   ### Additional context
   
   Potentially relevant for #7195.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to