alamb commented on code in PR #17213:
URL: https://github.com/apache/datafusion/pull/17213#discussion_r2294534396


##########
datafusion-examples/examples/memory_pool_tracking.rs:
##########
@@ -0,0 +1,191 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This example demonstrates how to use TrackConsumersPool for memory 
tracking and debugging.
+//!
+//! The TrackConsumersPool provides enhanced error messages that show the top 
memory consumers
+//! when memory allocation fails, making it easier to debug memory issues in 
DataFusion queries.
+//!
+//! # Examples
+//!
+//! * [`automatic_usage_example`]: Shows how to use RuntimeEnvBuilder to 
automatically enable memory tracking
+//! * [`manual_tracking_example`]: Shows how to manually create and configure 
a TrackConsumersPool
+
+use datafusion::execution::memory_pool::{
+    GreedyMemoryPool, MemoryConsumer, MemoryPool, TrackConsumersPool,
+};
+use datafusion::execution::runtime_env::RuntimeEnvBuilder;
+use datafusion::prelude::*;
+use std::num::NonZeroUsize;
+use std::sync::Arc;
+
+#[tokio::main]
+async fn main() -> Result<(), Box<dyn std::error::Error>> {
+    println!("=== DataFusion Memory Pool Tracking Example ===\n");
+
+    // Example 1: Automatic Usage with RuntimeEnvBuilder
+    automatic_usage_example().await?;
+
+    println!("\n{}\n", "=".repeat(60));
+
+    // Example 2: Manual tracking with custom consumers
+    manual_tracking_example()?;
+
+    println!("\n{}\n", "=".repeat(60));
+
+    Ok(())
+}
+
+/// Example 1: Automatic Usage with RuntimeEnvBuilder
+///
+/// This shows the recommended way to use TrackConsumersPool through 
RuntimeEnvBuilder,
+/// which automatically creates a TrackConsumersPool with sensible defaults.
+async fn automatic_usage_example() -> datafusion::error::Result<()> {
+    println!("Example 1: Automatic Usage with RuntimeEnvBuilder");
+    println!("------------------------------------------------");
+
+    // Success case: Create a runtime with reasonable memory limit
+    println!("Success case: Normal operation with sufficient memory");
+    let runtime = RuntimeEnvBuilder::new()
+        .with_memory_limit(5_000_000, 1.0) // 5MB, 100% utilization
+        .build_arc()?;
+
+    let config = SessionConfig::new();
+    let ctx = SessionContext::new_with_config_rt(config, runtime);
+
+    // Create a simple table for demonstration
+    ctx.sql("CREATE TABLE test AS VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+        .await?
+        .collect()
+        .await?;
+
+    println!("✓ Created table with memory tracking enabled");
+
+    // Run a simple query to show it works
+    let results = ctx.sql("SELECT * FROM test").await?.collect().await?;
+    println!(
+        "✓ Query executed successfully. Found {} rows",
+        results.len()
+    );
+
+    println!("\n{}", "-".repeat(50));
+
+    // Error case: Create a runtime with low memory limit to trigger errors
+    println!("Error case: Triggering memory limit error with detailed error 
messages");
+
+    // Use a WITH query that generates data and then processes it to trigger 
memory usage
+    match ctx.sql("
+        WITH large_dataset AS (
+            SELECT
+                column1 as id,
+                column1 * 2 as doubled,
+                repeat('data_', 20) || column1 as text_field,
+                column1 * column1 as squared
+            FROM generate_series(1, 2000) as t(column1)
+        ),
+        aggregated AS (
+            SELECT
+                id,
+                doubled,
+                text_field,
+                squared,
+                sum(doubled) OVER (ORDER BY id ROWS BETWEEN 100 PRECEDING AND 
CURRENT ROW) as running_sum
+            FROM large_dataset
+        )
+        SELECT
+            a1.id,
+            a1.text_field,
+            a2.text_field as text_field2,
+            a1.running_sum + a2.running_sum as combined_sum
+        FROM aggregated a1
+        JOIN aggregated a2 ON a1.id = a2.id - 1
+        ORDER BY a1.id
+    ").await?.collect().await {
+        Ok(results) => panic!("Should not succeed! Yet got {} batches", 
results.len()),
+        Err(e) => {
+            println!("✓ Expected memory limit error during data processing:");
+            println!("Error: {e}");
+            /* Example error message:
+                Error: Not enough memory to continue external sort. Consider 
increasing the memory limit, or decreasing sort_spill_reservation_bytes
+                caused by
+                    Resources exhausted: Additional allocation failed with top 
memory consumers (across reservations) as:
+                    ExternalSorterMerge[3]#112(can spill: false) consumed 10.0 
MB,
+                    ExternalSorterMerge[10]#147(can spill: false) consumed 
10.0 MB,
+                    ExternalSorter[1]#93(can spill: true) consumed 69.0 KB,
+                    ExternalSorter[13]#155(can spill: true) consumed 67.6 KB,
+                    ExternalSorter[8]#140(can spill: true) consumed 67.2 KB.
+                Error: Failed to allocate additional 10.0 MB for 
ExternalSorterMerge[0] with 0.0 B already allocated for this reservation - 7.1 
MB remain available for the total pool
+             */
+        }
+    }
+
+    println!("\nNote: The error message above shows which memory consumers");
+    println!("were using the most memory when the limit was exceeded.");
+
+    Ok(())
+}
+
+/// Example 2: Manual tracking with custom consumers
+///
+/// This shows how to manually create and use memory consumers with tracking.

Review Comment:
   I am not sure this example adds much value as I don't expect people to 
generate their own consumers outside the context of writing an operator (which 
is covered by the other example)
   
   Let's remove it from this PR for now



##########
datafusion-examples/examples/memory_pool_execution_plan.rs:
##########
@@ -0,0 +1,324 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This example demonstrates how to implement custom ExecutionPlans that 
properly
+//! use memory tracking through TrackConsumersPool.
+//!
+//! This shows the pattern for implementing memory-aware operators that:
+//! - Register memory consumers with the pool
+//! - Reserve memory before allocating
+//! - Handle memory pressure by spilling to disk
+//! - Release memory when done
+
+use arrow::array::{Int32Array, StringArray};
+use arrow::record_batch::RecordBatch;
+use arrow_schema::{DataType, Field, Schema, SchemaRef};
+use datafusion::datasource::{memory::MemTable, DefaultTableSource};
+use datafusion::error::{DataFusionError, Result};
+use datafusion::execution::memory_pool::{MemoryConsumer, MemoryReservation};
+use datafusion::execution::runtime_env::RuntimeEnvBuilder;
+use datafusion::execution::{SendableRecordBatchStream, TaskContext};
+use datafusion::logical_expr::LogicalPlanBuilder;
+use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
+use datafusion::physical_plan::{
+    DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, Statistics,
+};
+use datafusion::prelude::*;
+use futures::stream::{StreamExt, TryStreamExt};
+use std::any::Any;
+use std::fmt;
+use std::sync::Arc;
+
+#[tokio::main]
+async fn main() -> Result<(), Box<dyn std::error::Error>> {
+    println!("=== DataFusion ExecutionPlan Memory Tracking Example ===\n");
+
+    // Set up a runtime with memory tracking
+    // Set a low memory limit to trigger spilling on the second batch
+    let runtime = RuntimeEnvBuilder::new()
+        .with_memory_limit(15000, 1.0) // Allow only enough for 1 batch at once
+        .build_arc()?;
+
+    let config = SessionConfig::new().with_coalesce_batches(false);
+    let ctx = SessionContext::new_with_config_rt(config, runtime.clone());
+
+    // Create some test data
+    let schema = Arc::new(Schema::new(vec![
+        Field::new("id", DataType::Int32, false),
+        Field::new("name", DataType::Utf8, false),
+    ]));
+
+    // Create smaller batches to ensure we get multiple RecordBatches from the 
scan
+    // Make each batch smaller than the memory limit to force multiple batches
+    let batch1 = RecordBatch::try_new(

Review Comment:
   maybe this would be a good use for the 
https://docs.rs/arrow/latest/arrow/array/macro.record_batch.html macro



##########
datafusion-examples/examples/memory_pool_execution_plan.rs:
##########
@@ -0,0 +1,324 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This example demonstrates how to implement custom ExecutionPlans that 
properly
+//! use memory tracking through TrackConsumersPool.
+//!
+//! This shows the pattern for implementing memory-aware operators that:
+//! - Register memory consumers with the pool
+//! - Reserve memory before allocating
+//! - Handle memory pressure by spilling to disk
+//! - Release memory when done
+
+use arrow::array::{Int32Array, StringArray};
+use arrow::record_batch::RecordBatch;
+use arrow_schema::{DataType, Field, Schema, SchemaRef};
+use datafusion::datasource::{memory::MemTable, DefaultTableSource};
+use datafusion::error::{DataFusionError, Result};
+use datafusion::execution::memory_pool::{MemoryConsumer, MemoryReservation};
+use datafusion::execution::runtime_env::RuntimeEnvBuilder;
+use datafusion::execution::{SendableRecordBatchStream, TaskContext};
+use datafusion::logical_expr::LogicalPlanBuilder;
+use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
+use datafusion::physical_plan::{
+    DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, Statistics,
+};
+use datafusion::prelude::*;
+use futures::stream::{StreamExt, TryStreamExt};
+use std::any::Any;
+use std::fmt;
+use std::sync::Arc;
+
+#[tokio::main]
+async fn main() -> Result<(), Box<dyn std::error::Error>> {
+    println!("=== DataFusion ExecutionPlan Memory Tracking Example ===\n");
+
+    // Set up a runtime with memory tracking
+    // Set a low memory limit to trigger spilling on the second batch
+    let runtime = RuntimeEnvBuilder::new()
+        .with_memory_limit(15000, 1.0) // Allow only enough for 1 batch at once
+        .build_arc()?;
+
+    let config = SessionConfig::new().with_coalesce_batches(false);
+    let ctx = SessionContext::new_with_config_rt(config, runtime.clone());
+
+    // Create some test data
+    let schema = Arc::new(Schema::new(vec![
+        Field::new("id", DataType::Int32, false),
+        Field::new("name", DataType::Utf8, false),
+    ]));
+
+    // Create smaller batches to ensure we get multiple RecordBatches from the 
scan
+    // Make each batch smaller than the memory limit to force multiple batches
+    let batch1 = RecordBatch::try_new(
+        schema.clone(),
+        vec![
+            Arc::new(Int32Array::from([1; 800].to_vec())), // Much smaller
+            Arc::new(StringArray::from(["Alice"; 800].to_vec())),
+        ],
+    )?;
+    let batch2 = RecordBatch::try_new(
+        schema.clone(),
+        vec![
+            Arc::new(Int32Array::from([2; 800].to_vec())),
+            Arc::new(StringArray::from(["Bob"; 800].to_vec())),
+        ],
+    )?;
+    let batch3 = RecordBatch::try_new(
+        schema.clone(),
+        vec![
+            Arc::new(Int32Array::from([3; 800].to_vec())),
+            Arc::new(StringArray::from(["Charlie"; 800].to_vec())),
+        ],
+    )?;
+    let batch4 = RecordBatch::try_new(
+        schema.clone(),
+        vec![
+            Arc::new(Int32Array::from([4; 800].to_vec())),
+            Arc::new(StringArray::from(["David"; 800].to_vec())),
+        ],
+    )?;
+
+    // Create a single MemTable with all batches in one partition to preserve 
order but ensure streaming
+    let mem_table = Arc::new(MemTable::try_new(
+        schema.clone(),
+        vec![vec![batch1, batch2, batch3, batch4]], // Single partition with 
multiple batches
+    )?);
+
+    // Build logical plan with a single scan that will yield multiple batches
+    let table_source = Arc::new(DefaultTableSource::new(mem_table));
+    let logical_plan =
+        LogicalPlanBuilder::scan("multi_batch_table", table_source, 
None)?.build()?;
+
+    // Convert to physical plan
+    let physical_plan = ctx.state().create_physical_plan(&logical_plan).await?;
+
+    println!("Example: Custom Memory-Aware BufferingExecutionPlan");
+    println!("---------------------------------------------------");
+
+    // Wrap our input plan with our custom BufferingExecutionPlan
+    let buffering_plan =
+        Arc::new(BufferingExecutionPlan::new(schema.clone(), physical_plan));
+
+    // Create a task context from our runtime
+    let task_ctx = 
Arc::new(TaskContext::default().with_runtime(runtime.clone()));
+
+    // Execute the plan directly to demonstrate memory tracking
+    println!("Executing BufferingExecutionPlan with memory tracking...");
+    println!("Memory limit: 15000 bytes - should trigger spill on later 
batches\n");
+
+    let stream = buffering_plan.execute(0, task_ctx.clone())?;
+    let _results: Vec<RecordBatch> = stream.try_collect().await?;
+
+    println!("\nSuccessfully executed BufferingExecutionPlan!");
+
+    println!("\nThe BufferingExecutionPlan processed 4 input batches and");
+    println!("demonstrated memory tracking with spilling behavior when the");
+    println!("memory limit was exceeded by later batches.");
+    println!("Check the console output above to see the spill messages.");
+
+    Ok(())
+}
+
+/// Example of an external batch bufferer that uses memory reservation.
+///
+/// It's a simple example which spills all existing data to disk
+/// whenever the memory limit is reached.
+struct ExternalBatchBufferer {
+    buffer: Vec<u8>,
+    reservation: MemoryReservation,
+    spill_count: usize,
+}
+
+impl ExternalBatchBufferer {
+    fn new(reservation: MemoryReservation) -> Self {
+        Self {
+            buffer: Vec::new(),
+            reservation,
+            spill_count: 0,
+        }
+    }
+
+    fn add_batch(&mut self, batch_data: Vec<u8>) -> Result<()> {
+        let additional_memory = batch_data.len();
+
+        // Try to reserve memory before allocating
+        if self.reservation.try_grow(additional_memory).is_err() {
+            // Memory limit reached - handle by spilling
+            println!(
+                "Memory limit reached, spilling {} bytes to disk",
+                self.buffer.len()
+            );
+            self.spill_to_disk()?;
+
+            // Try again after spilling
+            self.reservation.try_grow(additional_memory)?;
+        }
+
+        self.buffer.extend_from_slice(&batch_data);
+        println!(
+            "Added batch of {} bytes, total buffered: {} bytes",
+            additional_memory,
+            self.buffer.len()
+        );
+        Ok(())
+    }
+
+    fn spill_to_disk(&mut self) -> Result<()> {
+        // Simulate writing buffer to disk

Review Comment:
   "simulate" lol 



##########
datafusion/execution/src/memory_pool/pool.rs:
##########
@@ -295,9 +295,72 @@ impl TrackedConsumer {
 ///
 /// By tracking memory reservations more carefully this pool
 /// can provide better error messages on the largest memory users
+/// when memory allocation fails.
 ///
 /// Tracking is per hashed [`MemoryConsumer`], not per [`MemoryReservation`].
 /// The same consumer can have multiple reservations.
+///
+/// # Automatic Usage with RuntimeEnvBuilder
+///
+/// The easiest way to use `TrackConsumersPool` is through 
[`RuntimeEnvBuilder::with_memory_limit()`](crate::runtime_env::RuntimeEnvBuilder::with_memory_limit),

Review Comment:
   this example seems redundant with the example you added, so I'll remove it 
for now



##########
datafusion-examples/examples/memory_pool_tracking.rs:
##########
@@ -0,0 +1,191 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This example demonstrates how to use TrackConsumersPool for memory 
tracking and debugging.
+//!
+//! The TrackConsumersPool provides enhanced error messages that show the top 
memory consumers
+//! when memory allocation fails, making it easier to debug memory issues in 
DataFusion queries.
+//!
+//! # Examples
+//!
+//! * [`automatic_usage_example`]: Shows how to use RuntimeEnvBuilder to 
automatically enable memory tracking
+//! * [`manual_tracking_example`]: Shows how to manually create and configure 
a TrackConsumersPool
+
+use datafusion::execution::memory_pool::{
+    GreedyMemoryPool, MemoryConsumer, MemoryPool, TrackConsumersPool,
+};
+use datafusion::execution::runtime_env::RuntimeEnvBuilder;
+use datafusion::prelude::*;
+use std::num::NonZeroUsize;
+use std::sync::Arc;
+
+#[tokio::main]
+async fn main() -> Result<(), Box<dyn std::error::Error>> {
+    println!("=== DataFusion Memory Pool Tracking Example ===\n");
+
+    // Example 1: Automatic Usage with RuntimeEnvBuilder
+    automatic_usage_example().await?;
+
+    println!("\n{}\n", "=".repeat(60));
+
+    // Example 2: Manual tracking with custom consumers
+    manual_tracking_example()?;
+
+    println!("\n{}\n", "=".repeat(60));
+
+    Ok(())
+}
+
+/// Example 1: Automatic Usage with RuntimeEnvBuilder
+///
+/// This shows the recommended way to use TrackConsumersPool through 
RuntimeEnvBuilder,
+/// which automatically creates a TrackConsumersPool with sensible defaults.
+async fn automatic_usage_example() -> datafusion::error::Result<()> {
+    println!("Example 1: Automatic Usage with RuntimeEnvBuilder");
+    println!("------------------------------------------------");
+
+    // Success case: Create a runtime with reasonable memory limit
+    println!("Success case: Normal operation with sufficient memory");
+    let runtime = RuntimeEnvBuilder::new()
+        .with_memory_limit(5_000_000, 1.0) // 5MB, 100% utilization
+        .build_arc()?;
+
+    let config = SessionConfig::new();
+    let ctx = SessionContext::new_with_config_rt(config, runtime);
+
+    // Create a simple table for demonstration
+    ctx.sql("CREATE TABLE test AS VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+        .await?
+        .collect()
+        .await?;
+
+    println!("✓ Created table with memory tracking enabled");
+
+    // Run a simple query to show it works
+    let results = ctx.sql("SELECT * FROM test").await?.collect().await?;
+    println!(
+        "✓ Query executed successfully. Found {} rows",
+        results.len()
+    );
+
+    println!("\n{}", "-".repeat(50));
+
+    // Error case: Create a runtime with low memory limit to trigger errors
+    println!("Error case: Triggering memory limit error with detailed error 
messages");
+
+    // Use a WITH query that generates data and then processes it to trigger 
memory usage

Review Comment:
   I think this will be a nice base to start from when implementing more 
sophisticated memory reporting / tracking
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to