ashdnazg commented on code in PR #15654: URL: https://github.com/apache/datafusion/pull/15654#discussion_r2039344018
########## datafusion/physical-plan/src/spill/mod.rs: ########## @@ -23,28 +23,156 @@ pub(crate) mod spill_manager; use std::fs::File; use std::io::BufReader; use std::path::{Path, PathBuf}; +use std::pin::Pin; use std::ptr::NonNull; +use std::sync::Arc; +use std::task::{Context, Poll}; use arrow::array::ArrayData; use arrow::datatypes::{Schema, SchemaRef}; use arrow::ipc::{reader::StreamReader, writer::StreamWriter}; use arrow::record_batch::RecordBatch; -use tokio::sync::mpsc::Sender; - -use datafusion_common::{exec_datafusion_err, HashSet, Result}; - -fn read_spill(sender: Sender<Result<RecordBatch>>, path: &Path) -> Result<()> { - let file = BufReader::new(File::open(path)?); - // SAFETY: DataFusion's spill writer strictly follows Arrow IPC specifications - // with validated schemas and buffers. Skip redundant validation during read - // to speedup read operation. This is safe for DataFusion as input guaranteed to be correct when written. - let reader = unsafe { StreamReader::try_new(file, None)?.with_skip_validation(true) }; - for batch in reader { - sender - .blocking_send(batch.map_err(Into::into)) - .map_err(|e| exec_datafusion_err!("{e}"))?; + +use datafusion_common::{exec_datafusion_err, DataFusionError, HashSet, Result}; +use datafusion_common_runtime::SpawnedTask; +use datafusion_execution::disk_manager::RefCountedTempFile; +use datafusion_execution::RecordBatchStream; +use futures::{FutureExt as _, Stream}; + +/// Stream that reads spill files from disk where each batch is read in a spawned blocking task +/// It will read one batch at a time and will not do any buffering, to buffer data use [`crate::common::spawn_buffered`] +struct SpillReaderStream { Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org