This is an automated email from the ASF dual-hosted git repository. alamb pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push: new be3842b55d chore: add example for how to use TrackConsumersPool (#17213) be3842b55d is described below commit be3842b55d61095d77aae54426726ddd1eed5f2c Author: wiedld <wie...@users.noreply.github.com> AuthorDate: Fri Aug 22 14:13:29 2025 -0700 chore: add example for how to use TrackConsumersPool (#17213) * chore: add docs for how to use TrackConsumersPool * fix: error in example * chore: reference the default pool, not a specific pool * refactor: move code examples to datafusion-examples and reference in docs * chore: add more docs with links to examples * chore: examples cleanup * Make description more concise * Remove confusing example * Simplify example --------- Co-authored-by: Andrew Lamb <and...@nerdnetworks.org> --- datafusion-examples/README.md | 2 + .../examples/memory_pool_execution_plan.rs | 303 +++++++++++++++++++++ .../examples/memory_pool_tracking.rs | 127 +++++++++ datafusion/execution/src/memory_pool/pool.rs | 48 +++- datafusion/execution/src/runtime_env.rs | 3 +- datafusion/physical-plan/src/execution_plan.rs | 9 + 6 files changed, 490 insertions(+), 2 deletions(-) diff --git a/datafusion-examples/README.md b/datafusion-examples/README.md index 75a53bc568..8dadd0b9f8 100644 --- a/datafusion-examples/README.md +++ b/datafusion-examples/README.md @@ -65,6 +65,8 @@ cargo run --example dataframe - [`file_stream_provider.rs`](examples/file_stream_provider.rs): Run a query on `FileStreamProvider` which implements `StreamProvider` for reading and writing to arbitrary stream sources / sinks. - [`flight_sql_server.rs`](examples/flight/flight_sql_server.rs): Run DataFusion as a standalone process and execute SQL queries from JDBC clients - [`function_factory.rs`](examples/function_factory.rs): Register `CREATE FUNCTION` handler to implement SQL macros +- [`memory_pool_tracking.rs`](examples/memory_pool_tracking.rs): Demonstrates TrackConsumersPool for memory tracking and debugging with enhanced error messages +- [`memory_pool_execution_plan.rs`](examples/memory_pool_execution_plan.rs): Shows how to implement memory-aware ExecutionPlan with memory reservation and spilling - [`optimizer_rule.rs`](examples/optimizer_rule.rs): Use a custom OptimizerRule to replace certain predicates - [`parquet_embedded_index.rs`](examples/parquet_embedded_index.rs): Store a custom index inside a Parquet file and use it to speed up queries - [`parquet_encrypted.rs`](examples/parquet_encrypted.rs): Read and write encrypted Parquet files using DataFusion diff --git a/datafusion-examples/examples/memory_pool_execution_plan.rs b/datafusion-examples/examples/memory_pool_execution_plan.rs new file mode 100644 index 0000000000..7a77e99691 --- /dev/null +++ b/datafusion-examples/examples/memory_pool_execution_plan.rs @@ -0,0 +1,303 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This example demonstrates how to implement custom ExecutionPlans that properly +//! use memory tracking through TrackConsumersPool. +//! +//! This shows the pattern for implementing memory-aware operators that: +//! - Register memory consumers with the pool +//! - Reserve memory before allocating +//! - Handle memory pressure by spilling to disk +//! - Release memory when done + +use arrow::record_batch::RecordBatch; +use arrow_schema::SchemaRef; +use datafusion::common::record_batch; +use datafusion::datasource::{memory::MemTable, DefaultTableSource}; +use datafusion::error::{DataFusionError, Result}; +use datafusion::execution::memory_pool::{MemoryConsumer, MemoryReservation}; +use datafusion::execution::runtime_env::RuntimeEnvBuilder; +use datafusion::execution::{SendableRecordBatchStream, TaskContext}; +use datafusion::logical_expr::LogicalPlanBuilder; +use datafusion::physical_plan::stream::RecordBatchStreamAdapter; +use datafusion::physical_plan::{ + DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, Statistics, +}; +use datafusion::prelude::*; +use futures::stream::{StreamExt, TryStreamExt}; +use std::any::Any; +use std::fmt; +use std::sync::Arc; + +#[tokio::main] +async fn main() -> Result<(), Box<dyn std::error::Error>> { + println!("=== DataFusion ExecutionPlan Memory Tracking Example ===\n"); + + // Set up a runtime with memory tracking + // Set a low memory limit to trigger spilling on the second batch + let runtime = RuntimeEnvBuilder::new() + .with_memory_limit(15000, 1.0) // Allow only enough for 1 batch at once + .build_arc()?; + + let config = SessionConfig::new().with_coalesce_batches(false); + let ctx = SessionContext::new_with_config_rt(config, runtime.clone()); + + // Create smaller batches to ensure we get multiple RecordBatches from the scan + // Make each batch smaller than the memory limit to force multiple batches + let batch1 = record_batch!( + ("id", Int32, vec![1; 800]), + ("name", Utf8, vec!["Alice"; 800]) + )?; + + let batch2 = record_batch!( + ("id", Int32, vec![2; 800]), + ("name", Utf8, vec!["Bob"; 800]) + )?; + + let batch3 = record_batch!( + ("id", Int32, vec![3; 800]), + ("name", Utf8, vec!["Charlie"; 800]) + )?; + + let batch4 = record_batch!( + ("id", Int32, vec![4; 800]), + ("name", Utf8, vec!["David"; 800]) + )?; + + let schema = batch1.schema(); + + // Create a single MemTable with all batches in one partition to preserve order but ensure streaming + let mem_table = Arc::new(MemTable::try_new( + Arc::clone(&schema), + vec![vec![batch1, batch2, batch3, batch4]], // Single partition with multiple batches + )?); + + // Build logical plan with a single scan that will yield multiple batches + let table_source = Arc::new(DefaultTableSource::new(mem_table)); + let logical_plan = + LogicalPlanBuilder::scan("multi_batch_table", table_source, None)?.build()?; + + // Convert to physical plan + let physical_plan = ctx.state().create_physical_plan(&logical_plan).await?; + + println!("Example: Custom Memory-Aware BufferingExecutionPlan"); + println!("---------------------------------------------------"); + + // Wrap our input plan with our custom BufferingExecutionPlan + let buffering_plan = Arc::new(BufferingExecutionPlan::new(schema, physical_plan)); + + // Create a task context from our runtime + let task_ctx = Arc::new(TaskContext::default().with_runtime(runtime)); + + // Execute the plan directly to demonstrate memory tracking + println!("Executing BufferingExecutionPlan with memory tracking..."); + println!("Memory limit: 15000 bytes - should trigger spill on later batches\n"); + + let stream = buffering_plan.execute(0, task_ctx.clone())?; + let _results: Vec<RecordBatch> = stream.try_collect().await?; + + println!("\nSuccessfully executed BufferingExecutionPlan!"); + + println!("\nThe BufferingExecutionPlan processed 4 input batches and"); + println!("demonstrated memory tracking with spilling behavior when the"); + println!("memory limit was exceeded by later batches."); + println!("Check the console output above to see the spill messages."); + + Ok(()) +} + +/// Example of an external batch bufferer that uses memory reservation. +/// +/// It's a simple example which spills all existing data to disk +/// whenever the memory limit is reached. +struct ExternalBatchBufferer { + buffer: Vec<u8>, + reservation: MemoryReservation, + spill_count: usize, +} + +impl ExternalBatchBufferer { + fn new(reservation: MemoryReservation) -> Self { + Self { + buffer: Vec::new(), + reservation, + spill_count: 0, + } + } + + fn add_batch(&mut self, batch_data: Vec<u8>) -> Result<()> { + let additional_memory = batch_data.len(); + + // Try to reserve memory before allocating + if self.reservation.try_grow(additional_memory).is_err() { + // Memory limit reached - handle by spilling + println!( + "Memory limit reached, spilling {} bytes to disk", + self.buffer.len() + ); + self.spill_to_disk()?; + + // Try again after spilling + self.reservation.try_grow(additional_memory)?; + } + + self.buffer.extend_from_slice(&batch_data); + println!( + "Added batch of {} bytes, total buffered: {} bytes", + additional_memory, + self.buffer.len() + ); + Ok(()) + } + + fn spill_to_disk(&mut self) -> Result<()> { + // Simulate writing buffer to disk + self.spill_count += 1; + println!( + "Spill #{}: Writing {} bytes to disk", + self.spill_count, + self.buffer.len() + ); + + // Free the memory after spilling + let freed_bytes = self.buffer.len(); + self.buffer.clear(); + self.reservation.shrink(freed_bytes); + + Ok(()) + } + + fn finish(&mut self) -> Vec<u8> { + let result = std::mem::take(&mut self.buffer); + // Free the memory when done + self.reservation.free(); + println!("Finished processing, released {} bytes", result.len()); + result + } +} + +/// Example of an ExecutionPlan that uses the ExternalBatchBufferer. +#[derive(Debug)] +struct BufferingExecutionPlan { + schema: SchemaRef, + input: Arc<dyn ExecutionPlan>, + properties: PlanProperties, +} + +impl BufferingExecutionPlan { + fn new(schema: SchemaRef, input: Arc<dyn ExecutionPlan>) -> Self { + let properties = input.properties().clone(); + + Self { + schema, + input, + properties, + } + } +} + +impl DisplayAs for BufferingExecutionPlan { + fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "BufferingExecutionPlan") + } +} + +impl ExecutionPlan for BufferingExecutionPlan { + fn name(&self) -> &'static str { + "BufferingExecutionPlan" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn properties(&self) -> &PlanProperties { + &self.properties + } + + fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> { + vec![&self.input] + } + + fn with_new_children( + self: Arc<Self>, + children: Vec<Arc<dyn ExecutionPlan>>, + ) -> Result<Arc<dyn ExecutionPlan>> { + if children.len() == 1 { + Ok(Arc::new(BufferingExecutionPlan::new( + self.schema.clone(), + children[0].clone(), + ))) + } else { + Err(DataFusionError::Internal( + "BufferingExecutionPlan must have exactly one child".to_string(), + )) + } + } + + fn execute( + &self, + partition: usize, + context: Arc<TaskContext>, + ) -> Result<SendableRecordBatchStream> { + // Register memory consumer with the context's memory pool + let reservation = MemoryConsumer::new("MyExternalBatchBufferer") + .with_can_spill(true) + .register(context.memory_pool()); + + // Incoming stream of batches + let mut input_stream = self.input.execute(partition, context)?; + + // Process the stream and collect all batches + Ok(Box::pin(RecordBatchStreamAdapter::new( + self.schema(), + futures::stream::once(async move { + let mut operator = ExternalBatchBufferer::new(reservation); + + while let Some(batch) = input_stream.next().await { + let batch = batch?; + + // Convert RecordBatch to bytes for this example + let batch_data = vec![1u8; batch.get_array_memory_size()]; + + operator.add_batch(batch_data)?; + } + + // Finish processing and get results + let _final_result = operator.finish(); + // In a real implementation, you would convert final_result back to RecordBatches + + // Since this is a simplified example, return an empty batch + // In a real implementation, you would create a batch stream from the processed results + record_batch!(("id", Int32, vec![5]), ("name", Utf8, vec!["Eve"])) + .map_err(|e| { + DataFusionError::Execution(format!( + "Failed to create final RecordBatch: {e}", + )) + }) + }), + ))) + } + + fn statistics(&self) -> Result<Statistics> { + Ok(Statistics::new_unknown(&self.schema)) + } +} diff --git a/datafusion-examples/examples/memory_pool_tracking.rs b/datafusion-examples/examples/memory_pool_tracking.rs new file mode 100644 index 0000000000..287a1d25c8 --- /dev/null +++ b/datafusion-examples/examples/memory_pool_tracking.rs @@ -0,0 +1,127 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This example demonstrates how to use TrackConsumersPool for memory tracking and debugging. +//! +//! The TrackConsumersPool provides enhanced error messages that show the top memory consumers +//! when memory allocation fails, making it easier to debug memory issues in DataFusion queries. +//! +//! # Examples +//! +//! * [`automatic_usage_example`]: Shows how to use RuntimeEnvBuilder to automatically enable memory tracking + +use datafusion::execution::runtime_env::RuntimeEnvBuilder; +use datafusion::prelude::*; + +#[tokio::main] +async fn main() -> Result<(), Box<dyn std::error::Error>> { + println!("=== DataFusion Memory Pool Tracking Example ===\n"); + + // Example 1: Automatic Usage with RuntimeEnvBuilder + automatic_usage_example().await?; + + Ok(()) +} + +/// Example 1: Automatic Usage with RuntimeEnvBuilder +/// +/// This shows the recommended way to use TrackConsumersPool through RuntimeEnvBuilder, +/// which automatically creates a TrackConsumersPool with sensible defaults. +async fn automatic_usage_example() -> datafusion::error::Result<()> { + println!("Example 1: Automatic Usage with RuntimeEnvBuilder"); + println!("------------------------------------------------"); + + // Success case: Create a runtime with reasonable memory limit + println!("Success case: Normal operation with sufficient memory"); + let runtime = RuntimeEnvBuilder::new() + .with_memory_limit(5_000_000, 1.0) // 5MB, 100% utilization + .build_arc()?; + + let config = SessionConfig::new(); + let ctx = SessionContext::new_with_config_rt(config, runtime); + + // Create a simple table for demonstration + ctx.sql("CREATE TABLE test AS VALUES (1, 'a'), (2, 'b'), (3, 'c')") + .await? + .collect() + .await?; + + println!("✓ Created table with memory tracking enabled"); + + // Run a simple query to show it works + let results = ctx.sql("SELECT * FROM test").await?.collect().await?; + println!( + "✓ Query executed successfully. Found {} rows", + results.len() + ); + + println!("\n{}", "-".repeat(50)); + + // Error case: Create a runtime with low memory limit to trigger errors + println!("Error case: Triggering memory limit error with detailed error messages"); + + // Use a WITH query that generates data and then processes it to trigger memory usage + match ctx.sql(" + WITH large_dataset AS ( + SELECT + column1 as id, + column1 * 2 as doubled, + repeat('data_', 20) || column1 as text_field, + column1 * column1 as squared + FROM generate_series(1, 2000) as t(column1) + ), + aggregated AS ( + SELECT + id, + doubled, + text_field, + squared, + sum(doubled) OVER (ORDER BY id ROWS BETWEEN 100 PRECEDING AND CURRENT ROW) as running_sum + FROM large_dataset + ) + SELECT + a1.id, + a1.text_field, + a2.text_field as text_field2, + a1.running_sum + a2.running_sum as combined_sum + FROM aggregated a1 + JOIN aggregated a2 ON a1.id = a2.id - 1 + ORDER BY a1.id + ").await?.collect().await { + Ok(results) => panic!("Should not succeed! Yet got {} batches", results.len()), + Err(e) => { + println!("✓ Expected memory limit error during data processing:"); + println!("Error: {e}"); + /* Example error message: + Error: Not enough memory to continue external sort. Consider increasing the memory limit, or decreasing sort_spill_reservation_bytes + caused by + Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: + ExternalSorterMerge[3]#112(can spill: false) consumed 10.0 MB, + ExternalSorterMerge[10]#147(can spill: false) consumed 10.0 MB, + ExternalSorter[1]#93(can spill: true) consumed 69.0 KB, + ExternalSorter[13]#155(can spill: true) consumed 67.6 KB, + ExternalSorter[8]#140(can spill: true) consumed 67.2 KB. + Error: Failed to allocate additional 10.0 MB for ExternalSorterMerge[0] with 0.0 B already allocated for this reservation - 7.1 MB remain available for the total pool + */ + } + } + + println!("\nNote: The error message above shows which memory consumers"); + println!("were using the most memory when the limit was exceeded."); + + Ok(()) +} diff --git a/datafusion/execution/src/memory_pool/pool.rs b/datafusion/execution/src/memory_pool/pool.rs index 11467f69be..8ad61ba9f8 100644 --- a/datafusion/execution/src/memory_pool/pool.rs +++ b/datafusion/execution/src/memory_pool/pool.rs @@ -295,9 +295,25 @@ impl TrackedConsumer { /// /// By tracking memory reservations more carefully this pool /// can provide better error messages on the largest memory users +/// when memory allocation fails. /// /// Tracking is per hashed [`MemoryConsumer`], not per [`MemoryReservation`]. /// The same consumer can have multiple reservations. +/// +/// # Automatic Usage via [`RuntimeEnvBuilder`] +/// +/// The easiest way to use `TrackConsumersPool` is via +/// [`RuntimeEnvBuilder::with_memory_limit()`]. +/// +/// [`RuntimeEnvBuilder`]: crate::runtime_env::RuntimeEnvBuilder +/// [`RuntimeEnvBuilder::with_memory_limit()`]: crate::runtime_env::RuntimeEnvBuilder::with_memory_limit +/// +/// # Usage Examples +/// +/// For more examples of using `TrackConsumersPool`, see the [memory_pool_tracking.rs] example +/// +/// [memory_pool_tracking.rs]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/memory_pool_tracking.rs +/// [memory_pool_execution_plan.rs]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/memory_pool_execution_plan.rs #[derive(Debug)] pub struct TrackConsumersPool<I> { /// The wrapped memory pool that actually handles reservation logic @@ -311,6 +327,36 @@ pub struct TrackConsumersPool<I> { impl<I: MemoryPool> TrackConsumersPool<I> { /// Creates a new [`TrackConsumersPool`]. /// + /// # Arguments + /// * `inner` - The underlying memory pool that handles actual memory allocation + /// * `top` - The number of top memory consumers to include in error messages + /// + /// # Note + /// In most cases, you should use [`RuntimeEnvBuilder::with_memory_limit()`](crate::runtime_env::RuntimeEnvBuilder::with_memory_limit) + /// instead of creating this pool manually, as it automatically sets up tracking with + /// sensible defaults (top 5 consumers). + /// + /// # Example + /// + /// ```rust + /// use std::num::NonZeroUsize; + /// use datafusion_execution::memory_pool::{TrackConsumersPool, GreedyMemoryPool, FairSpillPool}; + /// + /// // Create with a greedy pool backend, reporting top 3 consumers in error messages + /// let tracked_greedy = TrackConsumersPool::new( + /// GreedyMemoryPool::new(1024 * 1024), // 1MB limit + /// NonZeroUsize::new(3).unwrap(), + /// ); + /// + /// // Create with a fair spill pool backend, reporting top 5 consumers in error messages + /// let tracked_fair = TrackConsumersPool::new( + /// FairSpillPool::new(2 * 1024 * 1024), // 2MB limit + /// NonZeroUsize::new(5).unwrap(), + /// ); + /// ``` + /// + /// # Impact on Error Messages + /// /// The `top` determines how many Top K [`MemoryConsumer`]s to include /// in the reported [`DataFusionError::ResourcesExhausted`]. pub fn new(inner: I, top: NonZeroUsize) -> Self { @@ -321,7 +367,7 @@ impl<I: MemoryPool> TrackConsumersPool<I> { } } - /// The top consumers in a report string. + /// Returns a formatted string with the top memory consumers. pub fn report_top(&self, top: usize) -> String { let mut consumers = self .tracked_consumers diff --git a/datafusion/execution/src/runtime_env.rs b/datafusion/execution/src/runtime_env.rs index 9c5de42bcd..db045a8b7e 100644 --- a/datafusion/execution/src/runtime_env.rs +++ b/datafusion/execution/src/runtime_env.rs @@ -258,7 +258,8 @@ impl RuntimeEnvBuilder { /// Specify the total memory to use while running the DataFusion /// plan to `max_memory * memory_fraction` in bytes. /// - /// This defaults to using [`GreedyMemoryPool`] + /// This defaults to using [`GreedyMemoryPool`] wrapped in the + /// [`TrackConsumersPool`] with a maximum of 5 consumers. /// /// Note DataFusion does not yet respect this limit in all cases. pub fn with_memory_limit(self, max_memory: usize, memory_fraction: f64) -> Self { diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index d4e0fe82bd..e708bbd3ec 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -74,6 +74,15 @@ use futures::stream::{StreamExt, TryStreamExt}; /// [`execute`]: ExecutionPlan::execute /// [`required_input_distribution`]: ExecutionPlan::required_input_distribution /// [`required_input_ordering`]: ExecutionPlan::required_input_ordering +/// +/// # Examples +/// +/// See [`datafusion-examples`] for examples, including +/// [`memory_pool_execution_plan.rs`] which shows how to implement a custom +/// `ExecutionPlan` with memory tracking and spilling support. +/// +/// [`datafusion-examples`]: https://github.com/apache/datafusion/tree/main/datafusion-examples +/// [`memory_pool_execution_plan.rs`]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/memory_pool_execution_plan.rs pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// Short name for the ExecutionPlan, such as 'DataSourceExec'. /// --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org