This is an automated email from the ASF dual-hosted git repository.
mneumann pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 2d8e0845ca Add `StreamDecoder::schema` (#7488)
2d8e0845ca is described below
commit 2d8e0845ca1ed020086dc299bec8e1fa91ef7345
Author: David Li <[email protected]>
AuthorDate: Mon May 12 23:59:50 2025 +0900
Add `StreamDecoder::schema` (#7488)
Fixes #6420.
---
arrow-ipc/src/reader/stream.rs | 33 +++++++++++++++++++++++++++++++++
1 file changed, 33 insertions(+)
diff --git a/arrow-ipc/src/reader/stream.rs b/arrow-ipc/src/reader/stream.rs
index 5902cbe4e0..f3aab9a82b 100644
--- a/arrow-ipc/src/reader/stream.rs
+++ b/arrow-ipc/src/reader/stream.rs
@@ -109,6 +109,11 @@ impl StreamDecoder {
self
}
+ /// Return the schema if decoded, else None.
+ pub fn schema(&self) -> Option<SchemaRef> {
+ self.schema.as_ref().map(|schema| schema.clone())
+ }
+
/// Try to read the next [`RecordBatch`] from the provided [`Buffer`]
///
/// [`Buffer::advance`] will be called on `buffer` for any consumed bytes.
@@ -129,6 +134,9 @@ impl StreamDecoder {
/// if let Some(x) = decoder.decode(&mut x)? {
/// println!("{x:?}");
/// }
+ /// if let Some(schema) = decoder.schema() {
+ /// println!("Schema: {schema:?}");
+ /// }
/// }
/// }
/// decoder.finish().unwrap();
@@ -326,6 +334,31 @@ mod tests {
assert_eq!(err, "Ipc error: Unexpected End of Stream");
}
+ #[test]
+ fn test_schema() {
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("int32", DataType::Int32, false),
+ Field::new("int64", DataType::Int64, false),
+ ]));
+
+ let mut buf = Vec::with_capacity(1024);
+ let mut s = StreamWriter::try_new(&mut buf, &schema).unwrap();
+ s.finish().unwrap();
+ drop(s);
+
+ let buffer = Buffer::from_vec(buf);
+
+ let mut b = buffer.slice_with_length(0, buffer.len() - 1);
+ let mut decoder = StreamDecoder::new();
+ let output = decoder.decode(&mut b).unwrap();
+ assert!(output.is_none());
+ let decoded_schema = decoder.schema().unwrap();
+ assert_eq!(schema, decoded_schema);
+
+ let err = decoder.finish().unwrap_err().to_string();
+ assert_eq!(err, "Ipc error: Unexpected End of Stream");
+ }
+
#[test]
fn test_read_ree_dict_record_batches_from_buffer() {
let schema = Schema::new(vec![Field::new(