This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 48766c9712 Add MemoryReservation::{split_off, take, new_empty} (#7184)
48766c9712 is described below

commit 48766c97120c7693bd65b1274a8027b8c5fbf2a5
Author: Andrew Lamb <[email protected]>
AuthorDate: Thu Aug 3 12:56:48 2023 -0500

    Add MemoryReservation::{split_off, take, new_empty} (#7184)
    
    * Add MemoryReservation::{split_off, take, new_empty}
    
    * typo
---
 datafusion/execution/src/memory_pool/mod.rs  | 137 ++++++++++++++++++++++++---
 datafusion/execution/src/memory_pool/pool.rs |  35 ++++---
 2 files changed, 144 insertions(+), 28 deletions(-)

diff --git a/datafusion/execution/src/memory_pool/mod.rs 
b/datafusion/execution/src/memory_pool/mod.rs
index 011cd72cbb..f8fc9fcdbb 100644
--- a/datafusion/execution/src/memory_pool/mod.rs
+++ b/datafusion/execution/src/memory_pool/mod.rs
@@ -77,7 +77,9 @@ pub trait MemoryPool: Send + Sync + std::fmt::Debug {
     fn reserved(&self) -> usize;
 }
 
-/// A memory consumer that can be tracked by [`MemoryReservation`] in a 
[`MemoryPool`]
+/// A memory consumer that can be tracked by [`MemoryReservation`] in
+/// a [`MemoryPool`]. All allocations are registered to a particular
+/// `MemoryConsumer`;
 #[derive(Debug)]
 pub struct MemoryConsumer {
     name: String,
@@ -113,20 +115,40 @@ impl MemoryConsumer {
     pub fn register(self, pool: &Arc<dyn MemoryPool>) -> MemoryReservation {
         pool.register(&self);
         MemoryReservation {
-            consumer: self,
+            registration: Arc::new(SharedRegistration {
+                pool: Arc::clone(pool),
+                consumer: self,
+            }),
             size: 0,
-            policy: Arc::clone(pool),
         }
     }
 }
 
-/// A [`MemoryReservation`] tracks a reservation of memory in a [`MemoryPool`]
-/// that is freed back to the pool on drop
+/// A registration of a [`MemoryConsumer`] with a [`MemoryPool`].
+///
+/// Calls [`MemoryPool::unregister`] on drop to return any memory to
+/// the underlying pool.
 #[derive(Debug)]
-pub struct MemoryReservation {
+struct SharedRegistration {
+    pool: Arc<dyn MemoryPool>,
     consumer: MemoryConsumer,
+}
+
+impl Drop for SharedRegistration {
+    fn drop(&mut self) {
+        self.pool.unregister(&self.consumer);
+    }
+}
+
+/// A [`MemoryReservation`] tracks an individual reservation of a
+/// number of bytes of memory in a [`MemoryPool`] that is freed back
+/// to the pool on drop.
+///
+/// The reservation can be grown or shrunk over time.
+#[derive(Debug)]
+pub struct MemoryReservation {
+    registration: Arc<SharedRegistration>,
     size: usize,
-    policy: Arc<dyn MemoryPool>,
 }
 
 impl MemoryReservation {
@@ -135,7 +157,8 @@ impl MemoryReservation {
         self.size
     }
 
-    /// Frees all bytes from this reservation returning the number of bytes 
freed
+    /// Frees all bytes from this reservation back to the underlying
+    /// pool, returning the number of bytes freed.
     pub fn free(&mut self) -> usize {
         let size = self.size;
         if size != 0 {
@@ -151,7 +174,7 @@ impl MemoryReservation {
     /// Panics if `capacity` exceeds [`Self::size`]
     pub fn shrink(&mut self, capacity: usize) {
         let new_size = self.size.checked_sub(capacity).unwrap();
-        self.policy.shrink(self, capacity);
+        self.registration.pool.shrink(self, capacity);
         self.size = new_size
     }
 
@@ -176,22 +199,55 @@ impl MemoryReservation {
 
     /// Increase the size of this reservation by `capacity` bytes
     pub fn grow(&mut self, capacity: usize) {
-        self.policy.grow(self, capacity);
+        self.registration.pool.grow(self, capacity);
         self.size += capacity;
     }
 
-    /// Try to increase the size of this reservation by `capacity` bytes
+    /// Try to increase the size of this reservation by `capacity`
+    /// bytes, returning error if there is insufficient capacity left
+    /// in the pool.
     pub fn try_grow(&mut self, capacity: usize) -> Result<()> {
-        self.policy.try_grow(self, capacity)?;
+        self.registration.pool.try_grow(self, capacity)?;
         self.size += capacity;
         Ok(())
     }
+
+    /// Splits off `capacity` bytes from this [`MemoryReservation`]
+    /// into a new [`MemoryReservation`] with the same
+    /// [`MemoryConsumer`].
+    ///
+    /// This can be useful to free part of this reservation with RAAI
+    /// style dropping
+    ///
+    /// # Panics
+    ///
+    /// Panics if `capacity` exceeds [`Self::size`]
+    pub fn split(&mut self, capacity: usize) -> MemoryReservation {
+        self.size = self.size.checked_sub(capacity).unwrap();
+        Self {
+            size: capacity,
+            registration: self.registration.clone(),
+        }
+    }
+
+    /// Returns a new empty [`MemoryReservation`] with the same 
[`MemoryConsumer`]
+    pub fn new_empty(&self) -> Self {
+        Self {
+            size: 0,
+            registration: self.registration.clone(),
+        }
+    }
+
+    /// Splits off all the bytes from this [`MemoryReservation`] into
+    /// a new [`MemoryReservation`] with the same [`MemoryConsumer`]
+    pub fn take(&mut self) -> MemoryReservation {
+        self.split(self.size)
+    }
 }
 
 impl Drop for MemoryReservation {
     fn drop(&mut self) {
         self.free();
-        self.policy.unregister(&self.consumer);
     }
 }
 
@@ -251,4 +307,59 @@ mod tests {
         a2.try_grow(25).unwrap();
         assert_eq!(pool.reserved(), 25);
     }
+
+    #[test]
+    fn test_split() {
+        let pool = Arc::new(GreedyMemoryPool::new(50)) as _;
+        let mut r1 = MemoryConsumer::new("r1").register(&pool);
+
+        r1.try_grow(20).unwrap();
+        assert_eq!(r1.size(), 20);
+        assert_eq!(pool.reserved(), 20);
+
+        // take 5 from r1, should still have same reservation split
+        let r2 = r1.split(5);
+        assert_eq!(r1.size(), 15);
+        assert_eq!(r2.size(), 5);
+        assert_eq!(pool.reserved(), 20);
+
+        // dropping r1 frees 15 but retains 5 as they have the same consumer
+        drop(r1);
+        assert_eq!(r2.size(), 5);
+        assert_eq!(pool.reserved(), 5);
+    }
+
+    #[test]
+    fn test_new_empty() {
+        let pool = Arc::new(GreedyMemoryPool::new(50)) as _;
+        let mut r1 = MemoryConsumer::new("r1").register(&pool);
+
+        r1.try_grow(20).unwrap();
+        let mut r2 = r1.new_empty();
+        r2.try_grow(5).unwrap();
+
+        assert_eq!(r1.size(), 20);
+        assert_eq!(r2.size(), 5);
+        assert_eq!(pool.reserved(), 25);
+    }
+
+    #[test]
+    fn test_take() {
+        let pool = Arc::new(GreedyMemoryPool::new(50)) as _;
+        let mut r1 = MemoryConsumer::new("r1").register(&pool);
+
+        r1.try_grow(20).unwrap();
+        let mut r2 = r1.take();
+        r2.try_grow(5).unwrap();
+
+        assert_eq!(r1.size(), 0);
+        assert_eq!(r2.size(), 25);
+        assert_eq!(pool.reserved(), 25);
+
+        // r1 can still grow again
+        r1.try_grow(3).unwrap();
+        assert_eq!(r1.size(), 3);
+        assert_eq!(r2.size(), 25);
+        assert_eq!(pool.reserved(), 28);
+    }
 }
diff --git a/datafusion/execution/src/memory_pool/pool.rs 
b/datafusion/execution/src/memory_pool/pool.rs
index 7b68a86244..1242ce025c 100644
--- a/datafusion/execution/src/memory_pool/pool.rs
+++ b/datafusion/execution/src/memory_pool/pool.rs
@@ -84,7 +84,11 @@ impl MemoryPool for GreedyMemoryPool {
                 (new_used <= self.pool_size).then_some(new_used)
             })
             .map_err(|used| {
-                insufficient_capacity_err(reservation, additional, 
self.pool_size - used)
+                insufficient_capacity_err(
+                    reservation,
+                    additional,
+                    self.pool_size.saturating_sub(used),
+                )
             })?;
         Ok(())
     }
@@ -159,13 +163,14 @@ impl MemoryPool for FairSpillPool {
 
     fn unregister(&self, consumer: &MemoryConsumer) {
         if consumer.can_spill {
-            self.state.lock().num_spill -= 1;
+            let mut state = self.state.lock();
+            state.num_spill = state.num_spill.checked_sub(1).unwrap();
         }
     }
 
     fn grow(&self, reservation: &MemoryReservation, additional: usize) {
         let mut state = self.state.lock();
-        match reservation.consumer.can_spill {
+        match reservation.registration.consumer.can_spill {
             true => state.spillable += additional,
             false => state.unspillable += additional,
         }
@@ -173,7 +178,7 @@ impl MemoryPool for FairSpillPool {
 
     fn shrink(&self, reservation: &MemoryReservation, shrink: usize) {
         let mut state = self.state.lock();
-        match reservation.consumer.can_spill {
+        match reservation.registration.consumer.can_spill {
             true => state.spillable -= shrink,
             false => state.unspillable -= shrink,
         }
@@ -182,7 +187,7 @@ impl MemoryPool for FairSpillPool {
     fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> 
Result<()> {
         let mut state = self.state.lock();
 
-        match reservation.consumer.can_spill {
+        match reservation.registration.consumer.can_spill {
             true => {
                 // The total amount of memory available to spilling consumers
                 let spill_available = 
self.pool_size.saturating_sub(state.unspillable);
@@ -230,7 +235,7 @@ fn insufficient_capacity_err(
     additional: usize,
     available: usize,
 ) -> DataFusionError {
-    DataFusionError::ResourcesExhausted(format!("Failed to allocate additional 
{} bytes for {} with {} bytes already allocated - maximum available is {}", 
additional, reservation.consumer.name, reservation.size, available))
+    DataFusionError::ResourcesExhausted(format!("Failed to allocate additional 
{} bytes for {} with {} bytes already allocated - maximum available is {}", 
additional, reservation.registration.consumer.name, reservation.size, 
available))
 }
 
 #[cfg(test)]
@@ -247,7 +252,7 @@ mod tests {
         r1.grow(2000);
         assert_eq!(pool.reserved(), 2000);
 
-        let mut r2 = MemoryConsumer::new("s1")
+        let mut r2 = MemoryConsumer::new("r2")
             .with_can_spill(true)
             .register(&pool);
         // Can grow beyond capacity of pool
@@ -256,10 +261,10 @@ mod tests {
         assert_eq!(pool.reserved(), 4000);
 
         let err = r2.try_grow(1).unwrap_err().to_string();
-        assert_eq!(err, "Resources exhausted: Failed to allocate additional 1 
bytes for s1 with 2000 bytes already allocated - maximum available is 0");
+        assert_eq!(err, "Resources exhausted: Failed to allocate additional 1 
bytes for r2 with 2000 bytes already allocated - maximum available is 0");
 
         let err = r2.try_grow(1).unwrap_err().to_string();
-        assert_eq!(err, "Resources exhausted: Failed to allocate additional 1 
bytes for s1 with 2000 bytes already allocated - maximum available is 0");
+        assert_eq!(err, "Resources exhausted: Failed to allocate additional 1 
bytes for r2 with 2000 bytes already allocated - maximum available is 0");
 
         r1.shrink(1990);
         r2.shrink(2000);
@@ -269,7 +274,7 @@ mod tests {
         r1.try_grow(10).unwrap();
         assert_eq!(pool.reserved(), 20);
 
-        // Can grow a2 to 80 as only spilling consumer
+        // Can grow r2 to 80 as only spilling consumer
         r2.try_grow(80).unwrap();
         assert_eq!(pool.reserved(), 100);
 
@@ -279,19 +284,19 @@ mod tests {
         assert_eq!(r2.size(), 10);
         assert_eq!(pool.reserved(), 30);
 
-        let mut r3 = MemoryConsumer::new("s2")
+        let mut r3 = MemoryConsumer::new("r3")
             .with_can_spill(true)
             .register(&pool);
 
         let err = r3.try_grow(70).unwrap_err().to_string();
-        assert_eq!(err, "Resources exhausted: Failed to allocate additional 70 
bytes for s2 with 0 bytes already allocated - maximum available is 40");
+        assert_eq!(err, "Resources exhausted: Failed to allocate additional 70 
bytes for r3 with 0 bytes already allocated - maximum available is 40");
 
-        //Shrinking a2 to zero doesn't allow a3 to allocate more than 45
+        //Shrinking r2 to zero doesn't allow a3 to allocate more than 45
         r2.free();
         let err = r3.try_grow(70).unwrap_err().to_string();
-        assert_eq!(err, "Resources exhausted: Failed to allocate additional 70 
bytes for s2 with 0 bytes already allocated - maximum available is 40");
+        assert_eq!(err, "Resources exhausted: Failed to allocate additional 70 
bytes for r3 with 0 bytes already allocated - maximum available is 40");
 
-        // But dropping a2 does
+        // But dropping r2 does
         drop(r2);
         assert_eq!(pool.reserved(), 20);
         r3.try_grow(80).unwrap();

Reply via email to