zhuqi-lucas commented on issue #16353: URL: https://github.com/apache/datafusion/issues/16353#issuecomment-2965191039
> Figured it out. File access is done via object store and object store uses `std::fs::File`, not `tokio::fs::File`. Even if it would, from browsing the code it doesn't look to me like Tokio's file stuff consumes budget. > > I tried reworking YieldStream to use the Tokio budget, but I don't see a way to. I rephrased the issue I opened at [tokio-rs/tokio#7403](https://github.com/tokio-rs/tokio/issues/7403). Interesting finding, i check the spilling file, that's the truth that we are using std::fs::File. So before the solution which using Tokio budget. We should add YieldStream to SpillReaderStream. ```rust use std::fs::File; impl SpillReaderStream { fn new(schema: SchemaRef, spill_file: RefCountedTempFile) -> Self { Self { schema, state: SpillReaderStreamState::Uninitialized(spill_file), } } fn poll_next_inner( &mut self, cx: &mut Context<'_>, ) -> Poll<Option<Result<RecordBatch>>> { match &mut self.state { SpillReaderStreamState::Uninitialized(_) => { // Temporarily replace with `Done` to be able to pass the file to the task. let SpillReaderStreamState::Uninitialized(spill_file) = std::mem::replace(&mut self.state, SpillReaderStreamState::Done) else { unreachable!() }; let task = SpawnedTask::spawn_blocking(move || { let file = BufReader::new(File::open(spill_file.path())?); // SAFETY: DataFusion's spill writer strictly follows Arrow IPC specifications // with validated schemas and buffers. Skip redundant validation during read // to speedup read operation. This is safe for DataFusion as input guaranteed to be correct when written. let mut reader = unsafe { StreamReader::try_new(file, None)?.with_skip_validation(true) }; let next_batch = reader.next().transpose()?; Ok((reader, next_batch)) }); self.state = SpillReaderStreamState::ReadInProgress(task); // Poll again immediately so the inner task is polled and the waker is // registered. self.poll_next_inner(cx) } SpillReaderStreamState::ReadInProgress(task) => { let result = futures::ready!(task.poll_unpin(cx)) .unwrap_or_else(|err| Err(DataFusionError::External(Box::new(err)))); match result { Ok((reader, batch)) => { match batch { Some(batch) => { self.state = SpillReaderStreamState::Waiting(reader); Poll::Ready(Some(Ok(batch))) } None => { // Stream is done self.state = SpillReaderStreamState::Done; Poll::Ready(None) } } } Err(err) => { self.state = SpillReaderStreamState::Done; Poll::Ready(Some(Err(err))) } } } SpillReaderStreamState::Waiting(_) => { // Temporarily replace with `Done` to be able to pass the file to the task. let SpillReaderStreamState::Waiting(mut reader) = std::mem::replace(&mut self.state, SpillReaderStreamState::Done) else { unreachable!() }; let task = SpawnedTask::spawn_blocking(move || { let next_batch = reader.next().transpose()?; Ok((reader, next_batch)) }); self.state = SpillReaderStreamState::ReadInProgress(task); // Poll again immediately so the inner task is polled and the waker is // registered. self.poll_next_inner(cx) } SpillReaderStreamState::Done => Poll::Ready(None), } } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org