joroKr21 commented on code in PR #16359: URL: https://github.com/apache/datafusion/pull/16359#discussion_r2168281061
########## datafusion/execution/src/memory_pool/pool.rs: ########## @@ -112,6 +113,144 @@ impl MemoryPool for GreedyMemoryPool { } } +// A [`MemoryPool`] that implements a greedy first-come first-serve limit. +/// and tracks the memory usage based on the references to the arrays. +/// +/// This pool works well for queries that do not need to spill or have +/// a single spillable operator. See [`FairSpillPool`] if there are +/// multiple spillable operators that all will spill. +#[derive(Debug)] +pub struct GreedyMemoryPoolWithTracking { + pool_size: usize, + used: AtomicUsize, + references: Mutex<HashMap<usize, usize>>, +} + +impl GreedyMemoryPoolWithTracking { + /// Create a new pool that can allocate up to `pool_size` bytes + pub fn new(pool_size: usize) -> Self { + debug!("Created new GreedyMemoryPool(pool_size={pool_size})"); + Self { + pool_size, + used: AtomicUsize::new(0), + references: Mutex::new(HashMap::new()), + } + } +} + +impl MemoryPool for GreedyMemoryPoolWithTracking { + fn grow(&self, _reservation: &MemoryReservation, additional: usize) { + self.used.fetch_add(additional, Ordering::Relaxed); + } + + fn grow_with_arrays( + &self, + reservation: &MemoryReservation, + arrays: &[Arc<dyn arrow::array::Array>], + ) { + for array in arrays { + let array_data = array.to_data(); + for buffer in array_data.buffers() { + let addr = buffer.data_ptr().as_ptr() as usize; + let ref_count = *self + .references + .lock() + .entry(addr) + .and_modify(|ref_count| *ref_count += array.get_array_memory_size()) + .or_insert(1); + + // If this is the first time we see this buffer, we need to grow the pool + if ref_count == 1 { + let additional = buffer.capacity(); + self.grow(reservation, additional); + } + } + } + } + + fn shrink(&self, _reservation: &MemoryReservation, shrink: usize) { + self.used.fetch_sub(shrink, Ordering::Relaxed); + } + + fn shrink_with_arrays( + &self, + reservation: &MemoryReservation, + arrays: &[Arc<dyn arrow::array::Array>], + ) { + for array in arrays { + let array_data = array.to_data(); + for buffer in array_data.buffers() { + // We need to track the memory usage of the buffers + let addr = buffer.data_ptr().as_ptr() as usize; + let ref_count = *self + .references + .lock() + .entry(addr) + .and_modify(|ref_count| *ref_count -= buffer.len()) + .or_insert(1); Review Comment: I think we should remove the entry here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org