This is an automated email from the ASF dual-hosted git repository.

thinkharderdev pushed a commit to branch file-stream-pipeline
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git

commit 8b100dab718b5b9a3dbfaa73704c5485451b5a26
Author: Dan Harris <[email protected]>
AuthorDate: Thu Feb 2 16:21:13 2023 +0200

    FileStream: Open next file in parallel while decoding
---
 .../src/physical_plan/file_format/file_stream.rs   | 83 +++++++++++++++++-----
 1 file changed, 64 insertions(+), 19 deletions(-)

diff --git a/datafusion/core/src/physical_plan/file_format/file_stream.rs 
b/datafusion/core/src/physical_plan/file_format/file_stream.rs
index 89c2cecb4..4a306793d 100644
--- a/datafusion/core/src/physical_plan/file_format/file_stream.rs
+++ b/datafusion/core/src/physical_plan/file_format/file_stream.rs
@@ -22,6 +22,7 @@
 //! compliant with the `SendableRecordBatchStream` trait.
 
 use std::collections::VecDeque;
+use std::mem;
 use std::pin::Pin;
 use std::task::{Context, Poll};
 use std::time::Instant;
@@ -98,6 +99,8 @@ enum FileStreamState {
         partition_values: Vec<ScalarValue>,
         /// The reader instance
         reader: BoxStream<'static, Result<RecordBatch, ArrowError>>,
+        /// A [`FileOpenFuture`] for the next file to be processed
+        next: Option<(FileOpenFuture, Vec<ScalarValue>)>,
     },
     /// Encountered an error
     Error,
@@ -202,30 +205,39 @@ impl<F: FileOpener> FileStream<F> {
         })
     }
 
+    fn next_file(&mut self) -> Option<Result<(FileOpenFuture, 
Vec<ScalarValue>)>> {
+        let part_file = match self.file_iter.pop_front() {
+            Some(file) => file,
+            None => return None,
+        };
+
+        let file_meta = FileMeta {
+            object_meta: part_file.object_meta,
+            range: part_file.range,
+            extensions: part_file.extensions,
+        };
+
+        Some(
+            self.file_reader
+                .open(file_meta)
+                .map(|future| (future, part_file.partition_values)),
+        )
+    }
+
     fn poll_inner(&mut self, cx: &mut Context<'_>) -> 
Poll<Option<Result<RecordBatch>>> {
         loop {
             match &mut self.state {
                 FileStreamState::Idle => {
-                    let part_file = match self.file_iter.pop_front() {
-                        Some(file) => file,
-                        None => return Poll::Ready(None),
-                    };
-
-                    let file_meta = FileMeta {
-                        object_meta: part_file.object_meta,
-                        range: part_file.range,
-                        extensions: part_file.extensions,
-                    };
-
                     self.file_stream_metrics.time_opening.start();
 
-                    match self.file_reader.open(file_meta) {
-                        Ok(future) => {
+                    match self.next_file().transpose() {
+                        Ok(Some((future, partition_values))) => {
                             self.state = FileStreamState::Open {
                                 future,
-                                partition_values: part_file.partition_values,
+                                partition_values,
                             }
                         }
+                        Ok(None) => return Poll::Ready(None),
                         Err(e) => {
                             self.state = FileStreamState::Error;
                             return Poll::Ready(Some(Err(e)));
@@ -237,13 +249,34 @@ impl<F: FileOpener> FileStream<F> {
                     partition_values,
                 } => match ready!(future.poll_unpin(cx)) {
                     Ok(reader) => {
+                        let partition_values = mem::take(partition_values);
+
+                        let next = self.next_file().transpose();
+
                         self.file_stream_metrics.time_opening.stop();
                         
self.file_stream_metrics.time_scanning_until_data.start();
                         self.file_stream_metrics.time_scanning_total.start();
-                        self.state = FileStreamState::Scan {
-                            partition_values: std::mem::take(partition_values),
-                            reader,
-                        };
+
+                        match next {
+                            Ok(Some((next_future, next_partition_values))) => {
+                                self.state = FileStreamState::Scan {
+                                    partition_values,
+                                    reader,
+                                    next: Some((next_future, 
next_partition_values)),
+                                };
+                            }
+                            Ok(None) => {
+                                self.state = FileStreamState::Scan {
+                                    reader,
+                                    partition_values,
+                                    next: None,
+                                };
+                            }
+                            Err(e) => {
+                                self.state = FileStreamState::Error;
+                                return Poll::Ready(Some(Err(e)));
+                            }
+                        }
                     }
                     Err(e) => {
                         self.state = FileStreamState::Error;
@@ -253,6 +286,7 @@ impl<F: FileOpener> FileStream<F> {
                 FileStreamState::Scan {
                     reader,
                     partition_values,
+                    next,
                 } => match ready!(reader.poll_next_unpin(cx)) {
                     Some(result) => {
                         
self.file_stream_metrics.time_scanning_until_data.stop();
@@ -287,7 +321,18 @@ impl<F: FileOpener> FileStream<F> {
                     None => {
                         
self.file_stream_metrics.time_scanning_until_data.stop();
                         self.file_stream_metrics.time_scanning_total.stop();
-                        self.state = FileStreamState::Idle;
+
+                        match mem::take(next) {
+                            Some((future, partition_values)) => {
+                                self.file_stream_metrics.time_opening.start();
+
+                                self.state = FileStreamState::Open {
+                                    future,
+                                    partition_values,
+                                }
+                            }
+                            None => return Poll::Ready(None),
+                        }
                     }
                 },
                 FileStreamState::Error | FileStreamState::Limit => {

Reply via email to