alamb commented on code in PR #15432:
URL: https://github.com/apache/datafusion/pull/15432#discussion_r2019764664


##########
datafusion/core/src/datasource/listing/table.rs:
##########
@@ -1181,6 +1175,92 @@ impl ListingTable {
     }
 }
 
+/// Processes a stream of partitioned files and returns a `FileGroup` 
containing the files.
+///
+/// This function collects files from the provided stream until either:
+/// 1. The stream is exhausted
+/// 2. The accumulated number of rows exceeds the provided `limit` (if 
specified)
+///
+/// # Arguments
+/// * `files` - A stream of `Result<PartitionedFile>` items to process
+/// * `limit` - An optional row count limit. If provided, the function will 
stop collecting files
+///             once the accumulated number of rows exceeds this limit
+/// * `collect_stats` - Whether to collect and accumulate statistics from the 
files
+///
+/// # Returns
+/// A `Result` containing a `FileGroup` with the collected files
+/// and a boolean indicating whether the statistics are inexact.
+///
+/// # Note
+/// The function will continue processing files if statistics are not 
available or if the
+/// limit is not provided. If `collect_stats` is false, statistics won't be 
accumulated
+/// but files will still be collected.
+async fn get_files_with_limit(
+    files: impl Stream<Item = Result<PartitionedFile>>,
+    limit: Option<usize>,
+    collect_stats: bool,
+) -> Result<(FileGroup, bool)> {
+    let mut file_group = FileGroup::default();
+    // Fusing the stream allows us to call next safely even once it is 
finished.
+    let mut all_files = Box::pin(files.fuse());
+    let mut num_rows = Precision::<usize>::Absent;
+    while let Some(first_file) = all_files.next().await {
+        let file = first_file?;
+        if let Some(file_statistic) = &file.statistics {
+            num_rows = file_statistic.num_rows;
+        }
+        file_group.push(file);
+
+        // If the number of rows exceeds the limit, we can stop processing
+        // files. This only applies when we know the number of rows. It also
+        // currently ignores tables that have no statistics regarding the
+        // number of rows.
+        let conservative_num_rows = match num_rows {
+            Precision::Exact(nr) => nr,
+            _ => usize::MIN,
+        };
+        if conservative_num_rows <= limit.unwrap_or(usize::MAX) {
+            while let Some(current) = all_files.next().await {
+                let file = current?;
+                if !collect_stats {
+                    file_group.push(file);
+                    continue;
+                }
+
+                // We accumulate the number of rows, total byte size and null
+                // counts across all the files in question. If any file does 
not
+                // provide any information or provides an inexact value, we 
demote
+                // the statistic precision to inexact.
+                if let Some(file_stats) = &file.statistics {
+                    num_rows = crate::datasource::statistics::add_row_stats(
+                        num_rows,
+                        file_stats.num_rows,
+                    );
+                }
+                file_group.push(file);
+
+                // If the number of rows exceeds the limit, we can stop 
processing
+                // files. This only applies when we know the number of rows. 
It also
+                // currently ignores tables that have no statistics regarding 
the
+                // number of rows.
+                if num_rows.get_value().unwrap_or(&usize::MIN)
+                    > &limit.unwrap_or(usize::MAX)
+                {
+                    break;
+                }
+            }
+        }
+    }
+    let mut inexact_stats = false;
+    if all_files.next().await.is_some() {
+        // If we still have files in the stream, it means that the limit kicked
+        // in, and the statistic could have been different had we processed the
+        // files in a different order.
+        inexact_stats = true;
+    }

Review Comment:
   ```suggestion
       // If we still have files in the stream, it means that the limit kicked
       // in, and the statistic could have been different had we processed the
       // files in a different order.
       let inexact_stats = all_files.next().await.is_some();
   ```



##########
datafusion/core/src/datasource/statistics.rs:
##########
@@ -145,7 +147,142 @@ pub async fn get_statistics_with_limit(
     Ok((result_files, statistics))
 }
 
-fn add_row_stats(
+/// Generic function to compute statistics across multiple items that have 
statistics
+fn compute_summary_statistics<T, I>(
+    items: I,
+    file_schema: &SchemaRef,
+    stats_extractor: impl Fn(&T) -> Option<&Statistics>,
+) -> Statistics
+where
+    I: IntoIterator<Item = T>,
+{
+    let size = file_schema.fields().len();
+    let mut col_stats_set = vec![ColumnStatistics::default(); size];
+    let mut num_rows = Precision::<usize>::Absent;
+    let mut total_byte_size = Precision::<usize>::Absent;
+
+    for (idx, item) in items.into_iter().enumerate() {
+        if let Some(item_stats) = stats_extractor(&item) {
+            if idx == 0 {
+                // First item, set values directly
+                num_rows = item_stats.num_rows;
+                total_byte_size = item_stats.total_byte_size;
+                for (index, column_stats) in
+                    item_stats.column_statistics.iter().enumerate()
+                {
+                    col_stats_set[index].null_count = column_stats.null_count;
+                    col_stats_set[index].max_value = 
column_stats.max_value.clone();
+                    col_stats_set[index].min_value = 
column_stats.min_value.clone();
+                    col_stats_set[index].sum_value = 
column_stats.sum_value.clone();
+                }
+                continue;
+            }
+
+            // Accumulate statistics for subsequent items
+            num_rows = add_row_stats(item_stats.num_rows, num_rows);
+            total_byte_size = add_row_stats(item_stats.total_byte_size, 
total_byte_size);
+
+            for (item_col_stats, col_stats) in item_stats
+                .column_statistics
+                .iter()
+                .zip(col_stats_set.iter_mut())
+            {
+                col_stats.null_count =
+                    add_row_stats(item_col_stats.null_count, 
col_stats.null_count);
+                set_max_if_greater(&item_col_stats.max_value, &mut 
col_stats.max_value);
+                set_min_if_lesser(&item_col_stats.min_value, &mut 
col_stats.min_value);
+                col_stats.sum_value = 
item_col_stats.sum_value.add(&col_stats.sum_value);
+            }
+        }
+    }
+
+    Statistics {
+        num_rows,
+        total_byte_size,
+        column_statistics: col_stats_set,
+    }
+}
+
+/// Computes the summary statistics for a group of files(`FileGroup` level's 
statistics).
+///
+/// This function combines statistics from all files in the file group to 
create
+/// summary statistics. It handles the following aspects:
+/// - Merges row counts and byte sizes across files
+/// - Computes column-level statistics like min/max values
+/// - Maintains appropriate precision information (exact, inexact, absent)
+///
+/// # Parameters
+/// * `file_group` - The group of files to process
+/// * `file_schema` - Schema of the files
+/// * `collect_stats` - Whether to collect statistics (if false, returns 
original file group)
+///
+/// # Returns
+/// A new file group with summary statistics attached
+pub fn compute_file_group_statistics(
+    file_group: FileGroup,
+    file_schema: SchemaRef,
+    collect_stats: bool,
+) -> Result<FileGroup> {
+    if !collect_stats {
+        return Ok(file_group);
+    }
+
+    let statistics =
+        compute_summary_statistics(file_group.iter(), &file_schema, |file| {
+            file.statistics.as_ref().map(|stats| stats.as_ref())
+        });
+
+    Ok(file_group.with_statistics(statistics))
+}
+
+/// Computes statistics for all files across multiple file groups.
+///
+/// This function:
+/// 1. Computes statistics for each individual file group
+/// 2. Summary statistics across all file groups
+/// 3. Optionally marks statistics as inexact
+///
+/// # Parameters
+/// * `file_groups` - Vector of file groups to process
+/// * `file_schema` - Schema of the files
+/// * `collect_stats` - Whether to collect statistics
+/// * `inexact_stats` - Whether to mark the resulting statistics as inexact
+///
+/// # Returns
+/// A tuple containing:
+/// * The processed file groups with their individual statistics attached
+/// * The summary statistics across all file groups, aka all files summary 
statistics
+pub fn compute_all_files_statistics(
+    file_groups: Vec<FileGroup>,
+    file_schema: SchemaRef,
+    collect_stats: bool,
+    inexact_stats: bool,
+) -> Result<(Vec<FileGroup>, Statistics)> {

Review Comment:
   I think eventually `Vec<FileGroups>` and the `Statistics` ends up on a 
PartitionedFile
   
   This is the same return type as `list_files_for_scan` so I think it makes 
sense 



##########
datafusion/core/src/datasource/listing/table.rs:
##########
@@ -1181,6 +1175,92 @@ impl ListingTable {
     }
 }
 
+/// Processes a stream of partitioned files and returns a `FileGroup` 
containing the files.

Review Comment:
   ❤️ 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to