alamb commented on code in PR #11665: URL: https://github.com/apache/datafusion/pull/11665#discussion_r1697242963
########## datafusion/execution/src/memory_pool/pool.rs: ########## @@ -240,6 +249,149 @@ fn insufficient_capacity_err( resources_datafusion_err!("Failed to allocate additional {} bytes for {} with {} bytes already allocated - maximum available is {}", additional, reservation.registration.consumer.name, reservation.size, available) } +/// A [`MemoryPool`] that tracks the consumers that have +/// reserved memory within the inner memory pool. +/// +/// Tracking is per hashed [`MemoryConsumer`], not per [`MemoryReservation`]. +/// The same consumer can have multiple reservations. +#[derive(Debug)] +pub struct TrackConsumersPool<I> { + inner: I, + top: NonZeroUsize, + tracked_consumers: Mutex<HashMap<MemoryConsumer, AtomicU64>>, +} + +impl<I: MemoryPool> TrackConsumersPool<I> { + /// Creates a new [`TrackConsumersPool`]. + /// + /// The `top` determines how many Top K [`MemoryConsumer`]s to include + /// in the reported [`DataFusionError::ResourcesExhausted`]. + pub fn new(inner: I, top: NonZeroUsize) -> Self { + Self { + inner, + top, + tracked_consumers: Default::default(), + } + } + + /// Determine if there are multiple [`MemoryConsumer`]s registered + /// which have the same name. + /// + /// This is very tied to the implementation of the memory consumer. + fn has_multiple_consumers(&self, name: &String) -> bool { + let consumer = MemoryConsumer::new(name); + let consumer_with_spill = consumer.clone().with_can_spill(true); + let guard = self.tracked_consumers.lock(); + guard.contains_key(&consumer) && guard.contains_key(&consumer_with_spill) + } + + /// The top consumers in a report string. + fn report_top(&self) -> String { Review Comment: since `top` is only used for this report maybe it should be a parameter to `report_top` rather than a parameter on the pool 🤔 It seems like someone could want both the "top 10" consumers and "all consumers" from the same tracked pool but with the implementation they could only have one or the other Is this the message you are proposing to fix in a follow on? ########## datafusion/execution/src/memory_pool/pool.rs: ########## @@ -240,6 +249,149 @@ fn insufficient_capacity_err( resources_datafusion_err!("Failed to allocate additional {} bytes for {} with {} bytes already allocated - maximum available is {}", additional, reservation.registration.consumer.name, reservation.size, available) } +/// A [`MemoryPool`] that tracks the consumers that have +/// reserved memory within the inner memory pool. Review Comment: Maybe we can also add some context on the usecase ```suggestion /// reserved memory within the inner memory pool. /// /// By tracking memory reservations more carefully this pool /// can provide better error messages on the largest memory users ``` ########## datafusion/execution/src/memory_pool/pool.rs: ########## @@ -231,6 +235,11 @@ impl MemoryPool for FairSpillPool { } } +/// Constructs a resources error based upon the individual [`MemoryReservation`]. +/// +/// The error references the `bytes already allocated` for the reservation, +/// and not the total within the collective [`MemoryPool`], +/// nor the total across multiple reservations with the same [`MemoryConsumer`]. #[inline(always)] fn insufficient_capacity_err( Review Comment: I like the idea of updating the error message in a follow on PR ########## datafusion/execution/src/memory_pool/pool.rs: ########## @@ -311,4 +463,190 @@ mod tests { let err = r4.try_grow(30).unwrap_err().strip_backtrace(); assert_eq!(err, "Resources exhausted: Failed to allocate additional 30 bytes for s4 with 0 bytes already allocated - maximum available is 20"); } + + #[test] + fn test_tracked_consumers_pool() { + let pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new( + GreedyMemoryPool::new(100), + NonZeroUsize::new(3).unwrap(), + )); + + // Test: use all the different interfaces to change reservation size + + // set r1=50, using grow and shrink + let mut r1 = MemoryConsumer::new("r1").register(&pool); + r1.grow(70); + r1.shrink(20); + + // set r2=15 using try_grow + let mut r2 = MemoryConsumer::new("r2").register(&pool); + r2.try_grow(15) + .expect("should succeed in memory allotment for r2"); + + // set r3=20 using try_resize + let mut r3 = MemoryConsumer::new("r3").register(&pool); + r3.try_resize(25) + .expect("should succeed in memory allotment for r3"); + r3.try_resize(20) + .expect("should succeed in memory allotment for r3"); + + // set r4=10 + // this should not be reported in top 3 + let mut r4 = MemoryConsumer::new("r4").register(&pool); + r4.grow(10); + + // Test: reports if new reservation causes error + // using the previously set sizes for other consumers + let mut r5 = MemoryConsumer::new("r5").register(&pool); + let expected = "Resources exhausted with top memory consumers (across reservations) are: r1 consumed 50 bytes, r3 consumed 20 bytes, r2 consumed 15 bytes. Error: Failed to allocate additional 150 bytes for r5 with 0 bytes already allocated - maximum available is 5"; + let res = r5.try_grow(150); + assert!( + matches!( + &res, + Err(DataFusionError::ResourcesExhausted(ref e)) if e.to_string().contains(expected) + ), + "should provide list of top memory consumers, instead found {:?}", + res + ); + } + + #[test] + fn test_tracked_consumers_pool_register() { + let pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new( + GreedyMemoryPool::new(100), + NonZeroUsize::new(3).unwrap(), + )); + + let same_name = "foo"; + + // Test: see error message when no consumers recorded yet + let mut r0 = MemoryConsumer::new(same_name).register(&pool); + let expected = "Resources exhausted with top memory consumers (across reservations) are: foo consumed 0 bytes. Error: Failed to allocate additional 150 bytes for foo with 0 bytes already allocated - maximum available is 100"; + let res = r0.try_grow(150); + assert!( + matches!( + &res, + Err(DataFusionError::ResourcesExhausted(ref e)) if e.to_string().contains(expected) + ), + "should provide proper error when no reservations have been made yet, instead found {:?}", res + ); + + // API: multiple registrations using the same hashed consumer, + // will be recognized as the same in the TrackConsumersPool. + + // Test: will be the same per Top Consumers reported. + r0.grow(10); // make r0=10, pool available=90 + let new_consumer_same_name = MemoryConsumer::new(same_name); + let mut r1 = new_consumer_same_name.clone().register(&pool); + // TODO: the insufficient_capacity_err() message is per reservation, not per consumer. + // a followup PR will clarify this message "0 bytes already allocated for this reservation" + let expected = "Resources exhausted with top memory consumers (across reservations) are: foo consumed 10 bytes. Error: Failed to allocate additional 150 bytes for foo with 0 bytes already allocated - maximum available is 90"; + let res = r1.try_grow(150); + assert!( + matches!( + &res, + Err(DataFusionError::ResourcesExhausted(ref e)) if e.to_string().contains(expected) + ), + "should provide proper error with same hashed consumer (a single foo=10 bytes, available=90), instead found {:?}", res + ); + + // Test: will accumulate size changes per consumer, not per reservation + r1.grow(20); + let expected = "Resources exhausted with top memory consumers (across reservations) are: foo consumed 30 bytes. Error: Failed to allocate additional 150 bytes for foo with 20 bytes already allocated - maximum available is 70"; Review Comment: This is somewhat confusing that in the same message foo is reported to have two different allocations: 1. `foo consumed 30 bytes` 2. `for foo with 20 bytes already allocated` Is it possible to rationalize the errors somehow make this less confusing? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org