This is an automated email from the ASF dual-hosted git repository.
mgrigorov pushed a commit to branch branch-1.11
in repository https://gitbox.apache.org/repos/asf/avro.git
The following commit(s) were added to refs/heads/branch-1.11 by this push:
new 2976d395c AVRO-4010: [Rust] Avoid re-resolving schema on every read()
(#2995)
2976d395c is described below
commit 2976d395c8e2485c3e71f34a22964b9b55496356
Author: Michael Spector <[email protected]>
AuthorDate: Wed Jul 3 17:19:27 2024 +0300
AVRO-4010: [Rust] Avoid re-resolving schema on every read() (#2995)
Co-authored-by: Michael Spector <[email protected]>
(cherry picked from commit f3b6ee2d32ae5200675e345b4d26b151caf3034b)
---
lang/rust/avro/src/reader.rs | 26 ++++++++---
lang/rust/avro/src/schema.rs | 101 ++++++++++++++++++++++++-------------------
2 files changed, 75 insertions(+), 52 deletions(-)
diff --git a/lang/rust/avro/src/reader.rs b/lang/rust/avro/src/reader.rs
index adefed203..121f8e257 100644
--- a/lang/rust/avro/src/reader.rs
+++ b/lang/rust/avro/src/reader.rs
@@ -20,7 +20,10 @@ use crate::{
decode::{decode, decode_internal},
from_value,
rabin::Rabin,
- schema::{AvroSchema, Names, ResolvedOwnedSchema, ResolvedSchema, Schema},
+ schema::{
+ resolve_names, resolve_names_with_schemata, AvroSchema, Names,
ResolvedOwnedSchema,
+ ResolvedSchema, Schema,
+ },
types::Value,
util, AvroResult, Codec, Error,
};
@@ -47,6 +50,7 @@ struct Block<'r, R> {
writer_schema: Schema,
schemata: Vec<&'r Schema>,
user_metadata: HashMap<String, Vec<u8>>,
+ names_refs: Names,
}
impl<'r, R: Read> Block<'r, R> {
@@ -61,6 +65,7 @@ impl<'r, R: Read> Block<'r, R> {
message_count: 0,
marker: [0; 16],
user_metadata: Default::default(),
+ names_refs: Default::default(),
};
block.read_header()?;
@@ -179,13 +184,18 @@ impl<'r, R: Read> Block<'r, R> {
let mut block_bytes = &self.buf[self.buf_idx..];
let b_original = block_bytes.len();
- let schemata = if self.schemata.is_empty() {
- vec![&self.writer_schema]
- } else {
- self.schemata.clone()
+
+ let item = decode_internal(
+ &self.writer_schema,
+ &self.names_refs,
+ &None,
+ &mut block_bytes,
+ )?;
+ let item = match read_schema {
+ Some(schema) => item.resolve(schema)?,
+ None => item,
};
- let item =
- from_avro_datum_schemata(&self.writer_schema, schemata, &mut
block_bytes, read_schema)?;
+
if b_original == block_bytes.len() {
// from_avro_datum did not consume any bytes, so return an error
to avoid an infinite loop
return Err(Error::ReadBlock);
@@ -214,8 +224,10 @@ impl<'r, R: Read> Block<'r, R> {
.map(|(name, schema)| (name.clone(), (*schema).clone()))
.collect();
self.writer_schema = Schema::parse_with_names(&json, names)?;
+ resolve_names_with_schemata(&self.schemata, &mut self.names_refs,
&None)?;
} else {
self.writer_schema = Schema::parse(&json)?;
+ resolve_names(&self.writer_schema, &mut self.names_refs, &None)?;
}
Ok(())
}
diff --git a/lang/rust/avro/src/schema.rs b/lang/rust/avro/src/schema.rs
index c313fe5f3..bd7d05eef 100644
--- a/lang/rust/avro/src/schema.rs
+++ b/lang/rust/avro/src/schema.rs
@@ -545,7 +545,7 @@ impl TryFrom<Schema> for ResolvedOwnedSchema {
names,
root_schema: schema,
};
- Self::from_internal(&rs.root_schema, &mut rs.names, &None)?;
+ resolve_names(&rs.root_schema, &mut rs.names, &None)?;
Ok(rs)
}
}
@@ -557,57 +557,68 @@ impl ResolvedOwnedSchema {
pub(crate) fn get_names(&self) -> &Names {
&self.names
}
+}
- fn from_internal(
- schema: &Schema,
- names: &mut Names,
- enclosing_namespace: &Namespace,
- ) -> AvroResult<()> {
- match schema {
- Schema::Array(schema) => Self::from_internal(&schema.items, names,
enclosing_namespace),
- Schema::Map(schema) => Self::from_internal(&schema.types, names,
enclosing_namespace),
- Schema::Union(UnionSchema { schemas, .. }) => {
- for schema in schemas {
- Self::from_internal(schema, names, enclosing_namespace)?
- }
- Ok(())
+pub(crate) fn resolve_names(
+ schema: &Schema,
+ names: &mut Names,
+ enclosing_namespace: &Namespace,
+) -> AvroResult<()> {
+ match schema {
+ Schema::Array(schema) => resolve_names(&schema.items, names,
enclosing_namespace),
+ Schema::Map(schema) => resolve_names(&schema.types, names,
enclosing_namespace),
+ Schema::Union(UnionSchema { schemas, .. }) => {
+ for schema in schemas {
+ resolve_names(schema, names, enclosing_namespace)?
}
- Schema::Enum(EnumSchema { name, .. }) | Schema::Fixed(FixedSchema
{ name, .. }) => {
- let fully_qualified_name =
name.fully_qualified_name(enclosing_namespace);
- if names
- .insert(fully_qualified_name.clone(), schema.clone())
- .is_some()
- {
- Err(Error::AmbiguousSchemaDefinition(fully_qualified_name))
- } else {
- Ok(())
- }
+ Ok(())
+ }
+ Schema::Enum(EnumSchema { name, .. }) | Schema::Fixed(FixedSchema {
name, .. }) => {
+ let fully_qualified_name =
name.fully_qualified_name(enclosing_namespace);
+ if names
+ .insert(fully_qualified_name.clone(), schema.clone())
+ .is_some()
+ {
+ Err(Error::AmbiguousSchemaDefinition(fully_qualified_name))
+ } else {
+ Ok(())
}
- Schema::Record(RecordSchema { name, fields, .. }) => {
- let fully_qualified_name =
name.fully_qualified_name(enclosing_namespace);
- if names
- .insert(fully_qualified_name.clone(), schema.clone())
- .is_some()
- {
- Err(Error::AmbiguousSchemaDefinition(fully_qualified_name))
- } else {
- let record_namespace = fully_qualified_name.namespace;
- for field in fields {
- Self::from_internal(&field.schema, names,
&record_namespace)?
- }
- Ok(())
+ }
+ Schema::Record(RecordSchema { name, fields, .. }) => {
+ let fully_qualified_name =
name.fully_qualified_name(enclosing_namespace);
+ if names
+ .insert(fully_qualified_name.clone(), schema.clone())
+ .is_some()
+ {
+ Err(Error::AmbiguousSchemaDefinition(fully_qualified_name))
+ } else {
+ let record_namespace = fully_qualified_name.namespace;
+ for field in fields {
+ resolve_names(&field.schema, names, &record_namespace)?
}
+ Ok(())
}
- Schema::Ref { name } => {
- let fully_qualified_name =
name.fully_qualified_name(enclosing_namespace);
- names
- .get(&fully_qualified_name)
- .map(|_| ())
- .ok_or(Error::SchemaResolutionError(fully_qualified_name))
- }
- _ => Ok(()),
}
+ Schema::Ref { name } => {
+ let fully_qualified_name =
name.fully_qualified_name(enclosing_namespace);
+ names
+ .get(&fully_qualified_name)
+ .map(|_| ())
+ .ok_or(Error::SchemaResolutionError(fully_qualified_name))
+ }
+ _ => Ok(()),
+ }
+}
+
+pub(crate) fn resolve_names_with_schemata(
+ schemata: &Vec<&Schema>,
+ names: &mut Names,
+ enclosing_namespace: &Namespace,
+) -> AvroResult<()> {
+ for schema in schemata {
+ resolve_names(schema, names, enclosing_namespace)?;
}
+ Ok(())
}
/// Represents a `field` in a `record` Avro schema.