pantShrey commented on issue #21215:
URL: https://github.com/apache/datafusion/issues/21215#issuecomment-4209889902
Thank you for your time and suggestion @alamb, One more architecture
question before I open the draft PR.
Following up on your suggestion of avoiding `Read`/`Write` and using our own
abstraction, here is the direction I've landed on:
**Write** -
`IPCStreamWriter` serializes each batch into a `Vec<u8>`, converts to
`Bytes` (zero-copy via `Bytes::from(vec)`), then calls `write()` through this
trait:
```rust
pub trait SpillWriter: Send {
fn write(&mut self, data: Bytes) -> Result<()>;
fn flush(&mut self) -> Result<()>;
fn finish(&mut self) -> Result<()>;
}
```
This keeps Arrow's sync IPC serialization isolated while the actual IO goes
through the trait, so S3 or Postgres backends implement `write()` however they
need to.
**Read** -
`SpillFile::read_stream()` would return a `Stream<Item=Result<Bytes>>`. For
OS files we can use `tokio::fs::File` + `tokio_util::io::ReaderStream`, which
handles blocking internally per-syscall. For S3/object store backends they
return their native async stream directly, no bridging needed.
This i think will let me replace the existing `SpillReaderStream` state
machine with a simple `StreamDecoder`-based poll loop. The state machine
currently exists specifically to avoid a deadlock, as mentioned " spawning one
long-running blocking task per file exhausts Tokio's blocking thread pool when
concurrent reads exceed the limit". With `tokio::fs::File` that concern is
addressed at a lower level automatically, so the state machine's reason for
existence goes away. The `Buffer::from(Bytes)` conversion Arrow provides is
zero-copy so there's no extra copying in the pipeline either.
But there is an issue with this approach which is `StreamDecoder` doesn't
have a public `with_skip_validation` method unlike `StreamReader`, which the
existing code relies on since DataFusion controls what it writes. Would you
prefer I proceed without it for now and track it as a follow-up (potentially a
small Arrow PR to add parity with StreamReader), or is there a different
approach you'd suggest that I may have missed?
Again thank you for your time
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]