This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new a36bf7ade4 Parquet: clear metadata and project fields of 
ParquetRecordBatchStream::schema (#5135)
a36bf7ade4 is described below

commit a36bf7ade4091f90ac5fad30716444e09c56051d
Author: Jeffrey <[email protected]>
AuthorDate: Tue Dec 5 23:27:28 2023 +1100

    Parquet: clear metadata and project fields of 
ParquetRecordBatchStream::schema (#5135)
    
    * Parquet: clear metadata of ParquetRecordBatchStream::schema
    
    * Revert "Parquet: clear metadata of ParquetRecordBatchStream::schema"
    
    This reverts commit 84be336393018be53c3f0cd52155d717898ea3c7.
    
    * Document expected behaviour
    
    * Revert "Document expected behaviour"
    
    This reverts commit ef9601e84a9494145e315d222dcf4a66b22dbbef.
    
    * Reapply "Parquet: clear metadata of ParquetRecordBatchStream::schema"
    
    This reverts commit fd662ad84b60275e329e23617e8f3e81796bfa3e.
    
    * ParquetRecordBatchStream should strip schema metadata and respect 
projection
    
    * Fix projection of nested fields
---
 parquet/src/arrow/arrow_reader/mod.rs |   4 +
 parquet/src/arrow/async_reader/mod.rs | 136 ++++++++++++++++++++++++++++++++--
 2 files changed, 135 insertions(+), 5 deletions(-)

diff --git a/parquet/src/arrow/arrow_reader/mod.rs 
b/parquet/src/arrow/arrow_reader/mod.rs
index b9e9d28984..77de839940 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -575,6 +575,10 @@ impl Iterator for ParquetRecordBatchReader {
 }
 
 impl RecordBatchReader for ParquetRecordBatchReader {
+    /// Returns the projected [`SchemaRef`] for reading the parquet file.
+    ///
+    /// Note that the schema metadata will be stripped here. See
+    /// [`ParquetRecordBatchReaderBuilder::schema`] if the metadata is desired.
     fn schema(&self) -> SchemaRef {
         self.schema.clone()
     }
diff --git a/parquet/src/arrow/async_reader/mod.rs 
b/parquet/src/arrow/async_reader/mod.rs
index 04383bb51b..80a554026d 100644
--- a/parquet/src/arrow/async_reader/mod.rs
+++ b/parquet/src/arrow/async_reader/mod.rs
@@ -90,7 +90,7 @@ use futures::stream::Stream;
 use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
 
 use arrow_array::RecordBatch;
-use arrow_schema::SchemaRef;
+use arrow_schema::{DataType, Fields, Schema, SchemaRef};
 
 use crate::arrow::array_reader::{build_array_reader, RowGroups};
 use crate::arrow::arrow_reader::{
@@ -385,13 +385,24 @@ impl<T: AsyncFileReader + Send + 'static> 
ParquetRecordBatchStreamBuilder<T> {
             offset: self.offset,
         };
 
+        // Ensure schema of ParquetRecordBatchStream respects projection, and 
does
+        // not store metadata (same as for ParquetRecordBatchReader and 
emitted RecordBatches)
+        let projected_fields = match reader.fields.as_deref().map(|pf| 
&pf.arrow_type) {
+            Some(DataType::Struct(fields)) => {
+                fields.filter_leaves(|idx, _| 
self.projection.leaf_included(idx))
+            }
+            None => Fields::empty(),
+            _ => unreachable!("Must be Struct for root type"),
+        };
+        let schema = Arc::new(Schema::new(projected_fields));
+
         Ok(ParquetRecordBatchStream {
             metadata: self.metadata,
             batch_size,
             row_groups,
             projection: self.projection,
             selection: self.selection,
-            schema: self.schema,
+            schema,
             reader: Some(reader),
             state: StreamState::Init,
         })
@@ -572,7 +583,10 @@ impl<T> std::fmt::Debug for ParquetRecordBatchStream<T> {
 }
 
 impl<T> ParquetRecordBatchStream<T> {
-    /// Returns the [`SchemaRef`] for this parquet file
+    /// Returns the projected [`SchemaRef`] for reading the parquet file.
+    ///
+    /// Note that the schema metadata will be stripped here. See
+    /// [`ParquetRecordBatchStreamBuilder::schema`] if the metadata is desired.
     pub fn schema(&self) -> &SchemaRef {
         &self.schema
     }
@@ -855,11 +869,15 @@ mod tests {
     use arrow_array::builder::{ListBuilder, StringBuilder};
     use arrow_array::cast::AsArray;
     use arrow_array::types::Int32Type;
-    use arrow_array::{Array, ArrayRef, Int32Array, Int8Array, Scalar, 
StringArray, UInt64Array};
+    use arrow_array::{
+        Array, ArrayRef, Int32Array, Int8Array, RecordBatchReader, Scalar, 
StringArray,
+        StructArray, UInt64Array,
+    };
     use arrow_schema::{DataType, Field, Schema};
     use futures::{StreamExt, TryStreamExt};
     use rand::{thread_rng, Rng};
-    use std::sync::Mutex;
+    use std::collections::HashMap;
+    use std::sync::{Arc, Mutex};
     use tempfile::tempfile;
 
     #[derive(Clone)]
@@ -1584,6 +1602,114 @@ mod tests {
         test_get_row_group_column_bloom_filter(data, false).await;
     }
 
+    #[tokio::test]
+    async fn test_parquet_record_batch_stream_schema() {
+        fn get_all_field_names(schema: &Schema) -> Vec<&String> {
+            schema.all_fields().iter().map(|f| f.name()).collect()
+        }
+
+        // ParquetRecordBatchReaderBuilder::schema differs from
+        // ParquetRecordBatchReader::schema and RecordBatch::schema in the 
returned
+        // schema contents (in terms of custom metadata attached to schema, 
and fields
+        // returned). Test to ensure this remains consistent behaviour.
+        //
+        // Ensure same for asynchronous versions of the above.
+
+        // Prep data, for a schema with nested fields, with custom metadata
+        let mut metadata = HashMap::with_capacity(1);
+        metadata.insert("key".to_string(), "value".to_string());
+
+        let nested_struct_array = StructArray::from(vec![
+            (
+                Arc::new(Field::new("d", DataType::Utf8, true)),
+                Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
+            ),
+            (
+                Arc::new(Field::new("e", DataType::Utf8, true)),
+                Arc::new(StringArray::from(vec!["c", "d"])) as ArrayRef,
+            ),
+        ]);
+        let struct_array = StructArray::from(vec![
+            (
+                Arc::new(Field::new("a", DataType::Int32, true)),
+                Arc::new(Int32Array::from(vec![-1, 1])) as ArrayRef,
+            ),
+            (
+                Arc::new(Field::new("b", DataType::UInt64, true)),
+                Arc::new(UInt64Array::from(vec![1, 2])) as ArrayRef,
+            ),
+            (
+                Arc::new(Field::new(
+                    "c",
+                    nested_struct_array.data_type().clone(),
+                    true,
+                )),
+                Arc::new(nested_struct_array) as ArrayRef,
+            ),
+        ]);
+
+        let schema =
+            
Arc::new(Schema::new(struct_array.fields().clone()).with_metadata(metadata.clone()));
+        let record_batch = RecordBatch::from(struct_array)
+            .with_schema(schema.clone())
+            .unwrap();
+
+        // Write parquet with custom metadata in schema
+        let mut file = tempfile().unwrap();
+        let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), 
None).unwrap();
+        writer.write(&record_batch).unwrap();
+        writer.close().unwrap();
+
+        let all_fields = ["a", "b", "c", "d", "e"];
+        // (leaf indices in mask, expected names in output schema all fields)
+        let projections = [
+            (vec![], vec![]),
+            (vec![0], vec!["a"]),
+            (vec![0, 1], vec!["a", "b"]),
+            (vec![0, 1, 2], vec!["a", "b", "c", "d"]),
+            (vec![0, 1, 2, 3], vec!["a", "b", "c", "d", "e"]),
+        ];
+
+        // Ensure we're consistent for each of these projections
+        for (indices, expected_projected_names) in projections {
+            let assert_schemas = |builder: SchemaRef, reader: SchemaRef, 
batch: SchemaRef| {
+                // Builder schema should preserve all fields and metadata
+                assert_eq!(get_all_field_names(&builder), all_fields);
+                assert_eq!(builder.metadata, metadata);
+                // Reader & batch schema should show only projected fields, 
and no metadata
+                assert_eq!(get_all_field_names(&reader), 
expected_projected_names);
+                assert_eq!(reader.metadata, HashMap::default());
+                assert_eq!(get_all_field_names(&batch), 
expected_projected_names);
+                assert_eq!(batch.metadata, HashMap::default());
+            };
+
+            let builder =
+                
ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap();
+            let sync_builder_schema = builder.schema().clone();
+            let mask = ProjectionMask::leaves(builder.parquet_schema(), 
indices.clone());
+            let mut reader = builder.with_projection(mask).build().unwrap();
+            let sync_reader_schema = reader.schema();
+            let batch = reader.next().unwrap().unwrap();
+            let sync_batch_schema = batch.schema();
+            assert_schemas(sync_builder_schema, sync_reader_schema, 
sync_batch_schema);
+
+            // asynchronous should be same
+            let file = tokio::fs::File::from(file.try_clone().unwrap());
+            let builder = 
ParquetRecordBatchStreamBuilder::new(file).await.unwrap();
+            let async_builder_schema = builder.schema().clone();
+            let mask = ProjectionMask::leaves(builder.parquet_schema(), 
indices);
+            let mut reader = builder.with_projection(mask).build().unwrap();
+            let async_reader_schema = reader.schema().clone();
+            let batch = reader.next().await.unwrap().unwrap();
+            let async_batch_schema = batch.schema();
+            assert_schemas(
+                async_builder_schema,
+                async_reader_schema,
+                async_batch_schema,
+            );
+        }
+    }
+
     #[tokio::test]
     async fn test_get_row_group_column_bloom_filter_with_length() {
         // convert to new parquet file with bloom_filter_length

Reply via email to