This is an automated email from the ASF dual-hosted git repository.
mgrigorov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/avro-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 1d39dba chore: Extract Single Object encoding related structs from
reader/mod.rs (#478)
1d39dba is described below
commit 1d39dbae9a25de4a756dd7268c834659310d2ea7
Author: Martin Grigorov <[email protected]>
AuthorDate: Fri Feb 20 13:35:36 2026 +0200
chore: Extract Single Object encoding related structs from reader/mod.rs
(#478)
No API breaks!
No functional changes!
---
avro/src/lib.rs | 5 +-
avro/src/reader/mod.rs | 311 +-----------------------------------
avro/src/reader/single_object.rs | 333 +++++++++++++++++++++++++++++++++++++++
avro/src/schema/parser.rs | 2 +-
4 files changed, 342 insertions(+), 309 deletions(-)
diff --git a/avro/src/lib.rs b/avro/src/lib.rs
index 4800bfa..a56aaad 100644
--- a/avro/src/lib.rs
+++ b/avro/src/lib.rs
@@ -91,8 +91,9 @@ pub use decimal::Decimal;
pub use duration::{Days, Duration, Millis, Months};
pub use error::Error;
pub use reader::{
- GenericSingleObjectReader, Reader, SpecificSingleObjectReader,
from_avro_datum,
- from_avro_datum_reader_schemata, from_avro_datum_schemata, read_marker,
+ Reader, from_avro_datum, from_avro_datum_reader_schemata,
from_avro_datum_schemata,
+ read_marker,
+ single_object::{GenericSingleObjectReader, SpecificSingleObjectReader},
};
pub use schema::Schema;
pub use serde::{AvroSchema, AvroSchemaComponent, from_value, to_value};
diff --git a/avro/src/reader/mod.rs b/avro/src/reader/mod.rs
index 21924b0..598e1ff 100644
--- a/avro/src/reader/mod.rs
+++ b/avro/src/reader/mod.rs
@@ -18,21 +18,17 @@
//! Logic handling reading from Avro format at user level.
mod block;
+pub mod single_object;
use crate::{
AvroResult,
decode::{decode, decode_internal},
- error::Details,
- from_value,
- headers::{HeaderBuilder, RabinFingerprintHeader},
- schema::{ResolvedOwnedSchema, ResolvedSchema, Schema},
- serde::AvroSchema,
+ schema::{ResolvedSchema, Schema},
types::Value,
};
use block::Block;
use bon::bon;
-use serde::de::DeserializeOwned;
-use std::{collections::HashMap, io::Read, marker::PhantomData};
+use std::{collections::HashMap, io::Read};
/// Main interface for reading Avro formatted values.
///
@@ -209,89 +205,6 @@ pub fn from_avro_datum_reader_schemata<R: Read>(
}
}
-pub struct GenericSingleObjectReader {
- write_schema: ResolvedOwnedSchema,
- expected_header: Vec<u8>,
-}
-
-impl GenericSingleObjectReader {
- pub fn new(schema: Schema) -> AvroResult<GenericSingleObjectReader> {
- let header_builder = RabinFingerprintHeader::from_schema(&schema);
- Self::new_with_header_builder(schema, header_builder)
- }
-
- pub fn new_with_header_builder<HB: HeaderBuilder>(
- schema: Schema,
- header_builder: HB,
- ) -> AvroResult<GenericSingleObjectReader> {
- let expected_header = header_builder.build_header();
- Ok(GenericSingleObjectReader {
- write_schema: ResolvedOwnedSchema::try_from(schema)?,
- expected_header,
- })
- }
-
- pub fn read_value<R: Read>(&self, reader: &mut R) -> AvroResult<Value> {
- let mut header = vec![0; self.expected_header.len()];
- match reader.read_exact(&mut header) {
- Ok(_) => {
- if self.expected_header == header {
- decode_internal(
- self.write_schema.get_root_schema(),
- self.write_schema.get_names(),
- &None,
- reader,
- )
- } else {
- Err(
-
Details::SingleObjectHeaderMismatch(self.expected_header.clone(), header)
- .into(),
- )
- }
- }
- Err(io_error) => Err(Details::ReadHeader(io_error).into()),
- }
- }
-}
-
-pub struct SpecificSingleObjectReader<T>
-where
- T: AvroSchema,
-{
- inner: GenericSingleObjectReader,
- _model: PhantomData<T>,
-}
-
-impl<T> SpecificSingleObjectReader<T>
-where
- T: AvroSchema,
-{
- pub fn new() -> AvroResult<SpecificSingleObjectReader<T>> {
- Ok(SpecificSingleObjectReader {
- inner: GenericSingleObjectReader::new(T::get_schema())?,
- _model: PhantomData,
- })
- }
-}
-
-impl<T> SpecificSingleObjectReader<T>
-where
- T: AvroSchema + From<Value>,
-{
- pub fn read_from_value<R: Read>(&self, reader: &mut R) -> AvroResult<T> {
- self.inner.read_value(reader).map(|v| v.into())
- }
-}
-
-impl<T> SpecificSingleObjectReader<T>
-where
- T: AvroSchema + DeserializeOwned,
-{
- pub fn read<R: Read>(&self, reader: &mut R) -> AvroResult<T> {
- from_value::<T>(&self.inner.read_value(reader)?)
- }
-}
-
/// Reads the marker bytes from Avro bytes generated earlier by a `Writer`
pub fn read_marker(bytes: &[u8]) -> [u8; 16] {
assert!(
@@ -306,14 +219,12 @@ pub fn read_marker(bytes: &[u8]) -> [u8; 16] {
#[cfg(test)]
mod tests {
use super::*;
- use crate::{
- Error, encode::encode, headers::GlueSchemaUuidHeader, rabin::Rabin,
types::Record,
- };
+ use crate::from_value;
+ use crate::types::Record;
use apache_avro_test_helper::TestResult;
use pretty_assertions::assert_eq;
use serde::Deserialize;
use std::io::Cursor;
- use uuid::Uuid;
const SCHEMA: &str = r#"
{
@@ -548,218 +459,6 @@ mod tests {
Ok(())
}
- #[derive(Deserialize, Clone, PartialEq, Debug)]
- struct TestSingleObjectReader {
- a: i64,
- b: f64,
- c: Vec<String>,
- }
-
- impl AvroSchema for TestSingleObjectReader {
- fn get_schema() -> Schema {
- let schema = r#"
- {
- "type":"record",
- "name":"TestSingleObjectWrtierSerialize",
- "fields":[
- {
- "name":"a",
- "type":"long"
- },
- {
- "name":"b",
- "type":"double"
- },
- {
- "name":"c",
- "type":{
- "type":"array",
- "items":"string"
- }
- }
- ]
- }
- "#;
- Schema::parse_str(schema).unwrap()
- }
- }
-
- impl From<Value> for TestSingleObjectReader {
- fn from(obj: Value) -> TestSingleObjectReader {
- if let Value::Record(fields) = obj {
- let mut a = None;
- let mut b = None;
- let mut c = vec![];
- for (field_name, v) in fields {
- match (field_name.as_str(), v) {
- ("a", Value::Long(i)) => a = Some(i),
- ("b", Value::Double(d)) => b = Some(d),
- ("c", Value::Array(v)) => {
- for inner_val in v {
- if let Value::String(s) = inner_val {
- c.push(s);
- }
- }
- }
- (key, value) => panic!("Unexpected pair: {key:?} ->
{value:?}"),
- }
- }
- TestSingleObjectReader {
- a: a.unwrap(),
- b: b.unwrap(),
- c,
- }
- } else {
- panic!("Expected a Value::Record but was {obj:?}")
- }
- }
- }
-
- impl From<TestSingleObjectReader> for Value {
- fn from(obj: TestSingleObjectReader) -> Value {
- Value::Record(vec![
- ("a".into(), obj.a.into()),
- ("b".into(), obj.b.into()),
- (
- "c".into(),
- Value::Array(obj.c.into_iter().map(|s|
s.into()).collect()),
- ),
- ])
- }
- }
-
- #[test]
- fn test_avro_3507_single_object_reader() -> TestResult {
- let obj = TestSingleObjectReader {
- a: 42,
- b: 3.33,
- c: vec!["cat".into(), "dog".into()],
- };
- let mut to_read = Vec::<u8>::new();
- to_read.extend_from_slice(&[0xC3, 0x01]);
- to_read.extend_from_slice(
- &TestSingleObjectReader::get_schema()
- .fingerprint::<Rabin>()
- .bytes[..],
- );
- encode(
- &obj.clone().into(),
- &TestSingleObjectReader::get_schema(),
- &mut to_read,
- )
- .expect("Encode should succeed");
- let mut to_read = &to_read[..];
- let generic_reader =
GenericSingleObjectReader::new(TestSingleObjectReader::get_schema())
- .expect("Schema should resolve");
- let val = generic_reader
- .read_value(&mut to_read)
- .expect("Should read");
- let expected_value: Value = obj.into();
- assert_eq!(expected_value, val);
-
- Ok(())
- }
-
- #[test]
- fn avro_3642_test_single_object_reader_incomplete_reads() -> TestResult {
- let obj = TestSingleObjectReader {
- a: 42,
- b: 3.33,
- c: vec!["cat".into(), "dog".into()],
- };
- // The two-byte marker, to show that the message uses this
single-record format
- let to_read_1 = [0xC3, 0x01];
- let mut to_read_2 = Vec::<u8>::new();
- to_read_2.extend_from_slice(
- &TestSingleObjectReader::get_schema()
- .fingerprint::<Rabin>()
- .bytes[..],
- );
- let mut to_read_3 = Vec::<u8>::new();
- encode(
- &obj.clone().into(),
- &TestSingleObjectReader::get_schema(),
- &mut to_read_3,
- )
- .expect("Encode should succeed");
- let mut to_read =
(&to_read_1[..]).chain(&to_read_2[..]).chain(&to_read_3[..]);
- let generic_reader =
GenericSingleObjectReader::new(TestSingleObjectReader::get_schema())
- .expect("Schema should resolve");
- let val = generic_reader
- .read_value(&mut to_read)
- .expect("Should read");
- let expected_value: Value = obj.into();
- assert_eq!(expected_value, val);
-
- Ok(())
- }
-
- #[test]
- fn test_avro_3507_reader_parity() -> TestResult {
- let obj = TestSingleObjectReader {
- a: 42,
- b: 3.33,
- c: vec!["cat".into(), "dog".into()],
- };
-
- let mut to_read = Vec::<u8>::new();
- to_read.extend_from_slice(&[0xC3, 0x01]);
- to_read.extend_from_slice(
- &TestSingleObjectReader::get_schema()
- .fingerprint::<Rabin>()
- .bytes[..],
- );
- encode(
- &obj.clone().into(),
- &TestSingleObjectReader::get_schema(),
- &mut to_read,
- )
- .expect("Encode should succeed");
- let generic_reader =
GenericSingleObjectReader::new(TestSingleObjectReader::get_schema())
- .expect("Schema should resolve");
- let specific_reader =
SpecificSingleObjectReader::<TestSingleObjectReader>::new()
- .expect("schema should resolve");
- let mut to_read1 = &to_read[..];
- let mut to_read2 = &to_read[..];
- let mut to_read3 = &to_read[..];
-
- let val = generic_reader
- .read_value(&mut to_read1)
- .expect("Should read");
- let read_obj1 = specific_reader
- .read_from_value(&mut to_read2)
- .expect("Should read from value");
- let read_obj2 = specific_reader
- .read(&mut to_read3)
- .expect("Should read from deserilize");
- let expected_value: Value = obj.clone().into();
- assert_eq!(obj, read_obj1);
- assert_eq!(obj, read_obj2);
- assert_eq!(val, expected_value);
-
- Ok(())
- }
-
- #[test]
- fn avro_rs_164_generic_reader_alternate_header() -> TestResult {
- let schema_uuid =
Uuid::parse_str("b2f1cf00-0434-013e-439a-125eb8485a5f")?;
- let header_builder = GlueSchemaUuidHeader::from_uuid(schema_uuid);
- let generic_reader =
GenericSingleObjectReader::new_with_header_builder(
- TestSingleObjectReader::get_schema(),
- header_builder,
- )
- .expect("failed to build reader");
- let data_to_read: Vec<u8> = vec![
- 3, 0, 178, 241, 207, 0, 4, 52, 1, 62, 67, 154, 18, 94, 184, 72,
90, 95,
- ];
- let mut to_read = &data_to_read[..];
- let read_result = generic_reader
- .read_value(&mut to_read)
- .map_err(Error::into_details);
- matches!(read_result, Err(Details::ReadBytes(_)));
- Ok(())
- }
-
#[cfg(not(feature = "snappy"))]
#[test]
fn test_avro_3549_read_not_enabled_codec() {
diff --git a/avro/src/reader/single_object.rs b/avro/src/reader/single_object.rs
new file mode 100644
index 0000000..6fd61d1
--- /dev/null
+++ b/avro/src/reader/single_object.rs
@@ -0,0 +1,333 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::decode::decode_internal;
+use crate::error::Details;
+use crate::headers::{HeaderBuilder, RabinFingerprintHeader};
+use crate::schema::ResolvedOwnedSchema;
+use crate::types::Value;
+use crate::{AvroResult, AvroSchema, Schema, from_value};
+use serde::de::DeserializeOwned;
+use std::io::Read;
+use std::marker::PhantomData;
+
+pub struct GenericSingleObjectReader {
+ write_schema: ResolvedOwnedSchema,
+ expected_header: Vec<u8>,
+}
+
+impl GenericSingleObjectReader {
+ pub fn new(schema: Schema) -> AvroResult<GenericSingleObjectReader> {
+ let header_builder = RabinFingerprintHeader::from_schema(&schema);
+ Self::new_with_header_builder(schema, header_builder)
+ }
+
+ pub fn new_with_header_builder<HB: HeaderBuilder>(
+ schema: Schema,
+ header_builder: HB,
+ ) -> AvroResult<GenericSingleObjectReader> {
+ let expected_header = header_builder.build_header();
+ Ok(GenericSingleObjectReader {
+ write_schema: ResolvedOwnedSchema::try_from(schema)?,
+ expected_header,
+ })
+ }
+
+ pub fn read_value<R: Read>(&self, reader: &mut R) -> AvroResult<Value> {
+ let mut header = vec![0; self.expected_header.len()];
+ match reader.read_exact(&mut header) {
+ Ok(_) => {
+ if self.expected_header == header {
+ decode_internal(
+ self.write_schema.get_root_schema(),
+ self.write_schema.get_names(),
+ &None,
+ reader,
+ )
+ } else {
+ Err(
+
Details::SingleObjectHeaderMismatch(self.expected_header.clone(), header)
+ .into(),
+ )
+ }
+ }
+ Err(io_error) => Err(Details::ReadHeader(io_error).into()),
+ }
+ }
+}
+
+pub struct SpecificSingleObjectReader<T>
+where
+ T: AvroSchema,
+{
+ inner: GenericSingleObjectReader,
+ _model: PhantomData<T>,
+}
+
+impl<T> SpecificSingleObjectReader<T>
+where
+ T: AvroSchema,
+{
+ pub fn new() -> AvroResult<SpecificSingleObjectReader<T>> {
+ Ok(SpecificSingleObjectReader {
+ inner: GenericSingleObjectReader::new(T::get_schema())?,
+ _model: PhantomData,
+ })
+ }
+}
+
+impl<T> SpecificSingleObjectReader<T>
+where
+ T: AvroSchema + From<Value>,
+{
+ pub fn read_from_value<R: Read>(&self, reader: &mut R) -> AvroResult<T> {
+ self.inner.read_value(reader).map(|v| v.into())
+ }
+}
+
+impl<T> SpecificSingleObjectReader<T>
+where
+ T: AvroSchema + DeserializeOwned,
+{
+ pub fn read<R: Read>(&self, reader: &mut R) -> AvroResult<T> {
+ from_value::<T>(&self.inner.read_value(reader)?)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::encode::encode;
+ use crate::headers::GlueSchemaUuidHeader;
+ use crate::rabin::Rabin;
+ use crate::{AvroSchema, Error, Schema};
+ use apache_avro_test_helper::TestResult;
+ use serde::Deserialize;
+ use uuid::Uuid;
+
+ #[derive(Deserialize, Clone, PartialEq, Debug)]
+ struct TestSingleObjectReader {
+ a: i64,
+ b: f64,
+ c: Vec<String>,
+ }
+
+ impl AvroSchema for TestSingleObjectReader {
+ fn get_schema() -> Schema {
+ let schema = r#"
+ {
+ "type":"record",
+ "name":"TestSingleObjectWrtierSerialize",
+ "fields":[
+ {
+ "name":"a",
+ "type":"long"
+ },
+ {
+ "name":"b",
+ "type":"double"
+ },
+ {
+ "name":"c",
+ "type":{
+ "type":"array",
+ "items":"string"
+ }
+ }
+ ]
+ }
+ "#;
+ Schema::parse_str(schema).unwrap()
+ }
+ }
+
+ impl From<Value> for TestSingleObjectReader {
+ fn from(obj: Value) -> TestSingleObjectReader {
+ if let Value::Record(fields) = obj {
+ let mut a = None;
+ let mut b = None;
+ let mut c = vec![];
+ for (field_name, v) in fields {
+ match (field_name.as_str(), v) {
+ ("a", Value::Long(i)) => a = Some(i),
+ ("b", Value::Double(d)) => b = Some(d),
+ ("c", Value::Array(v)) => {
+ for inner_val in v {
+ if let Value::String(s) = inner_val {
+ c.push(s);
+ }
+ }
+ }
+ (key, value) => panic!("Unexpected pair: {key:?} ->
{value:?}"),
+ }
+ }
+ TestSingleObjectReader {
+ a: a.unwrap(),
+ b: b.unwrap(),
+ c,
+ }
+ } else {
+ panic!("Expected a Value::Record but was {obj:?}")
+ }
+ }
+ }
+
+ impl From<TestSingleObjectReader> for Value {
+ fn from(obj: TestSingleObjectReader) -> Value {
+ Value::Record(vec![
+ ("a".into(), obj.a.into()),
+ ("b".into(), obj.b.into()),
+ (
+ "c".into(),
+ Value::Array(obj.c.into_iter().map(|s|
s.into()).collect()),
+ ),
+ ])
+ }
+ }
+
+ #[test]
+ fn test_avro_3507_single_object_reader() -> TestResult {
+ let obj = TestSingleObjectReader {
+ a: 42,
+ b: 3.33,
+ c: vec!["cat".into(), "dog".into()],
+ };
+ let mut to_read = Vec::<u8>::new();
+ to_read.extend_from_slice(&[0xC3, 0x01]);
+ to_read.extend_from_slice(
+ &TestSingleObjectReader::get_schema()
+ .fingerprint::<Rabin>()
+ .bytes[..],
+ );
+ encode(
+ &obj.clone().into(),
+ &TestSingleObjectReader::get_schema(),
+ &mut to_read,
+ )
+ .expect("Encode should succeed");
+ let mut to_read = &to_read[..];
+ let generic_reader =
GenericSingleObjectReader::new(TestSingleObjectReader::get_schema())
+ .expect("Schema should resolve");
+ let val = generic_reader
+ .read_value(&mut to_read)
+ .expect("Should read");
+ let expected_value: Value = obj.into();
+ pretty_assertions::assert_eq!(expected_value, val);
+
+ Ok(())
+ }
+
+ #[test]
+ fn avro_3642_test_single_object_reader_incomplete_reads() -> TestResult {
+ let obj = TestSingleObjectReader {
+ a: 42,
+ b: 3.33,
+ c: vec!["cat".into(), "dog".into()],
+ };
+ // The two-byte marker, to show that the message uses this
single-record format
+ let to_read_1 = [0xC3, 0x01];
+ let mut to_read_2 = Vec::<u8>::new();
+ to_read_2.extend_from_slice(
+ &TestSingleObjectReader::get_schema()
+ .fingerprint::<Rabin>()
+ .bytes[..],
+ );
+ let mut to_read_3 = Vec::<u8>::new();
+ encode(
+ &obj.clone().into(),
+ &TestSingleObjectReader::get_schema(),
+ &mut to_read_3,
+ )
+ .expect("Encode should succeed");
+ let mut to_read =
(&to_read_1[..]).chain(&to_read_2[..]).chain(&to_read_3[..]);
+ let generic_reader =
GenericSingleObjectReader::new(TestSingleObjectReader::get_schema())
+ .expect("Schema should resolve");
+ let val = generic_reader
+ .read_value(&mut to_read)
+ .expect("Should read");
+ let expected_value: Value = obj.into();
+ pretty_assertions::assert_eq!(expected_value, val);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_avro_3507_reader_parity() -> TestResult {
+ let obj = TestSingleObjectReader {
+ a: 42,
+ b: 3.33,
+ c: vec!["cat".into(), "dog".into()],
+ };
+
+ let mut to_read = Vec::<u8>::new();
+ to_read.extend_from_slice(&[0xC3, 0x01]);
+ to_read.extend_from_slice(
+ &TestSingleObjectReader::get_schema()
+ .fingerprint::<Rabin>()
+ .bytes[..],
+ );
+ encode(
+ &obj.clone().into(),
+ &TestSingleObjectReader::get_schema(),
+ &mut to_read,
+ )
+ .expect("Encode should succeed");
+ let generic_reader =
GenericSingleObjectReader::new(TestSingleObjectReader::get_schema())
+ .expect("Schema should resolve");
+ let specific_reader =
SpecificSingleObjectReader::<TestSingleObjectReader>::new()
+ .expect("schema should resolve");
+ let mut to_read1 = &to_read[..];
+ let mut to_read2 = &to_read[..];
+ let mut to_read3 = &to_read[..];
+
+ let val = generic_reader
+ .read_value(&mut to_read1)
+ .expect("Should read");
+ let read_obj1 = specific_reader
+ .read_from_value(&mut to_read2)
+ .expect("Should read from value");
+ let read_obj2 = specific_reader
+ .read(&mut to_read3)
+ .expect("Should read from deserilize");
+ let expected_value: Value = obj.clone().into();
+ pretty_assertions::assert_eq!(obj, read_obj1);
+ pretty_assertions::assert_eq!(obj, read_obj2);
+ pretty_assertions::assert_eq!(val, expected_value);
+
+ Ok(())
+ }
+
+ #[test]
+ fn avro_rs_164_generic_reader_alternate_header() -> TestResult {
+ let schema_uuid =
Uuid::parse_str("b2f1cf00-0434-013e-439a-125eb8485a5f")?;
+ let header_builder = GlueSchemaUuidHeader::from_uuid(schema_uuid);
+ let generic_reader =
GenericSingleObjectReader::new_with_header_builder(
+ TestSingleObjectReader::get_schema(),
+ header_builder,
+ )
+ .expect("failed to build reader");
+ let data_to_read: Vec<u8> = vec![
+ 3, 0, 178, 241, 207, 0, 4, 52, 1, 62, 67, 154, 18, 94, 184, 72,
90, 95,
+ ];
+ let mut to_read = &data_to_read[..];
+ let read_result = generic_reader
+ .read_value(&mut to_read)
+ .map_err(Error::into_details);
+ matches!(read_result, Err(Details::ReadBytes(_)));
+ Ok(())
+ }
+}
diff --git a/avro/src/schema/parser.rs b/avro/src/schema/parser.rs
index 6747d84..3b2acc3 100644
--- a/avro/src/schema/parser.rs
+++ b/avro/src/schema/parser.rs
@@ -271,7 +271,7 @@ impl Parser {
}
}
- // This crate support some logical types natively, and this function
tries to convert
+ // This crate supports some logical types natively, and this function
tries to convert
// a native complex type with a logical type attribute to these
logical types.
// This function:
// 1. Checks whether the native complex type is in the supported kinds.