This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 6931d881d8 feat: expose arrow schema on async avro reader (#9534)
6931d881d8 is described below

commit 6931d881d88b515574133e4edda7757b5ee2dd56
Author: Mikhail Zabaluev <[email protected]>
AuthorDate: Wed Mar 11 23:59:10 2026 +0200

    feat: expose arrow schema on async avro reader (#9534)
    
    # Rationale for this change
    
    Exposes the Arrow schema produced by the async Avro file reader,
    similarly to the `schema` method on the synchronous reader.
    
    This allows an application to prepare casting or other schema
    transformations with no need to fetch the first record batch to learn
    the produced Arrow schema. Since the async reader only parses OCF
    content for the moment, the schema does not change from batch to batch.
    
    # What changes are included in this PR?
    
    The `schema` method for `AsyncAvroFileReader` exposes the Arrow schema
    of record batches that are produced by the reader.
    
    # Are these changes tested?
    
    Added tests verifying that the returned schema matches the expected.
    
    # Are there any user-facing changes?
    
    Added a `schema` method to `AsyncAvroFileReader`.
---
 arrow-avro/src/reader/async_reader/mod.rs | 161 ++++++++++++++++++++++++++----
 1 file changed, 140 insertions(+), 21 deletions(-)

diff --git a/arrow-avro/src/reader/async_reader/mod.rs 
b/arrow-avro/src/reader/async_reader/mod.rs
index 53229f8576..02c00a60e0 100644
--- a/arrow-avro/src/reader/async_reader/mod.rs
+++ b/arrow-avro/src/reader/async_reader/mod.rs
@@ -19,7 +19,7 @@ use crate::compression::CompressionCodec;
 use crate::reader::Decoder;
 use crate::reader::block::{BlockDecoder, BlockDecoderState};
 use arrow_array::RecordBatch;
-use arrow_schema::ArrowError;
+use arrow_schema::{ArrowError, SchemaRef};
 use bytes::Bytes;
 use futures::future::BoxFuture;
 use futures::{FutureExt, Stream};
@@ -173,6 +173,13 @@ impl<R> AsyncAvroFileReader<R> {
         }
     }
 
+    /// Returns the Arrow schema for batches produced by this reader.
+    ///
+    /// The schema is determined by the writer schema in the file and the 
reader schema provided to the builder.
+    pub fn schema(&self) -> SchemaRef {
+        self.decoder.schema()
+    }
+
     /// Calculate the byte range needed to complete the current block.
     /// Only valid when block_decoder is in Data or Sync state.
     /// Returns the range to fetch, or an error if EOF would be reached.
@@ -534,7 +541,9 @@ impl<R: AsyncFileReader + Unpin + 'static> Stream for 
AsyncAvroFileReader<R> {
 #[cfg(all(test, feature = "object_store"))]
 mod tests {
     use super::*;
-    use crate::schema::{AvroSchema, SCHEMA_METADATA_KEY};
+    use crate::schema::{
+        AVRO_NAME_METADATA_KEY, AVRO_NAMESPACE_METADATA_KEY, AvroSchema, 
SCHEMA_METADATA_KEY,
+    };
     use arrow_array::cast::AsArray;
     use arrow_array::types::{Int32Type, Int64Type};
     use arrow_array::*;
@@ -758,39 +767,63 @@ mod tests {
                                 vec![Field::new("f1_3_1", DataType::Float64, 
false)].into(),
                             ),
                             false,
-                        ),
+                        )
+                        .with_metadata(HashMap::from([
+                            (AVRO_NAMESPACE_METADATA_KEY.to_owned(), 
"ns3".to_owned()),
+                            (AVRO_NAME_METADATA_KEY.to_owned(), 
"record3".to_owned()),
+                        ])),
                     ]
                     .into(),
                 ),
                 false,
