2010YOUY01 commented on code in PR #15654: URL: https://github.com/apache/datafusion/pull/15654#discussion_r2038994375
########## datafusion/physical-plan/src/spill/mod.rs: ########## @@ -23,28 +23,156 @@ pub(crate) mod spill_manager; use std::fs::File; use std::io::BufReader; use std::path::{Path, PathBuf}; +use std::pin::Pin; use std::ptr::NonNull; +use std::sync::Arc; +use std::task::{Context, Poll}; use arrow::array::ArrayData; use arrow::datatypes::{Schema, SchemaRef}; use arrow::ipc::{reader::StreamReader, writer::StreamWriter}; use arrow::record_batch::RecordBatch; -use tokio::sync::mpsc::Sender; - -use datafusion_common::{exec_datafusion_err, HashSet, Result}; - -fn read_spill(sender: Sender<Result<RecordBatch>>, path: &Path) -> Result<()> { - let file = BufReader::new(File::open(path)?); - // SAFETY: DataFusion's spill writer strictly follows Arrow IPC specifications - // with validated schemas and buffers. Skip redundant validation during read - // to speedup read operation. This is safe for DataFusion as input guaranteed to be correct when written. - let reader = unsafe { StreamReader::try_new(file, None)?.with_skip_validation(true) }; - for batch in reader { - sender - .blocking_send(batch.map_err(Into::into)) - .map_err(|e| exec_datafusion_err!("{e}"))?; + +use datafusion_common::{exec_datafusion_err, DataFusionError, HashSet, Result}; +use datafusion_common_runtime::SpawnedTask; +use datafusion_execution::disk_manager::RefCountedTempFile; +use datafusion_execution::RecordBatchStream; +use futures::{FutureExt as _, Stream}; + +/// Stream that reads spill files from disk where each batch is read in a spawned blocking task +/// It will read one batch at a time and will not do any buffering, to buffer data use [`crate::common::spawn_buffered`] +struct SpillReaderStream { Review Comment: ```suggestion /// A simpler solution would be spawning a long-running blocking task for each /// file read (instead of each batch). This approach does not work because when /// the number of concurrent reads exceeds the Tokio thread pool limit, /// deadlocks can occur and block progress. struct SpillReaderStream { ``` I recommend to add a 'why' comment here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org