justinli500 opened a new pull request, #49855:
URL: https://github.com/apache/arrow/pull/49855

   ### Rationale for this change                                                
                                                                 
                                                                                
                                                                                
                                                                                
                                  
     `Dataset.to_batches()` accumulates memory because `ReadRangeCache`         
                                                                                
                                                                                
                                  
     has no eviction API. `PreBuffer()` is called with every row group up       
                                                                                
                                                                                
                                  
     front and entries stay resident until the `FileReader` is destroyed,       
                                                                                
                                                                                
                                  
     so users see ~10x more peak memory than `ParquetFile.iter_batches()`.      
                                                                                
                                                                                
                                  
     Issue #39808 has been open for over a year; downstream projects have       
                                                                                
                                                                                
                                  
     worked around it by disabling `pre_buffer` (Ray) or dropping the           
                                                                                
                                                                                
                                  
     dataset API (Marin), both of which give up features or throughput.         
                                                                                
                                                                                
                                  
                                                                                
                                                                                
                                                                                
                                  
     ### What changes are included in this PR?                                  
                                                                                
                                                                                
                                  
                                                                                
                                                                                
                                                                                
                                  
     * Add `ReadRangeCache::EvictEntriesInRange(start, length)`. Removes        
                                                                                
                                                                                
                                  
       entries fully contained in the window; leaves coalesced entries
       alone so eviction is safe under range coalescing.                        
                                                                                
                                                                                
                                  
     * Add `ParquetFileReader::EvictPreBufferedData(row_groups, 
column_indices)`                                                                
                                                                                
                                                  
       and call it from `RowGroupGenerator::ReadOneRowGroup` after the row      
                                                                                
                                                                                
                                  
       group has been decoded into Arrow arrays.                                
                                                                                
                                                                                
                                  
     * Promote the existing `LazyImpl` mutex into base `Impl` so                
                                                                                
                                                                                
                                  
       concurrent `Read` and `Evict` across row groups is defined               
                                                                                
                                                                                
                                  
       behaviour on both cache variants.                                        
                                                                                
                                                                                
                                  
                                                                                
                                                                                
                                                                                
                                  
     ### Performance                                                            
                                                                                
                                                                                
                                  
                                                                                
                                                                                
                                                                                
                                  
     Measured on a 458 MB / 10 row group / 10M row parquet file                 
                                                                                
                                                                                
                                  
     (6 columns: 3 float64, 2 int64, 1 large_string; Snappy; macOS arm64;
     Release build). Fix toggled via a one-line A/B test:                       
                                                                                
                                                                                
                                  
                                                                                
                                                                                
                                                                                
                                  
     | Mode                                     | Peak `total_allocated_bytes`  
|                                                                               
                                                                                
                                  
     
|------------------------------------------|-------------------------------|    
                                                                                
                                                                                
                             
     | `Dataset.to_batches`, fix disabled       | 598 MB                        
|                                                                  
     | `Dataset.to_batches`, fix enabled        | **331 MB** (-267 MB, -44.6%)  
|                                                                               
                                                                                
                                  
     | `Dataset.to_batches`, `pre_buffer=False` | 151 MB                        
|                                                                               
                                                                                
                                  
     | `ParquetFile.iter_batches`               | 59 MB                         
