This is an automated email from the ASF dual-hosted git repository. mgrigorov pushed a commit to branch extract-soe-from-reader in repository https://gitbox.apache.org/repos/asf/avro-rs.git
commit 3829bd289ac32c825858943af2d3da5977f05bec Author: Martin Tzvetanov Grigorov <[email protected]> AuthorDate: Fri Feb 20 12:44:55 2026 +0200 chore: Extract Single Object encoding related structs from reader/mod.rs No API breaks! No functional changes! --- avro/src/lib.rs | 5 +- avro/src/reader/mod.rs | 311 +----------------------------------- avro/src/reader/single_object.rs | 333 +++++++++++++++++++++++++++++++++++++++ avro/src/schema/parser.rs | 2 +- 4 files changed, 342 insertions(+), 309 deletions(-) diff --git a/avro/src/lib.rs b/avro/src/lib.rs index 4800bfa..a56aaad 100644 --- a/avro/src/lib.rs +++ b/avro/src/lib.rs @@ -91,8 +91,9 @@ pub use decimal::Decimal; pub use duration::{Days, Duration, Millis, Months}; pub use error::Error; pub use reader::{ - GenericSingleObjectReader, Reader, SpecificSingleObjectReader, from_avro_datum, - from_avro_datum_reader_schemata, from_avro_datum_schemata, read_marker, + Reader, from_avro_datum, from_avro_datum_reader_schemata, from_avro_datum_schemata, + read_marker, + single_object::{GenericSingleObjectReader, SpecificSingleObjectReader}, }; pub use schema::Schema; pub use serde::{AvroSchema, AvroSchemaComponent, from_value, to_value}; diff --git a/avro/src/reader/mod.rs b/avro/src/reader/mod.rs index 21924b0..598e1ff 100644 --- a/avro/src/reader/mod.rs +++ b/avro/src/reader/mod.rs @@ -18,21 +18,17 @@ //! Logic handling reading from Avro format at user level. mod block; +pub mod single_object; use crate::{ AvroResult, decode::{decode, decode_internal}, - error::Details, - from_value, - headers::{HeaderBuilder, RabinFingerprintHeader}, - schema::{ResolvedOwnedSchema, ResolvedSchema, Schema}, - serde::AvroSchema, + schema::{ResolvedSchema, Schema}, types::Value, }; use block::Block; use bon::bon; -use serde::de::DeserializeOwned; -use std::{collections::HashMap, io::Read, marker::PhantomData}; +use std::{collections::HashMap, io::Read}; /// Main interface for reading Avro formatted values. /// @@ -209,89 +205,6 @@ pub fn from_avro_datum_reader_schemata<R: Read>( } } -pub struct GenericSingleObjectReader { - write_schema: ResolvedOwnedSchema, - expected_header: Vec<u8>, -} - -impl GenericSingleObjectReader { - pub fn new(schema: Schema) -> AvroResult<GenericSingleObjectReader> { - let header_builder = RabinFingerprintHeader::from_schema(&schema); - Self::new_with_header_builder(schema, header_builder) - } - - pub fn new_with_header_builder<HB: HeaderBuilder>( - schema: Schema, - header_builder: HB, - ) -> AvroResult<GenericSingleObjectReader> { - let expected_header = header_builder.build_header(); - Ok(GenericSingleObjectReader { - write_schema: ResolvedOwnedSchema::try_from(schema)?, - expected_header, - }) - } - - pub fn read_value<R: Read>(&self, reader: &mut R) -> AvroResult<Value> { - let mut header = vec![0; self.expected_header.len()]; - match reader.read_exact(&mut header) { - Ok(_) => { - if self.expected_header == header { - decode_internal( - self.write_schema.get_root_schema(), - self.write_schema.get_names(), - &None, - reader, - ) - } else { - Err( - Details::SingleObjectHeaderMismatch(self.expected_header.clone(), header) - .into(), - ) - } - } - Err(io_error) => Err(Details::ReadHeader(io_error).into()), - } - } -} - -pub struct SpecificSingleObjectReader<T> -where - T: AvroSchema, -{ - inner: GenericSingleObjectReader, - _model: PhantomData<T>, -} - -impl<T> SpecificSingleObjectReader<T> -where - T: AvroSchema, -{ - pub fn new() -> AvroResult<SpecificSingleObjectReader<T>> { - Ok(SpecificSingleObjectReader { - inner: GenericSingleObjectReader::new(T::get_schema())?, - _model: PhantomData, - }) - } -} - -impl<T> SpecificSingleObjectReader<T> -where - T: AvroSchema + From<Value>, -{ - pub fn read_from_value<R: Read>(&self, reader: &mut R) -> AvroResult<T> { - self.inner.read_value(reader).map(|v| v.into()) - } -} - -impl<T> SpecificSingleObjectReader<T> -where - T: AvroSchema + DeserializeOwned, -{ - pub fn read<R: Read>(&self, reader: &mut R) -> AvroResult<T> { - from_value::<T>(&self.inner.read_value(reader)?) - } -} - /// Reads the marker bytes from Avro bytes generated earlier by a `Writer` pub fn read_marker(bytes: &[u8]) -> [u8; 16] { assert!( @@ -306,14 +219,12 @@ pub fn read_marker(bytes: &[u8]) -> [u8; 16] { #[cfg(test)] mod tests { use super::*; - use crate::{ - Error, encode::encode, headers::GlueSchemaUuidHeader, rabin::Rabin, types::Record, - }; + use crate::from_value; + use crate::types::Record; use apache_avro_test_helper::TestResult; use pretty_assertions::assert_eq; use serde::Deserialize; use std::io::Cursor; - use uuid::Uuid; const SCHEMA: &str = r#" { @@ -548,218 +459,6 @@ mod tests { Ok(()) } - #[derive(Deserialize, Clone, PartialEq, Debug)] - struct TestSingleObjectReader { - a: i64, - b: f64, - c: Vec<String>, - } - - impl AvroSchema for TestSingleObjectReader { - fn get_schema() -> Schema { - let schema = r#" - { - "type":"record", - "name":"TestSingleObjectWrtierSerialize", - "fields":[ - { - "name":"a", - "type":"long" - }, - { - "name":"b", - "type":"double" - }, - { - "name":"c", - "type":{ - "type":"array", - "items":"string" - } - } - ] - } - "#; - Schema::parse_str(schema).unwrap() - } - } - - impl From<Value> for TestSingleObjectReader { - fn from(obj: Value) -> TestSingleObjectReader { - if let Value::Record(fields) = obj { - let mut a = None; - let mut b = None; - let mut c = vec![]; - for (field_name, v) in fields { - match (field_name.as_str(), v) { - ("a", Value::Long(i)) => a = Some(i), - ("b", Value::Double(d)) => b = Some(d), - ("c", Value::Array(v)) => { - for inner_val in v { - if let Value::String(s) = inner_val { - c.push(s); - } - } - } - (key, value) => panic!("Unexpected pair: {key:?} -> {value:?}"), - } - } - TestSingleObjectReader { - a: a.unwrap(), - b: b.unwrap(), - c, - } - } else { - panic!("Expected a Value::Record but was {obj:?}") - } - } - } - - impl From<TestSingleObjectReader> for Value { - fn from(obj: TestSingleObjectReader) -> Value { - Value::Record(vec![ - ("a".into(), obj.a.into()), - ("b".into(), obj.b.into()), - ( - "c".into(), - Value::Array(obj.c.into_iter().map(|s| s.into()).collect()), - ), - ]) - } - } - - #[test] - fn test_avro_3507_single_object_reader() -> TestResult { - let obj = TestSingleObjectReader { - a: 42, - b: 3.33, - c: vec!["cat".into(), "dog".into()], - }; - let mut to_read = Vec::<u8>::new(); - to_read.extend_from_slice(&[0xC3, 0x01]); - to_read.extend_from_slice( - &TestSingleObjectReader::get_schema() - .fingerprint::<Rabin>() - .bytes[..], - ); - encode( - &obj.clone().into(), - &TestSingleObjectReader::get_schema(), - &mut to_read, - ) - .expect("Encode should succeed"); - let mut to_read = &to_read[..]; - let generic_reader = GenericSingleObjectReader::new(TestSingleObjectReader::get_schema()) - .expect("Schema should resolve"); - let val = generic_reader - .read_value(&mut to_read) - .expect("Should read"); - let expected_value: Value = obj.into(); - assert_eq!(expected_value, val); - - Ok(()) - } - - #[test] - fn avro_3642_test_single_object_reader_incomplete_reads() -> TestResult { - let obj = TestSingleObjectReader { - a: 42, - b: 3.33, - c: vec!["cat".into(), "dog".into()], - }; - // The two-byte marker, to show that the message uses this single-record format - let to_read_1 = [0xC3, 0x01]; - let mut to_read_2 = Vec::<u8>::new(); - to_read_2.extend_from_slice( - &TestSingleObjectReader::get_schema() - .fingerprint::<Rabin>() - .bytes[..], - ); - let mut to_read_3 = Vec::<u8>::new(); - encode( - &obj.clone().into(), - &TestSingleObjectReader::get_schema(), - &mut to_read_3, - ) - .expect("Encode should succeed"); - let mut to_read = (&to_read_1[..]).chain(&to_read_2[..]).chain(&to_read_3[..]); - let generic_reader = GenericSingleObjectReader::new(TestSingleObjectReader::get_schema()) - .expect("Schema should resolve"); - let val = generic_reader - .read_value(&mut to_read) - .expect("Should read"); - let expected_value: Value = obj.into(); - assert_eq!(expected_value, val); - - Ok(()) - } - - #[test] - fn test_avro_3507_reader_parity() -> TestResult { - let obj = TestSingleObjectReader { - a: 42, - b: 3.33, - c: vec!["cat".into(), "dog".into()], - }; - - let mut to_read = Vec::<u8>::new(); - to_read.extend_from_slice(&[0xC3, 0x01]); - to_read.extend_from_slice( - &TestSingleObjectReader::get_schema() - .fingerprint::<Rabin>() - .bytes[..], - ); - encode( - &obj.clone().into(), - &TestSingleObjectReader::get_schema(), - &mut to_read, - ) - .expect("Encode should succeed"); - let generic_reader = GenericSingleObjectReader::new(TestSingleObjectReader::get_schema()) - .expect("Schema should resolve"); - let specific_reader = SpecificSingleObjectReader::<TestSingleObjectReader>::new() - .expect("schema should resolve"); - let mut to_read1 = &to_read[..]; - let mut to_read2 = &to_read[..]; - let mut to_read3 = &to_read[..]; - - let val = generic_reader - .read_value(&mut to_read1) - .expect("Should read"); - let read_obj1 = specific_reader - .read_from_value(&mut to_read2) - .expect("Should read from value"); - let read_obj2 = specific_reader - .read(&mut to_read3) - .expect("Should read from deserilize"); - let expected_value: Value = obj.clone().into(); - assert_eq!(obj, read_obj1); - assert_eq!(obj, read_obj2); - assert_eq!(val, expected_value); - - Ok(()) - } - - #[test] - fn avro_rs_164_generic_reader_alternate_header() -> TestResult { - let schema_uuid = Uuid::parse_str("b2f1cf00-0434-013e-439a-125eb8485a5f")?; - let header_builder = GlueSchemaUuidHeader::from_uuid(schema_uuid); - let generic_reader = GenericSingleObjectReader::new_with_header_builder( - TestSingleObjectReader::get_schema(), - header_builder, - ) - .expect("failed to build reader"); - let data_to_read: Vec<u8> = vec![ - 3, 0, 178, 241, 207, 0, 4, 52, 1, 62, 67, 154, 18, 94, 184, 72, 90, 95, - ]; - let mut to_read = &data_to_read[..]; - let read_result = generic_reader - .read_value(&mut to_read) - .map_err(Error::into_details); - matches!(read_result, Err(Details::ReadBytes(_))); - Ok(()) - } - #[cfg(not(feature = "snappy"))] #[test] fn test_avro_3549_read_not_enabled_codec() { diff --git a/avro/src/reader/single_object.rs b/avro/src/reader/single_object.rs new file mode 100644 index 0000000..6fd61d1 --- /dev/null +++ b/avro/src/reader/single_object.rs @@ -0,0 +1,333 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::decode::decode_internal; +use crate::error::Details; +use crate::headers::{HeaderBuilder, RabinFingerprintHeader}; +use crate::schema::ResolvedOwnedSchema; +use crate::types::Value; +use crate::{AvroResult, AvroSchema, Schema, from_value}; +use serde::de::DeserializeOwned; +use std::io::Read; +use std::marker::PhantomData; + +pub struct GenericSingleObjectReader { + write_schema: ResolvedOwnedSchema, + expected_header: Vec<u8>, +} + +impl GenericSingleObjectReader { + pub fn new(schema: Schema) -> AvroResult<GenericSingleObjectReader> { + let header_builder = RabinFingerprintHeader::from_schema(&schema); + Self::new_with_header_builder(schema, header_builder) + } + + pub fn new_with_header_builder<HB: HeaderBuilder>( + schema: Schema, + header_builder: HB, + ) -> AvroResult<GenericSingleObjectReader> { + let expected_header = header_builder.build_header(); + Ok(GenericSingleObjectReader { + write_schema: ResolvedOwnedSchema::try_from(schema)?, + expected_header, + }) + } + + pub fn read_value<R: Read>(&self, reader: &mut R) -> AvroResult<Value> { + let mut header = vec![0; self.expected_header.len()]; + match reader.read_exact(&mut header) { + Ok(_) => { + if self.expected_header == header { + decode_internal( + self.write_schema.get_root_schema(), + self.write_schema.get_names(), + &None, + reader, + ) + } else { + Err( + Details::SingleObjectHeaderMismatch(self.expected_header.clone(), header) + .into(), + ) + } + } + Err(io_error) => Err(Details::ReadHeader(io_error).into()), + } + } +} + +pub struct SpecificSingleObjectReader<T> +where + T: AvroSchema, +{ + inner: GenericSingleObjectReader, + _model: PhantomData<T>, +} + +impl<T> SpecificSingleObjectReader<T> +where + T: AvroSchema, +{ + pub fn new() -> AvroResult<SpecificSingleObjectReader<T>> { + Ok(SpecificSingleObjectReader { + inner: GenericSingleObjectReader::new(T::get_schema())?, + _model: PhantomData, + }) + } +} + +impl<T> SpecificSingleObjectReader<T> +where + T: AvroSchema + From<Value>, +{ + pub fn read_from_value<R: Read>(&self, reader: &mut R) -> AvroResult<T> { + self.inner.read_value(reader).map(|v| v.into()) + } +} + +impl<T> SpecificSingleObjectReader<T> +where + T: AvroSchema + DeserializeOwned, +{ + pub fn read<R: Read>(&self, reader: &mut R) -> AvroResult<T> { + from_value::<T>(&self.inner.read_value(reader)?) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::encode::encode; + use crate::headers::GlueSchemaUuidHeader; + use crate::rabin::Rabin; + use crate::{AvroSchema, Error, Schema}; + use apache_avro_test_helper::TestResult; + use serde::Deserialize; + use uuid::Uuid; + + #[derive(Deserialize, Clone, PartialEq, Debug)] + struct TestSingleObjectReader { + a: i64, + b: f64, + c: Vec<String>, + } + + impl AvroSchema for TestSingleObjectReader { + fn get_schema() -> Schema { + let schema = r#" + { + "type":"record", + "name":"TestSingleObjectWrtierSerialize", + "fields":[ + { + "name":"a", + "type":"long" + }, + { + "name":"b", + "type":"double" + }, + { + "name":"c", + "type":{ + "type":"array", + "items":"string" + } + } + ] + } + "#; + Schema::parse_str(schema).unwrap() + } + } + + impl From<Value> for TestSingleObjectReader { + fn from(obj: Value) -> TestSingleObjectReader { + if let Value::Record(fields) = obj { + let mut a = None; + let mut b = None; + let mut c = vec![]; + for (field_name, v) in fields { + match (field_name.as_str(), v) { + ("a", Value::Long(i)) => a = Some(i), + ("b", Value::Double(d)) => b = Some(d), + ("c", Value::Array(v)) => { + for inner_val in v { + if let Value::String(s) = inner_val { + c.push(s); + } + } + } + (key, value) => panic!("Unexpected pair: {key:?} -> {value:?}"), + } + } + TestSingleObjectReader { + a: a.unwrap(), + b: b.unwrap(), + c, + } + } else { + panic!("Expected a Value::Record but was {obj:?}") + } + } + } + + impl From<TestSingleObjectReader> for Value { + fn from(obj: TestSingleObjectReader) -> Value { + Value::Record(vec![ + ("a".into(), obj.a.into()), + ("b".into(), obj.b.into()), + ( + "c".into(), + Value::Array(obj.c.into_iter().map(|s| s.into()).collect()), + ), + ]) + } + } + + #[test] + fn test_avro_3507_single_object_reader() -> TestResult { + let obj = TestSingleObjectReader { + a: 42, + b: 3.33, + c: vec!["cat".into(), "dog".into()], + }; + let mut to_read = Vec::<u8>::new(); + to_read.extend_from_slice(&[0xC3, 0x01]); + to_read.extend_from_slice( + &TestSingleObjectReader::get_schema() + .fingerprint::<Rabin>() + .bytes[..], + ); + encode( + &obj.clone().into(), + &TestSingleObjectReader::get_schema(), + &mut to_read, + ) + .expect("Encode should succeed"); + let mut to_read = &to_read[..]; + let generic_reader = GenericSingleObjectReader::new(TestSingleObjectReader::get_schema()) + .expect("Schema should resolve"); + let val = generic_reader + .read_value(&mut to_read) + .expect("Should read"); + let expected_value: Value = obj.into(); + pretty_assertions::assert_eq!(expected_value, val); + + Ok(()) + } + + #[test] + fn avro_3642_test_single_object_reader_incomplete_reads() -> TestResult { + let obj = TestSingleObjectReader { + a: 42, + b: 3.33, + c: vec!["cat".into(), "dog".into()], + }; + // The two-byte marker, to show that the message uses this single-record format + let to_read_1 = [0xC3, 0x01]; + let mut to_read_2 = Vec::<u8>::new(); + to_read_2.extend_from_slice( + &TestSingleObjectReader::get_schema() + .fingerprint::<Rabin>() + .bytes[..], + ); + let mut to_read_3 = Vec::<u8>::new(); + encode( + &obj.clone().into(), + &TestSingleObjectReader::get_schema(), + &mut to_read_3, + ) + .expect("Encode should succeed"); + let mut to_read = (&to_read_1[..]).chain(&to_read_2[..]).chain(&to_read_3[..]); + let generic_reader = GenericSingleObjectReader::new(TestSingleObjectReader::get_schema()) + .expect("Schema should resolve"); + let val = generic_reader + .read_value(&mut to_read) + .expect("Should read"); + let expected_value: Value = obj.into(); + pretty_assertions::assert_eq!(expected_value, val); + + Ok(()) + } + + #[test] + fn test_avro_3507_reader_parity() -> TestResult { + let obj = TestSingleObjectReader { + a: 42, + b: 3.33, + c: vec!["cat".into(), "dog".into()], + }; + + let mut to_read = Vec::<u8>::new(); + to_read.extend_from_slice(&[0xC3, 0x01]); + to_read.extend_from_slice( + &TestSingleObjectReader::get_schema() + .fingerprint::<Rabin>() + .bytes[..], + ); + encode( + &obj.clone().into(), + &TestSingleObjectReader::get_schema(), + &mut to_read, + ) + .expect("Encode should succeed"); + let generic_reader = GenericSingleObjectReader::new(TestSingleObjectReader::get_schema()) + .expect("Schema should resolve"); + let specific_reader = SpecificSingleObjectReader::<TestSingleObjectReader>::new() + .expect("schema should resolve"); + let mut to_read1 = &to_read[..]; + let mut to_read2 = &to_read[..]; + let mut to_read3 = &to_read[..]; + + let val = generic_reader + .read_value(&mut to_read1) + .expect("Should read"); + let read_obj1 = specific_reader + .read_from_value(&mut to_read2) + .expect("Should read from value"); + let read_obj2 = specific_reader + .read(&mut to_read3) + .expect("Should read from deserilize"); + let expected_value: Value = obj.clone().into(); + pretty_assertions::assert_eq!(obj, read_obj1); + pretty_assertions::assert_eq!(obj, read_obj2); + pretty_assertions::assert_eq!(val, expected_value); + + Ok(()) + } + + #[test] + fn avro_rs_164_generic_reader_alternate_header() -> TestResult { + let schema_uuid = Uuid::parse_str("b2f1cf00-0434-013e-439a-125eb8485a5f")?; + let header_builder = GlueSchemaUuidHeader::from_uuid(schema_uuid); + let generic_reader = GenericSingleObjectReader::new_with_header_builder( + TestSingleObjectReader::get_schema(), + header_builder, + ) + .expect("failed to build reader"); + let data_to_read: Vec<u8> = vec![ + 3, 0, 178, 241, 207, 0, 4, 52, 1, 62, 67, 154, 18, 94, 184, 72, 90, 95, + ]; + let mut to_read = &data_to_read[..]; + let read_result = generic_reader + .read_value(&mut to_read) + .map_err(Error::into_details); + matches!(read_result, Err(Details::ReadBytes(_))); + Ok(()) + } +} diff --git a/avro/src/schema/parser.rs b/avro/src/schema/parser.rs index 6747d84..3b2acc3 100644 --- a/avro/src/schema/parser.rs +++ b/avro/src/schema/parser.rs @@ -271,7 +271,7 @@ impl Parser { } } - // This crate support some logical types natively, and this function tries to convert + // This crate supports some logical types natively, and this function tries to convert // a native complex type with a logical type attribute to these logical types. // This function: // 1. Checks whether the native complex type is in the supported kinds.
