This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new c473c1852f feat(memory_pool): add `TrackConsumersPool::metrics()` to 
expose cons… (#21147)
c473c1852f is described below

commit c473c1852fad41541a2adfb252535321fd26a1b7
Author: Bert Vermeiren <[email protected]>
AuthorDate: Tue Mar 31 20:18:37 2026 +0200

    feat(memory_pool): add `TrackConsumersPool::metrics()` to expose cons… 
(#21147)
    
    ## Which issue does this PR close?
    
    - Closes #21146
    
    ## Rationale for this change
    
    There is currently no way to programmatically inspect the memory
    consumption of individual consumers tracked by TrackConsumersPool. The
    only available method, report_top(), returns a formatted string intended
    for human-readable output, making it unsuitable for programmatic use
    (e.g., metrics collection, monitoring, or custom reporting).
    
    ## What changes are included in this PR?
    
    Added a metrics() method to TrackConsumersPool that returns a Vec — a
    snapshot of all currently tracked consumers. Each MemoryConsumerMetrics
    entry exposes:
    
    name — the consumer's name
    can_spill — whether the consumer supports spilling to disk
    reserved — current bytes reserved
    peak — peak bytes reserved
    This allows callers to inspect memory usage programmatically without
    parsing formatted strings.
    
    ## Are these changes tested?
    
    Yes. A dedicated unit test test_track_consumers_pool_metrics was added
    in pool.rs that verifies:
    - An empty pool returns no metrics
    - name, can_spill, reserved, and peak are correctly reported for each
    consumer
    - Peak is tracked independently from current reservation (grow then
    shrink scenario)
      - Dropped consumers are removed from metrics
    
    ## Are there any user-facing changes?
    
    No
    
    Co-authored-by: Bert Vermeiren <[email protected]>
    Co-authored-by: Andrew Lamb <[email protected]>
---
 datafusion/execution/src/memory_pool/pool.rs | 83 ++++++++++++++++++++++++++++
 1 file changed, 83 insertions(+)

diff --git a/datafusion/execution/src/memory_pool/pool.rs 
b/datafusion/execution/src/memory_pool/pool.rs
index b10270851c..19aaa0371a 100644
--- a/datafusion/execution/src/memory_pool/pool.rs
+++ b/datafusion/execution/src/memory_pool/pool.rs
@@ -302,6 +302,32 @@ impl TrackedConsumer {
     }
 }
 
+/// A point-in-time snapshot of a tracked memory consumer's state.
+///
+/// Returned by [`TrackConsumersPool::metrics()`].
+#[derive(Debug, Clone)]
+pub struct MemoryConsumerMetrics {
+    /// The name of the memory consumer
+    pub name: String,
+    /// Whether this consumer can spill to disk
+    pub can_spill: bool,
+    /// The number of bytes currently reserved by this consumer
+    pub reserved: usize,
+    /// The peak number of bytes reserved by this consumer
+    pub peak: usize,
+}
+
+impl From<&TrackedConsumer> for MemoryConsumerMetrics {
+    fn from(tracked: &TrackedConsumer) -> Self {
+        Self {
+            name: tracked.name.clone(),
+            can_spill: tracked.can_spill,
+            reserved: tracked.reserved(),
+            peak: tracked.peak(),
+        }
+    }
+}
+
 /// A [`MemoryPool`] that tracks the consumers that have
 /// reserved memory within the inner memory pool.
 ///
@@ -381,6 +407,15 @@ impl<I: MemoryPool> TrackConsumersPool<I> {
         }
     }
 
+    /// Returns a snapshot of all currently tracked consumers.
+    pub fn metrics(&self) -> Vec<MemoryConsumerMetrics> {
+        self.tracked_consumers
+            .lock()
+            .values()
+            .map(Into::into)
+            .collect()
+    }
+
     /// Returns a formatted string with the top memory consumers.
     pub fn report_top(&self, top: usize) -> String {
         let mut consumers = self
@@ -778,6 +813,54 @@ mod tests {
         test_per_pool_type(tracked_greedy_pool);
     }
 
+    #[test]
+    fn test_track_consumers_pool_metrics() {
+        let track_consumers_pool = Arc::new(TrackConsumersPool::new(
+            GreedyMemoryPool::new(1000),
+            NonZeroUsize::new(3).unwrap(),
+        ));
+        let memory_pool: Arc<dyn MemoryPool> = 
Arc::clone(&track_consumers_pool) as _;
+
+        // Empty pool has no metrics
+        assert!(track_consumers_pool.metrics().is_empty());
+
+        // Register consumers with different spill settings
+        let r1 = MemoryConsumer::new("spilling")
+            .with_can_spill(true)
+            .register(&memory_pool);
+        let r2 = MemoryConsumer::new("non-spilling").register(&memory_pool);
+
+        // Grow r1 in two steps to verify peak tracking
+        r1.grow(100);
+        r1.grow(50);
+        r1.shrink(50); // reserved=100, peak=150
+
+        r2.grow(200); // reserved=200, peak=200
+
+        let mut metrics = track_consumers_pool.metrics();
+        metrics.sort_by_key(|m| m.name.clone());
+
+        assert_eq!(metrics.len(), 2);
+
+        let m_non = &metrics[0];
+        assert_eq!(m_non.name, "non-spilling");
+        assert!(!m_non.can_spill);
+        assert_eq!(m_non.reserved, 200);
+        assert_eq!(m_non.peak, 200);
+
+        let m_spill = &metrics[1];
+        assert_eq!(m_spill.name, "spilling");
+        assert!(m_spill.can_spill);
+        assert_eq!(m_spill.reserved, 100);
+        assert_eq!(m_spill.peak, 150);
+
+        // Unregistered consumers are removed from metrics
+        drop(r2);
+        let metrics = track_consumers_pool.metrics();
+        assert_eq!(metrics.len(), 1);
+        assert_eq!(metrics[0].name, "spilling");
+    }
+
     #[test]
     fn test_tracked_consumers_pool_use_beyond_errors() {
         let setting = make_settings();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to