xudong963 commented on code in PR #15432:
URL: https://github.com/apache/datafusion/pull/15432#discussion_r2016725230


##########
datafusion/core/src/datasource/listing/table.rs:
##########
@@ -1181,6 +1175,92 @@ impl ListingTable {
     }
 }
 
+/// Processes a stream of partitioned files and returns a `FileGroup` 
containing the files.
+///
+/// This function collects files from the provided stream until either:
+/// 1. The stream is exhausted
+/// 2. The accumulated number of rows exceeds the provided `limit` (if 
specified)
+///
+/// # Arguments
+/// * `files` - A stream of `Result<PartitionedFile>` items to process
+/// * `limit` - An optional row count limit. If provided, the function will 
stop collecting files
+///             once the accumulated number of rows exceeds this limit
+/// * `collect_stats` - Whether to collect and accumulate statistics from the 
files
+///
+/// # Returns
+/// A `Result` containing a `FileGroup` with the collected files
+/// and a boolean indicating whether the statistics are inexact.
+///
+/// # Note
+/// The function will continue processing files if statistics are not 
available or if the
+/// limit is not provided. If `collect_stats` is false, statistics won't be 
accumulated
+/// but files will still be collected.
+async fn get_files_with_limit(
+    files: impl Stream<Item = Result<PartitionedFile>>,
+    limit: Option<usize>,
+    collect_stats: bool,
+) -> Result<(FileGroup, bool)> {
+    let mut file_group = FileGroup::default();
+    // Fusing the stream allows us to call next safely even once it is 
finished.
+    let mut all_files = Box::pin(files.fuse());
+    let mut num_rows = Precision::<usize>::Absent;
+    while let Some(first_file) = all_files.next().await {

Review Comment:
   I agree, the current nested loop is annoying.
   
   Code to my mind by using FSM is like this:
   ```rust
   enum ProcessingState {
       ReadingFiles,
       ReachedLimit,
   }
   
   let mut state = ProcessingState::ReadingFiles;
   let mut num_rows = Precision::<usize>::Absent;
   
   while let Some(file_result) = all_files.next().await {
       // Early exit if we've already reached our limit
       if matches!(state, ProcessingState::ReachedLimit) {
           break;
       }
       
       let file = file_result?;
       
       // Update file statistics regardless of state
       if collect_stats {
           if let Some(file_stats) = &file.statistics {
               num_rows = if file_group.is_empty() {
                   // For the first file, just take its row count
                   file_stats.num_rows
               } else {
                   // For subsequent files, accumulate the counts
                   crate::datasource::statistics::add_row_stats(
                       num_rows,
                       file_stats.num_rows,
                   )
               };
           }
       }
       
       // Always add the file to our group
       file_group.push(file);
       
       // Check if we've hit the limit (if one was specified)
       if let Some(limit) = limit {
           if let Precision::Exact(row_count) = num_rows {
               if row_count > limit {
                   state = ProcessingState::ReachedLimit;
               }
           }
       }
   }
   ```
   
   Given that the PR mainly move the old cold to new method to make them 
flexible.  I don't want to make more changes in the PR, but happy to do such 
refactor in the separated PR.
   
   What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to