SubhamSinghal opened a new pull request, #21962:
URL: https://github.com/apache/datafusion/pull/21962

   ## Which issue does this PR close?
   Related to the TODO at `materializing_stream.rs:283` (from 
[#17429](https://github.com/apache/datafusion/pull/17429#discussion_r2324056495)):
 spilled `BufferedBatch` join key arrays are not tracked in memory reservation. 
                                   
      
   ## Rationale for this change                                                 
                                                                                
                         
    When a `BufferedBatch` is spilled to disk in Sort Merge Join, only the 
`RecordBatch` data is written to the IPC file. The `join_arrays` (evaluated 
join key columns) remain in memory  because the merge-scan comparator needs 
them to detect key group boundaries.
    Before this fix, these in-memory `join_arrays` were **invisible to the 
memory pool**:                                                                  
                               
      
     allocate_reservation():                                                    
                                                                                
                           
       try_grow(size_estimation) → FAILS (pool full)           
       spill batch to disk                                                      
                                                                                
                           
       → join_arrays still in memory, but reservation was never grown           
                                                                                
                           
       → pool thinks 0 bytes are used for this batch                            
                                                                                
                           
                                                                                
                                                                                
                           
     free_reservation():                                                        
                                                                                
                           
       if InMemory → shrink(size_estimation)                                    
                                                                                
                           
       if Spilled  → no-op  ← correct (nothing was grown), but join_arrays are 
invisible                                                                       
                            
                                                                                
                                                                                
                           
    With many spilled batches for a skewed key (e.g., millions of rows sharing 
the same join key), the untracked `join_arrays` memory accumulates. The memory 
pool cannot account for this when making spill decisions for concurrent 
operators.                                                                      
                                                          
                                                               
     ## What changes are included in this PR?                                   
                                                                                
                           
                                                               
     **Memory accounting fix** (`materializing_stream.rs`):                     
                                                                                
                           
      
     - Add `reserved_amount` field to `BufferedBatch` — tracks how much memory 
was **actually reserved** in the pool for this batch                            
                            
     - Add `join_arrays_mem()` helper — computes total memory of join key arrays
     - `allocate_reservation()`: after spilling, calls 
`try_grow(join_arrays_mem)` to track the remaining in-memory data. If the pool 
is too tight for even that, `reserved_amount` stays 0
      (best-effort, safe)                                                       
                                                                                
                           
     - `free_reservation()`: shrinks by `reserved_amount` instead of checking 
`InMemory` variant. Invariant: only shrink by what was actually grown — no 
underflow risk                    
                                                                                
                                                                                
                           
     | Scenario | `try_grow` | `reserved_amount` | `try_shrink` | Safe? |       
                                                                                
                           
     |----------|-----------|-------------------|-------------|-------|         
                                                                                
                           
     | InMemory | Ok(size_estimation) | size_estimation | size_estimation | Yes 
|                                                                               
                           
     | Spilled, tracked | Ok(join_arrays_mem) | join_arrays_mem | 
join_arrays_mem | Yes |                                                         
                                         
     | Spilled, pool tight | Err | 0 | 0 (no-op) | Yes |                        
                                                                                
                           
                                                                                
                                                                                
                           
     **Tests** (`tests.rs`):                                                    
                                                                                
                           
                                                                                
                                                                                
                           
     - `spill_many_batches_same_key` — 10+5 batches all sharing key=1, verifies 
correctness under heavy spilling
     - `spill_string_join_keys` — Utf8 join keys to exercise larger 
`join_arrays` footprint
     - `spill_mixed_keys_some_match` — multiple distinct keys with partial 
matching, tests Full outer join NULL rows from spilled batches                  
                                
     - `spill_join_arrays_memory_accounting` — verifies memory pool is fully 
released after join completes (`memory_pool.reserved() == 0`) and 
`peak_mem_used > 0`                         
                                                                                
                                                                                
                           
     ## Are these changes tested?                                               
                                                                                
                           
                                                                                
                                                                                
                           
     Yes. Four new tests added covering heavy spilling with same-key batches, 
string join keys, mixed keys with partial matching, and memory pool accounting 
verification.                 
                                                               
     ## Are there any user-facing changes?                                      
                                                                                
                           
                                                               
     No. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to