|                                                                               
                                                                                
                                  
                                                                                
                                                                                
                                                                                
                                  
     ```mermaid                                                                 
                                                                                
                                                                                
                                  
     xychart-beta                                                               
                                                                   
         title "Peak allocated memory (458 MB / 10 row groups, lower is 
better)"                                                                        
                                                                                
                                          
         x-axis ["without fix", "with fix", "no prebuffer", "iter_batches"]     
                                                                                
                                                                                
                                  
         y-axis "MB" 0 --> 650                                                  
                                                                                
                                                                                
                                  
         bar [598, 331, 151, 59]                                                
                                                                                
                                                                                
                                  
     ```                                                                        
                                                                                
                                                                                
                                  
                                                                                
                                                                                
                                                                                
                                  
     Per-row-group progression during iteration                                 
                                                                                
                                                                                
                                  
     (`max(total_allocated_bytes)` in MB, sampled every 10k of 100k batches):
                                                                                
                                                                                
                                                                                
                                  
     ```                                                                        
                                                                   
     batches (k)        10   20   30   40   50   60   70   80   90  100         
                                                                                
                                                                                
                                  
     ----------------  ---  ---  ---  ---  ---  ---  ---  ---  ---  ---         
                                                                                
                                                                                
                                  
     without fix       159  232  294  323  386  415  477  507  569  598         
                                                                                
                                                                                
                                  
     with fix          129  202  205  234  237  266  270  299  302  331         
                                                                                
                                                                                
                                  
     saved              30   30   89   89  149  149  207  208  267  267         
                                                                                
                                                                                
                                  
     ```                                                                        
                                                                                
                                                                                
                                  
                                                                                
                                                                                
                                                                                
                                  
     ```mermaid                                                                 
                                                                                
                                                                                
                                  
     xychart-beta                                                               
                                                                   
         title "max_allocated over iteration (top line: without fix; bottom: 
with fix)"                                                                      
                                                                                
                                     
         x-axis "batches consumed (thousands)" 10 --> 100                       
                                                                                
                                                                                
                                  
         y-axis "MB" 0 --> 650                                                  
                                                                                
                                                                                
                                  
         line [159, 232, 294, 323, 386, 415, 477, 507, 569, 598]                
                                                                                
                                                                                
                                  
         line [129, 202, 205, 234, 237, 266, 270, 299, 302, 331]                
                                                                                
                                                                                
                                  
     ```                                                                        
                                                                                
                                                                                
                                  
                                                                                
                                                                                
                                                                                
                                  
     Savings scale linearly with row-group count, so on the multi-GB files      
                                                                                
                                                                                
                                  
     from the issue thread this single fix recovers several GB of peak.         
                                                                   
                                                                                
                                                                                
                                                                                
                                  
     ### Related work/commits                                                   
                                                                              
                                                                                
                                                                                
                                                                                
                                  
     Downstream projects have shipped workarounds while this issue has          
                                                                   
     been open, all of them in their own code rather than upstream:
   
     * `ray-project/ray#62745` (merged 2026-04-20): injects                     
                                                                                
                                                                                
                                  
       `ParquetFragmentScanOptions(pre_buffer=False, use_buffered_stream=True)`
       in Ray Data's parquet reader. Gets peak alloc down to ~75 MB but         
                                                                                
                                                                                
                                  
       gives up the `pre_buffer=True` coalesced-read optimization that          
                                                                                
                                                                                
                                  
       makes S3 fast.                    
     * `marin-community/marin#4344` (merged): replaces dataset-API usage        
                                                                                
                                                                                
                                  
       with `ParquetFile.iter_batches`, giving up hive-partition discovery,     
                                                                   
       filter pushdown, and dataset-level schema unification.                   
                                                                                
                                                                                
                                  
                                                                                
                                                                   
     No open PR against `apache/arrow` addresses the cache-side                 
                                                                                
                                                                                
                                  
     accumulation. This PR is the upstream fix that lets both workarounds       
                                                                   
     be reverted without losing features or throughput.                         
                                                                                
                                                                                
                                  
      
     ### Scope of this fix                                                      
                                                                                
                                                                                
                                  
                                                                                
                                                                   
     This PR fixes the `ReadRangeCache` accumulation that dominates peak        
                                                                                
                                                                                
                                  
     memory on the default `pre_buffer=true` path.
                                                                                
                                                                                
                                                                                
                                  
     A second source of growth, visible as the 151 MB vs 59 MB gap in the       
                                                                                
                                                                                
                                  
     `pre_buffer=false` row of the table above, lives in the dataset async      
                                                                                
                                                                                
                                  
     generator pipeline and is unrelated to the cache. It should be             
                                                                                
                                                                                
                                  
     tracked as a follow-up issue.                                              
                                                                                
                                                                                
                                  
                                                                                
                                                                                
                                                                                
                                  
     Partially closes #39808.                                                   
                                                                                
                                                                                
                                  
                                                                                
                                                                                
                                                                                
                                  
     ### Test plan                                                              
                                                                                
                                                                                
                                  
                                         
     New tests in `arrow/io/memory_test.cc`:                                    
                                                                                
                                                                                
                                  
                                                                                
                                                                   
     * `RangeReadCache.EvictEntriesInRange` - basic eviction semantics
       across lazy and eager caches. Covers no-op windows, partial
       overlaps, wide windows that drop multiple entries, and evict on an       
                                                                                
                                                                                
                                  
       empty cache.                                                             
                                                                                
                                                                                
                                  
     * `RangeReadCache.EvictEntriesInRangeSpanningEntry` - forces coalescing    
                                                                                
                                                                                
                                  
       via `hole_size_limit=100` and verifies a coalesced entry is refused      
                                                                                
                                                                                
                                  
       for a partial-window evict and dropped for a wide window that fully      
                                                                                
                                                                                
                                  
       contains it.                      
     * `RangeReadCache.ConcurrentReadAndEvict` - 4 reader threads in a tight    
                                                                                
                                                                                
                                  
       `Read()` loop against the upper half of the cache, 1 evictor thread      
                                                                                
                                                                                
                                  
       running 50 cycles of `EvictEntriesInRange` + re-`Cache` against the
       lower half. Runs for both `lazy=true` and `lazy=false`. Under the        
                                                                                
                                                                                
                                  
       pre-refactor code the `lazy=false` case would race the `entries`         
                                                                                
                                                                                
                                  
       vector; both cases now pass cleanly.                                     
                                                                                
                                                                                
                                  
                                                                                
                                                                                
                                                                                
                                  
     New tests in `parquet/arrow/arrow_reader_writer_test.cc`:                  
                                                                   
                                                                                
                                                                                
                                                                                
                                  
     * `TestArrowReadWrite.EvictPreBufferedData` - PreBuffers a 4-row-group     
                                                                                
                                                                                
                                  
       file, calls `EvictPreBufferedData({0}, ...)`, confirms row group 0's
       cache entries are gone while row groups 1-3 remain readable, and         
                                                                                
                                                                                
                                  
       that evicting twice or evicting on a reader that never PreBuffered       
                                                                                
                                                                                
                                  
       are both safe no-ops.             
     * `TestArrowReadWrite.GetRecordBatchGeneratorReleasesPreBufferedRowGroups` 
                                                                                
                                                                                
                                  
       - drives the full async generator pipeline end to end with               
                                                                                
                                                                                
                                  
       `pre_buffer=true` and confirms correctness of every emitted batch.
                                                                                
                                                                                
                                                                                
                                  
     Full-suite regression on Release build, macOS arm64:                       
                                                                   
                                                                                
                                                                                
                                                                                
                                  
     * `parquet-arrow-reader-writer-test`: **824/826 passing, 0 failing**       
                                                                                
                                                                                
                                  
       (the 2 skips are pre-existing dictionary-write variants not built
       in this configuration).                                                  
                                                                                
                                                                                
                                  
     * `arrow-io-memory-test`: **57/57 passing**.                               
                                                                   
                                                                                
                                                                                
                                                                                
                                  
     ### Are there any user-facing changes?
                                                                                
                                                                                
                                                                                
                                  
     One new public method: `parquet::ParquetFileReader::EvictPreBufferedData`. 
                                                                                
                                                                                
                                  
     No behaviour change for existing callers beyond strictly lower peak
     memory on the default `pre_buffer=true` path. No API deprecations,         
                                                                                
                                                                                
                                  
     no format changes.                                                         
                                                                                
                                                                                
                                  
                                                                                
                                                                                
                                                                                
                                  
     **This PR contains a "Critical Fix"**: No (memory usage improvement,       
                                                                                
                                                                                
                                  
     not correctness).  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to