alchemist51 opened a new issue, #19649:
URL: https://github.com/apache/datafusion/issues/19649

   ## Describe the issue
   
   Currently in 
[GroupedHashAggregateStream](https://github.com/apache/datafusion/blob/main/datafusion/physical-plan/src/aggregates/row_hash.rs#L355C19-L355C45)
 struct, the group accumulator 
[state](https://github.com/apache/datafusion/blob/main/datafusion/physical-plan/src/aggregates/row_hash.rs#L421)
  is stored in a single Vec<T>. This has following associated problems:
   
   1. No memory is freed until the GroupByHash has output every batch
   2. Upstream operators that hold references to any of the record batches will 
lead to holding up the underlying record batch in memory
   
   Courtesy to @alamb for explaining it in detail 
[here](https://github.com/apache/datafusion/issues/7065)
   
   ## Solution
   
   Instead of storing a single large chunk of RecordBatchStream and slicing it, 
we could store it as multiple chunks. We have already 
[discussed](https://github.com/apache/datafusion/issues/11931) about the fixed 
sizing for the group accumulators in a different light, where we were trying to 
reduce the cost due to the double copy which we do in Vec::resize() for storing 
the groupaccumulators state. The same idea could be implemented for this issue 
and can help in reducing the overall memory usage for datafusion.
   
   In the experiment, we took the PR #15591 by 
@[Rachelint](https://github.com/Rachelint) and made it work for constraint 
memory for PrimitiveArrow types.Since it’s for primitive types we took the 
field WatchID which has very high cardinality. For reference, in clickbench 
data WatchID has ~99M cardinality. Here is the query we tried:
   
   
   
   ```
   SELECT "WatchID", MIN("ResolutionWidth"), MAX("ResolutionWidth"), 
SUM("IsRefresh") FROM hits GROUP BY "WatchID" ORDER BY "WatchID" DESC LIMIT 10;
   ```
   
   
   When we run this query for a 16GB allocated memory pool, it fails to run 
with the below resource exhaustion message:
   
   ```
   Resources exhausted: Failed to allocate additional 2684354560(2.5GB) bytes 
for 
   TopK[0] with 13421914214(12.5GB) bytes already allocated for this 
reservation - 
   1878841178(1.74GB) bytes remain available for the total pool
   ```
   
   ```
   [TOPK-INSERT] ▼ Receiving batch from aggregation: rows=8192, 
current_heap=0/10, memory=1000 bytes
   [TOPK-MEMORY] BEFORE insert: total=140994 bytes, batches_count=0, 
batches_size=0 bytes, entry_uses=10, reservation=0 bytes
   [TOPK-STORE] INSERT batch_id=0, uses=10, batch_size=2684354560 bytes, 
batch_rows=8192, total_batches=0->1, total_size=0->2684354560 bytes
   ```
   
   
   However when we enable the blocked approach for this query. Not only we were 
able to run the query in 16GB, it was working in 4GB memory pool as well:
   
   ```
    ./target/debug/datafusion-cli -m 4g
   
+---------------------+---------------------------+---------------------------+---------------------+
   | WatchID             | min(hits.ResolutionWidth) | 
max(hits.ResolutionWidth) | sum(hits.IsRefresh) |
   
+---------------------+---------------------------+---------------------------+---------------------+
   | 9223372033328793741 | 1368                      | 1368                     
 | 0                   |
   | 9223371941779979288 | 1479                      | 1479                     
 | 0                   |
   | 9223371906781104763 | 1638                      | 1638                     
 | 0                   |
   | 9223371803397398692 | 1990                      | 1990                     
 | 0                   |
   | 9223371799215233959 | 1638                      | 1638                     
 | 0                   |
   | 9223371785975219972 | 0                         | 0                        
 | 0                   |
   | 9223371776706839366 | 1368                      | 1368                     
 | 0                   |
   | 9223371740707848038 | 1750                      | 1750                     
 | 0                   |
   | 9223371715190479830 | 1368                      | 1368                     
 | 0                   |
   | 9223371620124912624 | 1828                      | 1828                     
 | 0                   |
   
+---------------------+---------------------------+---------------------------+---------------------+
   ```
   
   This is currently supported only for few 
[GroupedAccumulators](https://arrow.apache.org/blog/2023/08/05/datafusion_fast_grouping/)
 and Primitive data types. But the early results show promise that this can 
help us in making datafusion more memory friendly as can be seen from the topK 
logs as well:
   
   ```
   [TOPK-INSERT] ▼ Receiving batch from aggregation: rows=8192, 
current_heap=0/10, memory=1000 bytes
   [TOPK-MEMORY] BEFORE insert: total=140994 bytes, batches_count=0, 
batches_size=0 bytes, entry_uses=10, reservation=0 bytes
   [TOPK-STORE] INSERT batch_id=0, uses=10, batch_size=1310720 bytes, 
batch_rows=8192, total_batches=0->1, total_size=0->1310720 bytes
   ```
   
   For next steps, I’m thinking of submitting following issue tickets for 
making it work for resource intensive queries:
   
   
   * Introduce blocked approach for latest → PR #15591 is stale, we will need 
to revive it and make it work for the latest. We will focus on the memory 
optimisation coming from the change. Also PR #15591 doesn’t cover the following 
cases:
       * Support blocked approach with spills → We will always be running in 
memory constraint environment and will need to have spill support for the 
blocked approach for it.
       * Support multi group by → current POC code only helps in the case of 
single group by query, we need to implement it for multi group by cases.
       * Support for non-primitive types → currently we only support primitive 
types of Arrow, we will need to implement it for BinaryViews like structs.
       * Support all accumulators → currently only few of the grouped 
accumulators are supported like AVG,SUM,MIN,MAX. We need to extend it to 
support all varieties of aggs
   
   cc: @bharath-techie


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to