This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 8a13401 ARROW-10783: [Rust][DataFusion] Implement Statistics for
Parquet TableProvider
8a13401 is described below
commit 8a134013494eb7b15004ea7bba8bca8a9562ffdd
Author: Mike Seddon <[email protected]>
AuthorDate: Thu Dec 17 18:40:00 2020 -0500
ARROW-10783: [Rust][DataFusion] Implement Statistics for Parquet
TableProvider
This is an implementation of Statistics for the Parquet datasource as this
PR seems to have gone quiet: https://github.com/apache/arrow/pull/8860
This PR exposes the ParquetMetaData produced by the file reader as even
though initially we only consume `row_count` and `total_byte_size` there is
column level metadata that may be very useful in future for potentially large
performance gains.
Closes #8944 from seddonm1/parquet-statistics
Authored-by: Mike Seddon <[email protected]>
Signed-off-by: Andrew Lamb <[email protected]>
---
rust/datafusion/src/datasource/parquet.rs | 23 ++++++++++++++++++++++-
rust/datafusion/src/physical_plan/parquet.rs | 15 ++++++++++++++-
rust/parquet/src/arrow/arrow_reader.rs | 6 ++++++
rust/parquet/src/file/metadata.rs | 8 ++++----
rust/parquet/src/file/statistics.rs | 3 ++-
5 files changed, 48 insertions(+), 7 deletions(-)
diff --git a/rust/datafusion/src/datasource/parquet.rs
b/rust/datafusion/src/datasource/parquet.rs
index 9ac9c7b..c067b24 100644
--- a/rust/datafusion/src/datasource/parquet.rs
+++ b/rust/datafusion/src/datasource/parquet.rs
@@ -22,6 +22,7 @@ use std::string::String;
use std::sync::Arc;
use arrow::datatypes::*;
+use parquet::file::metadata::RowGroupMetaData;
use crate::datasource::datasource::Statistics;
use crate::datasource::TableProvider;
@@ -41,10 +42,26 @@ impl ParquetTable {
pub fn try_new(path: &str) -> Result<Self> {
let parquet_exec = ParquetExec::try_new(path, None, 0)?;
let schema = parquet_exec.schema();
+
+ let metadata = parquet_exec.metadata();
+ let num_rows: i64 = metadata
+ .row_groups()
+ .iter()
+ .map(RowGroupMetaData::num_rows)
+ .sum();
+ let total_byte_size: i64 = metadata
+ .row_groups()
+ .iter()
+ .map(RowGroupMetaData::total_byte_size)
+ .sum();
+
Ok(Self {
path: path.to_string(),
schema,
- statistics: Statistics::default(),
+ statistics: Statistics {
+ num_rows: Some(num_rows as usize),
+ total_byte_size: Some(total_byte_size as usize),
+ },
})
}
}
@@ -108,6 +125,10 @@ mod tests {
// we should have seen 4 batches of 2 rows
assert_eq!(4, count);
+ // test metadata
+ assert_eq!(table.statistics().num_rows, Some(8));
+ assert_eq!(table.statistics().total_byte_size, Some(671));
+
Ok(())
}
diff --git a/rust/datafusion/src/physical_plan/parquet.rs
b/rust/datafusion/src/physical_plan/parquet.rs
index 19d2aa3..530dd1b 100644
--- a/rust/datafusion/src/physical_plan/parquet.rs
+++ b/rust/datafusion/src/physical_plan/parquet.rs
@@ -30,6 +30,7 @@ use crate::physical_plan::{common, Partitioning};
use arrow::datatypes::{Schema, SchemaRef};
use arrow::error::{ArrowError, Result as ArrowResult};
use arrow::record_batch::RecordBatch;
+use parquet::file::metadata::ParquetMetaData;
use parquet::file::reader::SerializedFileReader;
use crossbeam::channel::{bounded, Receiver, RecvError, Sender};
@@ -50,6 +51,8 @@ pub struct ParquetExec {
projection: Vec<usize>,
/// Batch size
batch_size: usize,
+ /// Parquet metadata
+ metadata: ParquetMetaData,
}
impl ParquetExec {
@@ -68,8 +71,11 @@ impl ParquetExec {
let file_reader = Arc::new(SerializedFileReader::new(file)?);
let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
let schema = arrow_reader.get_schema()?;
+ let metadata = arrow_reader.get_metadata();
- Ok(Self::new(filenames, schema, projection, batch_size))
+ Ok(Self::new(
+ filenames, schema, projection, batch_size, metadata,
+ ))
}
}
@@ -79,6 +85,7 @@ impl ParquetExec {
schema: Schema,
projection: Option<Vec<usize>>,
batch_size: usize,
+ metadata: ParquetMetaData,
) -> Self {
let projection = match projection {
Some(p) => p,
@@ -97,8 +104,14 @@ impl ParquetExec {
schema: Arc::new(projected_schema),
projection,
batch_size,
+ metadata,
}
}
+
+ /// Expose the Parquet specific metadata
+ pub fn metadata(&self) -> ParquetMetaData {
+ self.metadata.clone()
+ }
}
#[async_trait]
diff --git a/rust/parquet/src/arrow/arrow_reader.rs
b/rust/parquet/src/arrow/arrow_reader.rs
index 7d5b5b8..f4ed084 100644
--- a/rust/parquet/src/arrow/arrow_reader.rs
+++ b/rust/parquet/src/arrow/arrow_reader.rs
@@ -23,6 +23,7 @@ use crate::arrow::schema::{
parquet_to_arrow_schema_by_columns,
parquet_to_arrow_schema_by_root_columns,
};
use crate::errors::{ParquetError, Result};
+use crate::file::metadata::ParquetMetaData;
use crate::file::reader::FileReader;
use arrow::datatypes::{DataType as ArrowType, Schema, SchemaRef};
use arrow::error::Result as ArrowResult;
@@ -154,6 +155,11 @@ impl ParquetFileArrowReader {
pub fn new(file_reader: Arc<dyn FileReader>) -> Self {
Self { file_reader }
}
+
+ // Expose the reader metadata
+ pub fn get_metadata(&mut self) -> ParquetMetaData {
+ self.file_reader.metadata().clone()
+ }
}
pub struct ParquetRecordBatchReader {
diff --git a/rust/parquet/src/file/metadata.rs
b/rust/parquet/src/file/metadata.rs
index 8565115..cff70e6 100644
--- a/rust/parquet/src/file/metadata.rs
+++ b/rust/parquet/src/file/metadata.rs
@@ -46,7 +46,7 @@ use crate::schema::types::{
};
/// Global Parquet metadata.
-#[derive(Debug)]
+#[derive(Debug, Clone)]
pub struct ParquetMetaData {
file_metadata: FileMetaData,
row_groups: Vec<RowGroupMetaData>,
@@ -90,7 +90,7 @@ pub type KeyValue = parquet_format::KeyValue;
pub type FileMetaDataPtr = Arc<FileMetaData>;
/// Metadata for a Parquet file.
-#[derive(Debug)]
+#[derive(Debug, Clone)]
pub struct FileMetaData {
version: i32,
num_rows: i64,
@@ -187,7 +187,7 @@ impl FileMetaData {
pub type RowGroupMetaDataPtr = Arc<RowGroupMetaData>;
/// Metadata for a row group.
-#[derive(Debug)]
+#[derive(Debug, Clone)]
pub struct RowGroupMetaData {
columns: Vec<ColumnChunkMetaData>,
num_rows: i64,
@@ -325,7 +325,7 @@ impl RowGroupMetaDataBuilder {
}
/// Metadata for a column chunk.
-#[derive(Debug)]
+#[derive(Debug, Clone)]
pub struct ColumnChunkMetaData {
column_type: Type,
column_path: ColumnPath,
diff --git a/rust/parquet/src/file/statistics.rs
b/rust/parquet/src/file/statistics.rs
index 238312c..4f5d0e9 100644
--- a/rust/parquet/src/file/statistics.rs
+++ b/rust/parquet/src/file/statistics.rs
@@ -232,7 +232,7 @@ pub fn to_thrift(stats: Option<&Statistics>) ->
Option<TStatistics> {
}
/// Statistics for a column chunk and data page.
-#[derive(Debug, PartialEq)]
+#[derive(Debug, Clone, PartialEq)]
pub enum Statistics {
Boolean(TypedStatistics<BoolType>),
Int32(TypedStatistics<Int32Type>),
@@ -341,6 +341,7 @@ impl fmt::Display for Statistics {
}
/// Typed implementation for [`Statistics`].
+#[derive(Clone)]
pub struct TypedStatistics<T: DataType> {
min: Option<T::T>,
max: Option<T::T>,