wiedld commented on code in PR #11665:
URL: https://github.com/apache/datafusion/pull/11665#discussion_r1693521876


##########
datafusion/execution/src/memory_pool/pool.rs:
##########
@@ -311,4 +458,179 @@ mod tests {
         let err = r4.try_grow(30).unwrap_err().strip_backtrace();
         assert_eq!(err, "Resources exhausted: Failed to allocate additional 30 
bytes for s4 with 0 bytes already allocated - maximum available is 20");
     }
+
+    #[test]
+    fn test_tracked_consumers_pool() {
+        let pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
+            GreedyMemoryPool::new(100),
+            NonZeroUsize::new(3).unwrap(),
+        ));
+
+        // Test: use all the different interfaces to change reservation size
+
+        // set r1=50, using grow and shrink
+        let mut r1 = MemoryConsumer::new("r1").register(&pool);
+        r1.grow(70);
+        r1.shrink(20);
+
+        // set r2=15 using try_grow
+        let mut r2 = MemoryConsumer::new("r2").register(&pool);
+        r2.try_grow(15)
+            .expect("should succeed in memory allotment for r2");
+
+        // set r3=20 using try_resize
+        let mut r3 = MemoryConsumer::new("r3").register(&pool);
+        r3.try_resize(25)
+            .expect("should succeed in memory allotment for r3");
+        r3.try_resize(20)
+            .expect("should succeed in memory allotment for r3");
+
+        // set r4=10
+        // this should not be reported in top 3
+        let mut r4 = MemoryConsumer::new("r4").register(&pool);
+        r4.grow(10);
+
+        // Test: reports if new reservation causes error
+        // using the previously set sizes for other consumers
+        let mut r5 = MemoryConsumer::new("r5").register(&pool);
+        let expected = "Failed to allocate additional 150 bytes for r5 with 0 
bytes already allocated - maximum available is 5. The top memory consumers 
(across reservations) are: r1 consumed 50 bytes, r3 consumed 20 bytes, r2 
consumed 15 bytes";
+        assert!(
+            matches!(
+                r5.try_grow(150),
+                Err(DataFusionError::ResourcesExhausted(e)) if 
e.to_string().contains(expected)
+            ),
+            "should provide list of top memory consumers"
+        );
+    }
+
+    #[test]
+    fn test_tracked_consumers_pool_register() {
+        let pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
+            GreedyMemoryPool::new(100),
+            NonZeroUsize::new(3).unwrap(),
+        ));
+
+        let same_name = "foo";
+
+        // Test: see error message when no consumers recorded yet
+        let mut r0 = MemoryConsumer::new(same_name).register(&pool);
+        let expected = "Failed to allocate additional 150 bytes for foo with 0 
bytes already allocated - maximum available is 100. The top memory consumers 
(across reservations) are: foo consumed 0 bytes";
+        assert!(
+            matches!(
+                r0.try_grow(150),
+                Err(DataFusionError::ResourcesExhausted(e)) if 
e.to_string().contains(expected)
+            ),
+            "should provide proper error when no reservations have been made 
yet"
+        );
+
+        // API: multiple registrations using the same hashed consumer,
+        // will be recognized as the same in the TrackConsumersPool.
+
+        // Test: will be the same per Top Consumers reported.
+        r0.grow(10); // make r0=10, pool available=90
+        let new_consumer_same_name = MemoryConsumer::new(same_name);
+        let mut r1 = new_consumer_same_name.clone().register(&pool);
+        // TODO: the insufficient_capacity_err() message is per reservation, 
not per consumer.
+        // a followup PR will clarify this message "0 bytes already allocated 
for this reservation"
+        let expected = "Failed to allocate additional 150 bytes for foo with 0 
bytes already allocated - maximum available is 90. The top memory consumers 
(across reservations) are: foo consumed 10 bytes";
+        assert!(
+            matches!(
+                r1.try_grow(150),
+                Err(DataFusionError::ResourcesExhausted(e)) if 
e.to_string().contains(expected)
+            ),
+            "should provide proper error with same hashed consumer (a single 
foo=10 bytes, available=90)"
+        );
+
+        // Test: will accumulate size changes per consumer, not per reservation
+        r1.grow(20);
+        let expected = "Failed to allocate additional 150 bytes for foo with 
20 bytes already allocated - maximum available is 70. The top memory consumers 
(across reservations) are: foo consumed 30 bytes";
+        assert!(
+            matches!(
+                r1.try_grow(150),
+                Err(DataFusionError::ResourcesExhausted(e)) if 
e.to_string().contains(expected)
+            ),
+            "should provide proper error with same hashed consumer (a single 
foo=30 bytes, available=70)"
+        );
+
+        // Test: different hashed consumer, (even with the same name),
+        // will be recognized as different in the TrackConsumersPool
+        let consumer_with_same_name_but_different_hash =
+            MemoryConsumer::new(same_name).with_can_spill(true);
+        let mut r2 = 
consumer_with_same_name_but_different_hash.register(&pool);
+        let expected = "Failed to allocate additional 150 bytes for foo with 0 
bytes already allocated - maximum available is 70. The top memory consumers 
(across reservations) are: foo(can_spill=false) consumed 30 bytes, 
foo(can_spill=true) consumed 0 bytes";
+        assert!(
+            matches!(
+                r2.try_grow(150),
+                Err(DataFusionError::ResourcesExhausted(e)) if 
e.to_string().contains(expected)
+            ),
+            "should provide proper error with different hashed consumer 
(foo(can_spill=false)=30 bytes and foo(can_spill=true)=0 bytes, available=70)"
+        );
+    }
+
+    #[test]
+    fn test_tracked_consumers_pool_deregister() {

Review Comment:
   This test is about an existing behavior. The current MemoryPool 
implementations will register/deregister on the MemoryConsumer level; whereas 
the bytes incr/decrement is on the MemoryReservation level.
   
   This means that a consumer can be deregistered, even while the reservation 
still holds memory.
   
   The new TrackConsumersPool has nothing to do with this existing behavior. 
However, this test is added in order to demonstrate what/how the error messages 
will read.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to