This is an automated email from the ASF dual-hosted git repository. mgrigorov pushed a commit to branch avro-3674 in repository https://gitbox.apache.org/repos/asf/avro.git
commit bf6efc7e5caf56e3f469b531e895e5b43cbb0ee9 Author: Martin Tzvetanov Grigorov <[email protected]> AuthorDate: Fri Nov 18 14:10:15 2022 +0200 AVRO-3674: Pass the correct enclosing namespace to validate and resolve_internal Expose getters for Schema's name and namespace Signed-off-by: Martin Tzvetanov Grigorov <[email protected]> --- lang/rust/avro/src/schema.rs | 18 ++++++- lang/rust/avro/src/types.rs | 119 +++++++++++++++++++++++++++---------------- lang/rust/avro/src/writer.rs | 18 ++++--- 3 files changed, 103 insertions(+), 52 deletions(-) diff --git a/lang/rust/avro/src/schema.rs b/lang/rust/avro/src/schema.rs index 2e2443ed3..6aa1d7fe0 100644 --- a/lang/rust/avro/src/schema.rs +++ b/lang/rust/avro/src/schema.rs @@ -696,7 +696,7 @@ impl UnionSchema { let rs = ResolvedSchema::try_from(*schema).expect("Schema didn't successfully parse"); value - .validate_internal(schema, rs.get_names(), &None) + .validate_internal(schema, rs.get_names(), &schema.namespace()) .is_none() }) } @@ -823,6 +823,22 @@ impl Schema { _ => None, } } + + /// Returns the name of the schema if it has one. + pub fn name(&self) -> Option<&Name> { + match self { + Schema::Ref { ref name, .. } + | Schema::Record { ref name, .. } + | Schema::Enum { ref name, .. } + | Schema::Fixed { ref name, .. } => Some(name), + _ => None, + } + } + + /// Returns the namespace of the schema if it has one. + pub fn namespace(&self) -> Namespace { + self.name().and_then(|n| n.namespace.clone()) + } } impl Parser { diff --git a/lang/rust/avro/src/types.rs b/lang/rust/avro/src/types.rs index fadff4481..c91c0bf47 100644 --- a/lang/rust/avro/src/types.rs +++ b/lang/rust/avro/src/types.rs @@ -340,14 +340,9 @@ impl Value { /// for the full set of rules of schema validation. pub fn validate(&self, schema: &Schema) -> bool { let rs = ResolvedSchema::try_from(schema).expect("Schema didn't successfully parse"); - let namespace = match schema { - Schema::Record { name, .. } - | Schema::Enum { name, .. } - | Schema::Fixed { name, .. } => &name.namespace, - _ => &None, - }; + let enclosing_namespace = schema.namespace(); - match self.validate_internal(schema, rs.get_names(), namespace) { + match self.validate_internal(schema, rs.get_names(), &enclosing_namespace) { Some(error_msg) => { error!( "Invalid value: {:?} for schema: {:?}. Reason: {}", @@ -372,17 +367,11 @@ impl Value { &self, schema: &Schema, names: &HashMap<Name, S>, - namespace: &Namespace, + enclosing_namespace: &Namespace, ) -> Option<String> { match (self, schema) { (_, &Schema::Ref { ref name }) => { - let name = match namespace { - Some(namespace) => Name { - name: name.name.to_owned(), - namespace: Some(namespace.to_owned()), - }, - None => name.to_owned(), - }; + let name = name.fully_qualified_name(enclosing_namespace); names.get(&name).map_or_else( || { @@ -392,7 +381,7 @@ impl Value { names.keys() )) }, - |s| self.validate_internal(s.borrow(), names, namespace), + |s| self.validate_internal(s.borrow(), names, &name.namespace), ) } (&Value::Null, &Schema::Null) => None, @@ -473,7 +462,7 @@ impl Value { (&Value::Union(i, ref value), &Schema::Union(ref inner)) => inner .variants() .get(i as usize) - .map(|schema| value.validate_internal(schema, names, namespace)) + .map(|schema| value.validate_internal(schema, names, enclosing_namespace)) .unwrap_or_else(|| Some(format!("No schema in the union at position '{}'", i))), (v, &Schema::Union(ref inner)) => match inner.find_schema(v) { Some(_) => None, @@ -481,12 +470,18 @@ impl Value { }, (&Value::Array(ref items), &Schema::Array(ref inner)) => { items.iter().fold(None, |acc, item| { - Value::accumulate(acc, item.validate_internal(inner, names, namespace)) + Value::accumulate( + acc, + item.validate_internal(inner, names, enclosing_namespace), + ) }) } (&Value::Map(ref items), &Schema::Map(ref inner)) => { items.iter().fold(None, |acc, (_, value)| { - Value::accumulate(acc, value.validate_internal(inner, names, namespace)) + Value::accumulate( + acc, + value.validate_internal(inner, names, enclosing_namespace), + ) }) } ( @@ -522,7 +517,11 @@ impl Value { let field = &fields[*idx]; Value::accumulate( acc, - record_field.validate_internal(&field.schema, names, namespace), + record_field.validate_internal( + &field.schema, + names, + enclosing_namespace, + ), ) } None => Value::accumulate( @@ -538,7 +537,7 @@ impl Value { (&Value::Map(ref items), &Schema::Record { ref fields, .. }) => { fields.iter().fold(None, |acc, field| { if let Some(item) = items.get(&field.name) { - let res = item.validate_internal(&field.schema, names, namespace); + let res = item.validate_internal(&field.schema, names, enclosing_namespace); Value::accumulate(acc, res) } else if !field.is_nullable() { Value::accumulate( @@ -564,12 +563,18 @@ impl Value { /// in the Avro specification for the full set of rules of schema /// resolution. pub fn resolve(self, schema: &Schema) -> AvroResult<Self> { + let enclosing_namespace = schema.namespace(); // FIXME transition to using resolved Schema let rs = ResolvedSchema::try_from(schema)?; - self.resolve_internal(schema, rs.get_names()) + self.resolve_internal(schema, rs.get_names(), &enclosing_namespace) } - fn resolve_internal(mut self, schema: &Schema, names: &NamesRef) -> AvroResult<Self> { + fn resolve_internal( + mut self, + schema: &Schema, + names: &NamesRef, + enclosing_namespace: &Namespace, + ) -> AvroResult<Self> { // Check if this schema is a union, and if the reader schema is not. if SchemaKind::from(&self) == SchemaKind::Union && SchemaKind::from(schema) != SchemaKind::Union @@ -583,9 +588,11 @@ impl Value { } match *schema { Schema::Ref { ref name } => { - if let Some(resolved) = names.get(name) { - info!("Resolved {:?}", name); - self.resolve_internal(resolved, names) + let name = name.fully_qualified_name(enclosing_namespace); + + if let Some(resolved) = names.get(&name) { + debug!("Resolved {:?}", name); + self.resolve_internal(resolved, names, &name.namespace) } else { error!("Failed to resolve schema {:?}", name); Err(Error::SchemaResolutionError(name.clone())) @@ -600,11 +607,13 @@ impl Value { Schema::Bytes => self.resolve_bytes(), Schema::String => self.resolve_string(), Schema::Fixed { size, .. } => self.resolve_fixed(size), - Schema::Union(ref inner) => self.resolve_union(inner, names), + Schema::Union(ref inner) => self.resolve_union(inner, names, enclosing_namespace), Schema::Enum { ref symbols, .. } => self.resolve_enum(symbols), - Schema::Array(ref inner) => self.resolve_array(inner, names), - Schema::Map(ref inner) => self.resolve_map(inner, names), - Schema::Record { ref fields, .. } => self.resolve_record(fields, names), + Schema::Array(ref inner) => self.resolve_array(inner, names, enclosing_namespace), + Schema::Map(ref inner) => self.resolve_map(inner, names, enclosing_namespace), + Schema::Record { ref fields, .. } => { + self.resolve_record(fields, names, enclosing_namespace) + } Schema::Decimal { scale, precision, @@ -848,7 +857,12 @@ impl Value { } } - fn resolve_union(self, schema: &UnionSchema, names: &NamesRef) -> Result<Self, Error> { + fn resolve_union( + self, + schema: &UnionSchema, + names: &NamesRef, + enclosing_namespace: &Namespace, + ) -> Result<Self, Error> { let v = match self { // Both are unions case. Value::Union(_i, v) => *v, @@ -861,16 +875,21 @@ impl Value { let (i, inner) = schema.find_schema(&v).ok_or(Error::FindUnionVariant)?; Ok(Value::Union( i as u32, - Box::new(v.resolve_internal(inner, names)?), + Box::new(v.resolve_internal(inner, names, enclosing_namespace)?), )) } - fn resolve_array(self, schema: &Schema, names: &NamesRef) -> Result<Self, Error> { + fn resolve_array( + self, + schema: &Schema, + names: &NamesRef, + enclosing_namespace: &Namespace, + ) -> Result<Self, Error> { match self { Value::Array(items) => Ok(Value::Array( items .into_iter() - .map(|item| item.resolve_internal(schema, names)) + .map(|item| item.resolve_internal(schema, names, enclosing_namespace)) .collect::<Result<_, _>>()?, )), other => Err(Error::GetArray { @@ -880,14 +899,19 @@ impl Value { } } - fn resolve_map(self, schema: &Schema, names: &NamesRef) -> Result<Self, Error> { + fn resolve_map( + self, + schema: &Schema, + names: &NamesRef, + enclosing_namespace: &Namespace, + ) -> Result<Self, Error> { match self { Value::Map(items) => Ok(Value::Map( items .into_iter() .map(|(key, value)| { value - .resolve_internal(schema, names) + .resolve_internal(schema, names, enclosing_namespace) .map(|value| (key, value)) }) .collect::<Result<_, _>>()?, @@ -899,7 +923,12 @@ impl Value { } } - fn resolve_record(self, fields: &[RecordField], names: &NamesRef) -> Result<Self, Error> { + fn resolve_record( + self, + fields: &[RecordField], + names: &NamesRef, + enclosing_namespace: &Namespace, + ) -> Result<Self, Error> { let mut items = match self { Value::Map(items) => Ok(items), Value::Record(fields) => Ok(fields.into_iter().collect::<HashMap<_, _>>()), @@ -930,10 +959,11 @@ impl Value { Schema::Null => Value::Union(0, Box::new(Value::Null)), _ => Value::Union( 0, - Box::new( - Value::from(value.clone()) - .resolve_internal(first, names)?, - ), + Box::new(Value::from(value.clone()).resolve_internal( + first, + names, + enclosing_namespace, + )?), ), } } @@ -945,7 +975,7 @@ impl Value { }, }; value - .resolve_internal(&field.schema, names) + .resolve_internal(&field.schema, names, enclosing_namespace) .map(|value| (field.name.clone(), value)) }) .collect::<Result<Vec<_>, _>>()?; @@ -2481,7 +2511,10 @@ Field with name '"b"' is not a member of the map items"#, let test_value: Value = msg.serialize(&mut ser).unwrap(); assert!(test_value.validate(&schema), "test_value should validate"); // TODO (rikheijdens): I believe this should also resolve? - //assert!(test_value.resolve(&schema).is_ok(), "test_value should resolve"); + assert!( + test_value.resolve(&schema).is_ok(), + "test_value should resolve" + ); } #[test] diff --git a/lang/rust/avro/src/writer.rs b/lang/rust/avro/src/writer.rs index 054cf8075..3bc5c5122 100644 --- a/lang/rust/avro/src/writer.rs +++ b/lang/rust/avro/src/writer.rs @@ -488,18 +488,19 @@ fn write_value_ref_resolved( value: &Value, buffer: &mut Vec<u8>, ) -> AvroResult<()> { + let root_schema = resolved_schema.get_root_schema(); if let Some(err) = value.validate_internal( - resolved_schema.get_root_schema(), + root_schema, resolved_schema.get_names(), - &None, + &root_schema.namespace(), ) { return Err(Error::ValidationWithReason(err)); } encode_internal( value, - resolved_schema.get_root_schema(), + root_schema, resolved_schema.get_names(), - &None, + &root_schema.namespace(), buffer, )?; Ok(()) @@ -510,18 +511,19 @@ fn write_value_ref_owned_resolved( value: &Value, buffer: &mut Vec<u8>, ) -> AvroResult<()> { + let root_schema = resolved_schema.get_root_schema(); if let Some(err) = value.validate_internal( - resolved_schema.get_root_schema(), + root_schema, resolved_schema.get_names(), - &None, + &root_schema.namespace(), ) { return Err(Error::ValidationWithReason(err)); } encode_internal( value, - resolved_schema.get_root_schema(), + root_schema, resolved_schema.get_names(), - &None, + &root_schema.namespace(), buffer, )?; Ok(())
