This is an automated email from the ASF dual-hosted git repository. thinkharderdev pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push: new 4d0c9fe4f1 Poll next open file future while scanning current file (#5800) 4d0c9fe4f1 is described below commit 4d0c9fe4f18507a719bc597c570b3be41fc19049 Author: Ronen Cohen <ron...@gmail.com> AuthorDate: Tue Apr 4 16:32:45 2023 +0300 Poll next open file future while scanning current file (#5800) * Poll next open file future while scanning current file * Update datafusion/core/src/physical_plan/file_format/file_stream.rs Co-authored-by: Dan Harris <1327726+thinkharder...@users.noreply.github.com> * Update datafusion/core/src/physical_plan/file_format/file_stream.rs Co-authored-by: Dan Harris <1327726+thinkharder...@users.noreply.github.com> * Update file_stream.rs --------- Co-authored-by: Dan Harris <1327726+thinkharder...@users.noreply.github.com> --- .../src/physical_plan/file_format/file_stream.rs | 119 +++++++++++++-------- 1 file changed, 76 insertions(+), 43 deletions(-) diff --git a/datafusion/core/src/physical_plan/file_format/file_stream.rs b/datafusion/core/src/physical_plan/file_format/file_stream.rs index a7fae47852..bb00170535 100644 --- a/datafusion/core/src/physical_plan/file_format/file_stream.rs +++ b/datafusion/core/src/physical_plan/file_format/file_stream.rs @@ -83,6 +83,14 @@ pub struct FileStream<F: FileOpener> { baseline_metrics: BaselineMetrics, } +/// Represents the state of the next `FileOpenFuture`. Since we need to poll +/// this future while scanning the current file, we need to store the result if it +/// is ready +enum NextOpen { + Pending(FileOpenFuture), + Ready(Result<BoxStream<'static, Result<RecordBatch, ArrowError>>>), +} + enum FileStreamState { /// The idle state, no file is currently being read Idle, @@ -105,7 +113,7 @@ enum FileStreamState { /// and its corresponding partition column values, if any. /// This allows the next file to be opened in parallel while the /// current file is read. - next: Option<(FileOpenFuture, Vec<ScalarValue>)>, + next: Option<(NextOpen, Vec<ScalarValue>)>, }, /// Encountered an error Error, @@ -267,7 +275,10 @@ impl<F: FileOpener> FileStream<F> { self.state = FileStreamState::Scan { partition_values, reader, - next: Some((next_future, next_partition_values)), + next: Some(( + NextOpen::Pending(next_future), + next_partition_values, + )), }; } Ok(None) => { @@ -292,54 +303,76 @@ impl<F: FileOpener> FileStream<F> { reader, partition_values, next, - } => match ready!(reader.poll_next_unpin(cx)) { - Some(result) => { - self.file_stream_metrics.time_scanning_until_data.stop(); - self.file_stream_metrics.time_scanning_total.stop(); - let result = result - .and_then(|b| { - self.pc_projector - .project(b, partition_values) - .map_err(|e| ArrowError::ExternalError(e.into())) - }) - .map(|batch| match &mut self.remain { - Some(remain) => { - if *remain > batch.num_rows() { - *remain -= batch.num_rows(); - batch - } else { - let batch = batch.slice(0, *remain); - self.state = FileStreamState::Limit; - *remain = 0; - batch - } - } - None => batch, - }); - - if result.is_err() { - self.state = FileStreamState::Error + } => { + // We need to poll the next `FileOpenFuture` here to drive it forward + if let Some((next_open_future, _)) = next { + if let NextOpen::Pending(f) = next_open_future { + if let Poll::Ready(reader) = f.as_mut().poll(cx) { + *next_open_future = NextOpen::Ready(reader); + } } - self.file_stream_metrics.time_scanning_total.start(); - return Poll::Ready(Some(result.map_err(Into::into))); } - None => { - self.file_stream_metrics.time_scanning_until_data.stop(); - self.file_stream_metrics.time_scanning_total.stop(); - - match mem::take(next) { - Some((future, partition_values)) => { - self.file_stream_metrics.time_opening.start(); + match ready!(reader.poll_next_unpin(cx)) { + Some(result) => { + self.file_stream_metrics.time_scanning_until_data.stop(); + self.file_stream_metrics.time_scanning_total.stop(); + let result = result + .and_then(|b| { + self.pc_projector + .project(b, partition_values) + .map_err(|e| ArrowError::ExternalError(e.into())) + }) + .map(|batch| match &mut self.remain { + Some(remain) => { + if *remain > batch.num_rows() { + *remain -= batch.num_rows(); + batch + } else { + let batch = batch.slice(0, *remain); + self.state = FileStreamState::Limit; + *remain = 0; + batch + } + } + None => batch, + }); - self.state = FileStreamState::Open { - future, - partition_values, + if result.is_err() { + self.state = FileStreamState::Error + } + self.file_stream_metrics.time_scanning_total.start(); + return Poll::Ready(Some(result.map_err(Into::into))); + } + None => { + self.file_stream_metrics.time_scanning_until_data.stop(); + self.file_stream_metrics.time_scanning_total.stop(); + + match mem::take(next) { + Some((future, partition_values)) => { + self.file_stream_metrics.time_opening.start(); + + match future { + NextOpen::Pending(future) => { + self.state = FileStreamState::Open { + future, + partition_values, + } + } + NextOpen::Ready(reader) => { + self.state = FileStreamState::Open { + future: Box::pin(std::future::ready( + reader, + )), + partition_values, + } + } + } } + None => return Poll::Ready(None), } - None => return Poll::Ready(None), } } - }, + } FileStreamState::Error | FileStreamState::Limit => { return Poll::Ready(None) }