pantShrey commented on code in PR #21882:
URL: https://github.com/apache/datafusion/pull/21882#discussion_r3443798942
##########
datafusion/physical-plan/src/spill/mod.rs:
##########
@@ -274,19 +258,73 @@ pub fn spill_record_batch_by_size(
Ok(())
}
-/// Write in Arrow IPC Stream format to a file.
-///
-/// Stream format is used for spill because it supports dictionary
replacement, and the random
-/// access of IPC File format is not needed (IPC File format doesn't support
dictionary replacement).
+/// An adapter that implements `std::io::Write` to bridge Arrow's synchronous
Review Comment:
Agreed, happy to make this change! The adapter was a bit unfortunate, it
ended up there because in the issue discussion we'd moved away from using
std::io::Write on the main trait, which led me toward Bytes to give backends
owned data. But as you point out, that just pushed the intermediate copy one
level up into the adapter itself. Switching to &[u8] directly lets us delete
the adapter entirely, which is a much cleaner outcome.
##########
datafusion/execution/src/disk_manager.rs:
##########
@@ -402,59 +452,7 @@ impl RefCountedTempFile {
self.tempfile.as_ref()
}
- /// Updates the global disk usage counter after modifications to the
underlying file.
- ///
- /// # Errors
- /// - Returns an error if the global disk usage exceeds the configured
limit.
- pub fn update_disk_usage(&mut self) -> Result<()> {
- // Get new file size from OS
- let metadata = self.tempfile.as_file().metadata()?;
- let new_disk_usage = metadata.len();
-
- // Get the old disk usage
Review Comment:
I moved the tracking to inside `FileSpillWriter::write()`, updating the
counter incrementally per write, so the callers don't need calling
`update_disk_usage()` after each write
##########
datafusion/physical-plan/src/spill/mod.rs:
##########
@@ -39,212 +39,187 @@ use arrow::array::{
Array, ArrayRef, BinaryViewArray, BufferSpec, GenericByteViewArray,
StringViewArray,
layout, make_array,
};
+use arrow::buffer::Buffer;
use arrow::datatypes::DataType;
use arrow::datatypes::{ByteViewType, Schema, SchemaRef};
use arrow::ipc::{
MetadataVersion,
- reader::StreamReader,
+ reader::StreamDecoder,
writer::{IpcWriteOptions, StreamWriter},
};
use arrow::record_batch::RecordBatch;
use arrow_data::ArrayDataBuilder;
use arrow_ipc::CompressionType;
use datafusion_common::config::SpillCompression;
-use datafusion_common::{DataFusionError, Result, exec_datafusion_err,
exec_err};
-use datafusion_common_runtime::SpawnedTask;
+use datafusion_common::{DataFusionError, Result, exec_datafusion_err};
use datafusion_execution::RecordBatchStream;
-use datafusion_execution::disk_manager::RefCountedTempFile;
-use futures::{FutureExt as _, Stream};
+use datafusion_execution::spill_file::SpillFile;
+use futures::Stream;
use log::debug;
-/// Stream that reads spill files from disk where each batch is read in a
spawned blocking task
-/// It will read one batch at a time and will not do any buffering, to buffer
data use [`crate::common::spawn_buffered`]
-///
-/// A simpler solution would be spawning a long-running blocking task for each
-/// file read (instead of each batch). This approach does not work because when
-/// the number of concurrent reads exceeds the Tokio thread pool limit,
-/// deadlocks can occur and block progress.
+/// Stream that reads spill files from a [`SpillFile`] backend as a stream of
[`RecordBatch`]es.
+/// Uses [`StreamDecoder`] to decode IPC bytes received from the backend's
async byte stream.
+/// Backends handle their own threading concerns internally - OS files use
+/// `tokio::fs::File` which performs blocking IO per-syscall without holding a
thread
+/// for the file's lifetime, avoiding deadlocks when concurrent reads exceed
thread pool limits.
struct SpillReaderStream {
schema: SchemaRef,
- state: SpillReaderStreamState,
+ decoder: StreamDecoder,
+ byte_stream: Pin<Box<dyn Stream<Item = Result<bytes::Bytes>> + Send>>,
Review Comment:
You're right that `StreamReader` reads directly into its parse buffer
without an intermediate Bytes object, so it genuinely avoids that step. The
tradeoff is that driving it requires `spawn_blocking`, which reintroduces the
thread pool exhaustion problem the old state machine existed to solve. The
`StreamDecoder` path accepts that slight extra buffering in exchange for
staying fully async.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]