This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new c473c1852f feat(memory_pool): add `TrackConsumersPool::metrics()` to
expose cons… (#21147)
c473c1852f is described below
commit c473c1852fad41541a2adfb252535321fd26a1b7
Author: Bert Vermeiren <[email protected]>
AuthorDate: Tue Mar 31 20:18:37 2026 +0200
feat(memory_pool): add `TrackConsumersPool::metrics()` to expose cons…
(#21147)
## Which issue does this PR close?
- Closes #21146
## Rationale for this change
There is currently no way to programmatically inspect the memory
consumption of individual consumers tracked by TrackConsumersPool. The
only available method, report_top(), returns a formatted string intended
for human-readable output, making it unsuitable for programmatic use
(e.g., metrics collection, monitoring, or custom reporting).
## What changes are included in this PR?
Added a metrics() method to TrackConsumersPool that returns a Vec — a
snapshot of all currently tracked consumers. Each MemoryConsumerMetrics
entry exposes:
name — the consumer's name
can_spill — whether the consumer supports spilling to disk
reserved — current bytes reserved
peak — peak bytes reserved
This allows callers to inspect memory usage programmatically without
parsing formatted strings.
## Are these changes tested?
Yes. A dedicated unit test test_track_consumers_pool_metrics was added
in pool.rs that verifies:
- An empty pool returns no metrics
- name, can_spill, reserved, and peak are correctly reported for each
consumer
- Peak is tracked independently from current reservation (grow then
shrink scenario)
- Dropped consumers are removed from metrics
## Are there any user-facing changes?
No
Co-authored-by: Bert Vermeiren <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
---
datafusion/execution/src/memory_pool/pool.rs | 83 ++++++++++++++++++++++++++++
1 file changed, 83 insertions(+)
diff --git a/datafusion/execution/src/memory_pool/pool.rs
b/datafusion/execution/src/memory_pool/pool.rs
index b10270851c..19aaa0371a 100644
--- a/datafusion/execution/src/memory_pool/pool.rs
+++ b/datafusion/execution/src/memory_pool/pool.rs
@@ -302,6 +302,32 @@ impl TrackedConsumer {
}
}
+/// A point-in-time snapshot of a tracked memory consumer's state.
+///
+/// Returned by [`TrackConsumersPool::metrics()`].
+#[derive(Debug, Clone)]
+pub struct MemoryConsumerMetrics {
+ /// The name of the memory consumer
+ pub name: String,
+ /// Whether this consumer can spill to disk
+ pub can_spill: bool,
+ /// The number of bytes currently reserved by this consumer
+ pub reserved: usize,
+ /// The peak number of bytes reserved by this consumer
+ pub peak: usize,
+}
+
+impl From<&TrackedConsumer> for MemoryConsumerMetrics {
+ fn from(tracked: &TrackedConsumer) -> Self {
+ Self {
+ name: tracked.name.clone(),
+ can_spill: tracked.can_spill,
+ reserved: tracked.reserved(),
+ peak: tracked.peak(),
+ }
+ }
+}
+
/// A [`MemoryPool`] that tracks the consumers that have
/// reserved memory within the inner memory pool.
///
@@ -381,6 +407,15 @@ impl<I: MemoryPool> TrackConsumersPool<I> {
}
}
+ /// Returns a snapshot of all currently tracked consumers.
+ pub fn metrics(&self) -> Vec<MemoryConsumerMetrics> {
+ self.tracked_consumers
+ .lock()
+ .values()
+ .map(Into::into)
+ .collect()
+ }
+
/// Returns a formatted string with the top memory consumers.
pub fn report_top(&self, top: usize) -> String {
let mut consumers = self
@@ -778,6 +813,54 @@ mod tests {
test_per_pool_type(tracked_greedy_pool);
}
+ #[test]
+ fn test_track_consumers_pool_metrics() {
+ let track_consumers_pool = Arc::new(TrackConsumersPool::new(
+ GreedyMemoryPool::new(1000),
+ NonZeroUsize::new(3).unwrap(),
+ ));
+ let memory_pool: Arc<dyn MemoryPool> =
Arc::clone(&track_consumers_pool) as _;
+
+ // Empty pool has no metrics
+ assert!(track_consumers_pool.metrics().is_empty());
+
+ // Register consumers with different spill settings
+ let r1 = MemoryConsumer::new("spilling")
+ .with_can_spill(true)
+ .register(&memory_pool);
+ let r2 = MemoryConsumer::new("non-spilling").register(&memory_pool);
+
+ // Grow r1 in two steps to verify peak tracking
+ r1.grow(100);
+ r1.grow(50);
+ r1.shrink(50); // reserved=100, peak=150
+
+ r2.grow(200); // reserved=200, peak=200
+
+ let mut metrics = track_consumers_pool.metrics();
+ metrics.sort_by_key(|m| m.name.clone());
+
+ assert_eq!(metrics.len(), 2);
+
+ let m_non = &metrics[0];
+ assert_eq!(m_non.name, "non-spilling");
+ assert!(!m_non.can_spill);
+ assert_eq!(m_non.reserved, 200);
+ assert_eq!(m_non.peak, 200);
+
+ let m_spill = &metrics[1];
+ assert_eq!(m_spill.name, "spilling");
+ assert!(m_spill.can_spill);
+ assert_eq!(m_spill.reserved, 100);
+ assert_eq!(m_spill.peak, 150);
+
+ // Unregistered consumers are removed from metrics
+ drop(r2);
+ let metrics = track_consumers_pool.metrics();
+ assert_eq!(metrics.len(), 1);
+ assert_eq!(metrics[0].name, "spilling");
+ }
+
#[test]
fn test_tracked_consumers_pool_use_beyond_errors() {
let setting = make_settings();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]