-            ),
+            )
+            .with_metadata(HashMap::from([
+                (AVRO_NAMESPACE_METADATA_KEY.to_owned(), "ns2".to_owned()),
+                (AVRO_NAME_METADATA_KEY.to_owned(), "record2".to_owned()),
+            ])),
             Field::new(
                 "f2",
-                DataType::List(Arc::new(Field::new(
-                    "item",
-                    DataType::Struct(
-                        vec![
-                            Field::new("f2_1", DataType::Boolean, false),
-                            Field::new("f2_2", DataType::Float32, false),
-                        ]
-                        .into(),
-                    ),
-                    false,
-                ))),
+                DataType::List(Arc::new(
+                    Field::new(
+                        "item",
+                        DataType::Struct(
+                            vec![
+                                Field::new("f2_1", DataType::Boolean, false),
+                                Field::new("f2_2", DataType::Float32, false),
+                            ]
+                            .into(),
+                        ),
+                        false,
+                    )
+                    .with_metadata(HashMap::from([
+                        (AVRO_NAMESPACE_METADATA_KEY.to_owned(), 
"ns4".to_owned()),
+                        (AVRO_NAME_METADATA_KEY.to_owned(), 
"record4".to_owned()),
+                    ])),
+                )),
                 false,
             ),
             Field::new(
                 "f3",
                 DataType::Struct(vec![Field::new("f3_1", DataType::Utf8, 
false)].into()),
                 true,
-            ),
+            )
+            .with_metadata(HashMap::from([
+                (AVRO_NAMESPACE_METADATA_KEY.to_owned(), "ns5".to_owned()),
+                (AVRO_NAME_METADATA_KEY.to_owned(), "record5".to_owned()),
+            ])),
             Field::new(
                 "f4",
-                DataType::List(Arc::new(Field::new(
-                    "item",
-                    DataType::Struct(vec![Field::new("f4_1", DataType::Int64, 
false)].into()),
-                    true,
-                ))),
+                DataType::List(Arc::new(
+                    Field::new(
+                        "item",
+                        DataType::Struct(vec![Field::new("f4_1", 
DataType::Int64, false)].into()),
+                        true,
+                    )
+                    .with_metadata(HashMap::from([
+                        (AVRO_NAMESPACE_METADATA_KEY.to_owned(), 
"ns6".to_owned()),
+                        (AVRO_NAME_METADATA_KEY.to_owned(), 
"record6".to_owned()),
+                    ])),
+                )),
                 false,
             ),
         ])
@@ -1538,6 +1571,92 @@ mod tests {
         assert!(err.to_string().contains("Duplicate projection index"));
     }
 
+    #[tokio::test]
+    async fn test_arrow_schema_from_reader_no_reader_schema() {
+        let file = arrow_test_data("avro/alltypes_plain.avro");
+        let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
+        let location = Path::from_filesystem_path(&file).unwrap();
+        let file_size = store.head(&location).await.unwrap().size;
+
+        let file_reader = AvroObjectReader::new(store, location);
+        let expected_schema = get_alltypes_schema()
+            .as_ref()
+            .clone()
+            .with_metadata(Default::default());
+
+        // Build reader without providing reader schema - should use writer 
schema from file
+        let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024)
+            .try_build()
+            .await
+            .unwrap();
+
+        assert_eq!(reader.schema().as_ref(), &expected_schema);
+
+        let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
+        let batch = &batches[0];
+
+        assert_eq!(batch.schema().as_ref(), &expected_schema);
+    }
+
+    #[tokio::test]
+    async fn test_arrow_schema_from_reader_with_reader_schema() {
+        let file = arrow_test_data("avro/alltypes_plain.avro");
+        let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
+        let location = Path::from_filesystem_path(&file).unwrap();
+        let file_size = store.head(&location).await.unwrap().size;
+
+        let file_reader = AvroObjectReader::new(store, location);
+        let schema = get_alltypes_schema()
+            .project(&[0, 1, 7])
+            .unwrap()
+            .with_metadata(Default::default());
+        let reader_schema = AvroSchema::try_from(&schema).unwrap();
+        let expected_schema = schema.clone();
+
+        // Build reader with provided reader schema - must apply the projection
+        let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024)
+            .with_reader_schema(reader_schema)
+            .try_build()
+            .await
+            .unwrap();
+
+        assert_eq!(reader.schema().as_ref(), &expected_schema);
+
+        let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
+        let batch = &batches[0];
+
+        assert_eq!(batch.schema().as_ref(), &expected_schema);
+    }
+
+    #[tokio::test]
+    async fn test_arrow_schema_from_reader_nested_records() {
+        let file = arrow_test_data("avro/nested_records.avro");
+        let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
+        let location = Path::from_filesystem_path(&file).unwrap();
+        let file_size = store.head(&location).await.unwrap().size;
+
+        let file_reader = AvroObjectReader::new(store, location);
+
+        // The schema produced by the reader should match the expected schema,
+        // attaching Avro type name metadata to fields of record and list 
types.
+        let expected_schema = get_nested_records_schema()
+            .as_ref()
+            .clone()
+            .with_metadata(Default::default());
+
+        let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024)
+            .try_build()
+            .await
+            .unwrap();
+
+        assert_eq!(reader.schema().as_ref(), &expected_schema);
+
+        let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
+        let batch = &batches[0];
+
+        assert_eq!(batch.schema().as_ref(), &expected_schema);
+    }
+
     #[tokio::test]
     async fn test_with_header_size_hint_small() {
         // Use a very small header size hint to force multiple fetches

Reply via email to