xudong963 commented on code in PR #22462:
URL: https://github.com/apache/datafusion/pull/22462#discussion_r3322355995
##########
datafusion/datasource-parquet/src/metadata.rs:
##########
@@ -506,119 +505,207 @@ impl StatisticsAccumulators<'_> {
}
fn summarize_column_statistics(
- parquet_schema: &SchemaDescriptor,
logical_file_schema: &Schema,
- physical_file_schema: &Schema,
accumulators: &mut StatisticsAccumulators,
logical_schema_index: usize,
stats_converter: &StatisticsConverter,
row_groups_metadata: &[RowGroupMetaData],
+ num_rows: usize,
) -> Result<()> {
- let max_values = stats_converter.row_group_maxes(row_groups_metadata)?;
- let min_values = stats_converter.row_group_mins(row_groups_metadata)?;
- let null_counts =
stats_converter.row_group_null_counts(row_groups_metadata)?;
- let is_max_value_exact_stat =
- stats_converter.row_group_is_max_value_exact(row_groups_metadata)?;
- let is_min_value_exact_stat =
- stats_converter.row_group_is_min_value_exact(row_groups_metadata)?;
+ let parquet_index = stats_converter.parquet_column_index();
if let Some(max_acc) = &mut accumulators.max_accs[logical_schema_index] {
- max_acc.update_batch(&[Arc::clone(&max_values)])?;
-
- // handle the common special case when all row groups have exact
statistics
- let exactness = &is_max_value_exact_stat;
- if !exactness.is_empty() && exactness.null_count() == 0 &&
!exactness.has_false()
- {
- accumulators.is_max_value_exact[logical_schema_index] = Some(true);
- } else if !exactness.has_true() {
- accumulators.is_max_value_exact[logical_schema_index] =
Some(false);
- } else {
- let val = max_acc.evaluate()?;
- accumulators.is_max_value_exact[logical_schema_index] =
- has_any_exact_match(&val, &max_values, exactness);
- }
+ accumulators.is_max_value_exact[logical_schema_index] =
summarize_bound(
+ max_acc,
+ &stats_converter.row_group_maxes(row_groups_metadata)?,
+ parquet_index,
+ row_groups_metadata,
+ ParquetStatistics::max_is_exact,
+ ||
Ok(stats_converter.row_group_is_max_value_exact(row_groups_metadata)?),
+ )?;
}
if let Some(min_acc) = &mut accumulators.min_accs[logical_schema_index] {
- min_acc.update_batch(&[Arc::clone(&min_values)])?;
+ accumulators.is_min_value_exact[logical_schema_index] =
summarize_bound(
+ min_acc,
+ &stats_converter.row_group_mins(row_groups_metadata)?,
+ parquet_index,
+ row_groups_metadata,
+ ParquetStatistics::min_is_exact,
+ ||
Ok(stats_converter.row_group_is_min_value_exact(row_groups_metadata)?),
+ )?;
+ }
+
+ accumulators.null_counts_array[logical_schema_index] =
+ summarize_null_counts(stats_converter, row_groups_metadata)?;
+
+ accumulators.distinct_counts_array[logical_schema_index] =
+ summarize_distinct_counts(parquet_index, row_groups_metadata);
+
+ let arrow_field = logical_file_schema.field(logical_schema_index);
+ accumulators.column_byte_sizes[logical_schema_index] =
compute_arrow_column_size(
+ arrow_field.data_type(),
+ row_groups_metadata,
+ parquet_index,
+ num_rows,
+ );
+
+ Ok(())
+}
- // handle the common special case when all row groups have exact
statistics
- let exactness = &is_min_value_exact_stat;
- if !exactness.is_empty() && exactness.null_count() == 0 &&
!exactness.has_false()
+/// Feed a column's per-row-group min or max `values` into `acc` and decide
+/// whether the resulting bound is exact across all row groups.
+///
+/// `is_exact` reads the per-row-group exactness flag straight from the raw
+/// parquet statistics. `row_group_exactness` rebuilds the exactness as a
Boolean
+/// array and is only called for the rare case where row groups disagree.
+fn summarize_bound<A: Accumulator>(
+ acc: &mut A,
+ values: &ArrayRef,
+ parquet_index: Option<usize>,
+ row_groups_metadata: &[RowGroupMetaData],
+ is_exact: impl Fn(&ParquetStatistics) -> bool,
+ row_group_exactness: impl FnOnce() -> Result<BooleanArray>,
+) -> Result<Option<bool>> {
+ acc.update_batch(&[Arc::clone(values)])?;
+
+ Ok(
+ match summarize_row_group_exactness(parquet_index,
row_groups_metadata, is_exact)
{
- accumulators.is_min_value_exact[logical_schema_index] = Some(true);
- } else if !exactness.has_true() {
- accumulators.is_min_value_exact[logical_schema_index] =
Some(false);
- } else {
- let val = min_acc.evaluate()?;
- accumulators.is_min_value_exact[logical_schema_index] =
- has_any_exact_match(&val, &min_values, exactness);
- }
+ ExactnessSummary::AllExact => Some(true),
+ ExactnessSummary::NoneExact => Some(false),
+ ExactnessSummary::Mixed => {
+ let exactness = row_group_exactness()?;
+ has_any_exact_match(&acc.evaluate()?, values, &exactness)
+ }
+ },
+ )
+}
+
+fn summarize_null_counts(
+ stats_converter: &StatisticsConverter,
+ row_groups_metadata: &[RowGroupMetaData],
+) -> Result<Precision<usize>> {
+ if row_groups_metadata.is_empty() {
+ return Ok(Precision::Exact(0));
}
- accumulators.null_counts_array[logical_schema_index] = match
sum(&null_counts) {
- Some(null_count) => Precision::Exact(null_count as usize),
+ let null_counts =
stats_converter.row_group_null_counts(row_groups_metadata)?;
+
+ match sum(&null_counts) {
+ Some(count) => {
+ // If any row group has a null `null_count` (either because the
column chunk doesn't have statistics
+ // or the value itself is null), we report the total as inexact.
Review Comment:
```suggestion
// If any row group has an unknown null_count, either because
column
// statistics are absent or because the null_count field is
omitted,
// report the aggregate as inexact.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]