AdamGS commented on code in PR #22462:
URL: https://github.com/apache/datafusion/pull/22462#discussion_r3292903450


##########
datafusion/datasource-parquet/src/metadata.rs:
##########
@@ -506,121 +505,215 @@ impl StatisticsAccumulators<'_> {
 }
 
 fn summarize_column_statistics(
-    parquet_schema: &SchemaDescriptor,
     logical_file_schema: &Schema,
-    physical_file_schema: &Schema,
     accumulators: &mut StatisticsAccumulators,
     logical_schema_index: usize,
     stats_converter: &StatisticsConverter,
     row_groups_metadata: &[RowGroupMetaData],
+    num_rows: usize,
 ) -> Result<()> {
-    let max_values = stats_converter.row_group_maxes(row_groups_metadata)?;
-    let min_values = stats_converter.row_group_mins(row_groups_metadata)?;
-    let null_counts = 
stats_converter.row_group_null_counts(row_groups_metadata)?;
-    let is_max_value_exact_stat =
-        stats_converter.row_group_is_max_value_exact(row_groups_metadata)?;
-    let is_min_value_exact_stat =
-        stats_converter.row_group_is_min_value_exact(row_groups_metadata)?;
+    let parquet_index = stats_converter.parquet_column_index();
 
     if let Some(max_acc) = &mut accumulators.max_accs[logical_schema_index] {
-        max_acc.update_batch(&[Arc::clone(&max_values)])?;
-
-        // handle the common special case when all row groups have exact 
statistics
-        let exactness = &is_max_value_exact_stat;
-        if !exactness.is_empty() && exactness.null_count() == 0 && 
!exactness.has_false()
-        {
-            accumulators.is_max_value_exact[logical_schema_index] = Some(true);
-        } else if !exactness.has_true() {
-            accumulators.is_max_value_exact[logical_schema_index] = 
Some(false);
-        } else {
-            let val = max_acc.evaluate()?;
-            accumulators.is_max_value_exact[logical_schema_index] =
-                has_any_exact_match(&val, &max_values, exactness);
-        }
+        accumulators.is_max_value_exact[logical_schema_index] = 
summarize_bound(
+            max_acc,
+            &stats_converter.row_group_maxes(row_groups_metadata)?,
+            parquet_index,
+            row_groups_metadata,
+            ParquetStatistics::max_is_exact,
+            || 
Ok(stats_converter.row_group_is_max_value_exact(row_groups_metadata)?),
+        )?;
     }
 
     if let Some(min_acc) = &mut accumulators.min_accs[logical_schema_index] {
-        min_acc.update_batch(&[Arc::clone(&min_values)])?;
-
-        // handle the common special case when all row groups have exact 
statistics
-        let exactness = &is_min_value_exact_stat;
-        if !exactness.is_empty() && exactness.null_count() == 0 && 
!exactness.has_false()
-        {
-            accumulators.is_min_value_exact[logical_schema_index] = Some(true);
-        } else if !exactness.has_true() {
-            accumulators.is_min_value_exact[logical_schema_index] = 
Some(false);
-        } else {
-            let val = min_acc.evaluate()?;
-            accumulators.is_min_value_exact[logical_schema_index] =
-                has_any_exact_match(&val, &min_values, exactness);
-        }
+        accumulators.is_min_value_exact[logical_schema_index] = 
summarize_bound(
+            min_acc,
+            &stats_converter.row_group_mins(row_groups_metadata)?,
+            parquet_index,
+            row_groups_metadata,
+            ParquetStatistics::min_is_exact,
+            || 
Ok(stats_converter.row_group_is_min_value_exact(row_groups_metadata)?),
+        )?;
     }
 
-    accumulators.null_counts_array[logical_schema_index] = match 
sum(&null_counts) {
-        Some(null_count) => Precision::Exact(null_count as usize),
-        None => match null_counts.len() {
-            // If sum() returned None we either have no rows or all values are 
null
-            0 => Precision::Exact(0),
-            _ => Precision::Absent,
-        },
-    };
-
-    // This is the same logic as parquet_column but we start from arrow schema 
index
-    // instead of looking up by name.
-    let parquet_index = parquet_column(
-        parquet_schema,
-        physical_file_schema,
-        logical_file_schema.field(logical_schema_index).name(),
-    )
-    .map(|(idx, _)| idx);
+    accumulators.null_counts_array[logical_schema_index] =
+        summarize_null_counts(parquet_index, row_groups_metadata);
 
-    // Extract distinct counts from row group column statistics
     accumulators.distinct_counts_array[logical_schema_index] =
-        if let Some(parquet_idx) = parquet_index {
-            let num_row_groups = row_groups_metadata.len();
-            let distinct_counts: Vec<u64> = row_groups_metadata
-                .iter()
-                .filter_map(|rg| {
-                    rg.columns()
-                        .get(parquet_idx)
-                        .and_then(|col| col.statistics())
-                        .and_then(|stats| stats.distinct_count_opt())
-                })
-                .collect();
-
-            let coverage = distinct_counts.len() as f64 / 
num_row_groups.max(1) as f64;
-
-            if coverage < PARTIAL_NDV_THRESHOLD {
-                Precision::Absent
-            } else if distinct_counts.len() == 1 && num_row_groups == 1 {
-                // Single row group with distinct count - use exact value
-                Precision::Exact(distinct_counts[0] as usize)
-            } else {
-                // Multiple row groups - use max as a lower bound estimate
-                // (can't accurately merge NDV since duplicates may exist 
across row groups)
-                match distinct_counts.iter().max() {
-                    Some(&max_ndv) => Precision::Inexact(max_ndv as usize),
-                    None => Precision::Absent,
-                }
-            }
-        } else {
-            Precision::Absent
-        };
+        summarize_distinct_counts(parquet_index, row_groups_metadata);
 
     let arrow_field = logical_file_schema.field(logical_schema_index);
     accumulators.column_byte_sizes[logical_schema_index] = 
