zhuqi-lucas commented on issue #16353:
URL: https://github.com/apache/datafusion/issues/16353#issuecomment-2965191039

   > Figured it out. File access is done via object store and object store uses 
`std::fs::File`, not `tokio::fs::File`. Even if it would, from browsing the 
code it doesn't look to me like Tokio's file stuff consumes budget.
   > 
   > I tried reworking YieldStream to use the Tokio budget, but I don't see a 
way to. I rephrased the issue I opened at 
[tokio-rs/tokio#7403](https://github.com/tokio-rs/tokio/issues/7403).
   
   Interesting finding, i check the spilling file, that's the truth that we are 
using std::fs::File. So before the solution which using Tokio budget. We should 
add YieldStream to SpillReaderStream.
   
   
   ```rust
   use std::fs::File;
   
   impl SpillReaderStream {
       fn new(schema: SchemaRef, spill_file: RefCountedTempFile) -> Self {
           Self {
               schema,
               state: SpillReaderStreamState::Uninitialized(spill_file),
           }
       }
   
       fn poll_next_inner(
           &mut self,
           cx: &mut Context<'_>,
       ) -> Poll<Option<Result<RecordBatch>>> {
           match &mut self.state {
               SpillReaderStreamState::Uninitialized(_) => {
                   // Temporarily replace with `Done` to be able to pass the 
file to the task.
                   let SpillReaderStreamState::Uninitialized(spill_file) =
                       std::mem::replace(&mut self.state, 
SpillReaderStreamState::Done)
                   else {
                       unreachable!()
                   };
   
                   let task = SpawnedTask::spawn_blocking(move || {
                       let file = 
BufReader::new(File::open(spill_file.path())?);
                       // SAFETY: DataFusion's spill writer strictly follows 
Arrow IPC specifications
                       // with validated schemas and buffers. Skip redundant 
validation during read
                       // to speedup read operation. This is safe for 
DataFusion as input guaranteed to be correct when written.
                       let mut reader = unsafe {
                           StreamReader::try_new(file, 
None)?.with_skip_validation(true)
                       };
   
                       let next_batch = reader.next().transpose()?;
   
                       Ok((reader, next_batch))
                   });
   
                   self.state = SpillReaderStreamState::ReadInProgress(task);
   
                   // Poll again immediately so the inner task is polled and 
the waker is
                   // registered.
                   self.poll_next_inner(cx)
               }
   
               SpillReaderStreamState::ReadInProgress(task) => {
                   let result = futures::ready!(task.poll_unpin(cx))
                       .unwrap_or_else(|err| 
Err(DataFusionError::External(Box::new(err))));
   
                   match result {
                       Ok((reader, batch)) => {
                           match batch {
                               Some(batch) => {
                                   self.state = 
SpillReaderStreamState::Waiting(reader);
   
                                   Poll::Ready(Some(Ok(batch)))
                               }
                               None => {
                                   // Stream is done
                                   self.state = SpillReaderStreamState::Done;
   
                                   Poll::Ready(None)
                               }
                           }
                       }
                       Err(err) => {
                           self.state = SpillReaderStreamState::Done;
   
                           Poll::Ready(Some(Err(err)))
                       }
                   }
               }
   
               SpillReaderStreamState::Waiting(_) => {
                   // Temporarily replace with `Done` to be able to pass the 
file to the task.
                   let SpillReaderStreamState::Waiting(mut reader) =
                       std::mem::replace(&mut self.state, 
SpillReaderStreamState::Done)
                   else {
                       unreachable!()
                   };
   
                   let task = SpawnedTask::spawn_blocking(move || {
                       let next_batch = reader.next().transpose()?;
   
                       Ok((reader, next_batch))
                   });
   
                   self.state = SpillReaderStreamState::ReadInProgress(task);
   
                   // Poll again immediately so the inner task is polled and 
the waker is
                   // registered.
                   self.poll_next_inner(cx)
               }
   
               SpillReaderStreamState::Done => Poll::Ready(None),
           }
       }
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to