This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a13401  ARROW-10783: [Rust][DataFusion] Implement Statistics for 
Parquet TableProvider
8a13401 is described below

commit 8a134013494eb7b15004ea7bba8bca8a9562ffdd
Author: Mike Seddon <[email protected]>
AuthorDate: Thu Dec 17 18:40:00 2020 -0500

    ARROW-10783: [Rust][DataFusion] Implement Statistics for Parquet 
TableProvider
    
    This is an implementation of Statistics for the Parquet datasource as this 
PR seems to have gone quiet: https://github.com/apache/arrow/pull/8860
    
    This PR exposes the ParquetMetaData produced by the file reader as even 
though initially we only consume `row_count` and `total_byte_size` there is 
column level metadata that may be very useful in future for potentially large 
performance gains.
    
    Closes #8944 from seddonm1/parquet-statistics
    
    Authored-by: Mike Seddon <[email protected]>
    Signed-off-by: Andrew Lamb <[email protected]>
---
 rust/datafusion/src/datasource/parquet.rs    | 23 ++++++++++++++++++++++-
 rust/datafusion/src/physical_plan/parquet.rs | 15 ++++++++++++++-
 rust/parquet/src/arrow/arrow_reader.rs       |  6 ++++++
 rust/parquet/src/file/metadata.rs            |  8 ++++----
 rust/parquet/src/file/statistics.rs          |  3 ++-
 5 files changed, 48 insertions(+), 7 deletions(-)

diff --git a/rust/datafusion/src/datasource/parquet.rs 
b/rust/datafusion/src/datasource/parquet.rs
index 9ac9c7b..c067b24 100644
--- a/rust/datafusion/src/datasource/parquet.rs
+++ b/rust/datafusion/src/datasource/parquet.rs
@@ -22,6 +22,7 @@ use std::string::String;
 use std::sync::Arc;
 
 use arrow::datatypes::*;
+use parquet::file::metadata::RowGroupMetaData;
 
 use crate::datasource::datasource::Statistics;
 use crate::datasource::TableProvider;
@@ -41,10 +42,26 @@ impl ParquetTable {
     pub fn try_new(path: &str) -> Result<Self> {
         let parquet_exec = ParquetExec::try_new(path, None, 0)?;
         let schema = parquet_exec.schema();
+
+        let metadata = parquet_exec.metadata();
+        let num_rows: i64 = metadata
+            .row_groups()
+            .iter()
+            .map(RowGroupMetaData::num_rows)
+            .sum();
+        let total_byte_size: i64 = metadata
+            .row_groups()
+            .iter()
+            .map(RowGroupMetaData::total_byte_size)
+            .sum();
+
         Ok(Self {
             path: path.to_string(),
             schema,
-            statistics: Statistics::default(),
+            statistics: Statistics {
+                num_rows: Some(num_rows as usize),
+                total_byte_size: Some(total_byte_size as usize),
+            },
         })
     }
 }
@@ -108,6 +125,10 @@ mod tests {
         // we should have seen 4 batches of 2 rows
         assert_eq!(4, count);
 
+        // test metadata
+        assert_eq!(table.statistics().num_rows, Some(8));
+        assert_eq!(table.statistics().total_byte_size, Some(671));
+
         Ok(())
     }
 
diff --git a/rust/datafusion/src/physical_plan/parquet.rs 
b/rust/datafusion/src/physical_plan/parquet.rs
index 19d2aa3..530dd1b 100644
--- a/rust/datafusion/src/physical_plan/parquet.rs
+++ b/rust/datafusion/src/physical_plan/parquet.rs
@@ -30,6 +30,7 @@ use crate::physical_plan::{common, Partitioning};
 use arrow::datatypes::{Schema, SchemaRef};
 use arrow::error::{ArrowError, Result as ArrowResult};
 use arrow::record_batch::RecordBatch;
+use parquet::file::metadata::ParquetMetaData;
 use parquet::file::reader::SerializedFileReader;
 
 use crossbeam::channel::{bounded, Receiver, RecvError, Sender};
@@ -50,6 +51,8 @@ pub struct ParquetExec {
     projection: Vec<usize>,
     /// Batch size
     batch_size: usize,
+    /// Parquet metadata
+    metadata: ParquetMetaData,
 }
 
 impl ParquetExec {
@@ -68,8 +71,11 @@ impl ParquetExec {
             let file_reader = Arc::new(SerializedFileReader::new(file)?);
             let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
             let schema = arrow_reader.get_schema()?;
+            let metadata = arrow_reader.get_metadata();
 
-            Ok(Self::new(filenames, schema, projection, batch_size))
+            Ok(Self::new(
+                filenames, schema, projection, batch_size, metadata,
+            ))
         }
     }
 
