ashdnazg commented on code in PR #15654:
URL: https://github.com/apache/datafusion/pull/15654#discussion_r2039344018


##########
datafusion/physical-plan/src/spill/mod.rs:
##########
@@ -23,28 +23,156 @@ pub(crate) mod spill_manager;
 use std::fs::File;
 use std::io::BufReader;
 use std::path::{Path, PathBuf};
+use std::pin::Pin;
 use std::ptr::NonNull;
+use std::sync::Arc;
+use std::task::{Context, Poll};
 
 use arrow::array::ArrayData;
 use arrow::datatypes::{Schema, SchemaRef};
 use arrow::ipc::{reader::StreamReader, writer::StreamWriter};
 use arrow::record_batch::RecordBatch;
-use tokio::sync::mpsc::Sender;
-
-use datafusion_common::{exec_datafusion_err, HashSet, Result};
-
-fn read_spill(sender: Sender<Result<RecordBatch>>, path: &Path) -> Result<()> {
-    let file = BufReader::new(File::open(path)?);
-    // SAFETY: DataFusion's spill writer strictly follows Arrow IPC 
specifications
-    // with validated schemas and buffers. Skip redundant validation during 
read
-    // to speedup read operation. This is safe for DataFusion as input 
guaranteed to be correct when written.
-    let reader = unsafe { StreamReader::try_new(file, 
None)?.with_skip_validation(true) };
-    for batch in reader {
-        sender
-            .blocking_send(batch.map_err(Into::into))
-            .map_err(|e| exec_datafusion_err!("{e}"))?;
+
+use datafusion_common::{exec_datafusion_err, DataFusionError, HashSet, Result};
+use datafusion_common_runtime::SpawnedTask;
+use datafusion_execution::disk_manager::RefCountedTempFile;
+use datafusion_execution::RecordBatchStream;
+use futures::{FutureExt as _, Stream};
+
+/// Stream that reads spill files from disk where each batch is read in a 
spawned blocking task
+/// It will read one batch at a time and will not do any buffering, to buffer 
data use [`crate::common::spawn_buffered`]
+struct SpillReaderStream {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to