xudong963 commented on code in PR #15432: URL: https://github.com/apache/datafusion/pull/15432#discussion_r2016725230
########## datafusion/core/src/datasource/listing/table.rs: ########## @@ -1181,6 +1175,92 @@ impl ListingTable { } } +/// Processes a stream of partitioned files and returns a `FileGroup` containing the files. +/// +/// This function collects files from the provided stream until either: +/// 1. The stream is exhausted +/// 2. The accumulated number of rows exceeds the provided `limit` (if specified) +/// +/// # Arguments +/// * `files` - A stream of `Result<PartitionedFile>` items to process +/// * `limit` - An optional row count limit. If provided, the function will stop collecting files +/// once the accumulated number of rows exceeds this limit +/// * `collect_stats` - Whether to collect and accumulate statistics from the files +/// +/// # Returns +/// A `Result` containing a `FileGroup` with the collected files +/// and a boolean indicating whether the statistics are inexact. +/// +/// # Note +/// The function will continue processing files if statistics are not available or if the +/// limit is not provided. If `collect_stats` is false, statistics won't be accumulated +/// but files will still be collected. +async fn get_files_with_limit( + files: impl Stream<Item = Result<PartitionedFile>>, + limit: Option<usize>, + collect_stats: bool, +) -> Result<(FileGroup, bool)> { + let mut file_group = FileGroup::default(); + // Fusing the stream allows us to call next safely even once it is finished. + let mut all_files = Box::pin(files.fuse()); + let mut num_rows = Precision::<usize>::Absent; + while let Some(first_file) = all_files.next().await { Review Comment: I agree, the current nested loop is annoying. Code to my mind by using FSM is like this: ```rust enum ProcessingState { ReadingFiles, ReachedLimit, } let mut state = ProcessingState::ReadingFiles; let mut num_rows = Precision::<usize>::Absent; while let Some(file_result) = all_files.next().await { // Early exit if we've already reached our limit if matches!(state, ProcessingState::ReachedLimit) { break; } let file = file_result?; // Update file statistics regardless of state if collect_stats { if let Some(file_stats) = &file.statistics { num_rows = if file_group.is_empty() { // For the first file, just take its row count file_stats.num_rows } else { // For subsequent files, accumulate the counts crate::datasource::statistics::add_row_stats( num_rows, file_stats.num_rows, ) }; } } // Always add the file to our group file_group.push(file); // Check if we've hit the limit (if one was specified) if let Some(limit) = limit { if let Precision::Exact(row_count) = num_rows { if row_count > limit { state = ProcessingState::ReachedLimit; } } } } ``` Given that the PR mainly move the old cold to new method to make them flexible. I don't want to make more changes in the PR, but happy to do such refactor in the separated PR. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org