alamb commented on code in PR #9015:
URL: https://github.com/apache/arrow-datafusion/pull/9015#discussion_r1473517339
##########
datafusion/execution/src/memory_pool/pool.rs:
##########
@@ -63,38 +72,89 @@ impl GreedyMemoryPool {
debug!("Created new GreedyMemoryPool(pool_size={pool_size})");
Self {
pool_size,
- used: AtomicUsize::new(0),
+ state: Mutex::new(GreedyMemoryPoolState {
+ pool_members: HashMap::new(),
+ }),
}
}
}
impl MemoryPool for GreedyMemoryPool {
- fn grow(&self, _reservation: &MemoryReservation, additional: usize) {
- self.used.fetch_add(additional, Ordering::Relaxed);
+ fn grow(&self, reservation: &MemoryReservation, additional: usize) {
+ let mut state = self.state.lock();
+ let used = state
+ .pool_members
+ .entry(reservation.consumer().name().into())
+ .or_insert(0);
+ *used = used.saturating_add(additional);
}
- fn shrink(&self, _reservation: &MemoryReservation, shrink: usize) {
- self.used.fetch_sub(shrink, Ordering::Relaxed);
+ fn shrink(&self, reservation: &MemoryReservation, shrink: usize) {
+ let mut state = self.state.lock();
+ let used = state
+ .pool_members
+ .entry(reservation.consumer().name().into())
+ .or_insert(0);
+ *used = used.saturating_sub(shrink);
}
fn try_grow(&self, reservation: &MemoryReservation, additional: usize) ->
Result<()> {
- self.used
- .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |used| {
- let new_used = used + additional;
- (new_used <= self.pool_size).then_some(new_used)
- })
- .map_err(|used| {
- insufficient_capacity_err(
- reservation,
- additional,
- self.pool_size.saturating_sub(used),
- )
- })?;
+ let mut state = self.state.lock();
+ let used: usize = state.pool_members.values().sum();
+ if used.saturating_add(additional) > self.pool_size {
+ // dropping the mutex so that the display trait method does not
deadlock
+ drop(state);
+
+ debug!("Pool Exhausted while trying to allocate {additional} bytes
for {}:\n{self}", reservation.registration.consumer.name());
+ return Err(insufficient_capacity_err(
+ reservation,
+ additional,
+ self.pool_size.saturating_sub(used),
+ ));
+ }
+ let entry = state
+ .pool_members
+ .entry(reservation.consumer().name().into())
+ .or_insert(0);
+ *entry = entry.saturating_add(additional);
Ok(())
}
fn reserved(&self) -> usize {
- self.used.load(Ordering::Relaxed)
+ let state = self.state.lock();
+ state.pool_members.values().sum()
+ }
+
+ fn unregister(&self, consumer: &MemoryConsumer) {
+ let mut state = self.state.lock();
+ state.pool_members.remove(consumer.name());
+ }
+}
+
+impl Display for GreedyMemoryPool {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ let state = self.state.lock();
+ let used: usize = state.pool_members.values().sum();
+ let free = self.pool_size.saturating_sub(used);
+
+ let mut allocations = state.pool_members.iter().collect::<Vec<_>>();
+ allocations.sort_by(|(_, a), (_, b)| a.cmp(b).reverse());
Review Comment:
💯
##########
datafusion/execution/src/memory_pool/pool.rs:
##########
@@ -54,7 +58,12 @@ impl MemoryPool for UnboundedMemoryPool {
#[derive(Debug)]
pub struct GreedyMemoryPool {
pool_size: usize,
Review Comment:
is pool_size used anymore? It looks like it is not
##########
datafusion/execution/src/memory_pool/pool.rs:
##########
@@ -63,38 +72,89 @@ impl GreedyMemoryPool {
debug!("Created new GreedyMemoryPool(pool_size={pool_size})");
Self {
pool_size,
- used: AtomicUsize::new(0),
+ state: Mutex::new(GreedyMemoryPoolState {
+ pool_members: HashMap::new(),
+ }),
}
}
}
impl MemoryPool for GreedyMemoryPool {
- fn grow(&self, _reservation: &MemoryReservation, additional: usize) {
- self.used.fetch_add(additional, Ordering::Relaxed);
+ fn grow(&self, reservation: &MemoryReservation, additional: usize) {
+ let mut state = self.state.lock();
+ let used = state
+ .pool_members
+ .entry(reservation.consumer().name().into())
Review Comment:
I think this is going to allocate a new string on each call to `grow()`
What would you think about using the hashbrown `HashMap` to avoid this
allocation (look up entry by `&str` and then only clone if it was missing)?
https://docs.rs/hashbrown/latest/hashbrown/hash_map/struct.HashMap.html
##########
datafusion/execution/src/memory_pool/pool.rs:
##########
@@ -63,38 +72,89 @@ impl GreedyMemoryPool {
debug!("Created new GreedyMemoryPool(pool_size={pool_size})");
Self {
pool_size,
- used: AtomicUsize::new(0),
+ state: Mutex::new(GreedyMemoryPoolState {
+ pool_members: HashMap::new(),
+ }),
}
}
}
impl MemoryPool for GreedyMemoryPool {
- fn grow(&self, _reservation: &MemoryReservation, additional: usize) {
- self.used.fetch_add(additional, Ordering::Relaxed);
+ fn grow(&self, reservation: &MemoryReservation, additional: usize) {
+ let mut state = self.state.lock();
+ let used = state
+ .pool_members
+ .entry(reservation.consumer().name().into())
+ .or_insert(0);
+ *used = used.saturating_add(additional);
}
- fn shrink(&self, _reservation: &MemoryReservation, shrink: usize) {
- self.used.fetch_sub(shrink, Ordering::Relaxed);
+ fn shrink(&self, reservation: &MemoryReservation, shrink: usize) {
+ let mut state = self.state.lock();
+ let used = state
+ .pool_members
+ .entry(reservation.consumer().name().into())
+ .or_insert(0);
+ *used = used.saturating_sub(shrink);
}
fn try_grow(&self, reservation: &MemoryReservation, additional: usize) ->
Result<()> {
- self.used
- .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |used| {
- let new_used = used + additional;
- (new_used <= self.pool_size).then_some(new_used)
- })
- .map_err(|used| {
- insufficient_capacity_err(
- reservation,
- additional,
- self.pool_size.saturating_sub(used),
- )
- })?;
+ let mut state = self.state.lock();
+ let used: usize = state.pool_members.values().sum();
Review Comment:
If there are many consumers (a complicated plan) this call could potentially
take a long time. What do you think about keeping the running size in
`GreedyMemoryPoolState::total_size`?
##########
datafusion/execution/src/memory_pool/pool.rs:
##########
@@ -129,15 +189,14 @@ pub struct FairSpillPool {
}
#[derive(Debug)]
-struct FairSpillPoolState {
- /// The number of consumers that can spill
- num_spill: usize,
-
- /// The total amount of memory reserved that can be spilled
- spillable: usize,
+struct FairSpillPoolMember {
+ used: usize,
+ can_spill: bool,
+}
- /// The total amount of memory reserved by consumers that cannot spill
- unspillable: usize,
+#[derive(Debug)]
+struct FairSpillPoolState {
+ pool_members: HashMap<String, FairSpillPoolMember>,
Review Comment:
I wonder if you can keep the spill information in
`FairSpillPoolState::num_spill`, etc -- that would then allow you to use the
same structure to track members in both GreedyPool and FairSpillPool
##########
datafusion/execution/src/memory_pool/pool.rs:
##########
@@ -310,4 +466,34 @@ mod tests {
let err = r4.try_grow(30).unwrap_err().strip_backtrace();
assert_eq!(err, "Resources exhausted: Failed to allocate additional 30
bytes for s4 with 0 bytes already allocated - maximum available is 20");
}
+
+ #[test]
+ fn test_greedy() {
+ let pool = Arc::new(GreedyMemoryPool::new(100)) as _;
+ let mut r1 = MemoryConsumer::new("r1").register(&pool);
+
+ // Can grow beyond capacity of pool
+ r1.grow(2000);
+ assert_eq!(pool.reserved(), 2000);
+
+ let mut r2 = MemoryConsumer::new("r2")
+ .with_can_spill(true)
+ .register(&pool);
+ // Can grow beyond capacity of pool
+ r2.grow(2000);
+
+ let err = r1.try_grow(1).unwrap_err().strip_backtrace();
+ assert_eq!(err, "Resources exhausted: Failed to allocate additional 1
bytes for r1 with 2000 bytes already allocated - maximum available is 0");
Review Comment:
Can you please add tests showing the report generated on error?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]