This is an automated email from the ASF dual-hosted git repository. thinkharderdev pushed a commit to branch file-stream-pipeline in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
commit 8b100dab718b5b9a3dbfaa73704c5485451b5a26 Author: Dan Harris <[email protected]> AuthorDate: Thu Feb 2 16:21:13 2023 +0200 FileStream: Open next file in parallel while decoding --- .../src/physical_plan/file_format/file_stream.rs | 83 +++++++++++++++++----- 1 file changed, 64 insertions(+), 19 deletions(-) diff --git a/datafusion/core/src/physical_plan/file_format/file_stream.rs b/datafusion/core/src/physical_plan/file_format/file_stream.rs index 89c2cecb4..4a306793d 100644 --- a/datafusion/core/src/physical_plan/file_format/file_stream.rs +++ b/datafusion/core/src/physical_plan/file_format/file_stream.rs @@ -22,6 +22,7 @@ //! compliant with the `SendableRecordBatchStream` trait. use std::collections::VecDeque; +use std::mem; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::Instant; @@ -98,6 +99,8 @@ enum FileStreamState { partition_values: Vec<ScalarValue>, /// The reader instance reader: BoxStream<'static, Result<RecordBatch, ArrowError>>, + /// A [`FileOpenFuture`] for the next file to be processed + next: Option<(FileOpenFuture, Vec<ScalarValue>)>, }, /// Encountered an error Error, @@ -202,30 +205,39 @@ impl<F: FileOpener> FileStream<F> { }) } + fn next_file(&mut self) -> Option<Result<(FileOpenFuture, Vec<ScalarValue>)>> { + let part_file = match self.file_iter.pop_front() { + Some(file) => file, + None => return None, + }; + + let file_meta = FileMeta { + object_meta: part_file.object_meta, + range: part_file.range, + extensions: part_file.extensions, + }; + + Some( + self.file_reader + .open(file_meta) + .map(|future| (future, part_file.partition_values)), + ) + } + fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<RecordBatch>>> { loop { match &mut self.state { FileStreamState::Idle => { - let part_file = match self.file_iter.pop_front() { - Some(file) => file, - None => return Poll::Ready(None), - }; - - let file_meta = FileMeta { - object_meta: part_file.object_meta, - range: part_file.range, - extensions: part_file.extensions, - }; - self.file_stream_metrics.time_opening.start(); - match self.file_reader.open(file_meta) { - Ok(future) => { + match self.next_file().transpose() { + Ok(Some((future, partition_values))) => { self.state = FileStreamState::Open { future, - partition_values: part_file.partition_values, + partition_values, } } + Ok(None) => return Poll::Ready(None), Err(e) => { self.state = FileStreamState::Error; return Poll::Ready(Some(Err(e))); @@ -237,13 +249,34 @@ impl<F: FileOpener> FileStream<F> { partition_values, } => match ready!(future.poll_unpin(cx)) { Ok(reader) => { + let partition_values = mem::take(partition_values); + + let next = self.next_file().transpose(); + self.file_stream_metrics.time_opening.stop(); self.file_stream_metrics.time_scanning_until_data.start(); self.file_stream_metrics.time_scanning_total.start(); - self.state = FileStreamState::Scan { - partition_values: std::mem::take(partition_values), - reader, - }; + + match next { + Ok(Some((next_future, next_partition_values))) => { + self.state = FileStreamState::Scan { + partition_values, + reader, + next: Some((next_future, next_partition_values)), + }; + } + Ok(None) => { + self.state = FileStreamState::Scan { + reader, + partition_values, + next: None, + }; + } + Err(e) => { + self.state = FileStreamState::Error; + return Poll::Ready(Some(Err(e))); + } + } } Err(e) => { self.state = FileStreamState::Error; @@ -253,6 +286,7 @@ impl<F: FileOpener> FileStream<F> { FileStreamState::Scan { reader, partition_values, + next, } => match ready!(reader.poll_next_unpin(cx)) { Some(result) => { self.file_stream_metrics.time_scanning_until_data.stop(); @@ -287,7 +321,18 @@ impl<F: FileOpener> FileStream<F> { None => { self.file_stream_metrics.time_scanning_until_data.stop(); self.file_stream_metrics.time_scanning_total.stop(); - self.state = FileStreamState::Idle; + + match mem::take(next) { + Some((future, partition_values)) => { + self.file_stream_metrics.time_opening.start(); + + self.state = FileStreamState::Open { + future, + partition_values, + } + } + None => return Poll::Ready(None), + } } }, FileStreamState::Error | FileStreamState::Limit => {
