andygrove commented on PR #2639:
URL: 
https://github.com/apache/datafusion-comet/pull/2639#issuecomment-3446954076

   > > Ok so the previous time I wanted to make this PR I encountered something 
interesting: Imagine something like the following code on DataFusion's SortExec
   > > ```
   > > futures::stream::once(async move {
   > >                         while let Some(batch) = input.next().await {
   > >                             let batch = batch?;
   > >                             sorter.insert_batch(batch).await?;
   > >                         }
   > >                         sorter.sort().await
   > >                     })
   > >                     .try_flatten(),
   > > ```
   > > 
   > > 
   > >     
   > >       
   > >     
   > > 
   > >       
   > >     
   > > 
   > >     
   > >   
   > > This will essentially consume the entire input iterator, all the way to 
the operator producing the ColumnarBatch on the Scala side. The underlying 
arrow arrays can be produced off-heap, it doesn't matter, because the wrapped 
ArrowArray and ArrowSchema objects are created using heap memory, so any 
ArrayRef created from the ArrayData of that passed arrow array, will use a 
small amount of heap memory. The issue is that a code like the one above, that 
will consume the entire input iterator, will cause such high GC pressure, that 
performance can decrease by up to 10x compared to Spark. It will not always 
show up on local performance runs(I saw the horrible performance when running 
on an EC2 Cluster with a huge amount of data) The only solution I've found was 
to do a deep copy of the ArrayData itself. I know this seems paradoxical but 
it's a real-life issue. The best way I know to handle this is to just do a full 
copy of the ArrayData before make_array in the scan for _all_ arrays. 
 The unpacking can happen before that I guess, but hopefully DataFusion will 
have enough support for dictionaries this could be ignored completely.
   > > When we use off-heap memory, naturally we want to reduce the executor 
memory, which exacerbates the issue.
   > 
   > Thanks, this is really helpful. I will experiment with this.
   
   Claude's analysis of the PR and this comment:
   
     The Core Problem
   
     Memory Model Mismatch
   
     When Arrow arrays are passed from Spark (JVM) to DataFusion (Rust):
     - Data buffers: Can be allocated off-heap (native memory)
     - Wrapper objects (ArrowArray, ArrowSchema): Always allocated on Java heap
     - Even though the actual data is off-heap, each ArrayRef needs these small 
heap-allocated wrappers
   
     GC Pressure from Buffering Operators
   
     The issue arises with operators like SortExec that consume their entire 
input:
   
   ```rust
     futures::stream::once(async move {
         while let Some(batch) = input.next().await {
             sorter.insert_batch(batch).await?;  // Accumulates all batches
         }
         sorter.sort().await  // Only then produces output
     })
   ```
   
     This pattern:
     1. Consumes the entire input iterator before producing output
     2. Creates many wrapper objects (one set per batch)
     3. Keeps all wrappers alive until sorting completes
     4. Causes severe GC pressure as thousands of small objects accumulate on 
the heap
   
     Why It's Worse with Off-Heap Memory
   
     When using off-heap memory for performance, users typically:
     - Reduce executor heap size (since data is off-heap)
     - This makes the heap smaller → GC pressure from wrapper objects becomes 
catastrophic
     - Can cause 10x performance degradation on clusters with large data
   
     Why Deep Copy Solves It (The Paradox)
   
     The comment suggests doing a deep copy of ArrayData before make_array in 
the scan. This seems counterintuitive but works because:
   
     1. Immediate Materialization: Deep copy fully materializes data into new 
arrays
     2. Immediate GC: Original wrapper objects can be garbage collected right 
away
     3. Clean Boundaries: Each batch owns its data completely
     4. Less GC Thrashing: Even though copying costs CPU, it's cheaper than 
continuous GC pauses
   
     Without copy: Many small wrapper objects → constant GC pressureWith copy: 
Upfront copy cost → clean memory lifecycle → smooth execution
   
     Current PR Context
   
     Looking at your PR, you've:
     - Removed the separate CopyExec operator
     - Moved copy_array and copy_or_unpack_array functions into scan.rs
     - The copy happens at scan time (lines 270-278 in scan.rs):
   
   ```rust
     let array = if arrow_ffi_safe {
         copy_or_unpack_array(&array, &CopyMode::UnpackOrClone)?
     } else {
         copy_array(&array)  // Deep copy
     };
   ```
   
     The commenter is saying: Make sure this deep copy happens for all arrays 
to prevent the GC pressure issue, especially for operators that buffer entire 
inputs.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to