adriangb commented on code in PR #18207:
URL: https://github.com/apache/datafusion/pull/18207#discussion_r2461395234


##########
datafusion/physical-plan/src/spill/spill_pool.rs:
##########
@@ -0,0 +1,1099 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! SpillPool: A reusable abstraction for managing spill files with FIFO 
semantics.
+//!
+//! # Overview
+//!
+//! The `SpillPool` provides a centralized mechanism for spilling record 
batches to disk
+//! when memory is constrained. It manages a collection of spill files, each 
containing
+//! multiple batches, with configurable maximum file sizes.
+//!
+//! # Design
+//!
+//! - **FIFO (Queue) semantics**: Batches are read in the order they were 
spilled
+//! - **File handle reuse**: Multiple batches are written to the same file to 
minimize syscalls
+//! - **Automatic file rotation**: When a file exceeds `max_file_size_bytes`, 
rotate to a new file
+//! - **Sequential reading**: Uses IPC Stream format's natural sequential 
access pattern
+//! - **Automatic cleanup**: Files are deleted once fully consumed
+//!
+//! # Usage Example
+//!
+//! ```ignore
+//! use std::sync::Arc;
+//! use parking_lot::Mutex;
+//!
+//! let pool = SpillPool::new(
+//!     100 * 1024 * 1024,  // 100MB max per file
+//!     spill_manager,
+//!     schema,
+//! );
+//! let pool = Arc::new(Mutex::new(pool));
+//!
+//! // Spill batches - automatically rotates files when size limit reached
+//! {
+//!     let mut pool = pool.lock();
+//!     pool.push_batch(batch1)?;
+//!     pool.push_batch(batch2)?;
+//!     pool.flush()?;  // Finalize current file
+//!     pool.finalize(); // Signal no more writes
+//! }
+//!
+//! // Read back in FIFO order using a stream
+//! let mut stream = SpillPool::reader(pool, spill_manager);
+//! let batch1 = stream.next().await.unwrap()?;  // Returns batch1
+//! let batch2 = stream.next().await.unwrap()?;  // Returns batch2
+//! // stream.next() returns None after finalize
+//! ```
+
+use std::collections::VecDeque;
+use std::sync::Arc;
+use std::task::Waker;
+
+use parking_lot::Mutex;
+
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+use datafusion_common::Result;
+use datafusion_execution::disk_manager::RefCountedTempFile;
+use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+
+use super::in_progress_spill_file::InProgressSpillFile;
+use super::spill_manager::SpillManager;
+
+/// A pool of spill files that manages batch-level spilling with FIFO 
semantics.
+///
+/// Batches are written sequentially to files, with automatic rotation when the
+/// configured size limit is reached. Reading is done via an infinite stream
+/// that can read concurrently while writes continue.
+///
+/// # Thread Safety
+///
+/// `SpillPool` is not thread-safe and should be used from a single thread or
+/// protected with appropriate synchronization (e.g., `Arc<Mutex<SpillPool>>`).
+pub struct SpillPool {
+    /// Maximum size in bytes before rotating to a new file
+    max_file_size_bytes: usize,
+    /// Queue of spill files (front = oldest, back = newest)
+    files: VecDeque<RefCountedTempFile>,
+    /// Current file being written to (if any)
+    current_write_file: Option<InProgressSpillFile>,
+    /// Size of current write file in bytes (estimated)
+    current_write_size: usize,
+    /// Number of batches written to current file
+    current_batch_count: usize,

Review Comment:
   done



##########
datafusion/physical-plan/src/spill/spill_pool.rs:
##########
@@ -0,0 +1,1099 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! SpillPool: A reusable abstraction for managing spill files with FIFO 
semantics.
+//!
+//! # Overview
+//!
+//! The `SpillPool` provides a centralized mechanism for spilling record 
batches to disk
+//! when memory is constrained. It manages a collection of spill files, each 
containing
+//! multiple batches, with configurable maximum file sizes.
+//!
+//! # Design
+//!
+//! - **FIFO (Queue) semantics**: Batches are read in the order they were 
spilled
+//! - **File handle reuse**: Multiple batches are written to the same file to 
minimize syscalls
+//! - **Automatic file rotation**: When a file exceeds `max_file_size_bytes`, 
rotate to a new file
+//! - **Sequential reading**: Uses IPC Stream format's natural sequential 
access pattern
+//! - **Automatic cleanup**: Files are deleted once fully consumed
+//!
+//! # Usage Example
+//!
+//! ```ignore
+//! use std::sync::Arc;
+//! use parking_lot::Mutex;
+//!
+//! let pool = SpillPool::new(
+//!     100 * 1024 * 1024,  // 100MB max per file
+//!     spill_manager,
+//!     schema,
+//! );
+//! let pool = Arc::new(Mutex::new(pool));
+//!
+//! // Spill batches - automatically rotates files when size limit reached
+//! {
+//!     let mut pool = pool.lock();
+//!     pool.push_batch(batch1)?;
+//!     pool.push_batch(batch2)?;
+//!     pool.flush()?;  // Finalize current file
+//!     pool.finalize(); // Signal no more writes
+//! }
+//!
+//! // Read back in FIFO order using a stream
+//! let mut stream = SpillPool::reader(pool, spill_manager);
+//! let batch1 = stream.next().await.unwrap()?;  // Returns batch1
+//! let batch2 = stream.next().await.unwrap()?;  // Returns batch2
+//! // stream.next() returns None after finalize
+//! ```
+
+use std::collections::VecDeque;
+use std::sync::Arc;
+use std::task::Waker;
+
+use parking_lot::Mutex;
+
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+use datafusion_common::Result;
+use datafusion_execution::disk_manager::RefCountedTempFile;
+use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+
+use super::in_progress_spill_file::InProgressSpillFile;
+use super::spill_manager::SpillManager;
+
+/// A pool of spill files that manages batch-level spilling with FIFO 
semantics.
+///
+/// Batches are written sequentially to files, with automatic rotation when the
+/// configured size limit is reached. Reading is done via an infinite stream
+/// that can read concurrently while writes continue.
+///
+/// # Thread Safety
+///
+/// `SpillPool` is not thread-safe and should be used from a single thread or
+/// protected with appropriate synchronization (e.g., `Arc<Mutex<SpillPool>>`).
+pub struct SpillPool {
+    /// Maximum size in bytes before rotating to a new file
+    max_file_size_bytes: usize,
+    /// Queue of spill files (front = oldest, back = newest)
+    files: VecDeque<RefCountedTempFile>,
+    /// Current file being written to (if any)
+    current_write_file: Option<InProgressSpillFile>,
+    /// Size of current write file in bytes (estimated)
+    current_write_size: usize,
+    /// Number of batches written to current file
+    current_batch_count: usize,
+    /// SpillManager for creating files and tracking metrics
+    spill_manager: Arc<SpillManager>,
+    /// Schema for batches (used by SpillPoolStream to implement 
RecordBatchStream)
+    schema: SchemaRef,

Review Comment:
   👍🏻 done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to