gene-bordegaray commented on code in PR #19957:
URL: https://github.com/apache/datafusion/pull/19957#discussion_r2722323190
##########
datafusion/common/src/stats.rs:
##########
@@ -632,7 +632,24 @@ impl Statistics {
col_stats.max_value =
col_stats.max_value.max(&item_col_stats.max_value);
col_stats.min_value =
col_stats.min_value.min(&item_col_stats.min_value);
col_stats.sum_value =
col_stats.sum_value.add(&item_col_stats.sum_value);
- col_stats.distinct_count = Precision::Absent;
+ // Use max as a conservative lower bound for distinct count
+ // (can't accurately merge NDV since duplicates may exist across
partitions)
+ col_stats.distinct_count =
+ match (&col_stats.distinct_count,
&item_col_stats.distinct_count) {
+ (Precision::Exact(a), Precision::Exact(b))
+ | (Precision::Inexact(a), Precision::Exact(b))
+ | (Precision::Exact(a), Precision::Inexact(b))
+ | (Precision::Inexact(a), Precision::Inexact(b)) => {
+ Precision::Inexact(if a >= b { *a } else { *b })
+ }
+ (Precision::Exact(v), Precision::Absent)
+ | (Precision::Inexact(v), Precision::Absent)
+ | (Precision::Absent, Precision::Exact(v))
+ | (Precision::Absent, Precision::Inexact(v)) => {
+ Precision::Inexact(*v)
+ }
+ (Precision::Absent, Precision::Absent) =>
Precision::Absent,
+ };
Review Comment:
I think this verbosity could be reduced to something like:
```rust
col_stats.distinct_count = col_stats.distinct_count.get_value()
.max(item_col_stats.distinct_count.get_value())
.map(|&v| Precision::Inexact(v))
.unwrap_or(Precision::Absent);
```
or we could introduce some method like `max_inexact()` on Precision.
##########
datafusion/physical-expr/src/projection.rs:
##########
@@ -660,9 +660,25 @@ impl ProjectionExprs {
}
}
} else {
- // TODO stats: estimate more statistics from expressions
- // (expressions should compute their statistics themselves)
- ColumnStatistics::new_unknown()
+ // TODO: expressions should compute their own statistics
Review Comment:
noice, this is useful thanks 😄
##########
datafusion/datasource-parquet/src/metadata.rs:
##########
@@ -778,4 +815,354 @@ mod tests {
assert_eq!(result, Some(false));
}
}
+
+ mod ndv_tests {
+ use super::*;
+ use arrow::datatypes::Field;
+ use parquet::basic::Type as PhysicalType;
+ use parquet::file::metadata::{ColumnChunkMetaData, RowGroupMetaData};
+ use parquet::file::statistics::Statistics as ParquetStatistics;
+ use parquet::schema::types::{SchemaDescriptor, Type as SchemaType};
+
+ fn create_schema_descr(num_columns: usize) -> Arc<SchemaDescriptor> {
+ let fields: Vec<Arc<SchemaType>> = (0..num_columns)
+ .map(|i| {
+ Arc::new(
+ SchemaType::primitive_type_builder(
+ &format!("col_{i}"),
+ PhysicalType::INT32,
+ )
+ .build()
+ .unwrap(),
+ )
+ })
+ .collect();
+
+ let schema = SchemaType::group_type_builder("schema")
+ .with_fields(fields)
+ .build()
+ .unwrap();
+
+ Arc::new(SchemaDescriptor::new(Arc::new(schema)))
+ }
+
+ fn create_arrow_schema(num_columns: usize) -> SchemaRef {
+ let fields: Vec<Field> = (0..num_columns)
+ .map(|i| Field::new(format!("col_{i}"), DataType::Int32, true))
+ .collect();
+ Arc::new(Schema::new(fields))
+ }
+
+ fn create_row_group_with_stats(
+ schema_descr: &Arc<SchemaDescriptor>,
+ column_stats: Vec<Option<ParquetStatistics>>,
+ num_rows: i64,
+ ) -> RowGroupMetaData {
+ let columns: Vec<ColumnChunkMetaData> = column_stats
+ .into_iter()
+ .enumerate()
+ .map(|(i, stats)| {
+ let mut builder =
+ ColumnChunkMetaData::builder(schema_descr.column(i));
+ if let Some(s) = stats {
+ builder = builder.set_statistics(s);
+ }
+ builder.set_num_values(num_rows).build().unwrap()
+ })
+ .collect();
+
+ RowGroupMetaData::builder(schema_descr.clone())
+ .set_num_rows(num_rows)
+ .set_total_byte_size(1000)
+ .set_column_metadata(columns)
+ .build()
+ .unwrap()
+ }
+
+ fn create_parquet_metadata(
+ schema_descr: Arc<SchemaDescriptor>,
+ row_groups: Vec<RowGroupMetaData>,
+ ) -> ParquetMetaData {
+ use parquet::file::metadata::FileMetaData;
+
+ let num_rows: i64 = row_groups.iter().map(|rg|
rg.num_rows()).sum();
+ let file_meta = FileMetaData::new(
+ 1, // version
+ num_rows, // num_rows
+ None, // created_by
+ None, // key_value_metadata
+ schema_descr, // schema_descr
+ None, // column_orders
+ );
+
+ ParquetMetaData::new(file_meta, row_groups)
+ }
+
+ #[test]
+ fn test_distinct_count_single_row_group_with_ndv() {
+ // Single row group with distinct count should return Exact
+ let schema_descr = create_schema_descr(1);
+ let arrow_schema = create_arrow_schema(1);
+
+ // Create statistics with distinct_count = 42
+ let stats = ParquetStatistics::int32(
+ Some(1), // min
+ Some(100), // max
+ Some(42), // distinct_count
+ Some(0), // null_count
+ false, // is_deprecated
+ );
+
+ let row_group =
+ create_row_group_with_stats(&schema_descr, vec![Some(stats)],
1000);
+ let metadata = create_parquet_metadata(schema_descr,
vec![row_group]);
+
+ let result = DFParquetMetadata::statistics_from_parquet_metadata(
+ &metadata,
+ &arrow_schema,
+ )
+ .unwrap();
+
+ assert_eq!(
+ result.column_statistics[0].distinct_count,
+ Precision::Exact(42)
+ );
+ }
+
+ #[test]
+ fn test_distinct_count_multiple_row_groups_with_ndv() {
+ // Multiple row groups with distinct counts should return Inexact
(sum)
+ let schema_descr = create_schema_descr(1);
+ let arrow_schema = create_arrow_schema(1);
+
+ // Row group 1: distinct_count = 10
+ let stats1 = ParquetStatistics::int32(
+ Some(1),
+ Some(50),
+ Some(10), // distinct_count
+ Some(0),
+ false,
+ );
+
+ // Row group 2: distinct_count = 20
+ let stats2 = ParquetStatistics::int32(
+ Some(51),
+ Some(100),
+ Some(20), // distinct_count
+ Some(0),
+ false,
+ );
+
+ let row_group1 =
+ create_row_group_with_stats(&schema_descr, vec![Some(stats1)],
500);
+ let row_group2 =
+ create_row_group_with_stats(&schema_descr, vec![Some(stats2)],
500);
+ let metadata =
+ create_parquet_metadata(schema_descr, vec![row_group1,
row_group2]);
+
+ let result = DFParquetMetadata::statistics_from_parquet_metadata(
+ &metadata,
+ &arrow_schema,
+ )
+ .unwrap();
+
+ // Max of distinct counts (lower bound since we can't accurately
merge NDV)
+ assert_eq!(
+ result.column_statistics[0].distinct_count,
+ Precision::Inexact(20)
+ );
+ }
+
+ #[test]
+ fn test_distinct_count_no_ndv_available() {
+ // No distinct count in statistics should return Absent
+ let schema_descr = create_schema_descr(1);
+ let arrow_schema = create_arrow_schema(1);
+
+ // Create statistics without distinct_count (None)
+ let stats = ParquetStatistics::int32(
+ Some(1),
+ Some(100),
+ None, // no distinct_count
+ Some(0),
+ false,
+ );
+
+ let row_group =
+ create_row_group_with_stats(&schema_descr, vec![Some(stats)],
1000);
+ let metadata = create_parquet_metadata(schema_descr,
vec![row_group]);
+
+ let result = DFParquetMetadata::statistics_from_parquet_metadata(
+ &metadata,
+ &arrow_schema,
+ )
+ .unwrap();
+
+ assert_eq!(
+ result.column_statistics[0].distinct_count,
+ Precision::Absent
+ );
+ }
+
+ #[test]
+ fn test_distinct_count_partial_ndv_in_row_groups() {
+ // Some row groups have NDV, some don't - should use only those
that have it
+ let schema_descr = create_schema_descr(1);
+ let arrow_schema = create_arrow_schema(1);
+
+ // Row group 1: has distinct_count = 15
+ let stats1 =
+ ParquetStatistics::int32(Some(1), Some(50), Some(15), Some(0),
false);
+
+ // Row group 2: no distinct_count
+ let stats2 = ParquetStatistics::int32(
+ Some(51),
+ Some(100),
+ None, // no distinct_count
+ Some(0),
+ false,
+ );
+
+ let row_group1 =
+ create_row_group_with_stats(&schema_descr, vec![Some(stats1)],
500);
+ let row_group2 =
+ create_row_group_with_stats(&schema_descr, vec![Some(stats2)],
500);
+ let metadata =
+ create_parquet_metadata(schema_descr, vec![row_group1,
row_group2]);
+
+ let result = DFParquetMetadata::statistics_from_parquet_metadata(
+ &metadata,
+ &arrow_schema,
+ )
+ .unwrap();
+
+ // Only one row group has NDV, so it's Exact(15)
+ assert_eq!(
+ result.column_statistics[0].distinct_count,
+ Precision::Exact(15)
+ );
+ }
+
+ #[test]
+ fn test_distinct_count_multiple_columns() {
+ // Test with multiple columns, each with different NDV
+ let schema_descr = create_schema_descr(3);
+ let arrow_schema = create_arrow_schema(3);
+
+ // col_0: distinct_count = 5
+ let stats0 =
+ ParquetStatistics::int32(Some(1), Some(10), Some(5), Some(0),
false);
+ // col_1: no distinct_count
+ let stats1 =
+ ParquetStatistics::int32(Some(1), Some(100), None, Some(0),
false);
+ // col_2: distinct_count = 100
+ let stats2 =
+ ParquetStatistics::int32(Some(1), Some(1000), Some(100),
Some(0), false);
+
+ let row_group = create_row_group_with_stats(
+ &schema_descr,
+ vec![Some(stats0), Some(stats1), Some(stats2)],
+ 1000,
+ );
+ let metadata = create_parquet_metadata(schema_descr,
vec![row_group]);
+
+ let result = DFParquetMetadata::statistics_from_parquet_metadata(
+ &metadata,
+ &arrow_schema,
+ )
+ .unwrap();
+
+ assert_eq!(
+ result.column_statistics[0].distinct_count,
+ Precision::Exact(5)
+ );
+ assert_eq!(
+ result.column_statistics[1].distinct_count,
+ Precision::Absent
+ );
+ assert_eq!(
+ result.column_statistics[2].distinct_count,
+ Precision::Exact(100)
+ );
+ }
+
+ #[test]
+ fn test_distinct_count_no_statistics_at_all() {
+ // No statistics in row group should return Absent for all stats
+ let schema_descr = create_schema_descr(1);
+ let arrow_schema = create_arrow_schema(1);
+
+ // Create row group without any statistics
+ let row_group = create_row_group_with_stats(&schema_descr,
vec![None], 1000);
+ let metadata = create_parquet_metadata(schema_descr,
vec![row_group]);
+
+ let result = DFParquetMetadata::statistics_from_parquet_metadata(
+ &metadata,
+ &arrow_schema,
+ )
+ .unwrap();
+
+ assert_eq!(
+ result.column_statistics[0].distinct_count,
+ Precision::Absent
+ );
+ }
+
+ /// Integration test that reads a real Parquet file with
distinct_count statistics.
+ /// The test file was created with DuckDB and has known NDV values:
+ /// - id: NULL (high cardinality, not tracked)
+ /// - category: 10 distinct values
+ /// - name: 5 distinct values
+ #[test]
+ fn test_distinct_count_from_real_parquet_file() {
+ use parquet::arrow::parquet_to_arrow_schema;
+ use parquet::file::reader::{FileReader, SerializedFileReader};
+ use std::fs::File;
+ use std::path::PathBuf;
Review Comment:
nit: since these tests are in their own module, I think moving these to the
ndv_test module level would be ok
##########
datafusion/datasource-parquet/src/metadata.rs:
##########
@@ -419,6 +424,7 @@ fn get_col_stats(
is_max_value_exact: &mut [Option<bool>],
is_min_value_exact: &mut [Option<bool>],
column_byte_sizes: &[Precision<usize>],
+ distinct_counts: &[Precision<usize>],
Review Comment:
A nit but maybe these could be extracted into a struct that encapsulates
these parameters as fields - say extend `StatisticsAccumulators` and use this
or create a new struct
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]