This is an automated email from the ASF dual-hosted git repository.

xudong963 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new a956ac3db9 use state machine to refactor the get_files_with_limit 
method (#15521)
a956ac3db9 is described below

commit a956ac3db9d1ba8569ed120aec05b4b74e9c33f3
Author: xudong.w <wxd963996...@gmail.com>
AuthorDate: Tue Apr 1 21:49:51 2025 +0800

    use state machine to refactor the get_files_with_limit method (#15521)
---
 datafusion/core/src/datasource/listing/table.rs | 72 ++++++++++++-------------
 1 file changed, 34 insertions(+), 38 deletions(-)

diff --git a/datafusion/core/src/datasource/listing/table.rs 
b/datafusion/core/src/datasource/listing/table.rs
index 79db5ecf52..6049614f37 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -1205,47 +1205,43 @@ async fn get_files_with_limit(
     let mut file_group = FileGroup::default();
     // Fusing the stream allows us to call next safely even once it is 
finished.
     let mut all_files = Box::pin(files.fuse());
-    let mut num_rows = Precision::<usize>::Absent;
-    while let Some(first_file) = all_files.next().await {
-        let file = first_file?;
-        if let Some(file_statistic) = &file.statistics {
-            num_rows = file_statistic.num_rows;
+    enum ProcessingState {
+        ReadingFiles,
+        ReachedLimit,
+    }
+
+    let mut state = ProcessingState::ReadingFiles;
+    let mut num_rows = Precision::Absent;
+
+    while let Some(file_result) = all_files.next().await {
+        // Early exit if we've already reached our limit
+        if matches!(state, ProcessingState::ReachedLimit) {
+            break;
         }
-        file_group.push(file);
 
-        // If the number of rows exceeds the limit, we can stop processing
-        // files. This only applies when we know the number of rows. It also
-        // currently ignores tables that have no statistics regarding the
-        // number of rows.
-        let conservative_num_rows = match num_rows {
-            Precision::Exact(nr) => nr,
-            _ => usize::MIN,
-        };
-        if conservative_num_rows <= limit.unwrap_or(usize::MAX) {
-            while let Some(current) = all_files.next().await {
-                let file = current?;
-                if !collect_stats {
-                    file_group.push(file);
-                    continue;
-                }
+        let file = file_result?;
 
-                // We accumulate the number of rows, total byte size and null
-                // counts across all the files in question. If any file does 
not
-                // provide any information or provides an inexact value, we 
demote
-                // the statistic precision to inexact.
-                if let Some(file_stats) = &file.statistics {
-                    num_rows = add_row_stats(num_rows, file_stats.num_rows);
-                }
-                file_group.push(file);
-
-                // If the number of rows exceeds the limit, we can stop 
processing
-                // files. This only applies when we know the number of rows. 
It also
-                // currently ignores tables that have no statistics regarding 
the
-                // number of rows.
-                if num_rows.get_value().unwrap_or(&usize::MIN)
-                    > &limit.unwrap_or(usize::MAX)
-                {
-                    break;
+        // Update file statistics regardless of state
+        if collect_stats {
+            if let Some(file_stats) = &file.statistics {
+                num_rows = if file_group.is_empty() {
+                    // For the first file, just take its row count
+                    file_stats.num_rows
+                } else {
+                    // For subsequent files, accumulate the counts
+                    add_row_stats(num_rows, file_stats.num_rows)
+                };
+            }
+        }
+
+        // Always add the file to our group
+        file_group.push(file);
+
+        // Check if we've hit the limit (if one was specified)
+        if let Some(limit) = limit {
+            if let Precision::Exact(row_count) = num_rows {
+                if row_count > limit {
+                    state = ProcessingState::ReachedLimit;
                 }
             }
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to