This is an automated email from the ASF dual-hosted git repository. xudong963 pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push: new a956ac3db9 use state machine to refactor the get_files_with_limit method (#15521) a956ac3db9 is described below commit a956ac3db9d1ba8569ed120aec05b4b74e9c33f3 Author: xudong.w <wxd963996...@gmail.com> AuthorDate: Tue Apr 1 21:49:51 2025 +0800 use state machine to refactor the get_files_with_limit method (#15521) --- datafusion/core/src/datasource/listing/table.rs | 72 ++++++++++++------------- 1 file changed, 34 insertions(+), 38 deletions(-) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 79db5ecf52..6049614f37 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -1205,47 +1205,43 @@ async fn get_files_with_limit( let mut file_group = FileGroup::default(); // Fusing the stream allows us to call next safely even once it is finished. let mut all_files = Box::pin(files.fuse()); - let mut num_rows = Precision::<usize>::Absent; - while let Some(first_file) = all_files.next().await { - let file = first_file?; - if let Some(file_statistic) = &file.statistics { - num_rows = file_statistic.num_rows; + enum ProcessingState { + ReadingFiles, + ReachedLimit, + } + + let mut state = ProcessingState::ReadingFiles; + let mut num_rows = Precision::Absent; + + while let Some(file_result) = all_files.next().await { + // Early exit if we've already reached our limit + if matches!(state, ProcessingState::ReachedLimit) { + break; } - file_group.push(file); - // If the number of rows exceeds the limit, we can stop processing - // files. This only applies when we know the number of rows. It also - // currently ignores tables that have no statistics regarding the - // number of rows. - let conservative_num_rows = match num_rows { - Precision::Exact(nr) => nr, - _ => usize::MIN, - }; - if conservative_num_rows <= limit.unwrap_or(usize::MAX) { - while let Some(current) = all_files.next().await { - let file = current?; - if !collect_stats { - file_group.push(file); - continue; - } + let file = file_result?; - // We accumulate the number of rows, total byte size and null - // counts across all the files in question. If any file does not - // provide any information or provides an inexact value, we demote - // the statistic precision to inexact. - if let Some(file_stats) = &file.statistics { - num_rows = add_row_stats(num_rows, file_stats.num_rows); - } - file_group.push(file); - - // If the number of rows exceeds the limit, we can stop processing - // files. This only applies when we know the number of rows. It also - // currently ignores tables that have no statistics regarding the - // number of rows. - if num_rows.get_value().unwrap_or(&usize::MIN) - > &limit.unwrap_or(usize::MAX) - { - break; + // Update file statistics regardless of state + if collect_stats { + if let Some(file_stats) = &file.statistics { + num_rows = if file_group.is_empty() { + // For the first file, just take its row count + file_stats.num_rows + } else { + // For subsequent files, accumulate the counts + add_row_stats(num_rows, file_stats.num_rows) + }; + } + } + + // Always add the file to our group + file_group.push(file); + + // Check if we've hit the limit (if one was specified) + if let Some(limit) = limit { + if let Precision::Exact(row_count) = num_rows { + if row_count > limit { + state = ProcessingState::ReachedLimit; } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org