wiedld commented on code in PR #17213:
URL: https://github.com/apache/datafusion/pull/17213#discussion_r2280252038


##########
datafusion/execution/src/memory_pool/pool.rs:
##########
@@ -295,9 +295,228 @@ impl TrackedConsumer {
 ///
 /// By tracking memory reservations more carefully this pool
 /// can provide better error messages on the largest memory users
+/// when memory allocation fails.
 ///
 /// Tracking is per hashed [`MemoryConsumer`], not per [`MemoryReservation`].
 /// The same consumer can have multiple reservations.
+///
+/// # Automatic Usage with RuntimeEnvBuilder
+///
+/// The easiest way to use `TrackConsumersPool` is through 
[`RuntimeEnvBuilder::with_memory_limit()`](crate::runtime_env::RuntimeEnvBuilder::with_memory_limit),
+/// which automatically creates a `TrackConsumersPool` wrapping a 
[`GreedyMemoryPool`] with tracking
+/// for the top 5 memory consumers:
+///
+/// ```no_run
+/// # use datafusion_execution::runtime_env::RuntimeEnvBuilder;
+/// # use datafusion_execution::config::SessionConfig;
+/// # use std::sync::Arc;
+/// # async fn example() -> datafusion_common::Result<()> {
+/// // Create a runtime with 20MB memory limit and consumer tracking
+/// let runtime = RuntimeEnvBuilder::new()
+///     .with_memory_limit(20_000_000, 1.0)  // 20MB, 100% utilization
+///     .build_arc()?;
+///
+/// let config = SessionConfig::new();
+///
+/// // Note: In real usage, you would use datafusion::prelude::SessionContext
+/// // let ctx = SessionContext::new_with_config_rt(config, runtime);
+///
+/// // Register your table
+/// // ctx.register_table("t", table)?;
+///
+/// // Run a memory-intensive query
+/// let query = "
+///     COPY (select * from t)
+///     TO '/tmp/output.parquet'
+///     STORED AS PARQUET OPTIONS (compression 'uncompressed')";
+///
+/// // If memory is exhausted, you'll get detailed error messages like:
+/// // "Additional allocation failed with top memory consumers (across 
reservations) as:
+/// //   ParquetSink(ArrowColumnWriter)#123(can spill: false) consumed 15.2 MB,
+/// //   HashJoin#456(can spill: false) consumed 3.1 MB,
+/// //   Sort#789(can spill: true) consumed 1.8 MB,
+/// //   Aggregation#101(can spill: false) consumed 892.0 KB,
+/// //   Filter#202(can spill: false) consumed 156.0 KB.
+/// // Error: Failed to allocate additional 2.5 MB..."
+///
+/// // let result = ctx.sql(query).await?.collect().await;
+/// # Ok(())
+/// # }
+/// ```
+///
+/// # How to use in ExecutionPlan implementations
+///
+/// When implementing custom ExecutionPlans that need to use significant 
memory, you should
+/// use the memory pool to track and limit memory usage:
+///
+/// ```rust
+/// use std::sync::Arc;
+/// use datafusion_execution::memory_pool::{MemoryPool, MemoryConsumer, 
MemoryReservation};
+/// use datafusion_common::Result;
+///
+/// /// Example of an external batch bufferer that uses memory reservation.
+/// ///
+/// /// It's a simple example which spills all existing data to disk
+/// /// whenever the memory limit is reached.
+/// struct MyExternalBatchBufferer {
+///     buffer: Vec<u8>,
+///     reservation: MemoryReservation,
+/// }
+///
+/// impl MyExternalBatchBufferer {
+///     fn new(reservation: MemoryReservation) -> Self {
+///         Self {
+///             buffer: Vec::new(),
+///             reservation,
+///         }
+///     }
+///
+///     fn add_batch(&mut self, batch_data: Vec<u8>) -> Result<()> {
+///         let additional_memory = batch_data.len();
+///
+///         // Try to reserve memory before allocating
+///         if let Err(_) = self.reservation.try_grow(additional_memory) {
+///             // Memory limit reached - handle by spilling or other strategy
+///             self.spill_to_disk()?;
+///         }
+///
+///         // Now safe to allocate and add the data
+///         self.reservation.try_grow(additional_memory)?;
+///         self.buffer.extend_from_slice(&batch_data);
+///         Ok(())
+///     }
+///
+///     fn spill_to_disk(&mut self) -> Result<()> {
+///         // Write buffer to disk
+///         // ... spilling logic ...
+///
+///         // Free the memory after spilling
+///         let freed_bytes = self.buffer.len();
+///         self.buffer.clear();
+///         self.reservation.shrink(freed_bytes);
+///
+///         Ok(())
+///     }
+///
+///     fn finish(&mut self) -> Vec<u8> {
+///         let result = std::mem::take(&mut self.buffer);
+///         // Free the memory when done
+///         self.reservation.free();
+///         result
+///     }
+/// }
+///
+/// # #[cfg(feature = "example")]
+/// # {
+/// use futures::StreamExt;
+/// use datafusion_execution::{SendableRecordBatchStream, TaskContext};
+/// use datafusion_physical_plan::{ExecutionPlan, DisplayAs, 
DisplayFormatType};
+/// use datafusion_common::{internal_err, DataFusionError};
+/// use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
+/// use arrow_schema::{Schema, SchemaRef};
+/// use futures::stream::{self, StreamExt};
+/// use std::any::Any;
+/// use std::fmt;
+///
+/// /// Example of an ExecutionPlan that uses the MyExternalBatchBufferer.
+/// #[derive(Debug)]
+/// struct MyBufferingExecutionPlan {
+///     schema: SchemaRef,
+///     input: Arc<dyn ExecutionPlan>,
+/// }
+///
+/// impl MyBufferingExecutionPlan {
+///     fn new(schema: SchemaRef, input: Arc<dyn ExecutionPlan>) -> Self {
+///         Self { schema, input }
+///     }
+/// }
+///
+/// impl DisplayAs for MyBufferingExecutionPlan {
+///     fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> 
fmt::Result {
+///         write!(f, "MyBufferingExecutionPlan")
+///     }
+/// }
+///
+/// impl ExecutionPlan for MyBufferingExecutionPlan {
+///     fn name(&self) -> &'static str {
+///         "MyBufferingExecutionPlan"
+///     }
+///
+///     fn as_any(&self) -> &dyn Any {
+///         self
+///     }
+///
+///     fn schema(&self) -> SchemaRef {
+///         self.schema.clone()
+///     }
+///
+///     fn properties(&self) -> &datafusion_physical_expr::PlanProperties {
+///         todo!()
+///     }
+///
+///     fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+///         vec![&self.input]
+///     }
+///
+///     fn with_new_children(
+///         self: Arc<Self>,
+///         children: Vec<Arc<dyn ExecutionPlan>>,
+///     ) -> Result<Arc<dyn ExecutionPlan>> {
+///         todo!()
+///     }
+///
+///     fn execute(
+///         &self,
+///         partition: usize,
+///         context: Arc<TaskContext>,
+///     ) -> Result<SendableRecordBatchStream> {
+///         // Register memory consumer with the context's memory pool
+///         let reservation = MemoryConsumer::new("MyExternalBatchBufferer")
+///             .with_can_spill(true)
+///             .register(context.memory_pool());
+///
+///         let mut operator = MyExternalBatchBufferer::new(reservation);
+///
+///         // Process incoming stream of batches
+///         let stream = self.input.execute(partition, context)?;
+///
+///         // Process the stream and collect all batches
+///         let processed_stream = stream
+///             .map(|batch_result| {
+///                 batch_result.map(|batch| {
+///                     // Convert RecordBatch to bytes for this example
+///                     vec![1u8; batch.get_array_memory_size()]
+///                 })
+///             })
+///             .try_for_each(move |batch_data| async move {
+///                 operator.add_batch(batch_data)?;
+///                 Ok(())
+///             })
+///             .map(move |result| {
+///                 match result {
+///                     Ok(_) => {
+///                         // Finish processing and get results
+///                         let _final_result = operator.finish();
+///                         // In a real implementation, you would convert 
final_result back to RecordBatches
+///                     }
+///                     Err(_) => {
+///                         // Handle error case
+///                     }
+///                 }
+///             });
+///
+///         // Since this is a simplified example, return an empty stream
+///         // In a real implementation, you would create a stream from the 
processed results
+///         let result_stream = stream::empty();
+///         Ok(Box::pin(result_stream))
+///     }
+/// }
+/// # }
+/// ```
+///
+/// # Runtime Monitoring
+///
+/// TODO(kosiew/wiedld):  Docs to be added after 
<https://github.com/apache/datafusion/pull/17021/>.

Review Comment:
   Should I remove this for now? It's explaining where we can reference @kosiew 
new/incoming functionality.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to