This is an automated email from the ASF dual-hosted git repository. kriskras99 pushed a commit to branch feat/full_enum_support in repository https://gitbox.apache.org/repos/asf/avro-rs.git
commit 7b4c2dc39f9f0a124b24459db24d3fe836eb4f41 Author: Kriskras99 <[email protected]> AuthorDate: Fri Mar 13 20:00:50 2026 +0100 feat: Use `SchemaAwareDeserializer` in the readers and add tests --- .../test_interop_single_object_encoding.rs | 4 +- avro/src/reader/block.rs | 63 +- avro/src/reader/datum.rs | 110 ++- avro/src/reader/mod.rs | 56 +- avro/src/reader/single_object.rs | 109 ++- avro/src/serde/deser_schema/mod.rs | 950 +++++++++++++++++++++ 6 files changed, 1219 insertions(+), 73 deletions(-) diff --git a/avro/examples/test_interop_single_object_encoding.rs b/avro/examples/test_interop_single_object_encoding.rs index e16c221..5b8e1bb 100644 --- a/avro/examples/test_interop_single_object_encoding.rs +++ b/avro/examples/test_interop_single_object_encoding.rs @@ -74,7 +74,9 @@ fn test_write(expected: &[u8]) { fn test_read(encoded: Vec<u8>) { let mut encoded = &encoded[..]; - let read_message = apache_avro::GenericSingleObjectReader::new(InteropMessage::get_schema()) + let read_message = apache_avro::GenericSingleObjectReader::builder() + .schema(InteropMessage::get_schema()) + .build() .expect("Resolving failed") .read_value(&mut encoded) .expect("Decoding failed"); diff --git a/avro/src/reader/block.rs b/avro/src/reader/block.rs index 06c07a2..73daaf8 100644 --- a/avro/src/reader/block.rs +++ b/avro/src/reader/block.rs @@ -15,21 +15,25 @@ // specific language governing permissions and limitations // under the License. +use std::{ + collections::HashMap, + io::{ErrorKind, Read}, + str::FromStr, +}; + +use log::warn; +use serde::de::DeserializeOwned; +use serde_json::from_slice; + use crate::{ AvroResult, Codec, Error, decode::{decode, decode_internal}, error::Details, schema::{Names, Schema, resolve_names, resolve_names_with_schemata}, + serde::deser_schema::{Config, SchemaAwareDeserializer}, types::Value, util, }; -use log::warn; -use serde_json::from_slice; -use std::{ - collections::HashMap, - io::{ErrorKind, Read}, - str::FromStr, -}; /// Internal Block reader. #[derive(Debug, Clone)] @@ -46,10 +50,15 @@ pub(super) struct Block<'r, R> { schemata: Vec<&'r Schema>, pub(super) user_metadata: HashMap<String, Vec<u8>>, names_refs: Names, + human_readable: bool, } impl<'r, R: Read> Block<'r, R> { - pub(super) fn new(reader: R, schemata: Vec<&'r Schema>) -> AvroResult<Block<'r, R>> { + pub(super) fn new( + reader: R, + schemata: Vec<&'r Schema>, + human_readable: bool, + ) -> AvroResult<Block<'r, R>> { let mut block = Block { reader, codec: Codec::Null, @@ -61,6 +70,7 @@ impl<'r, R: Read> Block<'r, R> { marker: [0; 16], user_metadata: Default::default(), names_refs: Default::default(), + human_readable, }; block.read_header()?; @@ -205,6 +215,43 @@ impl<'r, R: Read> Block<'r, R> { Ok(Some(item)) } + pub(super) fn read_next_deser<T: DeserializeOwned>( + &mut self, + read_schema: Option<&Schema>, + ) -> AvroResult<Option<T>> { + if self.is_empty() { + self.read_block_next()?; + if self.is_empty() { + return Ok(None); + } + } + + let mut block_bytes = &self.buf[self.buf_idx..]; + let b_original = block_bytes.len(); + + let item = if read_schema.is_some() { + todo!("Schema aware deserialisation does not resolve schemas yet"); + } else { + let config = Config { + names: &self.names_refs, + human_readable: self.human_readable, + }; + T::deserialize(SchemaAwareDeserializer::new( + &mut block_bytes, + &self.writer_schema, + config, + )?)? + }; + + if b_original != 0 && b_original == block_bytes.len() { + // No bytes were read, return an error to avoid an infinite loop + return Err(Details::ReadBlock.into()); + } + self.buf_idx += b_original - block_bytes.len(); + self.message_count -= 1; + Ok(Some(item)) + } + fn read_writer_schema(&mut self, metadata: &HashMap<String, Value>) -> AvroResult<()> { let json: serde_json::Value = metadata .get("avro.schema") diff --git a/avro/src/reader/datum.rs b/avro/src/reader/datum.rs index 80fe937..b02f8db 100644 --- a/avro/src/reader/datum.rs +++ b/avro/src/reader/datum.rs @@ -15,11 +15,19 @@ // specific language governing permissions and limitations // under the License. -use std::io::Read; +use std::{io::Read, marker::PhantomData}; use bon::bon; +use serde::de::DeserializeOwned; -use crate::{AvroResult, Schema, decode::decode_internal, schema::ResolvedSchema, types::Value}; +use crate::{ + AvroResult, AvroSchema, Schema, + decode::decode_internal, + schema::{ResolvedOwnedSchema, ResolvedSchema}, + serde::deser_schema::{Config, SchemaAwareDeserializer}, + types::Value, + util::is_human_readable, +}; /// Reader for reading raw Avro data. /// @@ -30,6 +38,7 @@ pub struct GenericDatumReader<'s> { writer: &'s Schema, resolved: ResolvedSchema<'s>, reader: Option<(&'s Schema, ResolvedSchema<'s>)>, + human_readable: bool, } #[bon] @@ -50,6 +59,9 @@ impl<'s> GenericDatumReader<'s> { reader_schema: Option<&'s Schema>, /// Already resolved schemata that will be used to resolve references in the reader's schema. resolved_reader_schemata: Option<ResolvedSchema<'s>>, + /// Was the data serialized with `human_readable`. + #[builder(default = is_human_readable())] + human_readable: bool, ) -> AvroResult<Self> { let resolved_writer_schemata = if let Some(resolved) = resolved_writer_schemata { resolved @@ -71,6 +83,7 @@ impl<'s> GenericDatumReader<'s> { writer: writer_schema, resolved: resolved_writer_schemata, reader, + human_readable, }) } } @@ -124,6 +137,68 @@ impl<'s> GenericDatumReader<'s> { Ok(value) } } + + /// Read a Avro datum from the reader. + /// + /// # Panics + /// Will panic if a reader schema has been configured, this is a WIP. + pub fn read_deser<T: DeserializeOwned>(&self, reader: &mut impl Read) -> AvroResult<T> { + // `reader` is `impl Read` instead of a generic on the function like T so it's easier to + // specify the type wanted (`read_deser<String>` vs `read_deser<String, _>`) + if let Some((_, _)) = &self.reader { + todo!("Schema aware deserialisation does not resolve schemas yet"); + } else { + T::deserialize(SchemaAwareDeserializer::new( + reader, + self.writer, + Config { + names: self.resolved.get_names(), + human_readable: self.human_readable, + }, + )?) + } + } +} + +/// Reader for reading raw Avro data. +/// +/// This is most likely not what you need. Most users should use [`Reader`][crate::Reader], +/// [`GenericSingleObjectReader`][crate::GenericSingleObjectReader], or +/// [`SpecificSingleObjectReader`][crate::SpecificSingleObjectReader] instead. +pub struct SpecificDatumReader<T: AvroSchema> { + resolved: ResolvedOwnedSchema, + human_readable: bool, + phantom: PhantomData<T>, +} + +#[bon] +impl<T: AvroSchema> SpecificDatumReader<T> { + /// Build a [`SpecificDatumReader`]. + /// + /// This is most likely not what you need. Most users should use [`Reader`][crate::Reader], + /// [`GenericSingleObjectReader`][crate::GenericSingleObjectReader], or + /// [`SpecificSingleObjectReader`][crate::SpecificSingleObjectReader] instead. + #[builder] + pub fn new(#[builder(default = is_human_readable())] human_readable: bool) -> AvroResult<Self> { + Ok(Self { + resolved: T::get_schema().try_into()?, + human_readable, + phantom: PhantomData, + }) + } +} + +impl<T: AvroSchema + DeserializeOwned> SpecificDatumReader<T> { + pub fn read<R: Read>(&self, reader: &mut R) -> AvroResult<T> { + T::deserialize(SchemaAwareDeserializer::new( + reader, + self.resolved.get_root_schema(), + Config { + names: self.resolved.get_names(), + human_readable: self.human_readable, + }, + )?) + } } /// Deprecated. @@ -227,7 +302,7 @@ mod tests { use serde::Deserialize; use crate::{ - Schema, from_value, + Schema, reader::datum::GenericDatumReader, types::{Record, Value}, }; @@ -272,7 +347,7 @@ mod tests { const TEST_RECORD_SCHEMA_3240: &str = r#" { "type": "record", - "name": "test", + "name": "TestRecord3240", "fields": [ { "name": "a", @@ -312,26 +387,17 @@ mod tests { } let schema = Schema::parse_str(TEST_RECORD_SCHEMA_3240)?; - let mut encoded: &'static [u8] = &[54, 6, 102, 111, 111]; + let mut encoded: &[u8] = &[54, 6, 102, 111, 111]; - let expected_record: TestRecord3240 = TestRecord3240 { - a: 27i64, - b: String::from("foo"), - a_nullable_array: None, - a_nullable_string: None, - }; - - let avro_datum = GenericDatumReader::builder(&schema) + let error = GenericDatumReader::builder(&schema) .build()? - .read_value(&mut encoded)?; - let parsed_record: TestRecord3240 = match &avro_datum { - Value::Record(_) => from_value::<TestRecord3240>(&avro_datum)?, - unexpected => { - panic!("could not map avro data to struct, found unexpected: {unexpected:?}") - } - }; - - assert_eq!(parsed_record, expected_record); + .read_deser::<TestRecord3240>(&mut encoded) + .unwrap_err(); + // TODO: Create a version of this test that does schema resolution + assert_eq!( + error.to_string(), + "Failed to read bytes for decoding variable length integer: failed to fill whole buffer" + ); Ok(()) } diff --git a/avro/src/reader/mod.rs b/avro/src/reader/mod.rs index 94af909..d439407 100644 --- a/avro/src/reader/mod.rs +++ b/avro/src/reader/mod.rs @@ -21,10 +21,13 @@ mod block; pub mod datum; pub mod single_object; -use crate::{AvroResult, schema::Schema, types::Value}; +use std::{collections::HashMap, io::Read, marker::PhantomData}; + use block::Block; use bon::bon; -use std::{collections::HashMap, io::Read}; +use serde::de::DeserializeOwned; + +use crate::{AvroResult, schema::Schema, types::Value, util::is_human_readable}; /// Main interface for reading Avro formatted values. /// @@ -68,11 +71,12 @@ impl<'a, R: Read> Reader<'a, R> { #[builder(start_fn)] reader: R, reader_schema: Option<&'a Schema>, schemata: Option<Vec<&'a Schema>>, + #[builder(default = is_human_readable())] human_readable: bool, ) -> AvroResult<Reader<'a, R>> { let schemata = schemata.unwrap_or_else(|| reader_schema.map(|rs| vec![rs]).unwrap_or_default()); - let block = Block::new(reader, schemata)?; + let block = Block::new(reader, schemata, human_readable)?; let mut reader = Reader { block, reader_schema, @@ -97,12 +101,20 @@ impl<'a, R: Read> Reader<'a, R> { self.reader_schema } - /// Get a reference to the user metadata + /// Get a reference to the user metadata. #[inline] pub fn user_metadata(&self) -> &HashMap<String, Vec<u8>> { &self.block.user_metadata } + /// Convert this reader into an iterator that deserializes to `T`. + pub fn into_deser_iter<T: DeserializeOwned>(self) -> ReaderDeser<'a, R, T> { + ReaderDeser { + inner: self, + phantom: PhantomData, + } + } + #[inline] fn read_next(&mut self) -> AvroResult<Option<Value>> { let read_schema = if self.should_resolve_schema { @@ -113,6 +125,10 @@ impl<'a, R: Read> Reader<'a, R> { self.block.read_next(read_schema) } + + fn read_next_deser<T: DeserializeOwned>(&mut self) -> AvroResult<Option<T>> { + self.block.read_next_deser(self.reader_schema) + } } impl<R: Read> Iterator for Reader<'_, R> { @@ -133,6 +149,30 @@ impl<R: Read> Iterator for Reader<'_, R> { } } +/// Wrapper around [`Reader`] where the iterator deserializes `T`. +pub struct ReaderDeser<'a, R, T> { + inner: Reader<'a, R>, + phantom: PhantomData<T>, +} + +impl<R: Read, T: DeserializeOwned> Iterator for ReaderDeser<'_, R, T> { + type Item = AvroResult<T>; + + fn next(&mut self) -> Option<Self::Item> { + // Don't continue when we've errored before + if self.inner.errored { + return None; + } + match self.inner.read_next_deser::<T>() { + Ok(opt) => opt.map(Ok), + Err(e) => { + self.inner.errored = true; + Some(Err(e)) + } + } + } +} + /// Reads the marker bytes from Avro bytes generated earlier by a `Writer` pub fn read_marker(bytes: &[u8]) -> [u8; 16] { assert!( @@ -146,11 +186,13 @@ pub fn read_marker(bytes: &[u8]) -> [u8; 16] { #[cfg(test)] mod tests { - use super::*; - use crate::types::Record; + use std::io::Cursor; + use apache_avro_test_helper::TestResult; use pretty_assertions::assert_eq; - use std::io::Cursor; + + use super::*; + use crate::types::Record; const SCHEMA: &str = r#" { diff --git a/avro/src/reader/single_object.rs b/avro/src/reader/single_object.rs index c6151a3..a26b513 100644 --- a/avro/src/reader/single_object.rs +++ b/avro/src/reader/single_object.rs @@ -15,38 +15,49 @@ // specific language governing permissions and limitations // under the License. -use crate::decode::decode_internal; -use crate::error::Details; -use crate::headers::{HeaderBuilder, RabinFingerprintHeader}; -use crate::schema::ResolvedOwnedSchema; -use crate::types::Value; -use crate::{AvroResult, AvroSchema, Schema, from_value}; +use std::{io::Read, marker::PhantomData}; + +use bon::bon; use serde::de::DeserializeOwned; -use std::io::Read; -use std::marker::PhantomData; + +use crate::{ + AvroResult, AvroSchema, Schema, + decode::decode_internal, + error::Details, + headers::{HeaderBuilder, RabinFingerprintHeader}, + schema::ResolvedOwnedSchema, + serde::deser_schema::{Config, SchemaAwareDeserializer}, + types::Value, + util::is_human_readable, +}; pub struct GenericSingleObjectReader { write_schema: ResolvedOwnedSchema, expected_header: Vec<u8>, + human_readable: bool, } +#[bon] impl GenericSingleObjectReader { - pub fn new(schema: Schema) -> AvroResult<GenericSingleObjectReader> { - let header_builder = RabinFingerprintHeader::from_schema(&schema); - Self::new_with_header_builder(schema, header_builder) - } - - pub fn new_with_header_builder<HB: HeaderBuilder>( + #[builder] + pub fn new( schema: Schema, - header_builder: HB, + /// The expected header. + #[builder(default = RabinFingerprintHeader::from_schema(&schema).build_header())] + header: Vec<u8>, + /// Was the data serialized with `human_readable`. + #[builder(default = is_human_readable())] + human_readable: bool, ) -> AvroResult<GenericSingleObjectReader> { - let expected_header = header_builder.build_header(); - Ok(GenericSingleObjectReader { - write_schema: ResolvedOwnedSchema::try_from(schema)?, - expected_header, + Ok(Self { + write_schema: schema.try_into()?, + expected_header: header, + human_readable, }) } +} +impl GenericSingleObjectReader { pub fn read_value<R: Read>(&self, reader: &mut R) -> AvroResult<Value> { let mut header = vec![0; self.expected_header.len()]; match reader.read_exact(&mut header) { @@ -68,6 +79,26 @@ impl GenericSingleObjectReader { Err(io_error) => Err(Details::ReadHeader(io_error).into()), } } + + pub fn read_deser<T: DeserializeOwned>(&self, reader: &mut impl Read) -> AvroResult<T> { + let mut header = vec![0; self.expected_header.len()]; + reader + .read_exact(&mut header) + .map_err(Details::ReadHeader)?; + if self.expected_header == header { + let config = Config { + names: self.write_schema.get_names(), + human_readable: self.human_readable, + }; + T::deserialize(SchemaAwareDeserializer::new( + reader, + self.write_schema.get_root_schema(), + config, + )?) + } else { + Err(Details::SingleObjectHeaderMismatch(self.expected_header.clone(), header).into()) + } + } } pub struct SpecificSingleObjectReader<T> @@ -84,7 +115,9 @@ where { pub fn new() -> AvroResult<SpecificSingleObjectReader<T>> { Ok(SpecificSingleObjectReader { - inner: GenericSingleObjectReader::new(T::get_schema())?, + inner: GenericSingleObjectReader::builder() + .schema(T::get_schema()) + .build()?, _model: PhantomData, }) } @@ -104,21 +137,21 @@ where T: AvroSchema + DeserializeOwned, { pub fn read<R: Read>(&self, reader: &mut R) -> AvroResult<T> { - from_value::<T>(&self.inner.read_value(reader)?) + self.inner.read_deser(reader) } } #[cfg(test)] mod tests { - use super::*; - use crate::encode::encode; - use crate::headers::GlueSchemaUuidHeader; - use crate::rabin::Rabin; - use crate::{AvroSchema, Error, Schema}; use apache_avro_test_helper::TestResult; use serde::Deserialize; use uuid::Uuid; + use super::*; + use crate::{ + AvroSchema, Error, Schema, encode::encode, headers::GlueSchemaUuidHeader, rabin::Rabin, + }; + #[derive(Deserialize, Clone, PartialEq, Debug)] struct TestSingleObjectReader { a: i64, @@ -131,7 +164,7 @@ mod tests { let schema = r#" { "type":"record", - "name":"TestSingleObjectWrtierSerialize", + "name":"TestSingleObjectReader", "fields":[ { "name":"a", @@ -220,7 +253,9 @@ mod tests { ) .expect("Encode should succeed"); let mut to_read = &to_read[..]; - let generic_reader = GenericSingleObjectReader::new(TestSingleObjectReader::get_schema()) + let generic_reader = GenericSingleObjectReader::builder() + .schema(TestSingleObjectReader::get_schema()) + .build() .expect("Schema should resolve"); let val = generic_reader .read_value(&mut to_read) @@ -254,7 +289,9 @@ mod tests { ) .expect("Encode should succeed"); let mut to_read = (&to_read_1[..]).chain(&to_read_2[..]).chain(&to_read_3[..]); - let generic_reader = GenericSingleObjectReader::new(TestSingleObjectReader::get_schema()) + let generic_reader = GenericSingleObjectReader::builder() + .schema(TestSingleObjectReader::get_schema()) + .build() .expect("Schema should resolve"); let val = generic_reader .read_value(&mut to_read) @@ -286,7 +323,9 @@ mod tests { &mut to_read, ) .expect("Encode should succeed"); - let generic_reader = GenericSingleObjectReader::new(TestSingleObjectReader::get_schema()) + let generic_reader = GenericSingleObjectReader::builder() + .schema(TestSingleObjectReader::get_schema()) + .build() .expect("Schema should resolve"); let specific_reader = SpecificSingleObjectReader::<TestSingleObjectReader>::new() .expect("schema should resolve"); @@ -315,11 +354,11 @@ mod tests { fn avro_rs_164_generic_reader_alternate_header() -> TestResult { let schema_uuid = Uuid::parse_str("b2f1cf00-0434-013e-439a-125eb8485a5f")?; let header_builder = GlueSchemaUuidHeader::from_uuid(schema_uuid); - let generic_reader = GenericSingleObjectReader::new_with_header_builder( - TestSingleObjectReader::get_schema(), - header_builder, - ) - .expect("failed to build reader"); + let generic_reader = GenericSingleObjectReader::builder() + .schema(TestSingleObjectReader::get_schema()) + .header(header_builder.build_header()) + .build() + .expect("failed to build reader"); let data_to_read: Vec<u8> = vec![ 3, 0, 178, 241, 207, 0, 4, 52, 1, 62, 67, 154, 18, 94, 184, 72, 90, 95, ]; diff --git a/avro/src/serde/deser_schema/mod.rs b/avro/src/serde/deser_schema/mod.rs index c404fc6..613bd58 100644 --- a/avro/src/serde/deser_schema/mod.rs +++ b/avro/src/serde/deser_schema/mod.rs @@ -737,3 +737,953 @@ impl<'de, 's, 'r, R: Read, S: Borrow<Schema>> Deserializer<'de> self.config.human_readable } } + +#[cfg(test)] +mod tests { + use std::fmt::Debug; + + use apache_avro_test_helper::TestResult; + use num_bigint::BigInt; + use pretty_assertions::assert_eq; + use serde::{ + Deserialize, Serialize, + de::{DeserializeOwned, Visitor}, + }; + use uuid::Uuid; + + use super::*; + use crate::{ + AvroResult, Decimal, reader::datum::GenericDatumReader, writer::datum::GenericDatumWriter, + }; + + #[track_caller] + fn assert_roundtrip<T>(value: T, schema: &Schema, schemata: Vec<&Schema>) -> AvroResult<()> + where + T: Serialize + DeserializeOwned + PartialEq + Debug + Clone, + { + let buf = GenericDatumWriter::builder(schema) + .schemata(schemata.clone())? + .build()? + .write_ser_to_vec(&value)?; + + let decoded_value: T = GenericDatumReader::builder(schema) + .writer_schemata(schemata)? + .build()? + .read_deser(&mut &buf[..])?; + + assert_eq!(decoded_value, value); + + Ok(()) + } + + #[test] + fn avro_3955_decode_enum() -> TestResult { + #[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] + #[serde(rename_all = "SCREAMING_SNAKE_CASE")] + pub enum SourceType { + Sozu, + Haproxy, + HaproxyTcp, + } + #[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] + struct AccessLog { + source: SourceType, + } + + let schema = Schema::parse_str( + r#"{ + "name": "AccessLog", + "namespace": "com.clevercloud.accesslogs.common.avro", + "type": "record", + "fields": [{ + "name": "source", + "type": { + "type": "enum", + "name": "SourceType", + "items": "string", + "symbols": ["SOZU", "HAPROXY", "HAPROXY_TCP"] + } + }] + }"#, + )?; + + let data = AccessLog { + source: SourceType::Sozu, + }; + + assert_roundtrip(data, &schema, Vec::new())?; + + Ok(()) + } + + #[test] + fn avro_rs_xxx_decode_enum_invalid_data() -> TestResult { + #[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] + #[serde(rename_all = "SCREAMING_SNAKE_CASE")] + pub enum SourceType { + Sozu, + Haproxy, + HaproxyTcp, + } + #[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] + struct AccessLog { + source: SourceType, + } + + let schema = Schema::parse_str( + r#"{ + "name": "AccessLog", + "namespace": "com.clevercloud.accesslogs.common.avro", + "type": "record", + "fields": [{ + "name": "source", + "type": { + "type": "enum", + "name": "SourceType", + "items": "string", + "symbols": ["SOZU", "HAPROXY", "HAPROXY_TCP"] + } + }] + }"#, + )?; + + // Contains index 3 (4th symbol) + let data_with_unknown_index = &[6u8]; + + let error = GenericDatumReader::builder(&schema) + .build()? + .read_deser::<AccessLog>(&mut &data_with_unknown_index[..]) + .unwrap_err(); + + assert_eq!(error.to_string(), "Enum symbol index out of bounds: 3"); + + Ok(()) + } + + #[test] + fn avro_rs_xxx_nested_struct() -> TestResult { + #[derive(Debug, Deserialize, Serialize, Clone, PartialEq, Eq)] + struct Test { + a: i64, + b: String, + c: Decimal, + } + + #[derive(Debug, Deserialize, Serialize, Clone, PartialEq, Eq)] + struct TestInner { + a: Test, + b: i32, + } + + let schemas = Schema::parse_list([ + r#"{ + "name": "Test", + "type": "record", + "fields": [ + { + "name": "a", + "type": "long" + }, + { + "name": "b", + "type": "string" + }, + { + "name": "c", + "type": { + "type": "bytes", + "logicalType": "decimal", + "precision": 4, + "scale": 2 + } + } + ] + }"#, + r#"{ + "name": "TestInner", + "type": "record", + "fields": [ + { + "name": "a", + "type": "Test" + }, + { + "name": "b", + "type": "int" + } + ] + }"#, + ])?; + + let test = Test { + a: 27, + b: "foo".to_string(), + c: Decimal::from(vec![1, 24]), + }; + + assert_roundtrip(test.clone(), &schemas[0], Vec::new())?; + + let test_inner = TestInner { a: test, b: 35 }; + + assert_roundtrip(test_inner, &schemas[1], vec![&schemas[0]])?; + + Ok(()) + } + + #[test] + fn avro_rs_xxx_external_unit_enum() -> TestResult { + #[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] + pub enum UnitExternalEnum { + Val1, + Val2, + } + #[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] + struct TestUnitExternalEnum { + a: UnitExternalEnum, + } + + let schema = Schema::parse_str( + r#"{ + "name": "TestUnitExternalEnum", + "type": "record", + "fields": [{ + "name": "a", + "type": { + "type": "enum", + "name": "UnitExternalEnum", + "items": "string", + "symbols": ["Val1", "Val2"] + } + }] + }"#, + )?; + + let alt_schema = Schema::parse_str( + r#"{ + "name": "TestUnitExternalEnum", + "type": "record", + "fields": [{ + "name": "a", + "type": [ + { + "name": "Val1", + "type": "record", + "fields": [] + }, + { + "name": "Val2", + "type": "record", + "fields": [] + } + ] + }] + }"#, + )?; + + let value = TestUnitExternalEnum { + a: UnitExternalEnum::Val1, + }; + assert_roundtrip(value.clone(), &schema, Vec::new())?; + assert_roundtrip(value, &alt_schema, Vec::new())?; + + let value = TestUnitExternalEnum { + a: UnitExternalEnum::Val2, + }; + assert_roundtrip(value.clone(), &alt_schema, Vec::new())?; + assert_roundtrip(value, &schema, Vec::new())?; + + Ok(()) + } + + #[test] + fn avro_rs_xxx_internal_unit_enum() -> TestResult { + #[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] + #[serde(tag = "t")] + pub enum UnitInternalEnum { + Val1, + Val2, + } + #[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] + struct TestUnitInternalEnum { + a: UnitInternalEnum, + } + + let schema = Schema::parse_str( + r#"{ + "name": "TestUnitInternalEnum", + "type": "record", + "fields": [{ + "name": "a", + "type": { + "type": "record", + "name": "UnitInternalEnum", + "fields": [{ + "name": "t", + "type": "string" + }] + } + }] + }"#, + )?; + + let value = TestUnitInternalEnum { + a: UnitInternalEnum::Val1, + }; + assert_roundtrip(value, &schema, Vec::new())?; + + let value = TestUnitInternalEnum { + a: UnitInternalEnum::Val2, + }; + assert_roundtrip(value, &schema, Vec::new())?; + + Ok(()) + } + + #[test] + fn avro_rs_xxx_adjacent_unit_enum() -> TestResult { + #[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] + #[serde(tag = "t", content = "v")] + pub enum UnitAdjacentEnum { + Val1, + Val2, + } + #[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] + struct TestUnitAdjacentEnum { + a: UnitAdjacentEnum, + } + + let schema = Schema::parse_str( + r#"{ + "name": "TestUnitAdjacentEnum", + "type": "record", + "fields": [{ + "name": "a", + "type": { + "type": "record", + "name": "UnitAdjacentEnum", + "fields": [ + { + "name": "t", + "type": { + "type": "enum", + "name": "t", + "symbols": ["Val1", "Val2"] + } + }, + { + "name": "v", + "default": null, + "type": ["null"] + } + ] + } + }] + }"#, + )?; + + let value = TestUnitAdjacentEnum { + a: UnitAdjacentEnum::Val1, + }; + assert_roundtrip(value, &schema, Vec::new())?; + + let value = TestUnitAdjacentEnum { + a: UnitAdjacentEnum::Val2, + }; + assert_roundtrip(value, &schema, Vec::new())?; + + Ok(()) + } + + #[test] + fn avro_rs_xxx_untagged_unit_enum() -> TestResult { + #[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] + #[serde(untagged)] + pub enum UnitUntaggedEnum { + Val1, + Val2, + } + #[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] + struct TestUnitUntaggedEnum { + a: UnitUntaggedEnum, + } + + let schema = Schema::parse_str( + r#"{ + "name": "TestUnitUntaggedEnum", + "type": "record", + "fields": [{ + "name": "a", + "type": ["null"] + }] + }"#, + )?; + + let value1 = TestUnitUntaggedEnum { + a: UnitUntaggedEnum::Val1, + }; + assert_roundtrip(value1.clone(), &schema, Vec::new())?; + + let value2 = TestUnitUntaggedEnum { + a: UnitUntaggedEnum::Val2, + }; + let buf = GenericDatumWriter::builder(&schema) + .build()? + .write_ser_to_vec(&value1)?; + + let decoded_value: TestUnitUntaggedEnum = GenericDatumReader::builder(&schema) + .build()? + .read_deser(&mut &buf[..])?; + + // Val2 cannot troundtrip. All unit variants are serialized to the same null. + // This also doesn't roundtrip in serde_json. + assert_ne!(value2, decoded_value); + assert_eq!(decoded_value, value1); + + Ok(()) + } + + #[test] + fn avro_rs_xxx_mixed_enum() -> TestResult { + #[derive(Debug, Deserialize, Serialize, Clone, PartialEq)] + struct TestNullExternalEnum { + a: NullExternalEnum, + } + + #[derive(Debug, Deserialize, Serialize, Clone, PartialEq)] + enum NullExternalEnum { + Val1, + Val2(), + Val3(()), + Val4(u64), + } + + let schema = Schema::parse_str( + r#"{ + "name": "TestNullExternalEnum", + "type": "record", + "fields": [{ + "name": "a", + "type": [ + { + "name": "Val1", + "type": "record", + "fields": [] + }, + { + "name": "Val2", + "type": "record", + "fields": [] + }, + { + "name": "Val3", + "type": "record", + "org.apache.avro.rust.union_of_records": true, + "fields": [{ + "name": "field_0", + "type": "null" + }] + }, + { + "name": "Val4", + "type": "record", + "org.apache.avro.rust.union_of_records": true, + "fields": [{ + "name": "field_0", + "type": { + "type": "fixed", + "name": "u64", + "size": 8 + } + }] + } + ] + }] + }"#, + )?; + + let data = [ + TestNullExternalEnum { + a: NullExternalEnum::Val1, + }, + TestNullExternalEnum { + a: NullExternalEnum::Val2(), + }, + TestNullExternalEnum { + a: NullExternalEnum::Val2(), + }, + TestNullExternalEnum { + a: NullExternalEnum::Val3(()), + }, + TestNullExternalEnum { + a: NullExternalEnum::Val4(123), + }, + ]; + + for value in data { + assert_roundtrip(value, &schema, Vec::new())?; + } + + Ok(()) + } + + #[test] + fn avro_rs_xxx_single_value_enum() -> TestResult { + #[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] + struct TestSingleValueExternalEnum { + a: SingleValueExternalEnum, + } + + #[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] + enum SingleValueExternalEnum { + Double(f64), + String(String), + } + + let schema = Schema::parse_str( + r#"{ + "name": "TestSingleValueExternalEnum", + "type": "record", + "fields": [{ + "name": "a", + "type": [ + { + "name": "Double", + "type": "record", + "org.apache.avro.rust.union_of_records": true, + "fields": [{ + "name": "field_0", + "type": "double" + }] + }, + { + "name": "String", + "type": "record", + "org.apache.avro.rust.union_of_records": true, + "fields": [{ + "name": "field_0", + "type": "string" + }] + } + ] + }] + }"#, + )?; + + let alt_schema = Schema::parse_str( + r#"{ + "name": "TestSingleValueExternalEnum", + "type": "record", + "fields": [{ + "name": "a", + "type": ["double", "string"] + }] + }"#, + )?; + + let double = TestSingleValueExternalEnum { + a: SingleValueExternalEnum::Double(64.0), + }; + assert_roundtrip(double.clone(), &schema, Vec::new())?; + assert_roundtrip(double, &alt_schema, Vec::new())?; + + let string = TestSingleValueExternalEnum { + a: SingleValueExternalEnum::String("test".to_string()), + }; + assert_roundtrip(string.clone(), &schema, Vec::new())?; + assert_roundtrip(string, &alt_schema, Vec::new())?; + + Ok(()) + } + + #[test] + fn avro_rs_xxx_struct_enum() -> TestResult { + #[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] + struct TestStructExternalEnum { + a: StructExternalEnum, + } + + #[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] + enum StructExternalEnum { + Val1 { x: f32, y: f32 }, + Val2 { x: f32, y: f32 }, + } + + let schema = Schema::parse_str( + r#"{ + "name": "TestStructExternalEnum", + "type": "record", + "fields": [{ + "name": "a", + "type": [ + { + "name": "Val1", + "type": "record", + "fields": [ + { + "name": "x", + "type": "float" + }, + { + "name": "y", + "type": "float" + } + ] + }, + { + "name": "Val2", + "type": "record", + "fields": [ + { + "name": "x", + "type": "float" + }, + { + "name": "y", + "type": "float" + } + ] + } + ] + }] + }"#, + )?; + + let value1 = TestStructExternalEnum { + a: StructExternalEnum::Val1 { x: 1.0, y: 2.0 }, + }; + + assert_roundtrip(value1, &schema, Vec::new())?; + + let value2 = TestStructExternalEnum { + a: StructExternalEnum::Val2 { x: 2.0, y: 1.0 }, + }; + + assert_roundtrip(value2, &schema, Vec::new())?; + + Ok(()) + } + + #[test] + fn avro_rs_xxx_struct_flatten() -> TestResult { + #[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] + struct S1 { + f1: String, + #[serde(flatten)] + inner: S2, + } + + #[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] + struct S2 { + f2: String, + } + + let schema = Schema::parse_str( + r#"{ + "name": "S1", + "type": "record", + "fields": [ + { + "name": "f1", + "type": "string" + }, + { + "name": "f2", + "type": "string" + } + ] + }"#, + )?; + + let value = S1 { + f1: "Hello".to_owned(), + inner: S2 { + f2: "World".to_owned(), + }, + }; + + assert_roundtrip(value, &schema, Vec::new())?; + + Ok(()) + } + + #[test] + fn avro_rs_xxx_tuple_enum() -> TestResult { + #[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] + struct TestTupleExternalEnum { + a: TupleExternalEnum, + } + + #[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] + enum TupleExternalEnum { + Val1(f32, f32), + Val2(f32, f32, f32), + } + + let schema = Schema::parse_str( + r#"{ + "name": "TestTupleExternalEnum", + "type": "record", + "fields": [{ + "name": "a", + "type": [ + { + "name": "Val1", + "type": "record", + "fields": [ + { + "name": "field_0", + "type": "float" + }, + { + "name": "field_1", + "type": "float" + } + ] + }, + { + "name": "Val2", + "type": "record", + "fields": [ + { + "name": "field_0", + "type": "float" + }, + { + "name": "field_1", + "type": "float" + }, + { + "name": "field_2", + "type": "float" + } + ] + } + ] + }] + }"#, + )?; + + let value1 = TestTupleExternalEnum { + a: TupleExternalEnum::Val1(1.0, 2.0), + }; + + assert_roundtrip(value1, &schema, Vec::new())?; + + let value2 = TestTupleExternalEnum { + a: TupleExternalEnum::Val1(2.0, 1.0), + }; + + assert_roundtrip(value2, &schema, Vec::new())?; + + Ok(()) + } + + #[test] + fn avro_rs_xxx_date() -> TestResult { + let schema = Schema::Date; + assert_roundtrip(1i32, &schema, Vec::new())?; + + Ok(()) + } + + #[test] + fn avro_rs_xxx_time_millis() -> TestResult { + let schema = Schema::TimeMillis; + assert_roundtrip(1i32, &schema, Vec::new())?; + + Ok(()) + } + + #[test] + fn avro_rs_xxx_time_micros() -> TestResult { + let schema = Schema::TimeMicros; + assert_roundtrip(1i64, &schema, Vec::new())?; + + Ok(()) + } + + #[test] + fn avro_rs_xxx_timestamp_millis() -> TestResult { + let schema = Schema::TimestampMillis; + assert_roundtrip(1i64, &schema, Vec::new())?; + + Ok(()) + } + + #[test] + fn avro_rs_xxx_timestamp_micros() -> TestResult { + let schema = Schema::TimestampMicros; + assert_roundtrip(1i64, &schema, Vec::new())?; + + Ok(()) + } + + #[test] + fn avro_3916_timestamp_nanos() -> TestResult { + let schema = Schema::TimestampNanos; + assert_roundtrip(1i64, &schema, Vec::new())?; + + Ok(()) + } + + #[test] + fn avro_3853_local_timestamp_millis() -> TestResult { + let schema = Schema::LocalTimestampMillis; + assert_roundtrip(1i64, &schema, Vec::new())?; + + Ok(()) + } + + #[test] + fn avro_3853_local_timestamp_micros() -> TestResult { + let schema = Schema::LocalTimestampMicros; + assert_roundtrip(1i64, &schema, Vec::new())?; + + Ok(()) + } + + #[test] + fn avro_3916_local_timestamp_nanos() -> TestResult { + let schema = Schema::LocalTimestampNanos; + assert_roundtrip(1i64, &schema, Vec::new())?; + + Ok(()) + } + + #[test] + fn avro_rs_xxx_uuid() -> TestResult { + let schema = Schema::parse_str( + r#"{ + "type": "fixed", + "logicalType": "uuid", + "size": 16, + "name": "uuid" + }"#, + )?; + + let alt_schema = Schema::Uuid(UuidSchema::String); + + let uuid = Uuid::parse_str("9ec535ff-3e2a-45bd-91d3-0a01321b5a49")?; + + assert_roundtrip(uuid, &schema, Vec::new())?; + + let buf = GenericDatumWriter::builder(&alt_schema) + // This needs changes in the serializer (is in the next 2 commits) + // .human_readable(true) + .build()? + .write_ser_to_vec(&uuid)?; + + let decoded_value: Uuid = GenericDatumReader::builder(&alt_schema) + .human_readable(true) + .build()? + .read_deser(&mut &buf[..])?; + + assert_eq!(decoded_value, uuid); + + Ok(()) + } + + #[derive(Debug)] + struct Bytes(Vec<u8>); + + impl<'de> Deserialize<'de> for Bytes { + fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> + where + D: serde::Deserializer<'de>, + { + struct BytesVisitor; + impl Visitor<'_> for BytesVisitor { + type Value = Bytes; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str("a byte array") + } + + fn visit_bytes<E>(self, v: &[u8]) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(Bytes(v.to_vec())) + } + } + deserializer.deserialize_bytes(BytesVisitor) + } + } + + #[test] + fn avro_3892_deserialize_bytes_from_decimal() -> TestResult { + let schema = Schema::parse_str( + r#"{ + "type": "bytes", + "logicalType": "decimal", + "precision": 4, + "scale": 2 + }"#, + )?; + let schema_union = Schema::parse_str( + r#"[ + "null", + { + "type": "bytes", + "logicalType": "decimal", + "precision": 4, + "scale": 2 + } + ]"#, + )?; + + let expected_bytes = BigInt::from(123456789).to_signed_bytes_be(); + let value = Decimal::from(&expected_bytes); + let buf = GenericDatumWriter::builder(&schema) + .build()? + .write_ser_to_vec(&value)?; + + let decoded_value: Bytes = GenericDatumReader::builder(&schema) + .build()? + .read_deser(&mut &buf[..])?; + + assert_eq!(decoded_value.0, expected_bytes); + + let buf = GenericDatumWriter::builder(&schema_union) + .build()? + .write_ser_to_vec(&Some(value))?; + + let decoded_value: Option<Bytes> = GenericDatumReader::builder(&schema_union) + .build()? + .read_deser(&mut &buf[..])?; + + assert_eq!(decoded_value.unwrap().0, expected_bytes); + + Ok(()) + } + + #[test] + fn avro_rs_414_deserialize_char_from_string() -> TestResult { + let schema = Schema::String; + + assert_roundtrip('a', &schema, Vec::new())?; + assert_roundtrip('👹', &schema, Vec::new())?; + + Ok(()) + } + + #[test] + fn avro_rs_414_deserialize_char_from_long_string() -> TestResult { + let schema = Schema::String; + let buf = GenericDatumWriter::builder(&schema) + .build()? + .write_ser_to_vec(&"avro")?; + + let error = GenericDatumReader::builder(&schema) + .build()? + .read_deser::<char>(&mut &buf[..]) + .unwrap_err(); + + assert_eq!( + error.to_string(), + r#"Failed to deserialize value of type char using schema String: Read more than one character: "avro""# + ); + + Ok(()) + } +}
