This is an automated email from the ASF dual-hosted git repository. kriskras99 pushed a commit to branch feat/full_enum_support in repository https://gitbox.apache.org/repos/asf/avro-rs.git
commit 46f168c2c12f5c2b2e205759f233897d6ac26b50 Author: default <[email protected]> AuthorDate: Tue Mar 17 16:05:18 2026 +0000 wip: Update tests for `SchemaAwareSerializer` --- avro/src/bigdecimal.rs | 45 ++++++- avro/src/schema/mod.rs | 64 ++++++++++ avro/src/serde/derive.rs | 55 +++++++-- avro/src/serde/mod.rs | 5 +- avro/src/serde/with.rs | 309 +++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 463 insertions(+), 15 deletions(-) diff --git a/avro/src/bigdecimal.rs b/avro/src/bigdecimal.rs index 7a6a379..ee26ee1 100644 --- a/avro/src/bigdecimal.rs +++ b/avro/src/bigdecimal.rs @@ -190,6 +190,47 @@ mod tests { big_decimal: BigDecimal, } + let schema_str = r#" + { + "type": "record", + "name": "Test", + "fields": [ + { + "name": "big_decimal", + "type": "string" + } + ] + } + "#; + let schema = Schema::parse_str(schema_str)?; + + let test = Test::default(); + + // write a record + let mut writer = Writer::new(&schema, Vec::new())?; + writer.append_ser(test.clone())?; + + let wrote_data = writer.into_inner()?; + + // read record + let mut reader = Reader::new(&wrote_data[..])?; + + let value = reader.next().unwrap()?; + + assert_eq!(test, from_value::<Test>(&value)?); + + Ok(()) + } + + #[test] + fn avro_rs_338_deserialize_serde_way_with_bigdecimal() -> TestResult { + #[derive(Clone, PartialEq, Eq, Debug, Default, serde::Deserialize, serde::Serialize)] + #[serde(rename = "test")] + struct Test { + #[serde(with = "crate::serde::bigdecimal")] + big_decimal: BigDecimal, + } + let schema_str = r#" { "type": "record", @@ -216,11 +257,11 @@ mod tests { let wrote_data = writer.into_inner()?; // read record - let mut reader = Reader::new(&wrote_data[..])?; + let mut reader = Reader::new(&wrote_data[..])?.into_deser_iter(); let value = reader.next().unwrap()?; - assert_eq!(test, from_value::<Test>(&value)?); + assert_eq!(test, value); Ok(()) } diff --git a/avro/src/schema/mod.rs b/avro/src/schema/mod.rs index 40476cb..f317cf9 100644 --- a/avro/src/schema/mod.rs +++ b/avro/src/schema/mod.rs @@ -45,6 +45,7 @@ use serde::{ ser::{SerializeMap, SerializeSeq}, }; use serde_json::{Map, Value as JsonValue}; +use std::borrow::Cow; use std::fmt::Formatter; use std::{ collections::{BTreeMap, HashMap, HashSet}, @@ -784,6 +785,69 @@ impl Schema { } Ok(()) } + + /// Derive a name for this schema. + /// + /// The name is a valid schema name and will be unique if the named + /// schemas in this schema have unique names. + pub(crate) fn unique_normalized_name(&self) -> Cow<'static, str> { + match self { + Schema::Null => Cow::Borrowed("n"), + Schema::Boolean => Cow::Borrowed("B"), + Schema::Int => Cow::Borrowed("i"), + Schema::Long => Cow::Borrowed("l"), + Schema::Float => Cow::Borrowed("f"), + Schema::Double => Cow::Borrowed("d"), + Schema::Bytes => Cow::Borrowed("b"), + Schema::String => Cow::Borrowed("s"), + Schema::Array(array) => { + Cow::Owned(format!("a_{}", array.items.unique_normalized_name())) + } + Schema::Map(map) => Cow::Owned(format!("m_{}", map.types.unique_normalized_name())), + Schema::Union(union) => { + let mut name = format!("u_{}", union.schemas.len()); + for schema in &union.schemas { + name.push('_'); + name.push_str(&schema.unique_normalized_name()); + } + Cow::Owned(name) + } + Schema::BigDecimal => Cow::Borrowed("bd"), + Schema::Date => Cow::Borrowed("D"), + Schema::TimeMillis => Cow::Borrowed("t"), + Schema::TimeMicros => Cow::Borrowed("tm"), + Schema::TimestampMillis => Cow::Borrowed("T"), + Schema::TimestampMicros => Cow::Borrowed("TM"), + Schema::TimestampNanos => Cow::Borrowed("TN"), + Schema::LocalTimestampMillis => Cow::Borrowed("L"), + Schema::LocalTimestampMicros => Cow::Borrowed("LM"), + Schema::LocalTimestampNanos => Cow::Borrowed("LN"), + Schema::Decimal(DecimalSchema { + inner: InnerDecimalSchema::Bytes, + precision, + scale, + }) => Cow::Owned(format!("db_{precision}_{scale}")), + Schema::Uuid(UuidSchema::Bytes) => Cow::Borrowed("ub"), + Schema::Uuid(UuidSchema::String) => Cow::Borrowed("us"), + Schema::Record(RecordSchema { name, .. }) + | Schema::Enum(EnumSchema { name, .. }) + | Schema::Fixed(FixedSchema { name, .. }) + | Schema::Decimal(DecimalSchema { + inner: InnerDecimalSchema::Fixed(FixedSchema { name, .. }), + .. + }) + | Schema::Uuid(UuidSchema::Fixed(FixedSchema { name, .. })) + | Schema::Duration(FixedSchema { name, .. }) + | Schema::Ref { name } => { + let name: String = name + .to_string() + .chars() + .map(|c| if c.is_ascii_alphanumeric() { c } else { '_' }) + .collect(); + Cow::Owned(format!("r_{}_{}", name.len(), name)) + } + } + } } impl Serialize for Schema { diff --git a/avro/src/serde/derive.rs b/avro/src/serde/derive.rs index 33c4f5b..e4fe186 100644 --- a/avro/src/serde/derive.rs +++ b/avro/src/serde/derive.rs @@ -630,27 +630,35 @@ where } impl AvroSchemaComponent for core::time::Duration { - /// The schema is [`Schema::Duration`] with the name `duration`. + /// The schema is [`Schema::Record`] with the name `Duration`. /// - /// This is a lossy conversion as this Avro type does not store the amount of nanoseconds. + /// It has two fields: + /// - `secs` with the schema `Schema::Fixed(name: "u64", size: 8)` + /// - `nanos` with the schema `Schema::Long` fn get_schema_in_ctxt( named_schemas: &mut HashSet<Name>, enclosing_namespace: NamespaceRef, ) -> Schema { - let name = Name::new_with_enclosing_namespace("duration", enclosing_namespace) + let name = Name::new_with_enclosing_namespace("Duration", enclosing_namespace) .expect("Name is valid"); if named_schemas.contains(&name) { Schema::Ref { name } } else { - let schema = Schema::Duration(FixedSchema { - name: name.clone(), - aliases: None, - doc: None, - size: 12, - attributes: Default::default(), - }); - named_schemas.insert(name); - schema + named_schemas.insert(name.clone()); + Schema::record(name) + .fields(vec![ + // Secs is an u64 + RecordField::builder() + .name("secs") + .schema(u64::get_schema_in_ctxt(named_schemas, enclosing_namespace)) + .build(), + // Nanos is an u32 + RecordField::builder() + .name("nanos") + .schema(Schema::Long) + .build(), + ]) + .build() } } @@ -666,6 +674,8 @@ impl AvroSchemaComponent for uuid::Uuid { /// The schema is [`Schema::Uuid`] with the name `uuid`. /// /// The underlying schema is [`Schema::Fixed`] with a size of 16. + /// + /// If you're using `human_readable: true` you need to override this schema with a `Schema::String`. fn get_schema_in_ctxt( named_schemas: &mut HashSet<Name>, enclosing_namespace: NamespaceRef, @@ -790,11 +800,14 @@ impl AvroSchemaComponent for i128 { #[cfg(test)] mod tests { + use crate::reader::datum::GenericDatumReader; + use crate::writer::datum::GenericDatumWriter; use crate::{ AvroSchema, Schema, schema::{FixedSchema, Name}, }; use apache_avro_test_helper::TestResult; + use std::io::Cursor; #[test] fn avro_rs_401_str() -> TestResult { @@ -927,4 +940,22 @@ mod tests { fn avro_rs_489_option_option() { <Option<Option<i32>>>::get_schema(); } + + #[test] + fn avro_rs_xxx_std_time_duration() -> TestResult { + let zero = std::time::Duration::ZERO; + let max = std::time::Duration::MAX; + let schema = std::time::Duration::get_schema(); + + let writer = GenericDatumWriter::builder(&schema).build()?; + let written_zero = writer.write_ser_to_vec(&zero)?; + let written_max = writer.write_ser_to_vec(&max)?; + + let reader = GenericDatumReader::builder(&schema).build()?; + let read_zero = reader.read_deser(&mut Cursor::new(written_zero))?; + assert_eq!(zero, read_zero); + let read_max = reader.read_deser(&mut Cursor::new(written_max))?; + assert_eq!(max, read_max); + Ok(()) + } } diff --git a/avro/src/serde/mod.rs b/avro/src/serde/mod.rs index 256963d..3b1718b 100644 --- a/avro/src/serde/mod.rs +++ b/avro/src/serde/mod.rs @@ -118,7 +118,10 @@ mod with; pub use de::from_value; pub use derive::{AvroSchema, AvroSchemaComponent}; pub use ser::to_value; -pub use with::{bytes, bytes_opt, fixed, fixed_opt, slice, slice_opt}; +pub use with::{ + array, array_opt, bigdecimal, bigdecimal_opt, bytes, bytes_opt, fixed, fixed_opt, slice, + slice_opt, +}; #[doc(hidden)] pub use derive::get_record_fields_in_ctxt; diff --git a/avro/src/serde/with.rs b/avro/src/serde/with.rs index 29744e1..74c0a3a 100644 --- a/avro/src/serde/with.rs +++ b/avro/src/serde/with.rs @@ -503,6 +503,315 @@ pub mod slice_opt { } } +/// (De)serialize [`BigDecimal`] as a [`Schema::BigDecimal`] instead of a [`Schema::String`]. +/// +/// This module is intended to be used through the Serde `with` attribute. +/// +/// Use [`apache_avro::serde::bigdecimal_opt`] for optional big decimals values. +/// +/// When used with different serialization formats, this will write bytes. +/// +/// See usage with below example: +/// ``` +/// # use apache_avro::AvroSchema; +/// # use serde::{Deserialize, Serialize}; +/// #[derive(AvroSchema, Serialize, Deserialize)] +/// struct StructWithBigDecimal<'a> { +/// #[avro(with)] +/// #[serde(with = "apache_avro::serde::bigdecimal")] +/// decimal: BigDecimal, +/// } +/// ``` +/// +/// [`BigDecimal`]: ::bigdecimal::BigDecimal +/// [`Schema::BigDecimal`]: crate::Schema::BigDecimal +/// [`Schema::String`]: crate::Schema::String +/// [`apache_avro::serde::bigdecimal_opt`]: bigdecimal_opt +pub mod bigdecimal { + use std::collections::HashSet; + + use bigdecimal::BigDecimal; + use serde::{Deserializer, Serializer, de::Error as _, ser::Error as _}; + + use crate::{ + Schema, + bigdecimal::{big_decimal_as_bytes, deserialize_big_decimal}, + schema::{Name, NamespaceRef, RecordField}, + serde::with::BytesType, + }; + + /// Returns [`Schema::BigDecimal`] + pub fn get_schema_in_ctxt(_: &mut HashSet<Name>, _: NamespaceRef) -> Schema { + Schema::BigDecimal + } + + /// Returns `None` + pub fn get_record_fields_in_ctxt( + _: &mut HashSet<Name>, + _: NamespaceRef, + ) -> Option<Vec<RecordField>> { + None + } + + pub fn serialize<S>(decimal: &BigDecimal, serializer: S) -> Result<S::Ok, S::Error> + where + S: Serializer, + { + let _guard = super::BytesTypeGuard::set(BytesType::Bytes); + let decimal_bytes = big_decimal_as_bytes(decimal).map_err(S::Error::custom)?; + serde_bytes::serialize(&decimal_bytes, serializer) + } + + pub fn deserialize<'de, D>(deserializer: D) -> Result<BigDecimal, D::Error> + where + D: Deserializer<'de>, + { + let _bytes_guard = super::BytesTypeGuard::set(BytesType::Bytes); + let _guard = super::BorrowedGuard::set(true); + // We don't use &'de [u8] here as the deserializer doesn't support that + let bytes: Vec<u8> = serde_bytes::deserialize(deserializer)?; + + deserialize_big_decimal(&bytes).map_err(D::Error::custom) + } +} + +/// (De)serialize [`Option<BigDecimal>`] as a `Schema::Union(Schema::Null, Schema::BigDecimal)` instead of a `Schema::Union(Schema::Null, Schema::String)`. +/// +/// This module is intended to be used through the Serde `with` attribute. +/// +/// Use [`apache_avro::serde::bigdecimal`] for non-optional big decimals values. +/// +/// When used with different serialization formats, this will write bytes. +/// +/// See usage with below example: +/// ``` +/// # use apache_avro::AvroSchema; +/// # use serde::{Deserialize, Serialize}; +/// #[derive(AvroSchema, Serialize, Deserialize)] +/// struct StructWithBigDecimal<'a> { +/// #[avro(with)] +/// #[serde(with = "apache_avro::serde::bigdecimal_opt")] +/// decimal: Option<BigDecimal>, +/// } +/// ``` +/// +/// [`Option<BigDecimal>`]: ::bigdecimal::BigDecimal +/// [`apache_avro::serde::bigdecimal`]: bigdecimal +pub mod bigdecimal_opt { + use std::collections::HashSet; + + use bigdecimal::BigDecimal; + use serde::{Deserializer, Serializer, de::Error as _, ser::Error as _}; + + use crate::{ + Schema, + bigdecimal::{big_decimal_as_bytes, deserialize_big_decimal}, + schema::{Name, NamespaceRef, RecordField, UnionSchema}, + serde::with::BytesType, + }; + + /// Returns `Schema::Union(Schema::Null, Schema::BigDecimal)` + pub fn get_schema_in_ctxt(_: &mut HashSet<Name>, _: NamespaceRef) -> Schema { + Schema::Union( + UnionSchema::new(vec![Schema::Null, Schema::BigDecimal]) + .expect("This is a valid union"), + ) + } + + /// Returns `None` + pub fn get_record_fields_in_ctxt( + _: &mut HashSet<Name>, + _: NamespaceRef, + ) -> Option<Vec<RecordField>> { + None + } + + pub fn serialize<S>(decimal: &Option<BigDecimal>, serializer: S) -> Result<S::Ok, S::Error> + where + S: Serializer, + { + let _guard = super::BytesTypeGuard::set(BytesType::Bytes); + if let Some(decimal) = decimal { + let decimal_bytes = big_decimal_as_bytes(decimal).map_err(S::Error::custom)?; + serde_bytes::serialize(&Some(decimal_bytes), serializer) + } else { + serde_bytes::serialize(&None::<Vec<u8>>, serializer) + } + } + + pub fn deserialize<'de, D>(deserializer: D) -> Result<Option<BigDecimal>, D::Error> + where + D: Deserializer<'de>, + { + let _bytes_guard = super::BytesTypeGuard::set(BytesType::Bytes); + let _guard = super::BorrowedGuard::set(true); + let bytes: Option<Vec<u8>> = serde_bytes::deserialize(deserializer)?; + if let Some(bytes) = bytes { + deserialize_big_decimal(&bytes) + .map(Some) + .map_err(D::Error::custom) + } else { + Ok(None) + } + } +} + +/// (De)serialize an Rust array (`[T; N]`) as an Avro [`Schema::Array`]. +/// +/// This module is intended to be used through the Serde `with` attribute. +/// +/// Use [`apache_avro::serde::array_opt`] for optional array values. +/// +/// See usage with below example: +/// ``` +/// # use apache_avro::AvroSchema; +/// # use serde::{Deserialize, Serialize}; +/// #[derive(AvroSchema, Serialize, Deserialize)] +/// struct StructWithBytes<'a> { +/// #[avro(with = apache_avro::serde::array::get_schema_in_ctxt::<i32>)] +/// #[serde(with = "apache_avro::serde::array")] +/// array: [i32; 10], +/// } +/// ``` +/// +/// [`apache_avro::serde::array_opt`]: array_opt +/// [`Schema::Array`]: crate::schema::Schema::Array +pub mod array { + use crate::{ + AvroSchemaComponent, Schema, + schema::{Name, NamespaceRef, RecordField}, + }; + use serde::de::DeserializeOwned; + use serde::{Deserialize, Deserializer, Serialize, Serializer, de::Error as _}; + use std::collections::HashSet; + + /// Returns `Schema::Array(T::get_schema_in_ctxt())` + pub fn get_schema_in_ctxt<T: AvroSchemaComponent>( + named_schemas: &mut HashSet<Name>, + enclosing_namespace: NamespaceRef, + ) -> Schema { + Schema::array(T::get_schema_in_ctxt(named_schemas, enclosing_namespace)).build() + } + + /// Returns `None` + pub fn get_record_fields_in_ctxt( + _: &mut HashSet<Name>, + _: NamespaceRef, + ) -> Option<Vec<RecordField>> { + None + } + + pub fn serialize<const N: usize, S, T>(value: &[T; N], serializer: S) -> Result<S::Ok, S::Error> + where + S: Serializer, + T: Serialize, + { + value.as_slice().serialize(serializer) + } + + pub fn deserialize<'de, const N: usize, D, T>(deserializer: D) -> Result<[T; N], D::Error> + where + D: Deserializer<'de>, + T: DeserializeOwned, + { + let bytes = <Vec<T> as Deserialize>::deserialize(deserializer)?; + bytes.try_into().map_err(|v: Vec<T>| { + D::Error::custom(format!( + "Deserialized array has length {} which does not match array length of {N}", + v.len() + )) + }) + } +} + +/// (De)serialize an optional Rust array (`Option<[T; N]>`) as an Avro `Schema::Union([Schema::Null, Schema::Array])`. +/// +/// This module is intended to be used through the Serde `with` attribute. +/// +/// Use [`apache_avro::serde::array`] for non-optional array values. +/// +/// When used with different serialization formats, this is equivalent to [`serde_bytes`]. +/// +/// See usage with below example: +/// ``` +/// # use apache_avro::AvroSchema; +/// # use serde::{Deserialize, Serialize}; +/// #[derive(AvroSchema, Serialize, Deserialize)] +/// struct StructWithBytes<'a> { +/// #[avro(with = apache_avro::serde::array_opt::get_schema_in_ctxt::<i32>)] +/// #[serde(with = "apache_avro::serde::array_opt")] +/// array: Option<[i32; 10]>, +/// } +/// ``` +/// +/// [`apache_avro::serde::array`]: mod@array +pub mod array_opt { + use serde::{Deserialize, Deserializer, Serialize, Serializer, de::Error as _}; + use std::collections::HashSet; + + use crate::{ + AvroSchemaComponent, Schema, + schema::{Name, NamespaceRef, RecordField, UnionSchema}, + }; + + /// Returns `Schema::Union(Schema::Null, Schema::Array(T::get_schema_in_ctxt()))` + pub fn get_schema_in_ctxt<T: AvroSchemaComponent>( + named_schemas: &mut HashSet<Name>, + enclosing_namespace: NamespaceRef, + ) -> Schema { + Schema::Union( + UnionSchema::new(vec![ + Schema::Null, + Schema::array(T::get_schema_in_ctxt(named_schemas, enclosing_namespace)).build(), + ]) + .expect("This is a valid union"), + ) + } + + /// Returns `None` + pub fn get_record_fields_in_ctxt( + _: &mut HashSet<Name>, + _: NamespaceRef, + ) -> Option<Vec<RecordField>> { + None + } + + pub fn serialize<const N: usize, S, T>( + value: &Option<[T; N]>, + serializer: S, + ) -> Result<S::Ok, S::Error> + where + S: Serializer, + T: Serialize, + { + if let Some(array) = value { + Some(array.as_slice()).serialize(serializer) + } else { + None::<Vec<T>>.serialize(serializer) + } + } + + pub fn deserialize<'de, const N: usize, D, T>( + deserializer: D, + ) -> Result<Option<[T; N]>, D::Error> + where + D: Deserializer<'de>, + T: Deserialize<'de>, + { + let bytes = <Option<Vec<T>> as Deserialize>::deserialize(deserializer)?; + if let Some(bytes) = bytes { + Ok(Some(bytes.try_into().map_err(|v: Vec<T>| { + D::Error::custom(format!( + "Deserialized array has length {} which does not match array length of {N}", + v.len() + )) + })?)) + } else { + Ok(None) + } + } +} + #[cfg(test)] mod tests { use crate::{Schema, from_value, to_value, types::Value};
