pantShrey commented on issue #21215:
URL: https://github.com/apache/datafusion/issues/21215#issuecomment-4209889902

   Thank you for your time and suggestion @alamb, One more architecture 
question before I open the draft PR.
   Following up on your suggestion of avoiding `Read`/`Write` and using our own 
abstraction, here is the direction I've landed on:
   **Write** - 
   `IPCStreamWriter` serializes each batch into a `Vec<u8>`, converts to 
`Bytes` (zero-copy via `Bytes::from(vec)`), then calls `write()` through this 
trait:
   ```rust
   pub trait SpillWriter: Send {
       fn write(&mut self, data: Bytes) -> Result<()>;
       fn flush(&mut self) -> Result<()>;
       fn finish(&mut self) -> Result<()>;
   }
   ```
   This keeps Arrow's sync IPC serialization isolated while the actual IO goes 
through the trait, so S3 or Postgres backends implement `write()` however they 
need to.
   
   **Read** -
   `SpillFile::read_stream()` would return a `Stream<Item=Result<Bytes>>`. For 
OS files we can use `tokio::fs::File` + `tokio_util::io::ReaderStream`, which 
handles blocking internally per-syscall. For S3/object store backends they 
return their native async stream directly, no bridging needed.
   This i think will let me replace the existing `SpillReaderStream` state 
machine with a simple `StreamDecoder`-based poll loop. The state machine 
currently exists specifically to avoid a deadlock, as mentioned " spawning one 
long-running blocking task per file exhausts Tokio's blocking thread pool when 
concurrent reads exceed the limit". With `tokio::fs::File` that concern is 
addressed at a lower level automatically, so the state machine's reason for 
existence goes away. The `Buffer::from(Bytes)` conversion Arrow provides is 
zero-copy so there's no extra copying in the pipeline either.
   
   But there is an issue with this approach which is `StreamDecoder` doesn't 
have a public `with_skip_validation` method unlike `StreamReader`, which the 
existing code relies on since DataFusion controls what it writes. Would you 
prefer I proceed without it for now and track it as a follow-up (potentially a 
small Arrow PR to add parity with StreamReader), or is there a different 
approach you'd suggest that I may have missed?
   
   Again thank you for your time 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to