This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a commit to branch avro-3683-multiple-schemas
in repository https://gitbox.apache.org/repos/asf/avro.git


The following commit(s) were added to refs/heads/avro-3683-multiple-schemas by 
this push:
     new 0c2e2183d AVRO-3683: WIP
0c2e2183d is described below

commit 0c2e2183da7b4a89f5a33447d7a9611e33986b03
Author: Martin Tzvetanov Grigorov <[email protected]>
AuthorDate: Thu Dec 15 00:42:50 2022 +0200

    AVRO-3683: WIP
    
    Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
---
 lang/rust/avro/src/encode.rs | 15 ++++++++++++--
 lang/rust/avro/src/schema.rs | 39 ++++++++++++++++++++++-------------
 lang/rust/avro/src/types.rs  | 28 +++++++++++++++----------
 lang/rust/avro/src/writer.rs | 49 ++++++++++++++++++++++++++------------------
 4 files changed, 84 insertions(+), 47 deletions(-)

diff --git a/lang/rust/avro/src/encode.rs b/lang/rust/avro/src/encode.rs
index 5f0819b60..3cdb067fe 100644
--- a/lang/rust/avro/src/encode.rs
+++ b/lang/rust/avro/src/encode.rs
@@ -37,9 +37,20 @@ pub fn encode(value: &Value, schema: &Schema, buffer: &mut 
Vec<u8>) -> AvroResul
     encode_internal(value, schema, rs.get_names(), &None, buffer)
 }
 
