This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 921c3b6b18 Add `TrackedMemoryPool` with better error messages on 
exhaustion (#11665)
921c3b6b18 is described below

commit 921c3b6b181b6175041c6927d38c7ce2c0735121
Author: wiedld <[email protected]>
AuthorDate: Thu Aug 1 08:15:58 2024 -0700

    Add `TrackedMemoryPool` with better error messages on exhaustion (#11665)
    
    * feat(11523): TrackConsumersPool impl which includes errors messages with 
top K of consumers
    
    * test(11523): unit tests for TrackConsumersPool
    
    * test(11523): integration test for tracked consumers oom message
    
    * chore(11523): use nonzero usize
    
    * chore(11523): document the what the memory insufficient_capacity_err is 
actually returning
    
    * chore(11523): improve test failure coverage for TrackConsumersPool
    
    * fix(11523): handle additive tracking of same hashed consumer, across 
different reservations
    
    * refactor(11523): update error message to delineate the multiple consumer 
with the same name, but different hash
    
    * test(11523): demonstrate the underlying pool behavior on deregister
    
    * chore: make explicit what the insufficient_capacity_err() logs
    
    * fix(11523): remove to_root() for the error, since the immediate inner 
child should be returning an OOM
    
    * chore(11523): add result to logging of failed CI tests
    
    * fix(11523): splice error message to get consumers prior to error message
    
    * Revert "fix(11523): splice error message to get consumers prior to error 
message"
    
    This reverts commit 09b20d289f53d3b61b976313f8731e8a6711f370.
    
    * fix(11523): fix without splicing error messages, and instead handle the 
proper error bubbling (msg wrapping)
    
    * chore: update docs to explain purpose of TrackConsumersPool
    
    Co-authored-by: Andrew Lamb <[email protected]>
    
    * refactor(11523): enable TrackConsumersPool to be used in runtime metrics
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 datafusion/core/tests/memory_limit/mod.rs    |  55 +++-
 datafusion/execution/src/memory_pool/mod.rs  |   2 +-
 datafusion/execution/src/memory_pool/pool.rs | 377 ++++++++++++++++++++++++++-
 3 files changed, 431 insertions(+), 3 deletions(-)

diff --git a/datafusion/core/tests/memory_limit/mod.rs 
b/datafusion/core/tests/memory_limit/mod.rs
index a2bdbe64aa..5c712af801 100644
--- a/datafusion/core/tests/memory_limit/mod.rs
+++ b/datafusion/core/tests/memory_limit/mod.rs
@@ -26,10 +26,14 @@ use datafusion::assert_batches_eq;
 use datafusion::physical_optimizer::PhysicalOptimizerRule;
 use datafusion::physical_plan::memory::MemoryExec;
 use datafusion::physical_plan::streaming::PartitionStream;
+use datafusion_execution::memory_pool::{
+    GreedyMemoryPool, MemoryPool, TrackConsumersPool,
+};
 use datafusion_expr::{Expr, TableType};
 use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr};
 use futures::StreamExt;
 use std::any::Any;
+use std::num::NonZeroUsize;
 use std::sync::{Arc, OnceLock};
 use tokio::fs::File;
 
@@ -371,6 +375,39 @@ async fn oom_parquet_sink() {
         .await
 }
 
+#[tokio::test]
+async fn oom_with_tracked_consumer_pool() {
+    let dir = tempfile::tempdir().unwrap();
+    let path = dir.into_path().join("test.parquet");
+    let _ = File::create(path.clone()).await.unwrap();
+
+    TestCase::new()
+        .with_config(
+            SessionConfig::new()
+        )
+        .with_query(format!(
+            "
+            COPY (select * from t)
+            TO '{}'
+            STORED AS PARQUET OPTIONS (compression 'uncompressed');
+        ",
+            path.to_string_lossy()
+        ))
+        .with_expected_errors(vec![
+            "Failed to allocate additional",
+            "for ParquetSink(ArrowColumnWriter)",
+            "Resources exhausted with top memory consumers (across 
reservations) are: ParquetSink(ArrowColumnWriter)"
+        ])
+        .with_memory_pool(Arc::new(
+            TrackConsumersPool::new(
+                GreedyMemoryPool::new(200_000),
+                NonZeroUsize::new(1).unwrap()
+            )
+        ))
+        .run()
+        .await
+}
+
 /// Run the query with the specified memory limit,
 /// and verifies the expected errors are returned
 #[derive(Clone, Debug)]
