This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new ea21b08e47 refactor: fetch statistics for a given ParquetMetaData
(#10880)
ea21b08e47 is described below
commit ea21b08e477cc48f458917c132f79c4980c957c1
Author: Nga Tran <[email protected]>
AuthorDate: Thu Jun 13 07:38:37 2024 -0400
refactor: fetch statistics for a given ParquetMetaData (#10880)
* refactor: fetch statistics for a given ParquetMetaData
* test: add tests for fetch_statistics_from_parquet_meta
* Rename function and improve docs
* Simplify the test
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
.../core/src/datasource/file_format/parquet.rs | 73 ++++++++++++++++++++++
1 file changed, 73 insertions(+)
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs
b/datafusion/core/src/datasource/file_format/parquet.rs
index 99c38d3f09..572904254f 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -455,6 +455,8 @@ async fn fetch_schema(
}
/// Read and parse the statistics of the Parquet file at location `path`
+///
+/// See [`statistics_from_parquet_meta`] for more details
async fn fetch_statistics(
store: &dyn ObjectStore,
table_schema: SchemaRef,
@@ -462,6 +464,17 @@ async fn fetch_statistics(
metadata_size_hint: Option<usize>,
) -> Result<Statistics> {
let metadata = fetch_parquet_metadata(store, file,
metadata_size_hint).await?;
+ statistics_from_parquet_meta(&metadata, table_schema).await
+}
+
+/// Convert statistics in [`ParquetMetaData`] into [`Statistics`]
+///
+/// The statistics are calculated for each column in the table schema
+/// using the row group statistics in the parquet metadata.
+pub async fn statistics_from_parquet_meta(
+ metadata: &ParquetMetaData,
+ table_schema: SchemaRef,
+) -> Result<Statistics> {
let file_metadata = metadata.file_metadata();
let file_schema = parquet_to_arrow_schema(
@@ -1402,6 +1415,66 @@ mod tests {
Ok(())
}
+ #[tokio::test]
+ async fn test_statistics_from_parquet_metadata() -> Result<()> {
+ // Data for column c1: ["Foo", null, "bar"]
+ let c1: ArrayRef =
+ Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+ let batch1 = RecordBatch::try_from_iter(vec![("c1",
c1.clone())]).unwrap();
+
+ // Data for column c2: [1, 2, null]
+ let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2),
None]));
+ let batch2 = RecordBatch::try_from_iter(vec![("c2", c2)]).unwrap();
+
+ // Use store_parquet to write each batch to its own file
+ // . batch1 written into first file and includes:
+ // - column c1 that has 3 rows with one null. Stats min and max of
string column is missing for this test even the column has values
+ // . batch2 written into second file and includes:
+ // - column c2 that has 3 rows with one null. Stats min and max of
int are avaialble and 1 and 2 respectively
+ let store = Arc::new(LocalFileSystem::new()) as _;
+ let (files, _file_names) = store_parquet(vec![batch1, batch2],
false).await?;
+
+ let state = SessionContext::new().state();
+ let format = ParquetFormat::default();
+ let schema = format.infer_schema(&state, &store,
&files).await.unwrap();
+
+ let null_i64 = ScalarValue::Int64(None);
+ let null_utf8 = ScalarValue::Utf8(None);
+
+ // Fetch statistics for first file
+ let pq_meta = fetch_parquet_metadata(store.as_ref(), &files[0],
None).await?;
+ let stats = statistics_from_parquet_meta(&pq_meta,
schema.clone()).await?;
+ //
+ assert_eq!(stats.num_rows, Precision::Exact(3));
+ // column c1
+ let c1_stats = &stats.column_statistics[0];
+ assert_eq!(c1_stats.null_count, Precision::Exact(1));
+ assert_eq!(c1_stats.max_value, Precision::Absent);
+ assert_eq!(c1_stats.min_value, Precision::Absent);
+ // column c2: missing from the file so the table treats all 3 rows as
null
+ let c2_stats = &stats.column_statistics[1];
+ assert_eq!(c2_stats.null_count, Precision::Exact(3));
+ assert_eq!(c2_stats.max_value, Precision::Exact(null_i64.clone()));
+ assert_eq!(c2_stats.min_value, Precision::Exact(null_i64.clone()));
+
+ // Fetch statistics for second file
+ let pq_meta = fetch_parquet_metadata(store.as_ref(), &files[1],
None).await?;
+ let stats = statistics_from_parquet_meta(&pq_meta,
schema.clone()).await?;
+ assert_eq!(stats.num_rows, Precision::Exact(3));
+ // column c1: missing from the file so the table treats all 3 rows as
null
+ let c1_stats = &stats.column_statistics[0];
+ assert_eq!(c1_stats.null_count, Precision::Exact(3));
+ assert_eq!(c1_stats.max_value, Precision::Exact(null_utf8.clone()));
+ assert_eq!(c1_stats.min_value, Precision::Exact(null_utf8.clone()));
+ // column c2
+ let c2_stats = &stats.column_statistics[1];
+ assert_eq!(c2_stats.null_count, Precision::Exact(1));
+ assert_eq!(c2_stats.max_value, Precision::Exact(2i64.into()));
+ assert_eq!(c2_stats.min_value, Precision::Exact(1i64.into()));
+
+ Ok(())
+ }
+
#[tokio::test]
async fn read_small_batches() -> Result<()> {
let config = SessionConfig::new().with_batch_size(2);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]