This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 921c3b6b18 Add `TrackedMemoryPool` with better error messages on
exhaustion (#11665)
921c3b6b18 is described below
commit 921c3b6b181b6175041c6927d38c7ce2c0735121
Author: wiedld <[email protected]>
AuthorDate: Thu Aug 1 08:15:58 2024 -0700
Add `TrackedMemoryPool` with better error messages on exhaustion (#11665)
* feat(11523): TrackConsumersPool impl which includes errors messages with
top K of consumers
* test(11523): unit tests for TrackConsumersPool
* test(11523): integration test for tracked consumers oom message
* chore(11523): use nonzero usize
* chore(11523): document the what the memory insufficient_capacity_err is
actually returning
* chore(11523): improve test failure coverage for TrackConsumersPool
* fix(11523): handle additive tracking of same hashed consumer, across
different reservations
* refactor(11523): update error message to delineate the multiple consumer
with the same name, but different hash
* test(11523): demonstrate the underlying pool behavior on deregister
* chore: make explicit what the insufficient_capacity_err() logs
* fix(11523): remove to_root() for the error, since the immediate inner
child should be returning an OOM
* chore(11523): add result to logging of failed CI tests
* fix(11523): splice error message to get consumers prior to error message
* Revert "fix(11523): splice error message to get consumers prior to error
message"
This reverts commit 09b20d289f53d3b61b976313f8731e8a6711f370.
* fix(11523): fix without splicing error messages, and instead handle the
proper error bubbling (msg wrapping)
* chore: update docs to explain purpose of TrackConsumersPool
Co-authored-by: Andrew Lamb <[email protected]>
* refactor(11523): enable TrackConsumersPool to be used in runtime metrics
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
datafusion/core/tests/memory_limit/mod.rs | 55 +++-
datafusion/execution/src/memory_pool/mod.rs | 2 +-
datafusion/execution/src/memory_pool/pool.rs | 377 ++++++++++++++++++++++++++-
3 files changed, 431 insertions(+), 3 deletions(-)
diff --git a/datafusion/core/tests/memory_limit/mod.rs
b/datafusion/core/tests/memory_limit/mod.rs
index a2bdbe64aa..5c712af801 100644
--- a/datafusion/core/tests/memory_limit/mod.rs
+++ b/datafusion/core/tests/memory_limit/mod.rs
@@ -26,10 +26,14 @@ use datafusion::assert_batches_eq;
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::streaming::PartitionStream;
+use datafusion_execution::memory_pool::{
+ GreedyMemoryPool, MemoryPool, TrackConsumersPool,
+};
use datafusion_expr::{Expr, TableType};
use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr};
use futures::StreamExt;
use std::any::Any;
+use std::num::NonZeroUsize;
use std::sync::{Arc, OnceLock};
use tokio::fs::File;
@@ -371,6 +375,39 @@ async fn oom_parquet_sink() {
.await
}
+#[tokio::test]
+async fn oom_with_tracked_consumer_pool() {
+ let dir = tempfile::tempdir().unwrap();
+ let path = dir.into_path().join("test.parquet");
+ let _ = File::create(path.clone()).await.unwrap();
+
+ TestCase::new()
+ .with_config(
+ SessionConfig::new()
+ )
+ .with_query(format!(
+ "
+ COPY (select * from t)
+ TO '{}'
+ STORED AS PARQUET OPTIONS (compression 'uncompressed');
+ ",
+ path.to_string_lossy()
+ ))
+ .with_expected_errors(vec![
+ "Failed to allocate additional",
+ "for ParquetSink(ArrowColumnWriter)",
+ "Resources exhausted with top memory consumers (across
reservations) are: ParquetSink(ArrowColumnWriter)"
+ ])
+ .with_memory_pool(Arc::new(
+ TrackConsumersPool::new(
+ GreedyMemoryPool::new(200_000),
+ NonZeroUsize::new(1).unwrap()
+ )
+ ))
+ .run()
+ .await
+}
+
/// Run the query with the specified memory limit,
/// and verifies the expected errors are returned
#[derive(Clone, Debug)]
@@ -378,6 +415,7 @@ struct TestCase {
query: Option<String>,
expected_errors: Vec<String>,
memory_limit: usize,
+ memory_pool: Option<Arc<dyn MemoryPool>>,
config: SessionConfig,
scenario: Scenario,
/// How should the disk manager (that allows spilling) be
@@ -396,6 +434,7 @@ impl TestCase {
expected_errors: vec![],
memory_limit: 0,
config: SessionConfig::new(),
+ memory_pool: None,
scenario: Scenario::AccessLog,
disk_manager_config: DiskManagerConfig::Disabled,
expected_plan: vec![],
@@ -425,6 +464,15 @@ impl TestCase {
self
}
+ /// Set the memory pool to be used
+ ///
+ /// This will override the memory_limit requested,
+ /// as the memory pool includes the limit.
+ fn with_memory_pool(mut self, memory_pool: Arc<dyn MemoryPool>) -> Self {
+ self.memory_pool = Some(memory_pool);
+ self
+ }
+
/// Specify the configuration to use
pub fn with_config(mut self, config: SessionConfig) -> Self {
self.config = config;
@@ -465,6 +513,7 @@ impl TestCase {
query,
expected_errors,
memory_limit,
+ memory_pool,
config,
scenario,
disk_manager_config,
@@ -474,11 +523,15 @@ impl TestCase {
let table = scenario.table();
- let rt_config = RuntimeConfig::new()
+ let mut rt_config = RuntimeConfig::new()
// disk manager setting controls the spilling
.with_disk_manager(disk_manager_config)
.with_memory_limit(memory_limit, MEMORY_FRACTION);
+ if let Some(pool) = memory_pool {
+ rt_config = rt_config.with_memory_pool(pool);
+ };
+
let runtime = RuntimeEnv::new(rt_config).unwrap();
// Configure execution
diff --git a/datafusion/execution/src/memory_pool/mod.rs
b/datafusion/execution/src/memory_pool/mod.rs
index 3df212d466..dcd59acbd4 100644
--- a/datafusion/execution/src/memory_pool/mod.rs
+++ b/datafusion/execution/src/memory_pool/mod.rs
@@ -117,7 +117,7 @@ pub trait MemoryPool: Send + Sync + std::fmt::Debug {
/// For help with allocation accounting, see the [proxy] module.
///
/// [proxy]: crate::memory_pool::proxy
-#[derive(Debug)]
+#[derive(Debug, PartialEq, Eq, Hash, Clone)]
pub struct MemoryConsumer {
name: String,
can_spill: bool,
diff --git a/datafusion/execution/src/memory_pool/pool.rs
b/datafusion/execution/src/memory_pool/pool.rs
index fd7724f307..9cb6f207e5 100644
--- a/datafusion/execution/src/memory_pool/pool.rs
+++ b/datafusion/execution/src/memory_pool/pool.rs
@@ -17,9 +17,13 @@
use crate::memory_pool::{MemoryConsumer, MemoryPool, MemoryReservation};
use datafusion_common::{resources_datafusion_err, DataFusionError, Result};
+use hashbrown::HashMap;
use log::debug;
use parking_lot::Mutex;
-use std::sync::atomic::{AtomicUsize, Ordering};
+use std::{
+ num::NonZeroUsize,
+ sync::atomic::{AtomicU64, AtomicUsize, Ordering},
+};
/// A [`MemoryPool`] that enforces no limit
#[derive(Debug, Default)]
@@ -231,6 +235,11 @@ impl MemoryPool for FairSpillPool {
}
}
+/// Constructs a resources error based upon the individual
[`MemoryReservation`].
+///
+/// The error references the `bytes already allocated` for the reservation,
+/// and not the total within the collective [`MemoryPool`],
+/// nor the total across multiple reservations with the same
[`MemoryConsumer`].
#[inline(always)]
fn insufficient_capacity_err(
reservation: &MemoryReservation,
@@ -240,6 +249,152 @@ fn insufficient_capacity_err(
resources_datafusion_err!("Failed to allocate additional {} bytes for {}
with {} bytes already allocated - maximum available is {}", additional,
reservation.registration.consumer.name, reservation.size, available)
}
+/// A [`MemoryPool`] that tracks the consumers that have
+/// reserved memory within the inner memory pool.
+///
+/// By tracking memory reservations more carefully this pool
+/// can provide better error messages on the largest memory users
+///
+/// Tracking is per hashed [`MemoryConsumer`], not per [`MemoryReservation`].
+/// The same consumer can have multiple reservations.
+#[derive(Debug)]
+pub struct TrackConsumersPool<I> {
+ inner: I,
+ top: NonZeroUsize,
+ tracked_consumers: Mutex<HashMap<MemoryConsumer, AtomicU64>>,
+}
+
+impl<I: MemoryPool> TrackConsumersPool<I> {
+ /// Creates a new [`TrackConsumersPool`].
+ ///
+ /// The `top` determines how many Top K [`MemoryConsumer`]s to include
+ /// in the reported [`DataFusionError::ResourcesExhausted`].
+ pub fn new(inner: I, top: NonZeroUsize) -> Self {
+ Self {
+ inner,
+ top,
+ tracked_consumers: Default::default(),
+ }
+ }
+
+ /// Determine if there are multiple [`MemoryConsumer`]s registered
+ /// which have the same name.
+ ///
+ /// This is very tied to the implementation of the memory consumer.
+ fn has_multiple_consumers(&self, name: &String) -> bool {
+ let consumer = MemoryConsumer::new(name);
+ let consumer_with_spill = consumer.clone().with_can_spill(true);
+ let guard = self.tracked_consumers.lock();
+ guard.contains_key(&consumer) &&
guard.contains_key(&consumer_with_spill)
+ }
+
+ /// The top consumers in a report string.
+ pub fn report_top(&self, top: usize) -> String {
+ let mut consumers = self
+ .tracked_consumers
+ .lock()
+ .iter()
+ .map(|(consumer, reserved)| {
+ (
+ (consumer.name().to_owned(), consumer.can_spill()),
+ reserved.load(Ordering::Acquire),
+ )
+ })
+ .collect::<Vec<_>>();
+ consumers.sort_by(|a, b| b.1.cmp(&a.1)); // inverse ordering
+
+ consumers[0..std::cmp::min(top, consumers.len())]
+ .iter()
+ .map(|((name, can_spill), size)| {
+ if self.has_multiple_consumers(name) {
+ format!("{name}(can_spill={}) consumed {:?} bytes",
can_spill, size)
+ } else {
+ format!("{name} consumed {:?} bytes", size)
+ }
+ })
+ .collect::<Vec<_>>()
+ .join(", ")
+ }
+}
+
+impl<I: MemoryPool> MemoryPool for TrackConsumersPool<I> {
+ fn register(&self, consumer: &MemoryConsumer) {
+ self.inner.register(consumer);
+
+ let mut guard = self.tracked_consumers.lock();
+ if let Some(already_reserved) = guard.insert(consumer.clone(),
Default::default())
+ {
+ guard.entry_ref(consumer).and_modify(|bytes| {
+ bytes.fetch_add(
+ already_reserved.load(Ordering::Acquire),
+ Ordering::AcqRel,
+ );
+ });
+ }
+ }
+
+ fn unregister(&self, consumer: &MemoryConsumer) {
+ self.inner.unregister(consumer);
+ self.tracked_consumers.lock().remove(consumer);
+ }
+
+ fn grow(&self, reservation: &MemoryReservation, additional: usize) {
+ self.inner.grow(reservation, additional);
+ self.tracked_consumers
+ .lock()
+ .entry_ref(reservation.consumer())
+ .and_modify(|bytes| {
+ bytes.fetch_add(additional as u64, Ordering::AcqRel);
+ });
+ }
+
+ fn shrink(&self, reservation: &MemoryReservation, shrink: usize) {
+ self.inner.shrink(reservation, shrink);
+ self.tracked_consumers
+ .lock()
+ .entry_ref(reservation.consumer())
+ .and_modify(|bytes| {
+ bytes.fetch_sub(shrink as u64, Ordering::AcqRel);
+ });
+ }
+
+ fn try_grow(&self, reservation: &MemoryReservation, additional: usize) ->
Result<()> {
+ self.inner
+ .try_grow(reservation, additional)
+ .map_err(|e| match e {
+ DataFusionError::ResourcesExhausted(e) => {
+ // wrap OOM message in top consumers
+ DataFusionError::ResourcesExhausted(
+ provide_top_memory_consumers_to_error_msg(
+ e.to_owned(),
+ self.report_top(self.top.into()),
+ ),
+ )
+ }
+ _ => e,
+ })?;
+
+ self.tracked_consumers
+ .lock()
+ .entry_ref(reservation.consumer())
+ .and_modify(|bytes| {
+ bytes.fetch_add(additional as u64, Ordering::AcqRel);
+ });
+ Ok(())
+ }
+
+ fn reserved(&self) -> usize {
+ self.inner.reserved()
+ }
+}
+
+fn provide_top_memory_consumers_to_error_msg(
+ error_msg: String,
+ top_consumers: String,
+) -> String {
+ format!("Resources exhausted with top memory consumers (across
reservations) are: {}. Error: {}", top_consumers, error_msg)
+}
+
#[cfg(test)]
mod tests {
use super::*;
@@ -311,4 +466,224 @@ mod tests {
let err = r4.try_grow(30).unwrap_err().strip_backtrace();
assert_eq!(err, "Resources exhausted: Failed to allocate additional 30
bytes for s4 with 0 bytes already allocated - maximum available is 20");
}
+
+ #[test]
+ fn test_tracked_consumers_pool() {
+ let pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
+ GreedyMemoryPool::new(100),
+ NonZeroUsize::new(3).unwrap(),
+ ));
+
+ // Test: use all the different interfaces to change reservation size
+
+ // set r1=50, using grow and shrink
+ let mut r1 = MemoryConsumer::new("r1").register(&pool);
+ r1.grow(70);
+ r1.shrink(20);
+
+ // set r2=15 using try_grow
+ let mut r2 = MemoryConsumer::new("r2").register(&pool);
+ r2.try_grow(15)
+ .expect("should succeed in memory allotment for r2");
+
+ // set r3=20 using try_resize
+ let mut r3 = MemoryConsumer::new("r3").register(&pool);
+ r3.try_resize(25)
+ .expect("should succeed in memory allotment for r3");
+ r3.try_resize(20)
+ .expect("should succeed in memory allotment for r3");
+
+ // set r4=10
+ // this should not be reported in top 3
+ let mut r4 = MemoryConsumer::new("r4").register(&pool);
+ r4.grow(10);
+
+ // Test: reports if new reservation causes error
+ // using the previously set sizes for other consumers
+ let mut r5 = MemoryConsumer::new("r5").register(&pool);
+ let expected = "Resources exhausted with top memory consumers (across
reservations) are: r1 consumed 50 bytes, r3 consumed 20 bytes, r2 consumed 15
bytes. Error: Failed to allocate additional 150 bytes for r5 with 0 bytes
already allocated - maximum available is 5";
+ let res = r5.try_grow(150);
+ assert!(
+ matches!(
+ &res,
+ Err(DataFusionError::ResourcesExhausted(ref e)) if
e.to_string().contains(expected)
+ ),
+ "should provide list of top memory consumers, instead found {:?}",
+ res
+ );
+ }
+
+ #[test]
+ fn test_tracked_consumers_pool_register() {
+ let pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
+ GreedyMemoryPool::new(100),
+ NonZeroUsize::new(3).unwrap(),
+ ));
+
+ let same_name = "foo";
+
+ // Test: see error message when no consumers recorded yet
+ let mut r0 = MemoryConsumer::new(same_name).register(&pool);
+ let expected = "Resources exhausted with top memory consumers (across
reservations) are: foo consumed 0 bytes. Error: Failed to allocate additional
150 bytes for foo with 0 bytes already allocated - maximum available is 100";
+ let res = r0.try_grow(150);
+ assert!(
+ matches!(
+ &res,
+ Err(DataFusionError::ResourcesExhausted(ref e)) if
e.to_string().contains(expected)
+ ),
+ "should provide proper error when no reservations have been made
yet, instead found {:?}", res
+ );
+
+ // API: multiple registrations using the same hashed consumer,
+ // will be recognized as the same in the TrackConsumersPool.
+
+ // Test: will be the same per Top Consumers reported.
+ r0.grow(10); // make r0=10, pool available=90
+ let new_consumer_same_name = MemoryConsumer::new(same_name);
+ let mut r1 = new_consumer_same_name.clone().register(&pool);
+ // TODO: the insufficient_capacity_err() message is per reservation,
not per consumer.
+ // a followup PR will clarify this message "0 bytes already allocated
for this reservation"
+ let expected = "Resources exhausted with top memory consumers (across
reservations) are: foo consumed 10 bytes. Error: Failed to allocate additional
150 bytes for foo with 0 bytes already allocated - maximum available is 90";
+ let res = r1.try_grow(150);
+ assert!(
+ matches!(
+ &res,
+ Err(DataFusionError::ResourcesExhausted(ref e)) if
e.to_string().contains(expected)
+ ),
+ "should provide proper error with same hashed consumer (a single
foo=10 bytes, available=90), instead found {:?}", res
+ );
+
+ // Test: will accumulate size changes per consumer, not per reservation
+ r1.grow(20);
+ let expected = "Resources exhausted with top memory consumers (across
reservations) are: foo consumed 30 bytes. Error: Failed to allocate additional
150 bytes for foo with 20 bytes already allocated - maximum available is 70";
+ let res = r1.try_grow(150);
+ assert!(
+ matches!(
+ &res,
+ Err(DataFusionError::ResourcesExhausted(ref e)) if
e.to_string().contains(expected)
+ ),
+ "should provide proper error with same hashed consumer (a single
foo=30 bytes, available=70), instead found {:?}", res
+ );
+
+ // Test: different hashed consumer, (even with the same name),
+ // will be recognized as different in the TrackConsumersPool
+ let consumer_with_same_name_but_different_hash =
+ MemoryConsumer::new(same_name).with_can_spill(true);
+ let mut r2 =
consumer_with_same_name_but_different_hash.register(&pool);
+ let expected = "Resources exhausted with top memory consumers (across
reservations) are: foo(can_spill=false) consumed 30 bytes, foo(can_spill=true)
consumed 0 bytes. Error: Failed to allocate additional 150 bytes for foo with 0
bytes already allocated - maximum available is 70";
+ let res = r2.try_grow(150);
+ assert!(
+ matches!(
+ &res,
+ Err(DataFusionError::ResourcesExhausted(ref e)) if
e.to_string().contains(expected)
+ ),
+ "should provide proper error with different hashed consumer
(foo(can_spill=false)=30 bytes and foo(can_spill=true)=0 bytes, available=70),
instead found {:?}", res
+ );
+ }
+
+ #[test]
+ fn test_tracked_consumers_pool_deregister() {
+ fn test_per_pool_type(pool: Arc<dyn MemoryPool>) {
+ // Baseline: see the 2 memory consumers
+ let mut r0 = MemoryConsumer::new("r0").register(&pool);
+ r0.grow(10);
+ let r1_consumer = MemoryConsumer::new("r1");
+ let mut r1 = r1_consumer.clone().register(&pool);
+ r1.grow(20);
+ let expected = "Resources exhausted with top memory consumers
(across reservations) are: r1 consumed 20 bytes, r0 consumed 10 bytes. Error:
Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated
- maximum available is 70";
+ let res = r0.try_grow(150);
+ assert!(
+ matches!(
+ &res,
+ Err(DataFusionError::ResourcesExhausted(ref e)) if
e.to_string().contains(expected)
+ ),
+ "should provide proper error with both consumers, instead
found {:?}",
+ res
+ );
+
+ // Test: unregister one
+ // only the remaining one should be listed
+ pool.unregister(&r1_consumer);
+ let expected_consumers = "Resources exhausted with top memory
consumers (across reservations) are: r0 consumed 10 bytes";
+ let res = r0.try_grow(150);
+ assert!(
+ matches!(
+ &res,
+ Err(DataFusionError::ResourcesExhausted(ref e)) if
e.to_string().contains(expected_consumers)
+ ),
+ "should provide proper error with only 1 consumer left
registered, instead found {:?}", res
+ );
+
+ // Test: actual message we see is the `available is 70`. When it
should be `available is 90`.
+ // This is because the pool.shrink() does not automatically occur
within the inner_pool.deregister().
+ let expected_70_available = "Failed to allocate additional 150
bytes for r0 with 10 bytes already allocated - maximum available is 70";
+ let res = r0.try_grow(150);
+ assert!(
+ matches!(
+ &res,
+ Err(DataFusionError::ResourcesExhausted(ref e)) if
e.to_string().contains(expected_70_available)
+ ),
+ "should find that the inner pool will still count all bytes
for the deregistered consumer until the reservation is dropped, instead found
{:?}", res
+ );
+
+ // Test: the registration needs to free itself (or be dropped),
+ // for the proper error message
+ r1.free();
+ let expected_90_available = "Failed to allocate additional 150
bytes for r0 with 10 bytes already allocated - maximum available is 90";
+ let res = r0.try_grow(150);
+ assert!(
+ matches!(
+ &res,
+ Err(DataFusionError::ResourcesExhausted(ref e)) if
e.to_string().contains(expected_90_available)
+ ),
+ "should correctly account the total bytes after reservation is
free, instead found {:?}", res
+ );
+ }
+
+ let tracked_spill_pool: Arc<dyn MemoryPool> =
Arc::new(TrackConsumersPool::new(
+ FairSpillPool::new(100),
+ NonZeroUsize::new(3).unwrap(),
+ ));
+ test_per_pool_type(tracked_spill_pool);
+
+ let tracked_greedy_pool: Arc<dyn MemoryPool> =
Arc::new(TrackConsumersPool::new(
+ GreedyMemoryPool::new(100),
+ NonZeroUsize::new(3).unwrap(),
+ ));
+ test_per_pool_type(tracked_greedy_pool);
+ }
+
+ #[test]
+ fn test_tracked_consumers_pool_use_beyond_errors() {
+ let upcasted: Arc<dyn std::any::Any + Send + Sync> =
+ Arc::new(TrackConsumersPool::new(
+ GreedyMemoryPool::new(100),
+ NonZeroUsize::new(3).unwrap(),
+ ));
+ let pool: Arc<dyn MemoryPool> = Arc::clone(&upcasted)
+ .downcast::<TrackConsumersPool<GreedyMemoryPool>>()
+ .unwrap();
+ // set r1=20
+ let mut r1 = MemoryConsumer::new("r1").register(&pool);
+ r1.grow(20);
+ // set r2=15
+ let mut r2 = MemoryConsumer::new("r2").register(&pool);
+ r2.grow(15);
+ // set r3=45
+ let mut r3 = MemoryConsumer::new("r3").register(&pool);
+ r3.grow(45);
+
+ let downcasted = upcasted
+ .downcast::<TrackConsumersPool<GreedyMemoryPool>>()
+ .unwrap();
+
+ // Test: can get runtime metrics, even without an error thrown
+ let expected = "r3 consumed 45 bytes, r1 consumed 20 bytes";
+ let res = downcasted.report_top(2);
+ assert_eq!(
+ res, expected,
+ "should provide list of top memory consumers, instead found {:?}",
+ res
+ );
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]