alamb commented on code in PR #10924:
URL: https://github.com/apache/datafusion/pull/10924#discussion_r1640395427


##########
datafusion/core/src/datasource/file_format/parquet.rs:
##########
@@ -482,73 +404,101 @@ pub async fn statistics_from_parquet_meta(
         file_metadata.key_value_metadata(),
     )?;
 
-    let num_fields = table_schema.fields().len();
-    let fields = table_schema.fields();
-
     let mut num_rows = 0;
     let mut total_byte_size = 0;
-    let mut null_counts = vec![Precision::Exact(0); num_fields];
-    let mut has_statistics = false;
-
-    let schema_adapter =
-        DefaultSchemaAdapterFactory::default().create(table_schema.clone());
-
-    let (mut max_values, mut min_values) = create_max_min_accs(&table_schema);
 
     for row_group_meta in metadata.row_groups() {
         num_rows += row_group_meta.num_rows();
         total_byte_size += row_group_meta.total_byte_size();
+    }
 
-        let mut column_stats: HashMap<usize, (u64, &ParquetStatistics)> = 
HashMap::new();
+    let schema_adapter =
+        DefaultSchemaAdapterFactory::default().create(table_schema.clone());
 
-        for (i, column) in row_group_meta.columns().iter().enumerate() {
-            if let Some(stat) = column.statistics() {
-                has_statistics = true;
-                column_stats.insert(i, (stat.null_count(), stat));
-            }
-        }
+    // statistics for each of the table's columns (may be different from the
+    // file schema)
+    let mut column_statistics = vec![];
+
+    for (table_idx, field) in table_schema.fields().iter().enumerate() {
+        let Some(file_idx) = schema_adapter.map_column_index(table_idx, 
&file_schema)
+        else {
+            // file columns not in table schema are treated as all null
+            let null_count = Precision::Exact(num_rows as usize);
+            let null_value = ScalarValue::try_from(field.data_type())?;
+            let stats = ColumnStatistics::new_unknown()
+                .with_null_count(null_count)
+                .with_max_value(Precision::Exact(null_value.clone()))
+                .with_min_value(Precision::Exact(null_value));
+            column_statistics.push(stats);
+            continue;
+        };
 
-        if has_statistics {
-            for (table_idx, null_cnt) in null_counts.iter_mut().enumerate() {
-                if let Some(file_idx) =
-                    schema_adapter.map_column_index(table_idx, &file_schema)
-                {
-                    if let Some((null_count, stats)) = 
column_stats.get(&file_idx) {
-                        *null_cnt = null_cnt.add(&Precision::Exact(*null_count 
as usize));
-                        summarize_min_max(
-                            &mut max_values,
-                            &mut min_values,
-                            fields,
-                            table_idx,
-                            stats,
-                        )
-                    } else {
-                        // If none statistics of current column exists, set 
the Max/Min Accumulator to None.
-                        max_values[table_idx] = None;
-                        min_values[table_idx] = None;
-                    }
-                } else {
-                    *null_cnt = null_cnt.add(&Precision::Exact(num_rows as 
usize));
-                }
-            }
-        }
-    }
+        let file_field = file_schema.field(file_idx);
+        let Some(converter) = StatisticsConverter::try_new(

Review Comment:
   this code now uses the well tested StatisticsConverter to extract statistics 
from the parquet file with the correct type of array



##########
datafusion/core/src/datasource/file_format/parquet.rs:
##########
@@ -295,86 +297,6 @@ impl FileFormat for ParquetFormat {
     }
 }
 