@@ -378,6 +415,7 @@ struct TestCase {
     query: Option<String>,
     expected_errors: Vec<String>,
     memory_limit: usize,
+    memory_pool: Option<Arc<dyn MemoryPool>>,
     config: SessionConfig,
     scenario: Scenario,
     /// How should the disk manager (that allows spilling) be
@@ -396,6 +434,7 @@ impl TestCase {
             expected_errors: vec![],
             memory_limit: 0,
             config: SessionConfig::new(),
+            memory_pool: None,
             scenario: Scenario::AccessLog,
             disk_manager_config: DiskManagerConfig::Disabled,
             expected_plan: vec![],
@@ -425,6 +464,15 @@ impl TestCase {
         self
     }
 
+    /// Set the memory pool to be used
+    ///
+    /// This will override the memory_limit requested,
+    /// as the memory pool includes the limit.
+    fn with_memory_pool(mut self, memory_pool: Arc<dyn MemoryPool>) -> Self {
+        self.memory_pool = Some(memory_pool);
+        self
+    }
+
     /// Specify the configuration to use
     pub fn with_config(mut self, config: SessionConfig) -> Self {
         self.config = config;
@@ -465,6 +513,7 @@ impl TestCase {
             query,
             expected_errors,
             memory_limit,
+            memory_pool,
             config,
             scenario,
             disk_manager_config,
@@ -474,11 +523,15 @@ impl TestCase {
 
         let table = scenario.table();
 
-        let rt_config = RuntimeConfig::new()
+        let mut rt_config = RuntimeConfig::new()
             // disk manager setting controls the spilling
             .with_disk_manager(disk_manager_config)
             .with_memory_limit(memory_limit, MEMORY_FRACTION);
 
+        if let Some(pool) = memory_pool {
+            rt_config = rt_config.with_memory_pool(pool);
+        };
+
         let runtime = RuntimeEnv::new(rt_config).unwrap();
 
         // Configure execution
diff --git a/datafusion/execution/src/memory_pool/mod.rs 
b/datafusion/execution/src/memory_pool/mod.rs
index 3df212d466..dcd59acbd4 100644
--- a/datafusion/execution/src/memory_pool/mod.rs
+++ b/datafusion/execution/src/memory_pool/mod.rs
@@ -117,7 +117,7 @@ pub trait MemoryPool: Send + Sync + std::fmt::Debug {
 /// For help with allocation accounting, see the [proxy] module.
 ///
 /// [proxy]: crate::memory_pool::proxy
-#[derive(Debug)]
+#[derive(Debug, PartialEq, Eq, Hash, Clone)]
 pub struct MemoryConsumer {
     name: String,
     can_spill: bool,
diff --git a/datafusion/execution/src/memory_pool/pool.rs 
b/datafusion/execution/src/memory_pool/pool.rs
index fd7724f307..9cb6f207e5 100644
--- a/datafusion/execution/src/memory_pool/pool.rs
+++ b/datafusion/execution/src/memory_pool/pool.rs
@@ -17,9 +17,13 @@
 
 use crate::memory_pool::{MemoryConsumer, MemoryPool, MemoryReservation};
 use datafusion_common::{resources_datafusion_err, DataFusionError, Result};
+use hashbrown::HashMap;
 use log::debug;
 use parking_lot::Mutex;
-use std::sync::atomic::{AtomicUsize, Ordering};
+use std::{
+    num::NonZeroUsize,
+    sync::atomic::{AtomicU64, AtomicUsize, Ordering},
+};
 
 /// A [`MemoryPool`] that enforces no limit
 #[derive(Debug, Default)]
@@ -231,6 +235,11 @@ impl MemoryPool for FairSpillPool {
     }
 }
 
+/// Constructs a resources error based upon the individual 
[`MemoryReservation`].
+///
+/// The error references the `bytes already allocated` for the reservation,
+/// and not the total within the collective [`MemoryPool`],
+/// nor the total across multiple reservations with the same 
[`MemoryConsumer`].
 #[inline(always)]
 fn insufficient_capacity_err(
     reservation: &MemoryReservation,
@@ -240,6 +249,152 @@ fn insufficient_capacity_err(
     resources_datafusion_err!("Failed to allocate additional {} bytes for {} 
with {} bytes already allocated - maximum available is {}", additional, 
reservation.registration.consumer.name, reservation.size, available)
 }
 
+/// A [`MemoryPool`] that tracks the consumers that have
+/// reserved memory within the inner memory pool.
+///
+/// By tracking memory reservations more carefully this pool
+/// can provide better error messages on the largest memory users
+///
+/// Tracking is per hashed [`MemoryConsumer`], not per [`MemoryReservation`].
+/// The same consumer can have multiple reservations.
+#[derive(Debug)]
+pub struct TrackConsumersPool<I> {
+    inner: I,
+    top: NonZeroUsize,
+    tracked_consumers: Mutex<HashMap<MemoryConsumer, AtomicU64>>,
+}
+
+impl<I: MemoryPool> TrackConsumersPool<I> {
+    /// Creates a new [`TrackConsumersPool`].
+    ///
+    /// The `top` determines how many Top K [`MemoryConsumer`]s to include
+    /// in the reported [`DataFusionError::ResourcesExhausted`].
+    pub fn new(inner: I, top: NonZeroUsize) -> Self {
+        Self {
+            inner,
+            top,
+            tracked_consumers: Default::default(),
+        }
+    }
+
+    /// Determine if there are multiple [`MemoryConsumer`]s registered
+    /// which have the same name.
+    ///
+    /// This is very tied to the implementation of the memory consumer.
+    fn has_multiple_consumers(&self, name: &String) -> bool {
+        let consumer = MemoryConsumer::new(name);
+        let consumer_with_spill = consumer.clone().with_can_spill(true);
+        let guard = self.tracked_consumers.lock();
+        guard.contains_key(&consumer) && 
guard.contains_key(&consumer_with_spill)
+    }
+
+    /// The top consumers in a report string.
+    pub fn report_top(&self, top: usize) -> String {
+        let mut consumers = self
+            .tracked_consumers
+            .lock()
+            .iter()
+            .map(|(consumer, reserved)| {
+                (
+                    (consumer.name().to_owned(), consumer.can_spill()),
+                    reserved.load(Ordering::Acquire),
+                )
+            })
+            .collect::<Vec<_>>();
+        consumers.sort_by(|a, b| b.1.cmp(&a.1)); // inverse ordering
+
+        consumers[0..std::cmp::min(top, consumers.len())]
+            .iter()
+            .map(|((name, can_spill), size)| {
+                if self.has_multiple_consumers(name) {
+                    format!("{name}(can_spill={}) consumed {:?} bytes", 
can_spill, size)
+                } else {
+                    format!("{name} consumed {:?} bytes", size)
+                }
+            })
+            .collect::<Vec<_>>()
+            .join(", ")
+    }
+}
+
+impl<I: MemoryPool> MemoryPool for TrackConsumersPool<I> {
+    fn register(&self, consumer: &MemoryConsumer) {
+        self.inner.register(consumer);
+
+        let mut guard = self.tracked_consumers.lock();
+        if let Some(already_reserved) = guard.insert(consumer.clone(), 
Default::default())
+        {
+            guard.entry_ref(consumer).and_modify(|bytes| {
+                bytes.fetch_add(
+                    already_reserved.load(Ordering::Acquire),
+                    Ordering::AcqRel,
+                );
+            });
+        }
+    }
+
+    fn unregister(&self, consumer: &MemoryConsumer) {
+        self.inner.unregister(consumer);
+        self.tracked_consumers.lock().remove(consumer);
+    }
+
+    fn grow(&self, reservation: &MemoryReservation, additional: usize) {
+        self.inner.grow(reservation, additional);
+        self.tracked_consumers
+            .lock()
+            .entry_ref(reservation.consumer())
+            .and_modify(|bytes| {
+                bytes.fetch_add(additional as u64, Ordering::AcqRel);
+            });
+    }
+
+    fn shrink(&self, reservation: &MemoryReservation, shrink: usize) {
+        self.inner.shrink(reservation, shrink);
+        self.tracked_consumers
+            .lock()
+            .entry_ref(reservation.consumer())
+            .and_modify(|bytes| {
+                bytes.fetch_sub(shrink as u64, Ordering::AcqRel);
+            });
+    }
+
+    fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> 
Result<()> {
+        self.inner
+            .try_grow(reservation, additional)
+            .map_err(|e| match e {
+                DataFusionError::ResourcesExhausted(e) => {
+                    // wrap OOM message in top consumers
+                    DataFusionError::ResourcesExhausted(
+                        provide_top_memory_consumers_to_error_msg(
+                            e.to_owned(),
+                            self.report_top(self.top.into()),
+                        ),
+                    )
+                }
+                _ => e,
+            })?;
+
+        self.tracked_consumers
+            .lock()
+            .entry_ref(reservation.consumer())
+            .and_modify(|bytes| {
+                bytes.fetch_add(additional as u64, Ordering::AcqRel);
+            });
+        Ok(())
+    }
+
+    fn reserved(&self) -> usize {
+        self.inner.reserved()
+    }
+}
+
+fn provide_top_memory_consumers_to_error_msg(
+    error_msg: String,
+    top_consumers: String,
+) -> String {
+    format!("Resources exhausted with top memory consumers (across 
reservations) are: {}. Error: {}", top_consumers, error_msg)
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
@@ -311,4 +466,224 @@ mod tests {
         let err = r4.try_grow(30).unwrap_err().strip_backtrace();
         assert_eq!(err, "Resources exhausted: Failed to allocate additional 30 
bytes for s4 with 0 bytes already allocated - maximum available is 20");
     }
+
+    #[test]
+    fn test_tracked_consumers_pool() {
+        let pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
+            GreedyMemoryPool::new(100),
+            NonZeroUsize::new(3).unwrap(),
+        ));
+
+        // Test: use all the different interfaces to change reservation size
+
+        // set r1=50, using grow and shrink
+        let mut r1 = MemoryConsumer::new("r1").register(&pool);
+        r1.grow(70);
+        r1.shrink(20);
+
+        // set r2=15 using try_grow
+        let mut r2 = MemoryConsumer::new("r2").register(&pool);
+        r2.try_grow(15)
+            .expect("should succeed in memory allotment for r2");
+
+        // set r3=20 using try_resize
+        let mut r3 = MemoryConsumer::new("r3").register(&pool);
+        r3.try_resize(25)
+            .expect("should succeed in memory allotment for r3");
+        r3.try_resize(20)
+            .expect("should succeed in memory allotment for r3");
+
+        // set r4=10
+        // this should not be reported in top 3
+        let mut r4 = MemoryConsumer::new("r4").register(&pool);
+        r4.grow(10);
+
+        // Test: reports if new reservation causes error
+        // using the previously set sizes for other consumers
+        let mut r5 = MemoryConsumer::new("r5").register(&pool);
+        let expected = "Resources exhausted with top memory consumers (across 
reservations) are: r1 consumed 50 bytes, r3 consumed 20 bytes, r2 consumed 15 
bytes. Error: Failed to allocate additional 150 bytes for r5 with 0 bytes 
already allocated - maximum available is 5";
+        let res = r5.try_grow(150);
+        assert!(
+            matches!(
+                &res,
+                Err(DataFusionError::ResourcesExhausted(ref e)) if 
e.to_string().contains(expected)
+            ),
+            "should provide list of top memory consumers, instead found {:?}",
+            res
+        );
+    }
+
+    #[test]
+    fn test_tracked_consumers_pool_register() {
+        let pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
+            GreedyMemoryPool::new(100),
+            NonZeroUsize::new(3).unwrap(),
+        ));
+
+        let same_name = "foo";
+
+        // Test: see error message when no consumers recorded yet
+        let mut r0 = MemoryConsumer::new(same_name).register(&pool);
+        let expected = "Resources exhausted with top memory consumers (across 
reservations) are: foo consumed 0 bytes. Error: Failed to allocate additional 
150 bytes for foo with 0 bytes already allocated - maximum available is 100";
+        let res = r0.try_grow(150);
+        assert!(
+            matches!(
+                &res,
+                Err(DataFusionError::ResourcesExhausted(ref e)) if 
e.to_string().contains(expected)
+            ),
+            "should provide proper error when no reservations have been made 
yet, instead found {:?}", res
+        );
+
+        // API: multiple registrations using the same hashed consumer,
+        // will be recognized as the same in the TrackConsumersPool.
+
+        // Test: will be the same per Top Consumers reported.
+        r0.grow(10); // make r0=10, pool available=90
+        let new_consumer_same_name = MemoryConsumer::new(same_name);
+        let mut r1 = new_consumer_same_name.clone().register(&pool);
+        // TODO: the insufficient_capacity_err() message is per reservation, 
not per consumer.
+        // a followup PR will clarify this message "0 bytes already allocated 
for this reservation"
+        let expected = "Resources exhausted with top memory consumers (across 
reservations) are: foo consumed 10 bytes. Error: Failed to allocate additional 
150 bytes for foo with 0 bytes already allocated - maximum available is 90";
+        let res = r1.try_grow(150);
+        assert!(
+            matches!(
+                &res,
+                Err(DataFusionError::ResourcesExhausted(ref e)) if 
e.to_string().contains(expected)
+            ),
+            "should provide proper error with same hashed consumer (a single 
foo=10 bytes, available=90), instead found {:?}", res
+        );
+
+        // Test: will accumulate size changes per consumer, not per reservation
+        r1.grow(20);
+        let expected = "Resources exhausted with top memory consumers (across 
reservations) are: foo consumed 30 bytes. Error: Failed to allocate additional 
150 bytes for foo with 20 bytes already allocated - maximum available is 70";
+        let res = r1.try_grow(150);
+        assert!(
+            matches!(
+                &res,
+                Err(DataFusionError::ResourcesExhausted(ref e)) if 
e.to_string().contains(expected)
+            ),
+            "should provide proper error with same hashed consumer (a single 
foo=30 bytes, available=70), instead found {:?}", res
+        );
+
+        // Test: different hashed consumer, (even with the same name),
+        // will be recognized as different in the TrackConsumersPool
+        let consumer_with_same_name_but_different_hash =
+            MemoryConsumer::new(same_name).with_can_spill(true);
+        let mut r2 = 
consumer_with_same_name_but_different_hash.register(&pool);
+        let expected = "Resources exhausted with top memory consumers (across 
reservations) are: foo(can_spill=false) consumed 30 bytes, foo(can_spill=true) 
consumed 0 bytes. Error: Failed to allocate additional 150 bytes for foo with 0 
bytes already allocated - maximum available is 70";
+        let res = r2.try_grow(150);
+        assert!(
+            matches!(
+                &res,
+                Err(DataFusionError::ResourcesExhausted(ref e)) if 
e.to_string().contains(expected)
+            ),
+            "should provide proper error with different hashed consumer 
(foo(can_spill=false)=30 bytes and foo(can_spill=true)=0 bytes, available=70), 
instead found {:?}", res
+        );
+    }
+
+    #[test]
+    fn test_tracked_consumers_pool_deregister() {
+        fn test_per_pool_type(pool: Arc<dyn MemoryPool>) {
+            // Baseline: see the 2 memory consumers
+            let mut r0 = MemoryConsumer::new("r0").register(&pool);
+            r0.grow(10);
+            let r1_consumer = MemoryConsumer::new("r1");
+            let mut r1 = r1_consumer.clone().register(&pool);
+            r1.grow(20);
+            let expected = "Resources exhausted with top memory consumers 
(across reservations) are: r1 consumed 20 bytes, r0 consumed 10 bytes. Error: 
Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated 
- maximum available is 70";
+            let res = r0.try_grow(150);
+            assert!(
+                matches!(
+                    &res,
+                    Err(DataFusionError::ResourcesExhausted(ref e)) if 
e.to_string().contains(expected)
+                ),
+                "should provide proper error with both consumers, instead 
found {:?}",
+                res
+            );
+
+            // Test: unregister one
+            // only the remaining one should be listed
+            pool.unregister(&r1_consumer);
+            let expected_consumers = "Resources exhausted with top memory 
consumers (across reservations) are: r0 consumed 10 bytes";
+            let res = r0.try_grow(150);
+            assert!(
+                matches!(
+                    &res,
+                    Err(DataFusionError::ResourcesExhausted(ref e)) if 
e.to_string().contains(expected_consumers)
+                ),
+                "should provide proper error with only 1 consumer left 
registered, instead found {:?}", res
+            );
+
+            // Test: actual message we see is the `available is 70`. When it 
should be `available is 90`.
+            // This is because the pool.shrink() does not automatically occur 
within the inner_pool.deregister().
+            let expected_70_available = "Failed to allocate additional 150 
bytes for r0 with 10 bytes already allocated - maximum available is 70";
+            let res = r0.try_grow(150);
+            assert!(
+                matches!(
+                    &res,
+                    Err(DataFusionError::ResourcesExhausted(ref e)) if 
e.to_string().contains(expected_70_available)
+                ),
+                "should find that the inner pool will still count all bytes 
for the deregistered consumer until the reservation is dropped, instead found 
{:?}", res
+            );
+
+            // Test: the registration needs to free itself (or be dropped),
+            // for the proper error message
+            r1.free();
+            let expected_90_available = "Failed to allocate additional 150 
bytes for r0 with 10 bytes already allocated - maximum available is 90";
+            let res = r0.try_grow(150);
+            assert!(
+                matches!(
+                    &res,
+                    Err(DataFusionError::ResourcesExhausted(ref e)) if 
e.to_string().contains(expected_90_available)
+                ),
+                "should correctly account the total bytes after reservation is 
free, instead found {:?}", res
+            );
+        }
+
+        let tracked_spill_pool: Arc<dyn MemoryPool> = 
Arc::new(TrackConsumersPool::new(
+            FairSpillPool::new(100),
+            NonZeroUsize::new(3).unwrap(),
+        ));
+        test_per_pool_type(tracked_spill_pool);
+
+        let tracked_greedy_pool: Arc<dyn MemoryPool> = 
Arc::new(TrackConsumersPool::new(
+            GreedyMemoryPool::new(100),
+            NonZeroUsize::new(3).unwrap(),
+        ));
+        test_per_pool_type(tracked_greedy_pool);
+    }
+
+    #[test]
+    fn test_tracked_consumers_pool_use_beyond_errors() {
+        let upcasted: Arc<dyn std::any::Any + Send + Sync> =
+            Arc::new(TrackConsumersPool::new(
+                GreedyMemoryPool::new(100),
+                NonZeroUsize::new(3).unwrap(),
+            ));
+        let pool: Arc<dyn MemoryPool> = Arc::clone(&upcasted)
+            .downcast::<TrackConsumersPool<GreedyMemoryPool>>()
+            .unwrap();
+        // set r1=20
+        let mut r1 = MemoryConsumer::new("r1").register(&pool);
+        r1.grow(20);
+        // set r2=15
+        let mut r2 = MemoryConsumer::new("r2").register(&pool);
+        r2.grow(15);
+        // set r3=45
+        let mut r3 = MemoryConsumer::new("r3").register(&pool);
+        r3.grow(45);
+
+        let downcasted = upcasted
+            .downcast::<TrackConsumersPool<GreedyMemoryPool>>()
+            .unwrap();
+
+        // Test: can get runtime metrics, even without an error thrown
+        let expected = "r3 consumed 45 bytes, r1 consumed 20 bytes";
+        let res = downcasted.report_top(2);
+        assert_eq!(
+            res, expected,
+            "should provide list of top memory consumers, instead found {:?}",
+            res
+        );
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to