@@ -79,6 +85,7 @@ impl ParquetExec {
         schema: Schema,
         projection: Option<Vec<usize>>,
         batch_size: usize,
+        metadata: ParquetMetaData,
     ) -> Self {
         let projection = match projection {
             Some(p) => p,
@@ -97,8 +104,14 @@ impl ParquetExec {
             schema: Arc::new(projected_schema),
             projection,
             batch_size,
+            metadata,
         }
     }
+
+    /// Expose the Parquet specific metadata
+    pub fn metadata(&self) -> ParquetMetaData {
+        self.metadata.clone()
+    }
 }
 
 #[async_trait]
diff --git a/rust/parquet/src/arrow/arrow_reader.rs 
b/rust/parquet/src/arrow/arrow_reader.rs
index 7d5b5b8..f4ed084 100644
--- a/rust/parquet/src/arrow/arrow_reader.rs
+++ b/rust/parquet/src/arrow/arrow_reader.rs
@@ -23,6 +23,7 @@ use crate::arrow::schema::{
     parquet_to_arrow_schema_by_columns, 
parquet_to_arrow_schema_by_root_columns,
 };
 use crate::errors::{ParquetError, Result};
+use crate::file::metadata::ParquetMetaData;
 use crate::file::reader::FileReader;
 use arrow::datatypes::{DataType as ArrowType, Schema, SchemaRef};
 use arrow::error::Result as ArrowResult;
@@ -154,6 +155,11 @@ impl ParquetFileArrowReader {
     pub fn new(file_reader: Arc<dyn FileReader>) -> Self {
         Self { file_reader }
     }
+
+    // Expose the reader metadata
+    pub fn get_metadata(&mut self) -> ParquetMetaData {
+        self.file_reader.metadata().clone()
+    }
 }
 
 pub struct ParquetRecordBatchReader {
diff --git a/rust/parquet/src/file/metadata.rs 
b/rust/parquet/src/file/metadata.rs
index 8565115..cff70e6 100644
--- a/rust/parquet/src/file/metadata.rs
+++ b/rust/parquet/src/file/metadata.rs
@@ -46,7 +46,7 @@ use crate::schema::types::{
 };
 
 /// Global Parquet metadata.
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct ParquetMetaData {
     file_metadata: FileMetaData,
     row_groups: Vec<RowGroupMetaData>,
@@ -90,7 +90,7 @@ pub type KeyValue = parquet_format::KeyValue;
 pub type FileMetaDataPtr = Arc<FileMetaData>;
 
 /// Metadata for a Parquet file.
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct FileMetaData {
     version: i32,
     num_rows: i64,
@@ -187,7 +187,7 @@ impl FileMetaData {
 pub type RowGroupMetaDataPtr = Arc<RowGroupMetaData>;
 
 /// Metadata for a row group.
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct RowGroupMetaData {
     columns: Vec<ColumnChunkMetaData>,
     num_rows: i64,
@@ -325,7 +325,7 @@ impl RowGroupMetaDataBuilder {
 }
 
 /// Metadata for a column chunk.
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct ColumnChunkMetaData {
     column_type: Type,
     column_path: ColumnPath,
diff --git a/rust/parquet/src/file/statistics.rs 
b/rust/parquet/src/file/statistics.rs
index 238312c..4f5d0e9 100644
--- a/rust/parquet/src/file/statistics.rs
+++ b/rust/parquet/src/file/statistics.rs
@@ -232,7 +232,7 @@ pub fn to_thrift(stats: Option<&Statistics>) -> 
Option<TStatistics> {
 }
 
 /// Statistics for a column chunk and data page.
-#[derive(Debug, PartialEq)]
+#[derive(Debug, Clone, PartialEq)]
 pub enum Statistics {
     Boolean(TypedStatistics<BoolType>),
     Int32(TypedStatistics<Int32Type>),
@@ -341,6 +341,7 @@ impl fmt::Display for Statistics {
 }
 
 /// Typed implementation for [`Statistics`].
+#[derive(Clone)]
 pub struct TypedStatistics<T: DataType> {
     min: Option<T::T>,
     max: Option<T::T>,

Reply via email to