This is an automated email from the ASF dual-hosted git repository.

thinkharderdev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 4d0c9fe4f1 Poll next open file future while scanning current file 
(#5800)
4d0c9fe4f1 is described below

commit 4d0c9fe4f18507a719bc597c570b3be41fc19049
Author: Ronen Cohen <ron...@gmail.com>
AuthorDate: Tue Apr 4 16:32:45 2023 +0300

    Poll next open file future while scanning current file (#5800)
    
    * Poll next open file future while scanning current file
    
    * Update datafusion/core/src/physical_plan/file_format/file_stream.rs
    
    Co-authored-by: Dan Harris <1327726+thinkharder...@users.noreply.github.com>
    
    * Update datafusion/core/src/physical_plan/file_format/file_stream.rs
    
    Co-authored-by: Dan Harris <1327726+thinkharder...@users.noreply.github.com>
    
    * Update file_stream.rs
    
    ---------
    
    Co-authored-by: Dan Harris <1327726+thinkharder...@users.noreply.github.com>
---
 .../src/physical_plan/file_format/file_stream.rs   | 119 +++++++++++++--------
 1 file changed, 76 insertions(+), 43 deletions(-)

diff --git a/datafusion/core/src/physical_plan/file_format/file_stream.rs 
b/datafusion/core/src/physical_plan/file_format/file_stream.rs
index a7fae47852..bb00170535 100644
--- a/datafusion/core/src/physical_plan/file_format/file_stream.rs
+++ b/datafusion/core/src/physical_plan/file_format/file_stream.rs
@@ -83,6 +83,14 @@ pub struct FileStream<F: FileOpener> {
     baseline_metrics: BaselineMetrics,
 }
 
+/// Represents the state of the next `FileOpenFuture`. Since we need to poll
+/// this future while scanning the current file, we need to store the result 
if it
+/// is ready
+enum NextOpen {
+    Pending(FileOpenFuture),
+    Ready(Result<BoxStream<'static, Result<RecordBatch, ArrowError>>>),
+}
+
 enum FileStreamState {
     /// The idle state, no file is currently being read
     Idle,
@@ -105,7 +113,7 @@ enum FileStreamState {
         /// and its corresponding partition column values, if any.
         /// This allows the next file to be opened in parallel while the
         /// current file is read.
-        next: Option<(FileOpenFuture, Vec<ScalarValue>)>,
+        next: Option<(NextOpen, Vec<ScalarValue>)>,
     },
     /// Encountered an error
     Error,
@@ -267,7 +275,10 @@ impl<F: FileOpener> FileStream<F> {
                                 self.state = FileStreamState::Scan {
                                     partition_values,
                                     reader,
-                                    next: Some((next_future, 
next_partition_values)),
+                                    next: Some((
+                                        NextOpen::Pending(next_future),
+                                        next_partition_values,
+                                    )),
                                 };
                             }
                             Ok(None) => {
@@ -292,54 +303,76 @@ impl<F: FileOpener> FileStream<F> {
                     reader,
                     partition_values,
                     next,
-                } => match ready!(reader.poll_next_unpin(cx)) {
-                    Some(result) => {
-                        
self.file_stream_metrics.time_scanning_until_data.stop();
-                        self.file_stream_metrics.time_scanning_total.stop();
-                        let result = result
-                            .and_then(|b| {
-                                self.pc_projector
-                                    .project(b, partition_values)
-                                    .map_err(|e| 
ArrowError::ExternalError(e.into()))
-                            })
-                            .map(|batch| match &mut self.remain {
-                                Some(remain) => {
-                                    if *remain > batch.num_rows() {
-                                        *remain -= batch.num_rows();
-                                        batch
-                                    } else {
-                                        let batch = batch.slice(0, *remain);
-                                        self.state = FileStreamState::Limit;
-                                        *remain = 0;
-                                        batch
-                                    }
-                                }
-                                None => batch,
-                            });
-
-                        if result.is_err() {
-                            self.state = FileStreamState::Error
+                } => {
+                    // We need to poll the next `FileOpenFuture` here to drive 
it forward
+                    if let Some((next_open_future, _)) = next {
+                        if let NextOpen::Pending(f) = next_open_future {
+                            if let Poll::Ready(reader) = f.as_mut().poll(cx) {
+                                *next_open_future = NextOpen::Ready(reader);
+                            }
                         }
-                        self.file_stream_metrics.time_scanning_total.start();
-                        return Poll::Ready(Some(result.map_err(Into::into)));
                     }
-                    None => {
-                        
self.file_stream_metrics.time_scanning_until_data.stop();
-                        self.file_stream_metrics.time_scanning_total.stop();
-
-                        match mem::take(next) {
-                            Some((future, partition_values)) => {
-                                self.file_stream_metrics.time_opening.start();
+                    match ready!(reader.poll_next_unpin(cx)) {
+                        Some(result) => {
+                            
self.file_stream_metrics.time_scanning_until_data.stop();
+                            
self.file_stream_metrics.time_scanning_total.stop();
+                            let result = result
+                                .and_then(|b| {
+                                    self.pc_projector
+                                        .project(b, partition_values)
+                                        .map_err(|e| 
ArrowError::ExternalError(e.into()))
+                                })
+                                .map(|batch| match &mut self.remain {
+                                    Some(remain) => {
+                                        if *remain > batch.num_rows() {
+                                            *remain -= batch.num_rows();
+                                            batch
+                                        } else {
+                                            let batch = batch.slice(0, 
*remain);
+                                            self.state = 
FileStreamState::Limit;
+                                            *remain = 0;
+                                            batch
+                                        }
+                                    }
+                                    None => batch,
+                                });
 
-                                self.state = FileStreamState::Open {
-                                    future,
-                                    partition_values,
+                            if result.is_err() {
+                                self.state = FileStreamState::Error
+                            }
+                            
self.file_stream_metrics.time_scanning_total.start();
+                            return 
Poll::Ready(Some(result.map_err(Into::into)));
+                        }
+                        None => {
+                            
self.file_stream_metrics.time_scanning_until_data.stop();
+                            
self.file_stream_metrics.time_scanning_total.stop();
+
+                            match mem::take(next) {
+                                Some((future, partition_values)) => {
+                                    
self.file_stream_metrics.time_opening.start();
+
+                                    match future {
+                                        NextOpen::Pending(future) => {
+                                            self.state = FileStreamState::Open 
{
+                                                future,
+                                                partition_values,
+                                            }
+                                        }
+                                        NextOpen::Ready(reader) => {
+                                            self.state = FileStreamState::Open 
{
+                                                future: 
Box::pin(std::future::ready(
+                                                    reader,
+                                                )),
+                                                partition_values,
+                                            }
+                                        }
+                                    }
                                 }
+                                None => return Poll::Ready(None),
                             }
-                            None => return Poll::Ready(None),
                         }
                     }
-                },
+                }
                 FileStreamState::Error | FileStreamState::Limit => {
                     return Poll::Ready(None)
                 }

Reply via email to