LiaCastaneda opened a new pull request, #19501: URL: https://github.com/apache/datafusion/pull/19501
## Which issue does this PR close? Closes # Related to [the https://github.com/apache/datafusion/issues/16841#issuecomment-3563643947](https://github.com/apache/datafusion/issues/16841#issue-3248316384) ## Rationale for this change Aggregation accumulators that store Arrow Arrays can have memory over accounting when array buffers are shared between multiple ScalarValues or directly Arrow Arrays. This occurs because doing ScalarValue::try_from_array() create slices that reference the same underlying buffers and each ScalarValue reports the full buffer size when calculating memory usage and the same physical buffer gets counted multiple times, leading to over accounting ([this](https://github.com/apache/datafusion/issues/16841#issuecomment-3107483475) comment explains very well why we are seeing this) There have been several attempts to fix this before which included compacting data to not keep the whole array alive, or using `get_slice_memory_size` instead of `get_array_memory_size`. However, we observed that both had downsides: - `compact()` was CPU inefficient (copies data which can be expensive) - `get_slice_memory_size()` only accounts for logical memory, but it is not the actual physical buffer capacity, therefore the amount returned is not accurate. ## What changes are included in this PR? This approach avoids double-counting memory by using Arrow's TrackingMemoryPool, which automatically deduplicates shared buffers when accounting them. This means we don't need to compact() or call `get_slice_memory_size()` just to solve the accounting problem. Note that compact() might still be useful when we want to release memory pressure. - Updated Accumulator::size() and GroupsAccumulator::size() signatures to accept Option<&dyn MemoryPool>: - When pool is `None` Returns total memory size including Arrow buffers using either get_slice_memory_size or just ScalarValue-> size () (same as before, so its backward compatible) - When pool is Some iteturns structural size only and claims buffers with the pool for deduplication tracking. Callers using the pool must add pool.used() to get total memory. Updated accumulators that use the pool parameter: - DistinctCountAccumulator - ArrayAggAccumulator - For OrderSensitiveArrayAggAccumulator and DistinctArrayAggAccumulator I removed the compacting since it was introduced specifically to solve the over accounting, and its not needed anymore. - FirstValueAccumulator / LastValueAccumulator All other accumulator implementations had to be updated to match new signature ## Are these changes tested? Added `distinct_count_does_not_over_account_memory() `test to test memory pool deduplication for COUNT(DISTINCT) with array types. Also updated the existing accumulator tests to use memory pool, it verifies the accounted memory is still less than when not using the memory pool (in some cases even less than when we compacted). ## Are there any user-facing changes? yes, the API size for Accumulators and GroupAccumulators changed from` fn size(&self) -> usize;` to `fn size(&self, pool: Option<&dyn MemoryPool>) -> usize; ` Not sure if this is the best API design... I'm open to suggestions. In any case, if None is passed the behavior will remain the same as before. Also IIUC this function is mainly used to keep the DF memory pool within its bounds during aggregations. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