-pub fn encode_schemata(value: &Value, schemata: &[&Schema], buffer: &mut 
Vec<u8>) -> AvroResult<()> {
+pub fn encode_schemata(
+    value: &Value,
+    schemata: &[&Schema],
+    buffer: &mut Vec<u8>,
+) -> AvroResult<()> {
     let rs = ResolvedSchema::try_from(schemata)?;
-    encode_internal(value, schema, rs.get_names(), &None, buffer)
+    for schema in schemata {
+        if value.validate(schema) {
+            encode_internal(value, schema, rs.get_names(), &None, buffer)?;
+            break;
+        }
+    }
+
+    todo!("Err(None of the provided schemata matched the value)")
 }
 
 fn encode_bytes<B: AsRef<[u8]> + ?Sized>(s: &B, buffer: &mut Vec<u8>) {
diff --git a/lang/rust/avro/src/schema.rs b/lang/rust/avro/src/schema.rs
index 3e73e59a2..5784f1901 100644
--- a/lang/rust/avro/src/schema.rs
+++ b/lang/rust/avro/src/schema.rs
@@ -401,12 +401,26 @@ impl Serialize for Alias {
 
 pub(crate) struct ResolvedSchema<'s> {
     names_ref: NamesRef<'s>,
-    schemata: &[&'s Schema],
+    schemata: &'s [&'s Schema],
 }
 
 impl<'s> TryFrom<&'s Schema> for ResolvedSchema<'s> {
     type Error = Error;
 
+    fn try_from(schema: &'s Schema) -> AvroResult<Self> {
+        let names = HashMap::new();
+        let mut rs = ResolvedSchema {
+            names_ref: names,
+            schemata: &[schema],
+        };
+        Self::from_internal(rs.schemata, &mut rs.names_ref, &None)?;
+        Ok(rs)
+    }
+}
+
+impl<'s> TryFrom<&'s [&'s Schema]> for ResolvedSchema<'s> {
+    type Error = Error;
+
     fn try_from(schemata: &[&'s Schema]) -> AvroResult<Self> {
         let names = HashMap::new();
         let mut rs = ResolvedSchema {
@@ -419,28 +433,27 @@ impl<'s> TryFrom<&'s Schema> for ResolvedSchema<'s> {
 }
 
 impl<'s> ResolvedSchema<'s> {
-    pub(crate) fn get_root_schema(&self) -> &'s Schema {
-        self.root_schema
+    pub(crate) fn get_schemata(&self) -> &'s [&'s Schema] {
+        self.schemata
     }
     pub(crate) fn get_names(&self) -> &NamesRef<'s> {
         &self.names_ref
     }
 
     fn from_internal(
-        schemata: &[&'s Schema],
+        schemata: &'s [&'s Schema],
         names_ref: &mut NamesRef<'s>,
         enclosing_namespace: &Namespace,
     ) -> AvroResult<()> {
         for schema in schemata {
             match schema {
                 Schema::Array(schema) | Schema::Map(schema) => {
-                    Self::from_internal(schema, names_ref, enclosing_namespace)
+                    Self::from_internal(&[schema], names_ref, 
enclosing_namespace)?
                 }
                 Schema::Union(UnionSchema { schemas, .. }) => {
                     for schema in schemas {
-                        Self::from_internal(schema, names_ref, 
enclosing_namespace)?
+                        Self::from_internal(&[schema], names_ref, 
enclosing_namespace)?
                     }
-                    Ok(())
                 }
                 Schema::Enum { name, .. } | Schema::Fixed { name, .. } => {
                     let fully_qualified_name = 
name.fully_qualified_name(enclosing_namespace);
@@ -448,9 +461,7 @@ impl<'s> ResolvedSchema<'s> {
                         .insert(fully_qualified_name.clone(), schema)
                         .is_some()
                     {
-                        
Err(Error::AmbiguousSchemaDefinition(fully_qualified_name))
-                    } else {
-                        Ok(())
+                        return 
Err(Error::AmbiguousSchemaDefinition(fully_qualified_name));
                     }
                 }
                 Schema::Record { name, fields, .. } => {
@@ -459,13 +470,12 @@ impl<'s> ResolvedSchema<'s> {
                         .insert(fully_qualified_name.clone(), schema)
                         .is_some()
                     {
-                        
Err(Error::AmbiguousSchemaDefinition(fully_qualified_name))
+                        return 
Err(Error::AmbiguousSchemaDefinition(fully_qualified_name));
                     } else {
                         let record_namespace = fully_qualified_name.namespace;
                         for field in fields {
-                            Self::from_internal(&field.schema, names_ref, 
&record_namespace)?
+                            Self::from_internal(&[&field.schema], names_ref, 
&record_namespace)?
                         }
-                        Ok(())
                     }
                 }
                 Schema::Ref { name } => {
@@ -473,11 +483,12 @@ impl<'s> ResolvedSchema<'s> {
                     names_ref
                         .get(&fully_qualified_name)
                         .map(|_| ())
-                        
.ok_or(Error::SchemaResolutionError(fully_qualified_name))
+                        .ok_or_else(|| return 
Error::SchemaResolutionError(fully_qualified_name))
                 }
                 _ => Ok(()),
             }
         }
+        Ok(())
     }
 }
 
diff --git a/lang/rust/avro/src/types.rs b/lang/rust/avro/src/types.rs
index 940ca17aa..ba18f7714 100644
--- a/lang/rust/avro/src/types.rs
+++ b/lang/rust/avro/src/types.rs
@@ -339,19 +339,25 @@ impl Value {
     /// See the [Avro 
specification](https://avro.apache.org/docs/current/spec.html)
     /// for the full set of rules of schema validation.
     pub fn validate(&self, schema: &Schema) -> bool {
-        let rs = ResolvedSchema::try_from(schema).expect("Schema didn't 
successfully parse");
-        let enclosing_namespace = schema.namespace();
+        self.validate_schemata(&[schema])
+    }
 
-        match self.validate_internal(schema, rs.get_names(), 
&enclosing_namespace) {
-            Some(error_msg) => {
-                error!(
-                    "Invalid value: {:?} for schema: {:?}. Reason: {}",
-                    self, schema, error_msg
-                );
-                false
+    pub fn validate_schemata(&self, schemata: &[&Schema]) -> bool {
+        let rs = ResolvedSchema::try_from(schemata).expect("Schemata didn't 
successfully resolve");
+        schemata.iter().any(|schema| {
+            let enclosing_namespace = schema.namespace();
+
+            match self.validate_internal(schema, rs.get_names(), 
&enclosing_namespace) {
+                Some(error_msg) => {
+                    error!(
+                        "Invalid value: {:?} for schema: {:?}. Reason: {}",
+                        self, schema, error_msg
+                    );
+                    false
+                }
+                None => true,
             }
-            None => true,
-        }
+        })
     }
 
     fn accumulate(accumulator: Option<String>, other: Option<String>) -> 
Option<String> {
diff --git a/lang/rust/avro/src/writer.rs b/lang/rust/avro/src/writer.rs
index 6ed93fbc7..c777e31cd 100644
--- a/lang/rust/avro/src/writer.rs
+++ b/lang/rust/avro/src/writer.rs
@@ -16,6 +16,7 @@
 // under the License.
 
 //! Logic handling writing in Avro format at user level.
+use crate::encode::encode_schemata;
 use crate::{
     encode::{encode, encode_internal, encode_to_vec},
     rabin::Rabin,
@@ -382,10 +383,10 @@ fn write_avro_datum_schemata<T: Into<Value>>(
     buffer: &mut Vec<u8>,
 ) -> Result<(), Error> {
     let avro = value.into();
-    if !avro.validate(schemata) {
+    if !avro.validate_schemata(schemata) {
         return Err(Error::Validation);
     }
-    encode(&avro, schemata, buffer)?;
+    encode_schemata(&avro, schemata, buffer)?;
     Ok(())
 }
 
@@ -501,21 +502,22 @@ fn write_value_ref_resolved(
     value: &Value,
     buffer: &mut Vec<u8>,
 ) -> AvroResult<()> {
-    let root_schema = resolved_schema.get_root_schema();
-    if let Some(err) = value.validate_internal(
-        root_schema,
-        resolved_schema.get_names(),
-        &root_schema.namespace(),
-    ) {
-        return Err(Error::ValidationWithReason(err));
+    let schemata = resolved_schema.get_schemata();
+    for schema in schemata {
+        if let Some(err) =
+            value.validate_internal(schema, resolved_schema.get_names(), 
&schema.namespace())
+        {
+            return Err(Error::ValidationWithReason(err));
+        } else {
+            encode_internal(
+                value,
+                schema,
+                resolved_schema.get_names(),
+                &schema.namespace(),
+                buffer,
+            )?;
+        }
     }
-    encode_internal(
-        value,
-        root_schema,
-        resolved_schema.get_names(),
-        &root_schema.namespace(),
-        buffer,
-    )?;
     Ok(())
 }
 
@@ -554,9 +556,12 @@ pub fn to_avro_datum<T: Into<Value>>(schema: &Schema, 
value: T) -> AvroResult<Ve
     Ok(buffer)
 }
 
-pub fn to_avro_datum_schemata<T: Into<Value>>(schemata: &[&Schema], value: T) 
-> AvroResult<Vec<u8>> {
+pub fn to_avro_datum_schemata<T: Into<Value>>(
+    schemata: &[&Schema],
+    value: T,
+) -> AvroResult<Vec<u8>> {
     let mut buffer = Vec::new();
-    write_avro_datum(schemata, value, &mut buffer)?;
+    write_avro_datum_schemata(schemata, value, &mut buffer)?;
     Ok(buffer)
 }
 
@@ -1264,7 +1269,7 @@ mod tests {
     }
 
     #[test]
-    fn test_multiple_schemata_to_avro_datum() {
+    fn test_avro_3683_multiple_schemata_to_avro_datum() {
         let schema_a_str = r#"{
         "name": "A",
         "type": "record",
@@ -1286,6 +1291,10 @@ mod tests {
             Value::Record(vec![("field_a".into(), Value::Float(1.0))]),
         )]);
 
-        assert_eq!(to_avro_datum(&schema, record).unwrap(), expected);
+        let expected: Vec<u8> = Vec::new();
+        assert_eq!(
+            to_avro_datum_schemata(&schemata[..], record).unwrap(),
+            expected
+        );
     }
 }

Reply via email to