This is an automated email from the ASF dual-hosted git repository.

mneumann pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 2d8e0845ca Add `StreamDecoder::schema` (#7488)
2d8e0845ca is described below

commit 2d8e0845ca1ed020086dc299bec8e1fa91ef7345
Author: David Li <[email protected]>
AuthorDate: Mon May 12 23:59:50 2025 +0900

    Add `StreamDecoder::schema` (#7488)
    
    Fixes #6420.
---
 arrow-ipc/src/reader/stream.rs | 33 +++++++++++++++++++++++++++++++++
 1 file changed, 33 insertions(+)

diff --git a/arrow-ipc/src/reader/stream.rs b/arrow-ipc/src/reader/stream.rs
index 5902cbe4e0..f3aab9a82b 100644
--- a/arrow-ipc/src/reader/stream.rs
+++ b/arrow-ipc/src/reader/stream.rs
@@ -109,6 +109,11 @@ impl StreamDecoder {
         self
     }
 
+    /// Return the schema if decoded, else None.
+    pub fn schema(&self) -> Option<SchemaRef> {
+        self.schema.as_ref().map(|schema| schema.clone())
+    }
+
     /// Try to read the next [`RecordBatch`] from the provided [`Buffer`]
     ///
     /// [`Buffer::advance`] will be called on `buffer` for any consumed bytes.
@@ -129,6 +134,9 @@ impl StreamDecoder {
     ///             if let Some(x) = decoder.decode(&mut x)? {
     ///                 println!("{x:?}");
     ///             }
+    ///             if let Some(schema) = decoder.schema() {
+    ///                 println!("Schema: {schema:?}");
+    ///             }
     ///         }
     ///     }
     ///     decoder.finish().unwrap();
@@ -326,6 +334,31 @@ mod tests {
         assert_eq!(err, "Ipc error: Unexpected End of Stream");
     }
 
+    #[test]
+    fn test_schema() {
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("int32", DataType::Int32, false),
+            Field::new("int64", DataType::Int64, false),
+        ]));
+
+        let mut buf = Vec::with_capacity(1024);
+        let mut s = StreamWriter::try_new(&mut buf, &schema).unwrap();
+        s.finish().unwrap();
+        drop(s);
+
+        let buffer = Buffer::from_vec(buf);
+
+        let mut b = buffer.slice_with_length(0, buffer.len() - 1);
+        let mut decoder = StreamDecoder::new();
+        let output = decoder.decode(&mut b).unwrap();
+        assert!(output.is_none());
+        let decoded_schema = decoder.schema().unwrap();
+        assert_eq!(schema, decoded_schema);
+
+        let err = decoder.finish().unwrap_err().to_string();
+        assert_eq!(err, "Ipc error: Unexpected End of Stream");
+    }
+
     #[test]
     fn test_read_ree_dict_record_batches_from_buffer() {
         let schema = Schema::new(vec![Field::new(

Reply via email to