-fn summarize_min_max(
-    max_values: &mut [Option<MaxAccumulator>],
-    min_values: &mut [Option<MinAccumulator>],
-    fields: &Fields,
-    i: usize,
-    stat: &ParquetStatistics,
-) {
-    if !stat.has_min_max_set() {
-        max_values[i] = None;
-        min_values[i] = None;
-        return;
-    }
-    match stat {
-        ParquetStatistics::Boolean(s) if DataType::Boolean == 
*fields[i].data_type() => {
-            if let Some(max_value) = &mut max_values[i] {
-                max_value
-                    
.update_batch(&[Arc::new(BooleanArray::from(vec![*s.max()]))])
-                    .unwrap_or_else(|_| max_values[i] = None);
-            }
-            if let Some(min_value) = &mut min_values[i] {
-                min_value
-                    
.update_batch(&[Arc::new(BooleanArray::from(vec![*s.min()]))])
-                    .unwrap_or_else(|_| min_values[i] = None);
-            }
-        }
-        ParquetStatistics::Int32(s) if DataType::Int32 == 
*fields[i].data_type() => {
-            if let Some(max_value) = &mut max_values[i] {
-                max_value
-                    .update_batch(&[Arc::new(Int32Array::from_value(*s.max(), 
1))])
-                    .unwrap_or_else(|_| max_values[i] = None);
-            }
-            if let Some(min_value) = &mut min_values[i] {
-                min_value
-                    .update_batch(&[Arc::new(Int32Array::from_value(*s.min(), 
1))])
-                    .unwrap_or_else(|_| min_values[i] = None);
-            }
-        }
-        ParquetStatistics::Int64(s) if DataType::Int64 == 
*fields[i].data_type() => {
-            if let Some(max_value) = &mut max_values[i] {
-                max_value
-                    .update_batch(&[Arc::new(Int64Array::from_value(*s.max(), 
1))])

Review Comment:
   note how update_batch is called by first creating a single row array



##########
datafusion/core/src/datasource/file_format/parquet.rs:
##########
@@ -1449,8 +1400,15 @@ mod tests {
         // column c1
         let c1_stats = &stats.column_statistics[0];
         assert_eq!(c1_stats.null_count, Precision::Exact(1));
-        assert_eq!(c1_stats.max_value, Precision::Absent);
-        assert_eq!(c1_stats.min_value, Precision::Absent);
+        // Note in ASCII lower case is greater than upper case
+        assert_eq!(
+            c1_stats.max_value,
+            Precision::Exact(ScalarValue::from("bar"))

Review Comment:
   strings were previously not handled



##########
datafusion/sqllogictest/test_files/explain.slt:
##########
@@ -287,20 +287,20 @@ query TT
 EXPLAIN SELECT * FROM alltypes_plain limit 10;
 ----
 physical_plan
-01)GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Absent, 
[(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
-02)--ParquetExec: file_groups={1 group: 
[[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, 
projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, 
float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, 
statistics=[Rows=Exact(8), Bytes=Absent, 
[(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
+01)GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Absent, 
[(Col[0]: Min=Exact(Int32(NULL)) Max=Exact(Int32(NULL)) Null=Exact(0)),(Col[1]: 
Min=Exact(Boolean(NULL)) Max=Exact(Boolean(NULL)) Null=Exact(0)),(Col[2]: 
Min=Exact(Int32(NULL)) Max=Exact(Int32(NULL)) Null=Exact(0)),(Col[3]: 
Min=Exact(Int32(NULL)) Max=Exact(Int32(NULL)) Null=Exact(0)),(Col[4]: 
Min=Exact(Int32(NULL)) Max=Exact(Int32(NULL)) Null=Exact(0)),(Col[5]: 
Min=Exact(Int64(NULL)) Max=Exact(Int64(NULL)) Null=Exact(0)),(Col[6]: 
Min=Exact(Float32(NULL)) Max=Exact(Float32(NULL)) Null=Exact(0)),(Col[7]: 
Min=Exact(Float64(NULL)) Max=Exact(Float64(NULL)) Null=Exact(0)),(Col[8]: 
Min=Exact(Binary(NULL)) Max=Exact(Binary(NULL)) Null=Exact(0)),(Col[9]: 
Min=Exact(Binary(NULL)) Max=Exact(Binary(NULL)) Null=Exact(0)),(Col[10]: 
Min=Exact(TimestampNanosecond(NULL, None)) Max=Exact(TimestampNanosecond(NULL, 
None)) Null=Exact(0))]]
+02)--ParquetExec: file_groups={1 group: 
[[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, 
projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, 
float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, 
statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]: Min=Exact(Int32(NULL)) 
Max=Exact(Int32(NULL)) Null=Exact(0)),(Col[1]: Min=Exact(Boolean(NULL)) 
Max=Exact(Boolean(NULL)) Null=Exact(0)),(Col[2]: Min=Exact(Int32(NULL)) 
Max=Exact(Int32(NULL)) Null=Exact(0)),(Col[3]: Min=Exact(Int32(NULL)) 
Max=Exact(Int32(NULL)) Null=Exact(0)),(Col[4]: Min=Exact(Int32(NULL)) 
Max=Exact(Int32(NULL)) Null=Exact(0)),(Col[5]: Min=Exact(Int64(NULL)) 
Max=Exact(Int64(NULL)) Null=Exact(0)),(Col[6]: Min=Exact(Float32(NULL)) 
Max=Exact(Float32(NULL)) Null=Exact(0)),(Col[7]: Min=Exact(Float64(NULL)) 
Max=Exact(Float64(NULL)) Null=Exact(0)),(Col[8]: Min=Exact(Binary(NULL)) 
Max=Exact(Binary(NULL)) Null=Exact(0)),(Col[9]: Min=Exact(Binary(NULL)) 
Max=Exact(Binary(NULL
 )) Null=Exact(0)),(Col[10]: Min=Exact(TimestampNanosecond(NULL, None)) 
Max=Exact(TimestampNanosecond(NULL, None)) Null=Exact(0))]]

Review Comment:
   I don't fully understand why these statistics are all reported as null / 
None now. -- I need to investagate



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to