jayzhan211 commented on code in PR #15432:
URL: https://github.com/apache/datafusion/pull/15432#discussion_r2016078687


##########
datafusion/core/src/datasource/statistics.rs:
##########
@@ -145,7 +147,142 @@ pub async fn get_statistics_with_limit(
     Ok((result_files, statistics))
 }
 
-fn add_row_stats(
+/// Generic function to compute statistics across multiple items that have 
statistics
+fn compute_summary_statistics<T, I>(
+    items: I,
+    file_schema: &SchemaRef,
+    stats_extractor: impl Fn(&T) -> Option<&Statistics>,
+) -> Statistics
+where
+    I: IntoIterator<Item = T>,
+{
+    let size = file_schema.fields().len();
+    let mut col_stats_set = vec![ColumnStatistics::default(); size];
+    let mut num_rows = Precision::<usize>::Absent;
+    let mut total_byte_size = Precision::<usize>::Absent;
+
+    for (idx, item) in items.into_iter().enumerate() {
+        if let Some(item_stats) = stats_extractor(&item) {
+            if idx == 0 {
+                // First item, set values directly
+                num_rows = item_stats.num_rows;
+                total_byte_size = item_stats.total_byte_size;
+                for (index, column_stats) in
+                    item_stats.column_statistics.iter().enumerate()
+                {
+                    col_stats_set[index].null_count = column_stats.null_count;
+                    col_stats_set[index].max_value = 
column_stats.max_value.clone();
+                    col_stats_set[index].min_value = 
column_stats.min_value.clone();
+                    col_stats_set[index].sum_value = 
column_stats.sum_value.clone();
+                }
+                continue;
+            }
+
+            // Accumulate statistics for subsequent items
+            num_rows = add_row_stats(item_stats.num_rows, num_rows);
+            total_byte_size = add_row_stats(item_stats.total_byte_size, 
total_byte_size);
+
+            for (item_col_stats, col_stats) in item_stats
+                .column_statistics
+                .iter()
+                .zip(col_stats_set.iter_mut())
+            {
+                col_stats.null_count =
+                    add_row_stats(item_col_stats.null_count, 
col_stats.null_count);
+                set_max_if_greater(&item_col_stats.max_value, &mut 
col_stats.max_value);
+                set_min_if_lesser(&item_col_stats.min_value, &mut 
col_stats.min_value);
+                col_stats.sum_value = 
item_col_stats.sum_value.add(&col_stats.sum_value);
+            }
+        }
+    }
+
+    Statistics {
+        num_rows,
+        total_byte_size,
+        column_statistics: col_stats_set,
+    }
+}
+
+/// Computes the summary statistics for a group of files(`FileGroup` level's 
statistics).
+///
+/// This function combines statistics from all files in the file group to 
create
+/// summary statistics. It handles the following aspects:
+/// - Merges row counts and byte sizes across files
+/// - Computes column-level statistics like min/max values
+/// - Maintains appropriate precision information (exact, inexact, absent)
+///
+/// # Parameters
+/// * `file_group` - The group of files to process
+/// * `file_schema` - Schema of the files
+/// * `collect_stats` - Whether to collect statistics (if false, returns 
original file group)
+///
+/// # Returns
+/// A new file group with summary statistics attached
+pub fn compute_file_group_statistics(
+    file_group: FileGroup,
+    file_schema: SchemaRef,
+    collect_stats: bool,
+) -> Result<FileGroup> {
+    if !collect_stats {
+        return Ok(file_group);
+    }
+
+    let statistics =
+        compute_summary_statistics(file_group.iter(), &file_schema, |file| {
+            file.statistics.as_ref().map(|stats| stats.as_ref())
+        });
+
+    Ok(file_group.with_statistics(statistics))
+}
+
+/// Computes statistics for all files across multiple file groups.
+///
+/// This function:
+/// 1. Computes statistics for each individual file group
+/// 2. Summary statistics across all file groups
+/// 3. Optionally marks statistics as inexact
+///
+/// # Parameters
+/// * `file_groups` - Vector of file groups to process
+/// * `file_schema` - Schema of the files
+/// * `collect_stats` - Whether to collect statistics
+/// * `inexact_stats` - Whether to mark the resulting statistics as inexact
+///
+/// # Returns
+/// A tuple containing:
+/// * The processed file groups with their individual statistics attached
+/// * The summary statistics across all file groups, aka all files summary 
statistics
+pub fn compute_all_files_statistics(
+    file_groups: Vec<FileGroup>,
+    file_schema: SchemaRef,
+    collect_stats: bool,
+    inexact_stats: bool,
+) -> Result<(Vec<FileGroup>, Statistics)> {
+    let mut file_groups = file_groups;
+
+    // First compute statistics for each file group
+    for file_group in file_groups.iter_mut() {
+        *file_group = compute_file_group_statistics(
+            file_group.clone(),

Review Comment:
   I think this clone is avoidable, we need owned type for updating statistics, 
but we don't need owned type for calculating statistics



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to