This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new a36bf7ade4 Parquet: clear metadata and project fields of
ParquetRecordBatchStream::schema (#5135)
a36bf7ade4 is described below
commit a36bf7ade4091f90ac5fad30716444e09c56051d
Author: Jeffrey <[email protected]>
AuthorDate: Tue Dec 5 23:27:28 2023 +1100
Parquet: clear metadata and project fields of
ParquetRecordBatchStream::schema (#5135)
* Parquet: clear metadata of ParquetRecordBatchStream::schema
* Revert "Parquet: clear metadata of ParquetRecordBatchStream::schema"
This reverts commit 84be336393018be53c3f0cd52155d717898ea3c7.
* Document expected behaviour
* Revert "Document expected behaviour"
This reverts commit ef9601e84a9494145e315d222dcf4a66b22dbbef.
* Reapply "Parquet: clear metadata of ParquetRecordBatchStream::schema"
This reverts commit fd662ad84b60275e329e23617e8f3e81796bfa3e.
* ParquetRecordBatchStream should strip schema metadata and respect
projection
* Fix projection of nested fields
---
parquet/src/arrow/arrow_reader/mod.rs | 4 +
parquet/src/arrow/async_reader/mod.rs | 136 ++++++++++++++++++++++++++++++++--
2 files changed, 135 insertions(+), 5 deletions(-)
diff --git a/parquet/src/arrow/arrow_reader/mod.rs
b/parquet/src/arrow/arrow_reader/mod.rs
index b9e9d28984..77de839940 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -575,6 +575,10 @@ impl Iterator for ParquetRecordBatchReader {
}
impl RecordBatchReader for ParquetRecordBatchReader {
+ /// Returns the projected [`SchemaRef`] for reading the parquet file.
+ ///
+ /// Note that the schema metadata will be stripped here. See
+ /// [`ParquetRecordBatchReaderBuilder::schema`] if the metadata is desired.
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
diff --git a/parquet/src/arrow/async_reader/mod.rs
b/parquet/src/arrow/async_reader/mod.rs
index 04383bb51b..80a554026d 100644
--- a/parquet/src/arrow/async_reader/mod.rs
+++ b/parquet/src/arrow/async_reader/mod.rs
@@ -90,7 +90,7 @@ use futures::stream::Stream;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
use arrow_array::RecordBatch;
-use arrow_schema::SchemaRef;
+use arrow_schema::{DataType, Fields, Schema, SchemaRef};
use crate::arrow::array_reader::{build_array_reader, RowGroups};
use crate::arrow::arrow_reader::{
@@ -385,13 +385,24 @@ impl<T: AsyncFileReader + Send + 'static>
ParquetRecordBatchStreamBuilder<T> {
offset: self.offset,
};
+ // Ensure schema of ParquetRecordBatchStream respects projection, and
does
+ // not store metadata (same as for ParquetRecordBatchReader and
emitted RecordBatches)
+ let projected_fields = match reader.fields.as_deref().map(|pf|
&pf.arrow_type) {
+ Some(DataType::Struct(fields)) => {
+ fields.filter_leaves(|idx, _|
self.projection.leaf_included(idx))
+ }
+ None => Fields::empty(),
+ _ => unreachable!("Must be Struct for root type"),
+ };
+ let schema = Arc::new(Schema::new(projected_fields));
+
Ok(ParquetRecordBatchStream {
metadata: self.metadata,
batch_size,
row_groups,
projection: self.projection,
selection: self.selection,
- schema: self.schema,
+ schema,
reader: Some(reader),
state: StreamState::Init,
})
@@ -572,7 +583,10 @@ impl<T> std::fmt::Debug for ParquetRecordBatchStream<T> {
}
impl<T> ParquetRecordBatchStream<T> {
- /// Returns the [`SchemaRef`] for this parquet file
+ /// Returns the projected [`SchemaRef`] for reading the parquet file.
+ ///
+ /// Note that the schema metadata will be stripped here. See
+ /// [`ParquetRecordBatchStreamBuilder::schema`] if the metadata is desired.
pub fn schema(&self) -> &SchemaRef {
&self.schema
}
@@ -855,11 +869,15 @@ mod tests {
use arrow_array::builder::{ListBuilder, StringBuilder};
use arrow_array::cast::AsArray;
use arrow_array::types::Int32Type;
- use arrow_array::{Array, ArrayRef, Int32Array, Int8Array, Scalar,
StringArray, UInt64Array};
+ use arrow_array::{
+ Array, ArrayRef, Int32Array, Int8Array, RecordBatchReader, Scalar,
StringArray,
+ StructArray, UInt64Array,
+ };
use arrow_schema::{DataType, Field, Schema};
use futures::{StreamExt, TryStreamExt};
use rand::{thread_rng, Rng};
- use std::sync::Mutex;
+ use std::collections::HashMap;
+ use std::sync::{Arc, Mutex};
use tempfile::tempfile;
#[derive(Clone)]
@@ -1584,6 +1602,114 @@ mod tests {
test_get_row_group_column_bloom_filter(data, false).await;
}
+ #[tokio::test]
+ async fn test_parquet_record_batch_stream_schema() {
+ fn get_all_field_names(schema: &Schema) -> Vec<&String> {
+ schema.all_fields().iter().map(|f| f.name()).collect()
+ }
+
+ // ParquetRecordBatchReaderBuilder::schema differs from
+ // ParquetRecordBatchReader::schema and RecordBatch::schema in the
returned
+ // schema contents (in terms of custom metadata attached to schema,
and fields
+ // returned). Test to ensure this remains consistent behaviour.
+ //
+ // Ensure same for asynchronous versions of the above.
+
+ // Prep data, for a schema with nested fields, with custom metadata
+ let mut metadata = HashMap::with_capacity(1);
+ metadata.insert("key".to_string(), "value".to_string());
+
+ let nested_struct_array = StructArray::from(vec![
+ (
+ Arc::new(Field::new("d", DataType::Utf8, true)),
+ Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
+ ),
+ (
+ Arc::new(Field::new("e", DataType::Utf8, true)),
+ Arc::new(StringArray::from(vec!["c", "d"])) as ArrayRef,
+ ),
+ ]);
+ let struct_array = StructArray::from(vec![
+ (
+ Arc::new(Field::new("a", DataType::Int32, true)),
+ Arc::new(Int32Array::from(vec![-1, 1])) as ArrayRef,
+ ),
+ (
+ Arc::new(Field::new("b", DataType::UInt64, true)),
+ Arc::new(UInt64Array::from(vec![1, 2])) as ArrayRef,
+ ),
+ (
+ Arc::new(Field::new(
+ "c",
+ nested_struct_array.data_type().clone(),
+ true,
+ )),
+ Arc::new(nested_struct_array) as ArrayRef,
+ ),
+ ]);
+
+ let schema =
+
Arc::new(Schema::new(struct_array.fields().clone()).with_metadata(metadata.clone()));
+ let record_batch = RecordBatch::from(struct_array)
+ .with_schema(schema.clone())
+ .unwrap();
+
+ // Write parquet with custom metadata in schema
+ let mut file = tempfile().unwrap();
+ let mut writer = ArrowWriter::try_new(&mut file, schema.clone(),
None).unwrap();
+ writer.write(&record_batch).unwrap();
+ writer.close().unwrap();
+
+ let all_fields = ["a", "b", "c", "d", "e"];
+ // (leaf indices in mask, expected names in output schema all fields)
+ let projections = [
+ (vec![], vec![]),
+ (vec![0], vec!["a"]),
+ (vec![0, 1], vec!["a", "b"]),
+ (vec![0, 1, 2], vec!["a", "b", "c", "d"]),
+ (vec![0, 1, 2, 3], vec!["a", "b", "c", "d", "e"]),
+ ];
+
+ // Ensure we're consistent for each of these projections
+ for (indices, expected_projected_names) in projections {
+ let assert_schemas = |builder: SchemaRef, reader: SchemaRef,
batch: SchemaRef| {
+ // Builder schema should preserve all fields and metadata
+ assert_eq!(get_all_field_names(&builder), all_fields);
+ assert_eq!(builder.metadata, metadata);
+ // Reader & batch schema should show only projected fields,
and no metadata
+ assert_eq!(get_all_field_names(&reader),
expected_projected_names);
+ assert_eq!(reader.metadata, HashMap::default());
+ assert_eq!(get_all_field_names(&batch),
expected_projected_names);
+ assert_eq!(batch.metadata, HashMap::default());
+ };
+
+ let builder =
+
ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap();
+ let sync_builder_schema = builder.schema().clone();
+ let mask = ProjectionMask::leaves(builder.parquet_schema(),
indices.clone());
+ let mut reader = builder.with_projection(mask).build().unwrap();
+ let sync_reader_schema = reader.schema();
+ let batch = reader.next().unwrap().unwrap();
+ let sync_batch_schema = batch.schema();
+ assert_schemas(sync_builder_schema, sync_reader_schema,
sync_batch_schema);
+
+ // asynchronous should be same
+ let file = tokio::fs::File::from(file.try_clone().unwrap());
+ let builder =
ParquetRecordBatchStreamBuilder::new(file).await.unwrap();
+ let async_builder_schema = builder.schema().clone();
+ let mask = ProjectionMask::leaves(builder.parquet_schema(),
indices);
+ let mut reader = builder.with_projection(mask).build().unwrap();
+ let async_reader_schema = reader.schema().clone();
+ let batch = reader.next().await.unwrap().unwrap();
+ let async_batch_schema = batch.schema();
+ assert_schemas(
+ async_builder_schema,
+ async_reader_schema,
+ async_batch_schema,
+ );
+ }
+ }
+
#[tokio::test]
async fn test_get_row_group_column_bloom_filter_with_length() {
// convert to new parquet file with bloom_filter_length