This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new ea21b08e47 refactor: fetch statistics for a given ParquetMetaData 
(#10880)
ea21b08e47 is described below

commit ea21b08e477cc48f458917c132f79c4980c957c1
Author: Nga Tran <[email protected]>
AuthorDate: Thu Jun 13 07:38:37 2024 -0400

    refactor: fetch statistics for a given ParquetMetaData (#10880)
    
    * refactor: fetch statistics for a given ParquetMetaData
    
    * test: add tests for fetch_statistics_from_parquet_meta
    
    * Rename function and improve docs
    
    * Simplify the test
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 .../core/src/datasource/file_format/parquet.rs     | 73 ++++++++++++++++++++++
 1 file changed, 73 insertions(+)

diff --git a/datafusion/core/src/datasource/file_format/parquet.rs 
b/datafusion/core/src/datasource/file_format/parquet.rs
index 99c38d3f09..572904254f 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -455,6 +455,8 @@ async fn fetch_schema(
 }
 
 /// Read and parse the statistics of the Parquet file at location `path`
+///
+/// See [`statistics_from_parquet_meta`] for more details
 async fn fetch_statistics(
     store: &dyn ObjectStore,
     table_schema: SchemaRef,
@@ -462,6 +464,17 @@ async fn fetch_statistics(
     metadata_size_hint: Option<usize>,
 ) -> Result<Statistics> {
     let metadata = fetch_parquet_metadata(store, file, 
metadata_size_hint).await?;
+    statistics_from_parquet_meta(&metadata, table_schema).await
+}
+
+/// Convert statistics in  [`ParquetMetaData`] into [`Statistics`]
+///
+/// The statistics are calculated for each column in the table schema
+/// using the row group statistics in the parquet metadata.
+pub async fn statistics_from_parquet_meta(
+    metadata: &ParquetMetaData,
+    table_schema: SchemaRef,
+) -> Result<Statistics> {
     let file_metadata = metadata.file_metadata();
 
     let file_schema = parquet_to_arrow_schema(
@@ -1402,6 +1415,66 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn test_statistics_from_parquet_metadata() -> Result<()> {
+        // Data for column c1: ["Foo", null, "bar"]
+        let c1: ArrayRef =
+            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+        let batch1 = RecordBatch::try_from_iter(vec![("c1", 
c1.clone())]).unwrap();
+
+        // Data for column c2: [1, 2, null]
+        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), 
None]));
+        let batch2 = RecordBatch::try_from_iter(vec![("c2", c2)]).unwrap();
+
+        // Use store_parquet to write each batch to its own file
+        // . batch1 written into first file and includes:
+        //    - column c1 that has 3 rows with one null. Stats min and max of 
string column is missing for this test even the column has values
+        // . batch2 written into second file and includes:
+        //    - column c2 that has 3 rows with one null. Stats min and max of 
int are avaialble and 1 and 2 respectively
+        let store = Arc::new(LocalFileSystem::new()) as _;
+        let (files, _file_names) = store_parquet(vec![batch1, batch2], 
false).await?;
+
+        let state = SessionContext::new().state();
+        let format = ParquetFormat::default();
+        let schema = format.infer_schema(&state, &store, 
&files).await.unwrap();
+
+        let null_i64 = ScalarValue::Int64(None);
+        let null_utf8 = ScalarValue::Utf8(None);
+
+        // Fetch statistics for first file
+        let pq_meta = fetch_parquet_metadata(store.as_ref(), &files[0], 
None).await?;
+        let stats = statistics_from_parquet_meta(&pq_meta, 
schema.clone()).await?;
+        //
+        assert_eq!(stats.num_rows, Precision::Exact(3));
+        // column c1
+        let c1_stats = &stats.column_statistics[0];
+        assert_eq!(c1_stats.null_count, Precision::Exact(1));
+        assert_eq!(c1_stats.max_value, Precision::Absent);
+        assert_eq!(c1_stats.min_value, Precision::Absent);
+        // column c2: missing from the file so the table treats all 3 rows as 
null
+        let c2_stats = &stats.column_statistics[1];
+        assert_eq!(c2_stats.null_count, Precision::Exact(3));
+        assert_eq!(c2_stats.max_value, Precision::Exact(null_i64.clone()));
+        assert_eq!(c2_stats.min_value, Precision::Exact(null_i64.clone()));
+
+        // Fetch statistics for second file
+        let pq_meta = fetch_parquet_metadata(store.as_ref(), &files[1], 
None).await?;
+        let stats = statistics_from_parquet_meta(&pq_meta, 
schema.clone()).await?;
+        assert_eq!(stats.num_rows, Precision::Exact(3));
+        // column c1: missing from the file so the table treats all 3 rows as 
null
+        let c1_stats = &stats.column_statistics[0];
+        assert_eq!(c1_stats.null_count, Precision::Exact(3));
+        assert_eq!(c1_stats.max_value, Precision::Exact(null_utf8.clone()));
+        assert_eq!(c1_stats.min_value, Precision::Exact(null_utf8.clone()));
+        // column c2
+        let c2_stats = &stats.column_statistics[1];
+        assert_eq!(c2_stats.null_count, Precision::Exact(1));
+        assert_eq!(c2_stats.max_value, Precision::Exact(2i64.into()));
+        assert_eq!(c2_stats.min_value, Precision::Exact(1i64.into()));
+
+        Ok(())
+    }
+
     #[tokio::test]
     async fn read_small_batches() -> Result<()> {
         let config = SessionConfig::new().with_batch_size(2);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to