compute_arrow_column_size(
         arrow_field.data_type(),
         row_groups_metadata,
         parquet_index,
-        row_groups_metadata
-            .iter()
-            .map(|rg| rg.num_rows() as usize)
-            .sum(),
+        num_rows,
     );
 
     Ok(())
 }
 
+/// Feed a column's per-row-group min or max `values` into `acc` and decide
+/// whether the resulting bound is exact across all row groups.
+///
+/// `is_exact` reads the per-row-group exactness flag straight from the raw
+/// parquet statistics. `row_group_exactness` rebuilds the exactness as a 
Boolean
+/// array and is only called for the rare case where row groups disagree.
+fn summarize_bound<A: Accumulator>(
+    acc: &mut A,
+    values: &ArrayRef,
+    parquet_index: Option<usize>,
+    row_groups_metadata: &[RowGroupMetaData],
+    is_exact: impl Fn(&ParquetStatistics) -> bool,
+    row_group_exactness: impl FnOnce() -> Result<BooleanArray>,
+) -> Result<Option<bool>> {
+    acc.update_batch(&[Arc::clone(values)])?;
+
+    Ok(
+        match summarize_row_group_exactness(parquet_index, 
row_groups_metadata, is_exact)
+        {
+            ExactnessSummary::AllExact => Some(true),
+            ExactnessSummary::NoneExact => Some(false),
+            ExactnessSummary::Mixed => {
+                let exactness = row_group_exactness()?;
+                has_any_exact_match(&acc.evaluate()?, values, &exactness)
+            }
+        },
+    )
+}
+
+fn summarize_null_counts(
+    parquet_idx: Option<usize>,
+    row_groups_metadata: &[RowGroupMetaData],
+) -> Precision<usize> {
+    let Some(parquet_idx) = parquet_idx else {
+        return match row_groups_metadata.len() {
+            0 => Precision::Exact(0),
+            _ => Precision::Absent,
+        };
+    };
+
+    let mut null_count_sum = 0_u64;
+    let mut has_known_count = false;
+    for row_group in row_groups_metadata {
+        let Some(null_count) = row_group
+            .columns()
+            .get(parquet_idx)
+            .and_then(|column| column.statistics())
+            .map(|statistics| statistics.null_count_opt().unwrap_or(0))

Review Comment:
   good catch, I'll fix this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to