2010YOUY01 opened a new issue, #23076:
URL: https://github.com/apache/datafusion/issues/23076

   ### Is your feature request related to a problem or challenge?
   
   
https://docs.rs/arrow-select/latest/arrow_select/concat/fn.concat_batches.html
   
   `concat_batches` currently uses 2X memory comparing to the input memory size.
   
   e.g. input has 10 batches, 100MB each, during concatenating, peak memory 
usage (RSS) is 2GB (2x than ideal), the reason is from the implementation, 
output buffer and input buffer coexist until finish.
   
   This causes significant memory inefficiency for joins (hash join, nested 
loop join): currently the join implementation typically buffer all input 
batches, concat them into a large batch, then perform the joining steps. Their 
peak memory usage become 2X than the ideal case.
   
   Reproducer in `datafusion-cli`
   ```
   /usr/bin/time -l datafusion-cli -f /Users/yongting/Scripts/hj_mem.sql
   
   100000001 row(s) fetched. (First 40 displayed. Use --maxrows to adjust)
   Elapsed 0.512 seconds.
   
           0.54 real         0.43 user         0.11 sys
             2032107520  maximum resident set size                  <--- 2GB
   ```
   ```sql
   -- hj_mem.sql
   set datafusion.execution.target_partitions=1;
   
   select *
   from generate_series(100000000) as t1(v1)
   join generate_series(100000000) as t2(v1)
   on t1.v1=t2.v1;
   ```
   This query buffers all build side first, and do streaming probing, build 
side is Int64 type, so ideal memory consumption is 100M * 8B, inefficient 
`concat_batches` explains the 2x. (I'm not sure about the remaining 0.5x memory 
amplification.)
   
   Here is a related issue trying to solve this issue by avoiding 
`concat_batches` in joins: https://github.com/apache/datafusion/issues/23031
   
   ### Describe the solution you'd like
   
   Implement a similar util `concat_batches_owned`. Incrementally concat the 
batches like
   ```
   let inprogress_batch = init_inprogress_batch(input_batches[0].schema());
   for batch in input_batches:
       inprogress_batch.push(batch);
       // now batch is dropped and release memory
   ```
   
   The new util have to consume input batches instead of taking reference, but 
this should be fine for the join use cases.
   
   The implementation should be similar to 
   - https://docs.rs/arrow/latest/arrow/compute/struct.BatchCoalescer.html
   
   Ideally this approach can do better than the existing 2X memory.
   
   ### Describe alternatives you've considered
   
   _No response_
   
   ### Additional context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to