This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/avro.git


The following commit(s) were added to refs/heads/main by this push:
     new f3b6ee2d3 AVRO-4010: [Rust] Avoid re-resolving schema on every read() 
(#2995)
f3b6ee2d3 is described below

commit f3b6ee2d32ae5200675e345b4d26b151caf3034b
Author: Michael Spector <[email protected]>
AuthorDate: Wed Jul 3 17:19:27 2024 +0300

    AVRO-4010: [Rust] Avoid re-resolving schema on every read() (#2995)
    
    Co-authored-by: Michael Spector <[email protected]>
---
 lang/rust/avro/src/reader.rs |  26 ++++++++---
 lang/rust/avro/src/schema.rs | 101 ++++++++++++++++++++++++-------------------
 2 files changed, 75 insertions(+), 52 deletions(-)

diff --git a/lang/rust/avro/src/reader.rs b/lang/rust/avro/src/reader.rs
index adefed203..121f8e257 100644
--- a/lang/rust/avro/src/reader.rs
+++ b/lang/rust/avro/src/reader.rs
@@ -20,7 +20,10 @@ use crate::{
     decode::{decode, decode_internal},
     from_value,
     rabin::Rabin,
-    schema::{AvroSchema, Names, ResolvedOwnedSchema, ResolvedSchema, Schema},
+    schema::{
+        resolve_names, resolve_names_with_schemata, AvroSchema, Names, 
ResolvedOwnedSchema,
+        ResolvedSchema, Schema,
+    },
     types::Value,
     util, AvroResult, Codec, Error,
 };
@@ -47,6 +50,7 @@ struct Block<'r, R> {
     writer_schema: Schema,
     schemata: Vec<&'r Schema>,
     user_metadata: HashMap<String, Vec<u8>>,
+    names_refs: Names,
 }
 
 impl<'r, R: Read> Block<'r, R> {
@@ -61,6 +65,7 @@ impl<'r, R: Read> Block<'r, R> {
             message_count: 0,
             marker: [0; 16],
             user_metadata: Default::default(),
+            names_refs: Default::default(),
         };
 
         block.read_header()?;
@@ -179,13 +184,18 @@ impl<'r, R: Read> Block<'r, R> {
 
         let mut block_bytes = &self.buf[self.buf_idx..];
         let b_original = block_bytes.len();
-        let schemata = if self.schemata.is_empty() {
-            vec![&self.writer_schema]
-        } else {
-            self.schemata.clone()
+
+        let item = decode_internal(
+            &self.writer_schema,
+            &self.names_refs,
+            &None,
+            &mut block_bytes,
+        )?;
+        let item = match read_schema {
+            Some(schema) => item.resolve(schema)?,
+            None => item,
         };
-        let item =
-            from_avro_datum_schemata(&self.writer_schema, schemata, &mut 
block_bytes, read_schema)?;
+
         if b_original == block_bytes.len() {
             // from_avro_datum did not consume any bytes, so return an error 
to avoid an infinite loop
             return Err(Error::ReadBlock);
@@ -214,8 +224,10 @@ impl<'r, R: Read> Block<'r, R> {
                 .map(|(name, schema)| (name.clone(), (*schema).clone()))
                 .collect();
             self.writer_schema = Schema::parse_with_names(&json, names)?;
+            resolve_names_with_schemata(&self.schemata, &mut self.names_refs, 
&None)?;
         } else {
             self.writer_schema = Schema::parse(&json)?;
+            resolve_names(&self.writer_schema, &mut self.names_refs, &None)?;
         }
         Ok(())
     }
diff --git a/lang/rust/avro/src/schema.rs b/lang/rust/avro/src/schema.rs
index b8a64dffd..f58892ca0 100644
--- a/lang/rust/avro/src/schema.rs
+++ b/lang/rust/avro/src/schema.rs
@@ -545,7 +545,7 @@ impl TryFrom<Schema> for ResolvedOwnedSchema {
             names,
             root_schema: schema,
         };
-        Self::from_internal(&rs.root_schema, &mut rs.names, &None)?;
+        resolve_names(&rs.root_schema, &mut rs.names, &None)?;
         Ok(rs)
     }
 }
@@ -557,57 +557,68 @@ impl ResolvedOwnedSchema {
     pub(crate) fn get_names(&self) -> &Names {
         &self.names
     }
+}
 
-    fn from_internal(
-        schema: &Schema,
-        names: &mut Names,
-        enclosing_namespace: &Namespace,
-    ) -> AvroResult<()> {
-        match schema {
-            Schema::Array(schema) => Self::from_internal(&schema.items, names, 
enclosing_namespace),
-            Schema::Map(schema) => Self::from_internal(&schema.types, names, 
enclosing_namespace),
-            Schema::Union(UnionSchema { schemas, .. }) => {
-                for schema in schemas {
-                    Self::from_internal(schema, names, enclosing_namespace)?
-                }
-                Ok(())
+pub(crate) fn resolve_names(
+    schema: &Schema,
+    names: &mut Names,
+    enclosing_namespace: &Namespace,
+) -> AvroResult<()> {
+    match schema {
+        Schema::Array(schema) => resolve_names(&schema.items, names, 
enclosing_namespace),
+        Schema::Map(schema) => resolve_names(&schema.types, names, 
enclosing_namespace),
+        Schema::Union(UnionSchema { schemas, .. }) => {
+            for schema in schemas {
+                resolve_names(schema, names, enclosing_namespace)?
             }
-            Schema::Enum(EnumSchema { name, .. }) | Schema::Fixed(FixedSchema 
{ name, .. }) => {
-                let fully_qualified_name = 
name.fully_qualified_name(enclosing_namespace);
-                if names
-                    .insert(fully_qualified_name.clone(), schema.clone())
-                    .is_some()
-                {
-                    Err(Error::AmbiguousSchemaDefinition(fully_qualified_name))
-                } else {
-                    Ok(())
-                }
+            Ok(())
+        }
+        Schema::Enum(EnumSchema { name, .. }) | Schema::Fixed(FixedSchema { 
name, .. }) => {
+            let fully_qualified_name = 
name.fully_qualified_name(enclosing_namespace);
+            if names
+                .insert(fully_qualified_name.clone(), schema.clone())
+                .is_some()
+            {
+                Err(Error::AmbiguousSchemaDefinition(fully_qualified_name))
+            } else {
+                Ok(())
             }
-            Schema::Record(RecordSchema { name, fields, .. }) => {
-                let fully_qualified_name = 
name.fully_qualified_name(enclosing_namespace);
-                if names
-                    .insert(fully_qualified_name.clone(), schema.clone())
-                    .is_some()
-                {
-                    Err(Error::AmbiguousSchemaDefinition(fully_qualified_name))
-                } else {
-                    let record_namespace = fully_qualified_name.namespace;
-                    for field in fields {
-                        Self::from_internal(&field.schema, names, 
&record_namespace)?
-                    }
-                    Ok(())
+        }
+        Schema::Record(RecordSchema { name, fields, .. }) => {
+            let fully_qualified_name = 
name.fully_qualified_name(enclosing_namespace);
+            if names
+                .insert(fully_qualified_name.clone(), schema.clone())
+                .is_some()
+            {
+                Err(Error::AmbiguousSchemaDefinition(fully_qualified_name))
+            } else {
+                let record_namespace = fully_qualified_name.namespace;
+                for field in fields {
+                    resolve_names(&field.schema, names, &record_namespace)?
                 }
+                Ok(())
             }
-            Schema::Ref { name } => {
-                let fully_qualified_name = 
name.fully_qualified_name(enclosing_namespace);
-                names
-                    .get(&fully_qualified_name)
-                    .map(|_| ())
-                    .ok_or(Error::SchemaResolutionError(fully_qualified_name))
-            }
-            _ => Ok(()),
         }
+        Schema::Ref { name } => {
+            let fully_qualified_name = 
name.fully_qualified_name(enclosing_namespace);
+            names
+                .get(&fully_qualified_name)
+                .map(|_| ())
+                .ok_or(Error::SchemaResolutionError(fully_qualified_name))
+        }
+        _ => Ok(()),
+    }
+}
+
+pub(crate) fn resolve_names_with_schemata(
+    schemata: &Vec<&Schema>,
+    names: &mut Names,
+    enclosing_namespace: &Namespace,
+) -> AvroResult<()> {
+    for schema in schemata {
+        resolve_names(schema, names, enclosing_namespace)?;
     }
+    Ok(())
 }
 
 /// Represents a `field` in a `record` Avro schema.

Reply via email to