This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a commit to branch avro-3683-multiple-schemas
in repository https://gitbox.apache.org/repos/asf/avro.git

commit cc5e7d32bf584996cb0d23db787bb88ba3f40498
Author: Martin Tzvetanov Grigorov <[email protected]>
AuthorDate: Fri Dec 9 16:16:01 2022 +0200

    AVRO-3683: Add support for using multiple schemata for 
resolve/validate/write
    
    Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
---
 lang/rust/avro/src/encode.rs |  5 +++
 lang/rust/avro/src/schema.rs | 88 ++++++++++++++++++++++----------------------
 lang/rust/avro/src/writer.rs | 45 ++++++++++++++++++++++
 3 files changed, 95 insertions(+), 43 deletions(-)

diff --git a/lang/rust/avro/src/encode.rs b/lang/rust/avro/src/encode.rs
index 2ae48f91c..5f0819b60 100644
--- a/lang/rust/avro/src/encode.rs
+++ b/lang/rust/avro/src/encode.rs
@@ -37,6 +37,11 @@ pub fn encode(value: &Value, schema: &Schema, buffer: &mut 
Vec<u8>) -> AvroResul
     encode_internal(value, schema, rs.get_names(), &None, buffer)
 }
 
+pub fn encode_schemata(value: &Value, schemata: &[&Schema], buffer: &mut 
Vec<u8>) -> AvroResult<()> {
+    let rs = ResolvedSchema::try_from(schemata)?;
+    encode_internal(value, schema, rs.get_names(), &None, buffer)
+}
+
 fn encode_bytes<B: AsRef<[u8]> + ?Sized>(s: &B, buffer: &mut Vec<u8>) {
     let bytes = s.as_ref();
     encode_long(bytes.len() as i64, buffer);
diff --git a/lang/rust/avro/src/schema.rs b/lang/rust/avro/src/schema.rs
index 6aa1d7fe0..3e73e59a2 100644
--- a/lang/rust/avro/src/schema.rs
+++ b/lang/rust/avro/src/schema.rs
@@ -401,19 +401,19 @@ impl Serialize for Alias {
 
 pub(crate) struct ResolvedSchema<'s> {
     names_ref: NamesRef<'s>,
-    root_schema: &'s Schema,
+    schemata: &[&'s Schema],
 }
 
 impl<'s> TryFrom<&'s Schema> for ResolvedSchema<'s> {
     type Error = Error;
 
-    fn try_from(schema: &'s Schema) -> AvroResult<Self> {
+    fn try_from(schemata: &[&'s Schema]) -> AvroResult<Self> {
         let names = HashMap::new();
         let mut rs = ResolvedSchema {
             names_ref: names,
-            root_schema: schema,
+            schemata,
         };
-        Self::from_internal(rs.root_schema, &mut rs.names_ref, &None)?;
+        Self::from_internal(rs.schemata, &mut rs.names_ref, &None)?;
         Ok(rs)
     }
 }
@@ -427,54 +427,56 @@ impl<'s> ResolvedSchema<'s> {
     }
 
     fn from_internal(
-        schema: &'s Schema,
+        schemata: &[&'s Schema],
         names_ref: &mut NamesRef<'s>,
         enclosing_namespace: &Namespace,
     ) -> AvroResult<()> {
-        match schema {
-            Schema::Array(schema) | Schema::Map(schema) => {
-                Self::from_internal(schema, names_ref, enclosing_namespace)
-            }
-            Schema::Union(UnionSchema { schemas, .. }) => {
-                for schema in schemas {
-                    Self::from_internal(schema, names_ref, 
enclosing_namespace)?
+        for schema in schemata {
+            match schema {
+                Schema::Array(schema) | Schema::Map(schema) => {
+                    Self::from_internal(schema, names_ref, enclosing_namespace)
                 }
-                Ok(())
-            }
-            Schema::Enum { name, .. } | Schema::Fixed { name, .. } => {
-                let fully_qualified_name = 
name.fully_qualified_name(enclosing_namespace);
-                if names_ref
-                    .insert(fully_qualified_name.clone(), schema)
-                    .is_some()
-                {
-                    Err(Error::AmbiguousSchemaDefinition(fully_qualified_name))
-                } else {
+                Schema::Union(UnionSchema { schemas, .. }) => {
+                    for schema in schemas {
+                        Self::from_internal(schema, names_ref, 
enclosing_namespace)?
+                    }
                     Ok(())
                 }
-            }
-            Schema::Record { name, fields, .. } => {
-                let fully_qualified_name = 
name.fully_qualified_name(enclosing_namespace);
-                if names_ref
-                    .insert(fully_qualified_name.clone(), schema)
-                    .is_some()
-                {
-                    Err(Error::AmbiguousSchemaDefinition(fully_qualified_name))
-                } else {
-                    let record_namespace = fully_qualified_name.namespace;
-                    for field in fields {
-                        Self::from_internal(&field.schema, names_ref, 
&record_namespace)?
+                Schema::Enum { name, .. } | Schema::Fixed { name, .. } => {
+                    let fully_qualified_name = 
name.fully_qualified_name(enclosing_namespace);
+                    if names_ref
+                        .insert(fully_qualified_name.clone(), schema)
+                        .is_some()
+                    {
+                        
Err(Error::AmbiguousSchemaDefinition(fully_qualified_name))
+                    } else {
+                        Ok(())
                     }
-                    Ok(())
                 }
+                Schema::Record { name, fields, .. } => {
+                    let fully_qualified_name = 
name.fully_qualified_name(enclosing_namespace);
+                    if names_ref
+                        .insert(fully_qualified_name.clone(), schema)
+                        .is_some()
+                    {
+                        
Err(Error::AmbiguousSchemaDefinition(fully_qualified_name))
+                    } else {
+                        let record_namespace = fully_qualified_name.namespace;
+                        for field in fields {
+                            Self::from_internal(&field.schema, names_ref, 
&record_namespace)?
+                        }
+                        Ok(())
+                    }
+                }
+                Schema::Ref { name } => {
+                    let fully_qualified_name = 
name.fully_qualified_name(enclosing_namespace);
+                    names_ref
+                        .get(&fully_qualified_name)
+                        .map(|_| ())
+                        
.ok_or(Error::SchemaResolutionError(fully_qualified_name))
+                }
+                _ => Ok(()),
             }
-            Schema::Ref { name } => {
-                let fully_qualified_name = 
name.fully_qualified_name(enclosing_namespace);
-                names_ref
-                    .get(&fully_qualified_name)
-                    .map(|_| ())
-                    .ok_or(Error::SchemaResolutionError(fully_qualified_name))
-            }
-            _ => Ok(()),
         }
     }
 }
diff --git a/lang/rust/avro/src/writer.rs b/lang/rust/avro/src/writer.rs
index 3bc5c5122..6ed93fbc7 100644
--- a/lang/rust/avro/src/writer.rs
+++ b/lang/rust/avro/src/writer.rs
@@ -376,6 +376,19 @@ fn write_avro_datum<T: Into<Value>>(
     Ok(())
 }
 
+fn write_avro_datum_schemata<T: Into<Value>>(
+    schemata: &[&Schema],
+    value: T,
+    buffer: &mut Vec<u8>,
+) -> Result<(), Error> {
+    let avro = value.into();
+    if !avro.validate(schemata) {
+        return Err(Error::Validation);
+    }
+    encode(&avro, schemata, buffer)?;
+    Ok(())
+}
+
 /// Writer that encodes messages according to the single object encoding v1 
spec
 /// Uses an API similar to the current File Writer
 /// Writes all object bytes at once, and drains internal buffer
@@ -541,6 +554,12 @@ pub fn to_avro_datum<T: Into<Value>>(schema: &Schema, 
value: T) -> AvroResult<Ve
     Ok(buffer)
 }
 
+pub fn to_avro_datum_schemata<T: Into<Value>>(schemata: &[&Schema], value: T) 
-> AvroResult<Vec<u8>> {
+    let mut buffer = Vec::new();
+    write_avro_datum(schemata, value, &mut buffer)?;
+    Ok(buffer)
+}
+
 #[cfg(not(target_arch = "wasm32"))]
 fn generate_sync_marker() -> [u8; 16] {
     let mut marker = [0_u8; 16];
@@ -1243,4 +1262,30 @@ mod tests {
         assert_eq!(buf1, buf2);
         assert_eq!(buf1, buf3);
     }
+
+    #[test]
+    fn test_multiple_schemata_to_avro_datum() {
+        let schema_a_str = r#"{
+        "name": "A",
+        "type": "record",
+        "fields": [
+            {"name": "field_a", "type": "float"}
+        ]
+    }"#;
+        let schema_b_str = r#"{
+        "name": "B",
+        "type": "record",
+        "fields": [
+            {"name": "field_b", "type": "A"}
+        ]
+    }"#;
+
+        let schemata = Schema::parse_list(&[schema_a_str, 
schema_b_str]).unwrap();
+        let record = Value::Record(vec![(
+            "field_b".into(),
+            Value::Record(vec![("field_a".into(), Value::Float(1.0))]),
+        )]);
+
+        assert_eq!(to_avro_datum(&schema, record).unwrap(), expected);
+    }
 }

Reply via email to