LiaCastaneda opened a new pull request, #19501:
URL: https://github.com/apache/datafusion/pull/19501

   ## Which issue does this PR close?
   
   Closes # 
   Related to [the 
https://github.com/apache/datafusion/issues/16841#issuecomment-3563643947](https://github.com/apache/datafusion/issues/16841#issue-3248316384)
   
   ## Rationale for this change
   
   Aggregation accumulators that store Arrow Arrays can have memory over 
accounting when array buffers are shared between multiple ScalarValues or 
directly Arrow Arrays. This occurs because doing ScalarValue::try_from_array() 
create slices that reference the same underlying buffers and each ScalarValue 
reports the full buffer size when calculating memory usage and the same 
physical buffer gets counted multiple times, leading to over accounting 
([this](https://github.com/apache/datafusion/issues/16841#issuecomment-3107483475)
 comment explains very well why we are seeing this)
   
   There have been several attempts to fix this before which included 
compacting data to not keep the whole array alive, or using 
`get_slice_memory_size` instead of `get_array_memory_size`. However, we 
observed that both had downsides: 
   - `compact()` was CPU inefficient (copies data which can be expensive) 
   - `get_slice_memory_size()` only accounts for logical memory, but it is not 
the actual physical buffer capacity, therefore the amount returned is not 
accurate.
   
   
   ## What changes are included in this PR?
   
   This approach avoids double-counting memory by using Arrow's 
TrackingMemoryPool, which automatically deduplicates shared buffers when 
accounting them. This means we don't need to compact() or call 
`get_slice_memory_size()` just to solve the accounting problem. Note that  
compact() might still be useful when we want to release memory pressure.
   
   
   - Updated Accumulator::size() and GroupsAccumulator::size() signatures to 
accept Option<&dyn MemoryPool>:
     - When pool is `None` Returns total memory size including Arrow buffers 
using either get_slice_memory_size or just ScalarValue-> size () (same as 
before, so its backward compatible)
     - When pool is Some iteturns structural size only and claims buffers with 
the pool for deduplication tracking. Callers using the pool must add 
pool.used() to get total memory.
   
   Updated accumulators that use the pool parameter:
   - DistinctCountAccumulator
   - ArrayAggAccumulator
   - For OrderSensitiveArrayAggAccumulator and DistinctArrayAggAccumulator I 
removed the compacting since it was introduced specifically to solve the over 
accounting, and its not needed anymore.
   - FirstValueAccumulator / LastValueAccumulator
   All other accumulator implementations had to be updated to match new 
signature
   
   ## Are these changes tested?
   
   Added `distinct_count_does_not_over_account_memory() `test to test memory 
pool deduplication for COUNT(DISTINCT) with array types. Also updated the 
existing accumulator tests to use memory pool, it verifies the accounted memory 
is still less than when not using the memory pool (in some cases even less than 
when we compacted).
   
   ## Are there any user-facing changes?
   
   yes, the API size for Accumulators and GroupAccumulators changed from` fn 
size(&self) -> usize;` to `fn size(&self, pool: Option<&dyn MemoryPool>) -> 
usize;
   ` 
   
   Not sure if this is the best API design... I'm open to suggestions. In any 
case, if None is passed the behavior will remain the same as before. Also IIUC 
this function is mainly used to keep the DF memory pool within its bounds 
during aggregations.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to