2010YOUY01 commented on code in PR #18207: URL: https://github.com/apache/datafusion/pull/18207#discussion_r2498047017
########## datafusion/physical-plan/src/spill/spill_pool.rs: ########## @@ -0,0 +1,1264 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Spill pool for managing spill files with FIFO semantics. + +use futures::{Stream, StreamExt}; +use std::collections::VecDeque; +use std::sync::Arc; +use std::task::Waker; + +use parking_lot::Mutex; + +use arrow::datatypes::SchemaRef; +use arrow::record_batch::RecordBatch; +use datafusion_common::Result; +use datafusion_execution::disk_manager::RefCountedTempFile; +use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream}; + +use super::in_progress_spill_file::InProgressSpillFile; +use super::spill_manager::SpillManager; + +/// Shared state between the writer and readers of a spill pool. +/// This contains the queue of files and coordination state. +struct SpillPoolShared { + /// Queue of ALL files (including the current write file if it exists). + /// Readers always read from the front of this queue (FIFO). + files: VecDeque<Arc<Mutex<ActiveSpillFileShared>>>, + /// SpillManager for creating files and tracking metrics + spill_manager: Arc<SpillManager>, + /// Pool-level wakers to notify when new files are available + wakers: Vec<Waker>, +} + +impl SpillPoolShared { + /// Creates a new shared pool state + fn new(spill_manager: Arc<SpillManager>) -> Self { + Self { + files: VecDeque::new(), + spill_manager, + wakers: Vec::new(), + } + } + + /// Registers a waker to be notified when new data is available (pool-level) + fn register_waker(&mut self, waker: Waker) { + // Only register if not already present (avoid duplicates) + if !self.wakers.iter().any(|w| w.will_wake(&waker)) { + self.wakers.push(waker); + } + } + + /// Wakes all pool-level readers + fn wake(&mut self) { + for waker in self.wakers.drain(..) { + waker.wake(); + } + } +} + +/// Writer for a spill pool. Provides exclusive write access with FIFO semantics. +/// +/// Created by [`channel`]. See that function for architecture diagrams and usage examples. +/// +/// The writer automatically manages file rotation based on the `max_file_size_bytes` +/// configured in [`channel`]. When dropped, it finalizes the current file so readers +/// can access all written data. +pub struct SpillPoolWriter { + /// Maximum size in bytes before rotating to a new file + max_file_size_bytes: usize, + /// Writer's reference to the current file (also in the shared files queue) + current_write_file: Option<Arc<Mutex<ActiveSpillFileShared>>>, + /// Shared state with readers + shared: Arc<Mutex<SpillPoolShared>>, +} + +impl SpillPoolWriter { + /// Spills a batch to the pool, rotating files when necessary. + /// + /// If the current file would exceed `max_file_size_bytes` after adding + /// this batch, the file is finalized and a new one is started. + /// + /// See [`channel`] for overall architecture and examples. + /// + /// # File Rotation Logic + /// + /// ```text + /// push_batch() + /// │ + /// ▼ + /// Current file exists? + /// │ + /// ├─ No ──▶ Create new file ──▶ Add to shared queue + /// │ Wake readers + /// ▼ + /// Write batch to current file + /// │ + /// ▼ + /// estimated_size > max_file_size_bytes? + /// │ + /// ├─ No ──▶ Keep current file for next batch + /// │ + /// ▼ + /// Yes: finish() current file + /// Mark writer_finished = true + /// Wake readers + /// │ + /// ▼ + /// Next push_batch() creates new file + /// ``` + /// + /// # Errors + /// + /// Returns an error if disk I/O fails or disk quota is exceeded. + pub fn push_batch(&mut self, batch: &RecordBatch) -> Result<()> { + if batch.num_rows() == 0 { + // Skip empty batches + return Ok(()); + } + + let batch_size = batch.get_array_memory_size(); + + // Create new file if we don't have one yet + if self.current_write_file.is_none() { + let spill_manager = { + let shared = self.shared.lock(); + Arc::clone(&shared.spill_manager) + }; + + let writer = spill_manager.create_in_progress_file("SpillPool")?; + // Clone the file so readers can access it immediately + let file = writer.file().expect("InProgressSpillFile should always have a file when it is first created").clone(); + + let file_shared = Arc::new(Mutex::new(ActiveSpillFileShared { + writer: Some(writer), + file: Some(file), // Set immediately so readers can access it + batches_written: 0, + estimated_size: 0, + writer_finished: false, + wakers: Vec::new(), + })); + + // Push to shared queue and keep reference for writing + { + let mut shared = self.shared.lock(); + shared.files.push_back(Arc::clone(&file_shared)); + shared.wake(); // Wake readers waiting for new files + } + self.current_write_file = Some(file_shared); + } + + let current_write_file = self.current_write_file.take(); + + // Write batch to current file + if let Some(current_file) = current_write_file { + let mut file_shared = current_file.lock(); + + // Append the batch + if let Some(ref mut writer) = file_shared.writer { + writer.append_batch(batch)?; + file_shared.batches_written += 1; + file_shared.estimated_size += batch_size; + } + + // Wake readers waiting on this specific file + file_shared.wake_all(); + + // Check if we need to rotate + let needs_rotation = file_shared.estimated_size > self.max_file_size_bytes; + + if needs_rotation { + // Finish the IPC writer + if let Some(mut writer) = file_shared.writer.take() { + writer.finish()?; + } + // Mark as finished so readers know not to wait for more data + file_shared.writer_finished = true; + // Wake readers waiting on this file (it's now finished) + file_shared.wake_all(); + } else { + // Release lock + drop(file_shared); + // Put back the current file for further writing + self.current_write_file = Some(current_file); + } + } + + Ok(()) + } +} + +impl Drop for SpillPoolWriter { + fn drop(&mut self) { + // Finalize the current file when writer is dropped + if let Some(current_file) = self.current_write_file.take() { + let mut file_shared = current_file.lock(); + + // Finish the current writer if it exists + if let Some(mut writer) = file_shared.writer.take() { + // Ignore errors on drop - we're in destructor + let _ = writer.finish(); + } + + // Mark as finished so readers know not to wait for more data + file_shared.writer_finished = true; + + // Wake readers waiting on this file (it's now finished) + file_shared.wake_all(); + } + } +} + +/// Creates a paired writer and reader for a spill pool with channel-like semantics. +/// +/// This is the recommended way to create a spill pool. The writer has exclusive +/// write access, and the reader can consume batches in FIFO order. The reader +/// can start reading immediately while the writer continues to write more data. +/// +/// # Architecture +/// +/// ```text +/// ┌─────────────────────────────────────────────────────────────────────────┐ +/// │ SpillPool │ +/// │ │ +/// │ Writer Side Shared State Reader Side │ +/// │ ─────────── ──────────── ─────────── │ +/// │ │ +/// │ SpillPoolWriter ┌────────────────────┐ SpillPoolReader │ +/// │ │ │ VecDeque<File> │ │ │ +/// │ │ │ ┌────┐┌────┐ │ │ │ +/// │ push_batch() │ │ F1 ││ F2 │ ... │ next().await │ +/// │ │ │ └────┘└────┘ │ │ │ +/// │ ▼ │ (FIFO order) │ ▼ │ +/// │ ┌─────────┐ │ │ ┌──────────┐ │ +/// │ │Current │───────▶│ Coordination: │◀───│ Current │ │ +/// │ │Write │ │ - Wakers │ │ Read │ │ +/// │ │File │ │ - Batch counts │ │ File │ │ +/// │ └─────────┘ │ - Writer status │ └──────────┘ │ +/// │ │ └────────────────────┘ │ │ +/// │ │ │ │ +/// │ Size > limit? Read all batches? │ +/// │ │ │ │ +/// │ ▼ ▼ │ +/// │ Rotate to new file Pop from queue │ +/// └─────────────────────────────────────────────────────────────────────────┘ +/// +/// Writer produces → Shared FIFO queue → Reader consumes +/// ``` +/// +/// # File State Machine +/// +/// Each file in the pool coordinates between writer and reader: +/// +/// ```text +/// Writer View Reader View +/// ─────────── ─────────── +/// +/// Created writer: Some(..) batches_read: 0 +/// batches_written: 0 (waiting for data) +/// │ +/// ▼ +/// Writing append_batch() Can read if: +/// batches_written++ batches_read < batches_written +/// wake readers +/// │ │ +/// │ ▼ +/// ┌──────┴──────┐ poll_next() → batch +/// │ │ batches_read++ +/// ▼ ▼ +/// Size > limit? More data? +/// │ │ +/// │ └─▶ Yes ──▶ Continue writing +/// ▼ +/// finish() Reader catches up: +/// writer_finished = true batches_read == batches_written +/// wake readers │ +/// │ ▼ +/// └─────────────────────▶ Returns Poll::Ready(None) +/// File complete, pop from queue +/// ``` +/// +/// # Arguments +/// +/// * `max_file_size_bytes` - Maximum size per file before rotation. When a file +/// exceeds this size, the writer automatically rotates to a new file. +/// * `spill_manager` - Manager for file creation and metrics tracking +/// +/// # Returns +/// +/// A tuple of `(SpillPoolWriter, SendableRecordBatchStream)` that share the same +/// underlying pool. The reader is returned as a stream for immediate use with +/// async stream combinators. +/// +/// # Example +/// +/// ``` +/// use std::sync::Arc; +/// use arrow::array::{ArrayRef, Int32Array}; +/// use arrow::datatypes::{DataType, Field, Schema}; +/// use arrow::record_batch::RecordBatch; +/// use datafusion_execution::runtime_env::RuntimeEnv; +/// use futures::StreamExt; +/// +/// # use datafusion_physical_plan::spill::spill_pool; +/// # use datafusion_physical_plan::spill::SpillManager; // Re-exported for doctests +/// # use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, SpillMetrics}; +/// # +/// # #[tokio::main] +/// # async fn main() -> datafusion_common::Result<()> { +/// # // Setup for the example (typically comes from TaskContext in production) +/// # let env = Arc::new(RuntimeEnv::default()); +/// # let metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0); +/// # let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); +/// # let spill_manager = Arc::new(SpillManager::new(env, metrics, schema.clone())); +/// # +/// // Create channel with 1MB file size limit +/// let (mut writer, mut reader) = spill_pool::channel(1024 * 1024, spill_manager); +/// +/// // Spawn writer task to produce batches +/// let write_handle = tokio::spawn(async move { +/// for i in 0..5 { +/// let array: ArrayRef = Arc::new(Int32Array::from(vec![i; 100])); +/// let batch = RecordBatch::try_new(schema.clone(), vec![array]).unwrap(); +/// writer.push_batch(&batch).unwrap(); +/// } +/// // Writer dropped here, finalizing current file +/// }); +/// +/// // Reader consumes batches in FIFO order (can run concurrently with writer) +/// let mut batches_read = 0; +/// while let Some(result) = reader.next().await { +/// let batch = result?; +/// batches_read += 1; +/// // Process batch... +/// if batches_read == 5 { +/// break; // Got all expected batches +/// } +/// } +/// +/// write_handle.await.unwrap(); +/// assert_eq!(batches_read, 5); +/// # Ok(()) +/// # } +/// ``` +/// +/// # Use Cases +/// +/// - **Backpressure handling**: Writer can continue producing while reader is slow +/// - **Memory management**: Files automatically rotate based on size limits +/// - **Concurrent I/O**: Reader and writer operate independently with async coordination +/// - **FIFO semantics**: Batches are consumed in the exact order they were written +pub fn channel( Review Comment: Correct me if I'm not understand it correctly: this is a SPSC channel, and the writer and the reader can operator concurrently inside the same in progress file, like the following timeline: 1. writer write batch `B0` to F1 2. writer write batch `B1` to F1 3. reader read `B0` 4. reader read `B1`, no more batch to read -> wait on the waker 5. writer write batch `B2` and finish `F1` then continue writing to a new file, wake up the waiting reader, then write `B3` to `F2` 6. reader wake up and read `B2`, then drop `F1` and release the resources. ########## datafusion/physical-plan/src/spill/spill_pool.rs: ########## @@ -0,0 +1,1264 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Spill pool for managing spill files with FIFO semantics. + +use futures::{Stream, StreamExt}; +use std::collections::VecDeque; +use std::sync::Arc; +use std::task::Waker; + +use parking_lot::Mutex; + +use arrow::datatypes::SchemaRef; +use arrow::record_batch::RecordBatch; +use datafusion_common::Result; +use datafusion_execution::disk_manager::RefCountedTempFile; +use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream}; + +use super::in_progress_spill_file::InProgressSpillFile; +use super::spill_manager::SpillManager; + +/// Shared state between the writer and readers of a spill pool. +/// This contains the queue of files and coordination state. +struct SpillPoolShared { + /// Queue of ALL files (including the current write file if it exists). + /// Readers always read from the front of this queue (FIFO). + files: VecDeque<Arc<Mutex<ActiveSpillFileShared>>>, + /// SpillManager for creating files and tracking metrics + spill_manager: Arc<SpillManager>, + /// Pool-level wakers to notify when new files are available + wakers: Vec<Waker>, +} + +impl SpillPoolShared { + /// Creates a new shared pool state + fn new(spill_manager: Arc<SpillManager>) -> Self { + Self { + files: VecDeque::new(), + spill_manager, + wakers: Vec::new(), + } + } + + /// Registers a waker to be notified when new data is available (pool-level) + fn register_waker(&mut self, waker: Waker) { + // Only register if not already present (avoid duplicates) + if !self.wakers.iter().any(|w| w.will_wake(&waker)) { + self.wakers.push(waker); + } + } + + /// Wakes all pool-level readers + fn wake(&mut self) { + for waker in self.wakers.drain(..) { + waker.wake(); + } + } +} + +/// Writer for a spill pool. Provides exclusive write access with FIFO semantics. +/// +/// Created by [`channel`]. See that function for architecture diagrams and usage examples. +/// +/// The writer automatically manages file rotation based on the `max_file_size_bytes` +/// configured in [`channel`]. When dropped, it finalizes the current file so readers +/// can access all written data. +pub struct SpillPoolWriter { + /// Maximum size in bytes before rotating to a new file + max_file_size_bytes: usize, + /// Writer's reference to the current file (also in the shared files queue) + current_write_file: Option<Arc<Mutex<ActiveSpillFileShared>>>, + /// Shared state with readers + shared: Arc<Mutex<SpillPoolShared>>, +} + +impl SpillPoolWriter { + /// Spills a batch to the pool, rotating files when necessary. + /// + /// If the current file would exceed `max_file_size_bytes` after adding + /// this batch, the file is finalized and a new one is started. + /// + /// See [`channel`] for overall architecture and examples. + /// + /// # File Rotation Logic + /// + /// ```text + /// push_batch() + /// │ + /// ▼ + /// Current file exists? + /// │ + /// ├─ No ──▶ Create new file ──▶ Add to shared queue + /// │ Wake readers + /// ▼ + /// Write batch to current file + /// │ + /// ▼ + /// estimated_size > max_file_size_bytes? + /// │ + /// ├─ No ──▶ Keep current file for next batch + /// │ + /// ▼ + /// Yes: finish() current file + /// Mark writer_finished = true + /// Wake readers + /// │ + /// ▼ + /// Next push_batch() creates new file + /// ``` + /// + /// # Errors + /// + /// Returns an error if disk I/O fails or disk quota is exceeded. + pub fn push_batch(&mut self, batch: &RecordBatch) -> Result<()> { + if batch.num_rows() == 0 { + // Skip empty batches + return Ok(()); + } + + let batch_size = batch.get_array_memory_size(); + + // Create new file if we don't have one yet + if self.current_write_file.is_none() { + let spill_manager = { + let shared = self.shared.lock(); + Arc::clone(&shared.spill_manager) + }; + + let writer = spill_manager.create_in_progress_file("SpillPool")?; + // Clone the file so readers can access it immediately + let file = writer.file().expect("InProgressSpillFile should always have a file when it is first created").clone(); + + let file_shared = Arc::new(Mutex::new(ActiveSpillFileShared { + writer: Some(writer), + file: Some(file), // Set immediately so readers can access it + batches_written: 0, + estimated_size: 0, + writer_finished: false, + wakers: Vec::new(), + })); + + // Push to shared queue and keep reference for writing + { + let mut shared = self.shared.lock(); + shared.files.push_back(Arc::clone(&file_shared)); + shared.wake(); // Wake readers waiting for new files + } + self.current_write_file = Some(file_shared); + } + + let current_write_file = self.current_write_file.take(); + + // Write batch to current file + if let Some(current_file) = current_write_file { + let mut file_shared = current_file.lock(); + + // Append the batch + if let Some(ref mut writer) = file_shared.writer { + writer.append_batch(batch)?; + file_shared.batches_written += 1; + file_shared.estimated_size += batch_size; + } + + // Wake readers waiting on this specific file + file_shared.wake_all(); + + // Check if we need to rotate + let needs_rotation = file_shared.estimated_size > self.max_file_size_bytes; + + if needs_rotation { + // Finish the IPC writer + if let Some(mut writer) = file_shared.writer.take() { + writer.finish()?; + } + // Mark as finished so readers know not to wait for more data + file_shared.writer_finished = true; + // Wake readers waiting on this file (it's now finished) + file_shared.wake_all(); + } else { + // Release lock + drop(file_shared); + // Put back the current file for further writing + self.current_write_file = Some(current_file); + } + } + + Ok(()) + } +} + +impl Drop for SpillPoolWriter { + fn drop(&mut self) { + // Finalize the current file when writer is dropped + if let Some(current_file) = self.current_write_file.take() { + let mut file_shared = current_file.lock(); + + // Finish the current writer if it exists + if let Some(mut writer) = file_shared.writer.take() { + // Ignore errors on drop - we're in destructor + let _ = writer.finish(); + } + + // Mark as finished so readers know not to wait for more data + file_shared.writer_finished = true; + + // Wake readers waiting on this file (it's now finished) + file_shared.wake_all(); + } + } +} + +/// Creates a paired writer and reader for a spill pool with channel-like semantics. +/// +/// This is the recommended way to create a spill pool. The writer has exclusive +/// write access, and the reader can consume batches in FIFO order. The reader +/// can start reading immediately while the writer continues to write more data. +/// +/// # Architecture +/// +/// ```text +/// ┌─────────────────────────────────────────────────────────────────────────┐ Review Comment: This figure looks 👍🏼 , are they AI-generated? I’ve tried generating ASCII arts with ChatGPT several times, but all attempts failed, so I still draw them manually. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
