This is an automated email from the ASF dual-hosted git repository.
mgrigorov pushed a commit to branch branch-1.11
in repository https://gitbox.apache.org/repos/asf/avro.git
The following commit(s) were added to refs/heads/branch-1.11 by this push:
new 2bb7eb3 [AVRO-3448][rust] Rust name/namespace spec adherence work
(#1602)
2bb7eb3 is described below
commit 2bb7eb3a42b98f846936b4d9bc1e4345f6de6e5b
Author: Jack Klamer <[email protected]>
AuthorDate: Fri Mar 18 01:21:43 2022 -0500
[AVRO-3448][rust] Rust name/namespace spec adherence work (#1602)
* resolved schema struct
* matrix testing started
* [AVRO-3448] encode, decode, parsing in allignment with name/namespace spec
* [AVRO-3448] clippy
* Update lang/rust/avro/src/util.rs
Co-authored-by: Martin Grigorov <[email protected]>
* Update lang/rust/avro/src/schema.rs
Co-authored-by: Martin Grigorov <[email protected]>
* [AVRO-3448] encode with Result return
* [AVRO-3448] clean up types and tests
* AVRO-3448: Fix a typo in an error variant
Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
Co-authored-by: Martin Grigorov <[email protected]>
Co-authored-by: Martin Tzvetanov Grigorov <[email protected]>
(cherry picked from commit 128510c4e9c3a714279aa7cea22eda19080521f7)
---
lang/rust/avro/src/decode.rs | 764 +++++++++++++++++------
lang/rust/avro/src/encode.rs | 700 +++++++++++++++------
lang/rust/avro/src/error.rs | 27 +-
lang/rust/avro/src/schema.rs | 962 +++++++++++++++++++++++++----
lang/rust/avro/src/schema_compatibility.rs | 2 +
lang/rust/avro/src/types.rs | 8 +-
lang/rust/avro/src/util.rs | 22 +-
lang/rust/avro/src/writer.rs | 12 +-
lang/rust/avro/tests/schema.rs | 9 +-
9 files changed, 1996 insertions(+), 510 deletions(-)
diff --git a/lang/rust/avro/src/decode.rs b/lang/rust/avro/src/decode.rs
index 87337bb..7ddb9fc 100644
--- a/lang/rust/avro/src/decode.rs
+++ b/lang/rust/avro/src/decode.rs
@@ -18,7 +18,7 @@
use crate::{
decimal::Decimal,
duration::Duration,
- schema::Schema,
+ schema::{NamesRef, Namespace, ResolvedSchema, Schema},
types::Value,
util::{safe_len, zag_i32, zag_i64},
AvroResult, Error,
@@ -68,222 +68,225 @@ fn decode_seq_len<R: Read>(reader: &mut R) ->
AvroResult<usize> {
/// Decode a `Value` from avro format given its `Schema`.
pub fn decode<R: Read>(schema: &Schema, reader: &mut R) -> AvroResult<Value> {
- fn decode0<R: Read>(
- schema: &Schema,
- reader: &mut R,
- schemas_by_name: &mut HashMap<String, Schema>,
- ) -> AvroResult<Value> {
- match *schema {
- Schema::Null => Ok(Value::Null),
- Schema::Boolean => {
- let mut buf = [0u8; 1];
- match reader.read_exact(&mut buf[..]) {
- Ok(_) => match buf[0] {
- 0u8 => Ok(Value::Boolean(false)),
- 1u8 => Ok(Value::Boolean(true)),
- _ => Err(Error::BoolValue(buf[0])),
- },
- Err(io_err) => {
- if let ErrorKind::UnexpectedEof = io_err.kind() {
- Ok(Value::Null)
- } else {
- Err(Error::ReadBoolean(io_err))
- }
+ let rs = ResolvedSchema::try_from(schema)?;
+ decode_internal(schema, rs.get_names(), &None, reader)
+}
+
+fn decode_internal<R: Read>(
+ schema: &Schema,
+ names: &NamesRef,
+ enclosing_namespace: &Namespace,
+ reader: &mut R,
+) -> AvroResult<Value> {
+ match *schema {
+ Schema::Null => Ok(Value::Null),
+ Schema::Boolean => {
+ let mut buf = [0u8; 1];
+ match reader.read_exact(&mut buf[..]) {
+ Ok(_) => match buf[0] {
+ 0u8 => Ok(Value::Boolean(false)),
+ 1u8 => Ok(Value::Boolean(true)),
+ _ => Err(Error::BoolValue(buf[0])),
+ },
+ Err(io_err) => {
+ if let ErrorKind::UnexpectedEof = io_err.kind() {
+ Ok(Value::Null)
+ } else {
+ Err(Error::ReadBoolean(io_err))
}
}
}
- Schema::Decimal { ref inner, .. } => match &**inner {
- Schema::Fixed { .. } => match decode0(inner, reader,
schemas_by_name)? {
+ }
+ Schema::Decimal { ref inner, .. } => match &**inner {
+ Schema::Fixed { .. } => {
+ match decode_internal(inner, names, enclosing_namespace,
reader)? {
Value::Fixed(_, bytes) =>
Ok(Value::Decimal(Decimal::from(bytes))),
value => Err(Error::FixedValue(value.into())),
- },
- Schema::Bytes => match decode0(inner, reader,
schemas_by_name)? {
- Value::Bytes(bytes) =>
Ok(Value::Decimal(Decimal::from(bytes))),
- value => Err(Error::BytesValue(value.into())),
- },
- schema => Err(Error::ResolveDecimalSchema(schema.into())),
+ }
+ }
+ Schema::Bytes => match decode_internal(inner, names,
enclosing_namespace, reader)? {
+ Value::Bytes(bytes) =>
Ok(Value::Decimal(Decimal::from(bytes))),
+ value => Err(Error::BytesValue(value.into())),
},
- Schema::Uuid => Ok(Value::Uuid(
- Uuid::from_str(match decode0(&Schema::String, reader,
schemas_by_name)? {
+ schema => Err(Error::ResolveDecimalSchema(schema.into())),
+ },
+ Schema::Uuid => Ok(Value::Uuid(
+ Uuid::from_str(
+ match decode_internal(&Schema::String, names,
enclosing_namespace, reader)? {
Value::String(ref s) => s,
value => return
Err(Error::GetUuidFromStringValue(value.into())),
- })
- .map_err(Error::ConvertStrToUuid)?,
- )),
- Schema::Int => decode_int(reader),
- Schema::Date => zag_i32(reader).map(Value::Date),
- Schema::TimeMillis => zag_i32(reader).map(Value::TimeMillis),
- Schema::Long => decode_long(reader),
- Schema::TimeMicros => zag_i64(reader).map(Value::TimeMicros),
- Schema::TimestampMillis =>
zag_i64(reader).map(Value::TimestampMillis),
- Schema::TimestampMicros =>
zag_i64(reader).map(Value::TimestampMicros),
- Schema::Duration => {
- let mut buf = [0u8; 12];
- reader.read_exact(&mut buf).map_err(Error::ReadDuration)?;
- Ok(Value::Duration(Duration::from(buf)))
- }
- Schema::Float => {
- let mut buf = [0u8; std::mem::size_of::<f32>()];
- reader.read_exact(&mut buf[..]).map_err(Error::ReadFloat)?;
- Ok(Value::Float(f32::from_le_bytes(buf)))
- }
- Schema::Double => {
- let mut buf = [0u8; std::mem::size_of::<f64>()];
- reader.read_exact(&mut buf[..]).map_err(Error::ReadDouble)?;
- Ok(Value::Double(f64::from_le_bytes(buf)))
- }
- Schema::Bytes => {
- let len = decode_len(reader)?;
- let mut buf = vec![0u8; len];
- reader.read_exact(&mut buf).map_err(Error::ReadBytes)?;
- Ok(Value::Bytes(buf))
- }
- Schema::String => {
- let len = decode_len(reader)?;
- let mut buf = vec![0u8; len];
- match reader.read_exact(&mut buf) {
- Ok(_) => Ok(Value::String(
- String::from_utf8(buf).map_err(Error::ConvertToUtf8)?,
- )),
- Err(io_err) => {
- if let ErrorKind::UnexpectedEof = io_err.kind() {
- Ok(Value::Null)
- } else {
- Err(Error::ReadString(io_err))
- }
+ },
+ )
+ .map_err(Error::ConvertStrToUuid)?,
+ )),
+ Schema::Int => decode_int(reader),
+ Schema::Date => zag_i32(reader).map(Value::Date),
+ Schema::TimeMillis => zag_i32(reader).map(Value::TimeMillis),
+ Schema::Long => decode_long(reader),
+ Schema::TimeMicros => zag_i64(reader).map(Value::TimeMicros),
+ Schema::TimestampMillis => zag_i64(reader).map(Value::TimestampMillis),
+ Schema::TimestampMicros => zag_i64(reader).map(Value::TimestampMicros),
+ Schema::Duration => {
+ let mut buf = [0u8; 12];
+ reader.read_exact(&mut buf).map_err(Error::ReadDuration)?;
+ Ok(Value::Duration(Duration::from(buf)))
+ }
+ Schema::Float => {
+ let mut buf = [0u8; std::mem::size_of::<f32>()];
+ reader.read_exact(&mut buf[..]).map_err(Error::ReadFloat)?;
+ Ok(Value::Float(f32::from_le_bytes(buf)))
+ }
+ Schema::Double => {
+ let mut buf = [0u8; std::mem::size_of::<f64>()];
+ reader.read_exact(&mut buf[..]).map_err(Error::ReadDouble)?;
+ Ok(Value::Double(f64::from_le_bytes(buf)))
+ }
+ Schema::Bytes => {
+ let len = decode_len(reader)?;
+ let mut buf = vec![0u8; len];
+ reader.read_exact(&mut buf).map_err(Error::ReadBytes)?;
+ Ok(Value::Bytes(buf))
+ }
+ Schema::String => {
+ let len = decode_len(reader)?;
+ let mut buf = vec![0u8; len];
+ match reader.read_exact(&mut buf) {
+ Ok(_) => Ok(Value::String(
+ String::from_utf8(buf).map_err(Error::ConvertToUtf8)?,
+ )),
+ Err(io_err) => {
+ if let ErrorKind::UnexpectedEof = io_err.kind() {
+ Ok(Value::Null)
+ } else {
+ Err(Error::ReadString(io_err))
}
}
}
- Schema::Fixed { ref name, size, .. } => {
- schemas_by_name.insert(name.name.clone(), schema.clone());
- let mut buf = vec![0u8; size];
- reader
- .read_exact(&mut buf)
- .map_err(|e| Error::ReadFixed(e, size))?;
- Ok(Value::Fixed(size, buf))
- }
- Schema::Array(ref inner) => {
- let mut items = Vec::new();
-
- loop {
- let len = decode_seq_len(reader)?;
- if len == 0 {
- break;
- }
+ }
+ Schema::Fixed { size, .. } => {
+ let mut buf = vec![0u8; size];
+ reader
+ .read_exact(&mut buf)
+ .map_err(|e| Error::ReadFixed(e, size))?;
+ Ok(Value::Fixed(size, buf))
+ }
+ Schema::Array(ref inner) => {
+ let mut items = Vec::new();
- items.reserve(len);
- for _ in 0..len {
- items.push(decode0(inner, reader, schemas_by_name)?);
- }
+ loop {
+ let len = decode_seq_len(reader)?;
+ if len == 0 {
+ break;
}
- Ok(Value::Array(items))
+ items.reserve(len);
+ for _ in 0..len {
+ items.push(decode_internal(inner, names,
enclosing_namespace, reader)?);
+ }
}
- Schema::Map(ref inner) => {
- let mut items = HashMap::new();
- loop {
- let len = decode_seq_len(reader)?;
- if len == 0 {
- break;
- }
+ Ok(Value::Array(items))
+ }
+ Schema::Map(ref inner) => {
+ let mut items = HashMap::new();
- items.reserve(len);
- for _ in 0..len {
- match decode0(&Schema::String, reader,
schemas_by_name)? {
- Value::String(key) => {
- let value = decode0(inner, reader,
schemas_by_name)?;
- items.insert(key, value);
- }
- value => return
Err(Error::MapKeyType(value.into())),
+ loop {
+ let len = decode_seq_len(reader)?;
+ if len == 0 {
+ break;
+ }
+
+ items.reserve(len);
+ for _ in 0..len {
+ match decode_internal(&Schema::String, names,
enclosing_namespace, reader)? {
+ Value::String(key) => {
+ let value = decode_internal(inner, names,
enclosing_namespace, reader)?;
+ items.insert(key, value);
}
+ value => return Err(Error::MapKeyType(value.into())),
}
}
+ }
- Ok(Value::Map(items))
+ Ok(Value::Map(items))
+ }
+ Schema::Union(ref inner) => match zag_i64(reader) {
+ Ok(index) => {
+ let variants = inner.variants();
+ let variant = variants
+ .get(usize::try_from(index).map_err(|e|
Error::ConvertI64ToUsize(e, index))?)
+ .ok_or(Error::GetUnionVariant {
+ index,
+ num_variants: variants.len(),
+ })?;
+ let value = decode_internal(variant, names,
enclosing_namespace, reader)?;
+ Ok(Value::Union(index as u32, Box::new(value)))
}
- Schema::Union(ref inner) => match zag_i64(reader) {
- Ok(index) => {
- let variants = inner.variants();
- let variant = variants
- .get(
- usize::try_from(index)
- .map_err(|e| Error::ConvertI64ToUsize(e,
index))?,
- )
- .ok_or(Error::GetUnionVariant {
- index,
- num_variants: variants.len(),
- })?;
- let value = decode0(variant, reader, schemas_by_name)?;
- Ok(Value::Union(index as u32, Box::new(value)))
- }
- Err(Error::ReadVariableIntegerBytes(io_err)) => {
- if let ErrorKind::UnexpectedEof = io_err.kind() {
- Ok(Value::Union(0, Box::new(Value::Null)))
- } else {
- Err(Error::ReadVariableIntegerBytes(io_err))
- }
- }
- Err(io_err) => Err(io_err),
- },
- Schema::Record {
- ref name,
- ref fields,
- ..
- } => {
- schemas_by_name.insert(name.name.clone(), schema.clone());
- // Benchmarks indicate ~10% improvement using this method.
- let mut items = Vec::with_capacity(fields.len());
- for field in fields {
- // TODO: This clone is also expensive. See if we can do
away with it...
- items.push((
- field.name.clone(),
- decode0(&field.schema, reader, schemas_by_name)?,
- ));
+ Err(Error::ReadVariableIntegerBytes(io_err)) => {
+ if let ErrorKind::UnexpectedEof = io_err.kind() {
+ Ok(Value::Union(0, Box::new(Value::Null)))
+ } else {
+ Err(Error::ReadVariableIntegerBytes(io_err))
}
- Ok(Value::Record(items))
}
- Schema::Enum {
- ref name,
- ref symbols,
- ..
- } => {
- schemas_by_name.insert(name.name.clone(), schema.clone());
- Ok(if let Value::Int(raw_index) = decode_int(reader)? {
- let index = usize::try_from(raw_index)
- .map_err(|e| Error::ConvertI32ToUsize(e, raw_index))?;
- if (0..=symbols.len()).contains(&index) {
- let symbol = symbols[index].clone();
- Value::Enum(raw_index as u32, symbol)
- } else {
- return Err(Error::GetEnumValue {
- index,
- nsymbols: symbols.len(),
- });
- }
- } else {
- return Err(Error::GetEnumSymbol);
- })
+ Err(io_err) => Err(io_err),
+ },
+ Schema::Record {
+ ref name,
+ ref fields,
+ ..
+ } => {
+ let fully_qualified_name =
name.fully_qualified_name(enclosing_namespace);
+ // Benchmarks indicate ~10% improvement using this method.
+ let mut items = Vec::with_capacity(fields.len());
+ for field in fields {
+ // TODO: This clone is also expensive. See if we can do away
with it...
+ items.push((
+ field.name.clone(),
+ decode_internal(
+ &field.schema,
+ names,
+ &fully_qualified_name.namespace,
+ reader,
+ )?,
+ ));
}
- Schema::Ref { ref name } => {
- let name = &name.name;
- if let Some(resolved) = schemas_by_name.get(name.as_str()) {
- decode0(resolved, reader, &mut schemas_by_name.clone())
+ Ok(Value::Record(items))
+ }
+ Schema::Enum { ref symbols, .. } => {
+ Ok(if let Value::Int(raw_index) = decode_int(reader)? {
+ let index = usize::try_from(raw_index)
+ .map_err(|e| Error::ConvertI32ToUsize(e, raw_index))?;
+ if (0..=symbols.len()).contains(&index) {
+ let symbol = symbols[index].clone();
+ Value::Enum(raw_index as u32, symbol)
} else {
- Err(Error::SchemaResolutionError(name.clone()))
+ return Err(Error::GetEnumValue {
+ index,
+ nsymbols: symbols.len(),
+ });
}
+ } else {
+ return Err(Error::GetEnumUnknownIndexValue);
+ })
+ }
+ Schema::Ref { ref name } => {
+ let fully_qualified_name =
name.fully_qualified_name(enclosing_namespace);
+ if let Some(resolved) = names.get(&fully_qualified_name) {
+ decode_internal(resolved, names,
&fully_qualified_name.namespace, reader)
+ } else {
+ Err(Error::SchemaResolutionError(fully_qualified_name))
}
}
}
-
- let mut schemas_by_name: HashMap<String, Schema> = HashMap::new();
- decode0(schema, reader, &mut schemas_by_name)
}
#[cfg(test)]
+#[allow(clippy::expect_fun_call)]
mod tests {
use crate::{
decode::decode,
+ encode::{encode, tests::success},
schema::Schema,
types::{
Value,
@@ -333,6 +336,7 @@ mod tests {
size: 2,
doc: None,
name: Name::new("decimal").unwrap(),
+ aliases: None,
});
let schema = Schema::Decimal {
inner,
@@ -343,7 +347,7 @@ mod tests {
let value = Value::Decimal(Decimal::from(bigint.to_signed_bytes_be()));
let mut buffer = Vec::new();
- encode(&value, &schema, &mut buffer);
+ encode(&value, &schema, &mut buffer).expect(&success(&value, &schema));
let mut bytes = &buffer[..];
let result = decode(&schema, &mut bytes).unwrap();
@@ -357,6 +361,7 @@ mod tests {
let inner = Box::new(Schema::Fixed {
size: 13,
name: Name::new("decimal").unwrap(),
+ aliases: None,
doc: None,
});
let schema = Schema::Decimal {
@@ -369,9 +374,402 @@ mod tests {
));
let mut buffer = Vec::<u8>::new();
- encode(&value, &schema, &mut buffer);
+ encode(&value, &schema, &mut buffer).expect(&success(&value, &schema));
let mut bytes: &[u8] = &buffer[..];
let result = decode(&schema, &mut bytes).unwrap();
assert_eq!(result, value);
}
+
+ #[test]
+ fn test_avro_3448_recursive_definition_decode_union() {
+ // if encoding fails in this test check the corresponding test in
encode
+ let schema = Schema::parse_str(
+ r#"
+ {
+ "type":"record",
+ "name":"TestStruct",
+ "fields": [
+ {
+ "name":"a",
+ "type":[ "null", {
+ "type":"record",
+ "name": "Inner",
+ "fields": [ {
+ "name":"z",
+ "type":"int"
+ }]
+ }]
+ },
+ {
+ "name":"b",
+ "type":"Inner"
+ }
+ ]
+ }"#,
+ )
+ .unwrap();
+
+ let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
+ let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
+ let outer_value1 = Value::Record(vec![
+ ("a".into(), Value::Union(1, Box::new(inner_value1))),
+ ("b".into(), inner_value2.clone()),
+ ]);
+ let mut buf = Vec::new();
+ encode(&outer_value1, &schema, &mut
buf).expect(&success(&outer_value1, &schema));
+ assert!(!buf.is_empty());
+ let mut bytes = &buf[..];
+ assert_eq!(
+ outer_value1,
+ decode(&schema, &mut bytes).expect(&format!(
+ "Failed to decode using recursive definitions with schema:\n
{:?}\n",
+ &schema
+ ))
+ );
+
+ let mut buf = Vec::new();
+ let outer_value2 = Value::Record(vec![
+ ("a".into(), Value::Union(0, Box::new(Value::Null))),
+ ("b".into(), inner_value2),
+ ]);
+ encode(&outer_value2, &schema, &mut
buf).expect(&success(&outer_value2, &schema));
+ let mut bytes = &buf[..];
+ assert_eq!(
+ outer_value2,
+ decode(&schema, &mut bytes).expect(&format!(
+ "Failed to decode using recursive definitions with schema:\n
{:?}\n",
+ &schema
+ ))
+ );
+ }
+
+ #[test]
+ fn test_avro_3448_recursive_definition_decode_array() {
+ let schema = Schema::parse_str(
+ r#"
+ {
+ "type":"record",
+ "name":"TestStruct",
+ "fields": [
+ {
+ "name":"a",
+ "type":{
+ "type":"array",
+ "items": {
+ "type":"record",
+ "name": "Inner",
+ "fields": [ {
+ "name":"z",
+ "type":"int"
+ }]
+ }
+ }
+ },
+ {
+ "name":"b",
+ "type": "Inner"
+ }
+ ]
+ }"#,
+ )
+ .unwrap();
+
+ let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
+ let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
+ let outer_value = Value::Record(vec![
+ ("a".into(), Value::Array(vec![inner_value1])),
+ ("b".into(), inner_value2),
+ ]);
+ let mut buf = Vec::new();
+ encode(&outer_value, &schema, &mut buf).expect(&success(&outer_value,
&schema));
+ let mut bytes = &buf[..];
+ assert_eq!(
+ outer_value,
+ decode(&schema, &mut bytes).expect(&format!(
+ "Failed to decode using recursive definitions with schema:\n
{:?}\n",
+ &schema
+ ))
+ )
+ }
+
+ #[test]
+ fn test_avro_3448_recursive_definition_decode_map() {
+ let schema = Schema::parse_str(
+ r#"
+ {
+ "type":"record",
+ "name":"TestStruct",
+ "fields": [
+ {
+ "name":"a",
+ "type":{
+ "type":"map",
+ "values": {
+ "type":"record",
+ "name": "Inner",
+ "fields": [ {
+ "name":"z",
+ "type":"int"
+ }]
+ }
+ }
+ },
+ {
+ "name":"b",
+ "type": "Inner"
+ }
+ ]
+ }"#,
+ )
+ .unwrap();
+
+ let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
+ let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
+ let outer_value = Value::Record(vec![
+ (
+ "a".into(),
+ Value::Map(vec![("akey".into(),
inner_value1)].into_iter().collect()),
+ ),
+ ("b".into(), inner_value2),
+ ]);
+ let mut buf = Vec::new();
+ encode(&outer_value, &schema, &mut buf).expect(&success(&outer_value,
&schema));
+ let mut bytes = &buf[..];
+ assert_eq!(
+ outer_value,
+ decode(&schema, &mut bytes).expect(&format!(
+ "Failed to decode using recursive definitions with schema:\n
{:?}\n",
+ &schema
+ ))
+ )
+ }
+
+ #[test]
+ fn test_avro_3448_proper_multi_level_decoding_middle_namespace() {
+ // if encoding fails in this test check the corresponding test in
encode
+ let schema = r#"
+ {
+ "name": "record_name",
+ "namespace": "space",
+ "type": "record",
+ "fields": [
+ {
+ "name": "outer_field_1",
+ "type": [
+ "null",
+ {
+ "type": "record",
+ "name": "middle_record_name",
+ "namespace":"middle_namespace",
+ "fields":[
+ {
+ "name":"middle_field_1",
+ "type":[
+ "null",
+ {
+ "type":"record",
+ "name":"inner_record_name",
+ "fields":[
+ {
+ "name":"inner_field_1",
+ "type":"double"
+ }
+ ]
+ }
+ ]
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "name": "outer_field_2",
+ "type" : "middle_namespace.inner_record_name"
+ }
+ ]
+ }
+ "#;
+ let schema = Schema::parse_str(schema).unwrap();
+ let inner_record = Value::Record(vec![("inner_field_1".into(),
Value::Double(5.4))]);
+ let middle_record_variation_1 = Value::Record(vec![(
+ "middle_field_1".into(),
+ Value::Union(0, Box::new(Value::Null)),
+ )]);
+ let middle_record_variation_2 = Value::Record(vec![(
+ "middle_field_1".into(),
+ Value::Union(1, Box::new(inner_record.clone())),
+ )]);
+ let outer_record_variation_1 = Value::Record(vec![
+ (
+ "outer_field_1".into(),
+ Value::Union(0, Box::new(Value::Null)),
+ ),
+ ("outer_field_2".into(), inner_record.clone()),
+ ]);
+ let outer_record_variation_2 = Value::Record(vec![
+ (
+ "outer_field_1".into(),
+ Value::Union(1, Box::new(middle_record_variation_1)),
+ ),
+ ("outer_field_2".into(), inner_record.clone()),
+ ]);
+ let outer_record_variation_3 = Value::Record(vec![
+ (
+ "outer_field_1".into(),
+ Value::Union(1, Box::new(middle_record_variation_2)),
+ ),
+ ("outer_field_2".into(), inner_record),
+ ]);
+
+ let mut buf = Vec::new();
+ encode(&outer_record_variation_1, &schema, &mut buf)
+ .expect(&success(&outer_record_variation_1, &schema));
+ let mut bytes = &buf[..];
+ assert_eq!(
+ outer_record_variation_1,
+ decode(&schema, &mut bytes).expect(&format!(
+ "Failed to Decode with recursively defined namespace with
schema:\n {:?}\n",
+ &schema
+ ))
+ );
+
+ let mut buf = Vec::new();
+ encode(&outer_record_variation_2, &schema, &mut buf)
+ .expect(&success(&outer_record_variation_2, &schema));
+ let mut bytes = &buf[..];
+ assert_eq!(
+ outer_record_variation_2,
+ decode(&schema, &mut bytes).expect(&format!(
+ "Failed to Decode with recursively defined namespace with
schema:\n {:?}\n",
+ &schema
+ ))
+ );
+
+ let mut buf = Vec::new();
+ encode(&outer_record_variation_3, &schema, &mut buf)
+ .expect(&success(&outer_record_variation_3, &schema));
+ let mut bytes = &buf[..];
+ assert_eq!(
+ outer_record_variation_3,
+ decode(&schema, &mut bytes).expect(&format!(
+ "Failed to Decode with recursively defined namespace with
schema:\n {:?}\n",
+ &schema
+ ))
+ );
+ }
+
+ #[test]
+ fn test_avro_3448_proper_multi_level_decoding_inner_namespace() {
+ // if encoding fails in this test check the corresponding test in
encode
+ let schema = r#"
+ {
+ "name": "record_name",
+ "namespace": "space",
+ "type": "record",
+ "fields": [
+ {
+ "name": "outer_field_1",
+ "type": [
+ "null",
+ {
+ "type": "record",
+ "name": "middle_record_name",
+ "namespace":"middle_namespace",
+ "fields":[
+ {
+ "name":"middle_field_1",
+ "type":[
+ "null",
+ {
+ "type":"record",
+ "name":"inner_record_name",
+ "namespace":"inner_namespace",
+ "fields":[
+ {
+ "name":"inner_field_1",
+ "type":"double"
+ }
+ ]
+ }
+ ]
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "name": "outer_field_2",
+ "type" : "inner_namespace.inner_record_name"
+ }
+ ]
+ }
+ "#;
+ let schema = Schema::parse_str(schema).unwrap();
+ let inner_record = Value::Record(vec![("inner_field_1".into(),
Value::Double(5.4))]);
+ let middle_record_variation_1 = Value::Record(vec![(
+ "middle_field_1".into(),
+ Value::Union(0, Box::new(Value::Null)),
+ )]);
+ let middle_record_variation_2 = Value::Record(vec![(
+ "middle_field_1".into(),
+ Value::Union(1, Box::new(inner_record.clone())),
+ )]);
+ let outer_record_variation_1 = Value::Record(vec![
+ (
+ "outer_field_1".into(),
+ Value::Union(0, Box::new(Value::Null)),
+ ),
+ ("outer_field_2".into(), inner_record.clone()),
+ ]);
+ let outer_record_variation_2 = Value::Record(vec![
+ (
+ "outer_field_1".into(),
+ Value::Union(1, Box::new(middle_record_variation_1)),
+ ),
+ ("outer_field_2".into(), inner_record.clone()),
+ ]);
+ let outer_record_variation_3 = Value::Record(vec![
+ (
+ "outer_field_1".into(),
+ Value::Union(1, Box::new(middle_record_variation_2)),
+ ),
+ ("outer_field_2".into(), inner_record),
+ ]);
+
+ let mut buf = Vec::new();
+ encode(&outer_record_variation_1, &schema, &mut buf)
+ .expect(&success(&outer_record_variation_1, &schema));
+ let mut bytes = &buf[..];
+ assert_eq!(
+ outer_record_variation_1,
+ decode(&schema, &mut bytes).expect(&format!(
+ "Failed to Decode with recursively defined namespace with
schema:\n {:?}\n",
+ &schema
+ ))
+ );
+
+ let mut buf = Vec::new();
+ encode(&outer_record_variation_2, &schema, &mut buf)
+ .expect(&success(&outer_record_variation_2, &schema));
+ let mut bytes = &buf[..];
+ assert_eq!(
+ outer_record_variation_2,
+ decode(&schema, &mut bytes).expect(&format!(
+ "Failed to Decode with recursively defined namespace with
schema:\n {:?}\n",
+ &schema
+ ))
+ );
+
+ let mut buf = Vec::new();
+ encode(&outer_record_variation_3, &schema, &mut buf)
+ .expect(&success(&outer_record_variation_3, &schema));
+ let mut bytes = &buf[..];
+ assert_eq!(
+ outer_record_variation_3,
+ decode(&schema, &mut bytes).expect(&format!(
+ "Failed to Decode with recursively defined namespace with
schema:\n {:?}\n",
+ &schema
+ ))
+ );
+ }
}
diff --git a/lang/rust/avro/src/encode.rs b/lang/rust/avro/src/encode.rs
index 664cd92..95d660d 100644
--- a/lang/rust/avro/src/encode.rs
+++ b/lang/rust/avro/src/encode.rs
@@ -16,24 +16,26 @@
// under the License.
use crate::{
- schema::{Name, Schema},
- types::Value,
+ schema::{NamesRef, Namespace, ResolvedSchema, Schema, SchemaKind},
+ types::{Value, ValueKind},
util::{zig_i32, zig_i64},
+ AvroResult, Error,
};
-use std::{collections::HashMap, convert::TryInto};
+use std::convert::{TryFrom, TryInto};
/// Encode a `Value` into avro format.
///
/// **NOTE** This will not perform schema validation. The value is assumed to
/// be valid with regards to the schema. Schema are needed only to guide the
/// encoding for complex type values.
-pub fn encode(value: &Value, schema: &Schema, buffer: &mut Vec<u8>) {
- encode_ref(value, schema, buffer)
+pub fn encode(value: &Value, schema: &Schema, buffer: &mut Vec<u8>) ->
AvroResult<()> {
+ let rs = ResolvedSchema::try_from(schema)?;
+ encode_internal(value, schema, rs.get_names(), &None, buffer)
}
fn encode_bytes<B: AsRef<[u8]> + ?Sized>(s: &B, buffer: &mut Vec<u8>) {
let bytes = s.as_ref();
- encode(&Value::Long(bytes.len() as i64), &Schema::Long, buffer);
+ encode_long(bytes.len() as i64, buffer);
buffer.extend_from_slice(bytes);
}
@@ -45,171 +47,203 @@ fn encode_int(i: i32, buffer: &mut Vec<u8>) {
zig_i32(i, buffer)
}
-/// Encode a `Value` into avro format.
-///
-/// **NOTE** This will not perform schema validation. The value is assumed to
-/// be valid with regards to the schema. Schema are needed only to guide the
-/// encoding for complex type values.
-pub fn encode_ref(value: &Value, schema: &Schema, buffer: &mut Vec<u8>) {
- fn encode_ref0(
- value: &Value,
- schema: &Schema,
- buffer: &mut Vec<u8>,
- schemas_by_name: &mut HashMap<Name, Schema>,
- ) {
- match &schema {
- Schema::Ref { ref name } => {
- let resolved = schemas_by_name.get(name).unwrap();
- return encode_ref0(value, resolved, buffer, &mut
schemas_by_name.clone());
- }
- Schema::Record { ref name, .. }
- | Schema::Enum { ref name, .. }
- | Schema::Fixed { ref name, .. } => {
- schemas_by_name.insert(name.clone(), schema.clone());
- }
- _ => (),
- }
+fn encode_internal(
+ value: &Value,
+ schema: &Schema,
+ names: &NamesRef,
+ enclosing_namespace: &Namespace,
+ buffer: &mut Vec<u8>,
+) -> AvroResult<()> {
+ if let Schema::Ref { ref name } = schema {
+ let fully_qualified_name =
name.fully_qualified_name(enclosing_namespace);
+ let resolved = *names
+ .get(&fully_qualified_name)
+ .ok_or(Error::SchemaResolutionError(fully_qualified_name))?;
+ return encode_internal(value, resolved, names, enclosing_namespace,
buffer);
+ }
- match value {
- Value::Null => (),
- Value::Boolean(b) => buffer.push(if *b { 1u8 } else { 0u8 }),
- // Pattern | Pattern here to signify that these _must_ have the
same encoding.
- Value::Int(i) | Value::Date(i) | Value::TimeMillis(i) =>
encode_int(*i, buffer),
- Value::Long(i)
- | Value::TimestampMillis(i)
- | Value::TimestampMicros(i)
- | Value::TimeMicros(i) => encode_long(*i, buffer),
- Value::Float(x) => buffer.extend_from_slice(&x.to_le_bytes()),
- Value::Double(x) => buffer.extend_from_slice(&x.to_le_bytes()),
- Value::Decimal(decimal) => match schema {
- Schema::Decimal { inner, .. } => match *inner.clone() {
- Schema::Fixed { size, .. } => {
- let bytes =
decimal.to_sign_extended_bytes_with_len(size).unwrap();
- let num_bytes = bytes.len();
- if num_bytes != size {
- panic!(
- "signed decimal bytes length {} not equal to
fixed schema size {}",
- num_bytes, size
- );
- }
- encode(&Value::Fixed(size, bytes), inner, buffer)
- }
- Schema::Bytes => {
- encode(&Value::Bytes(decimal.try_into().unwrap()),
inner, buffer)
+ match value {
+ Value::Null => (),
+ Value::Boolean(b) => buffer.push(if *b { 1u8 } else { 0u8 }),
+ // Pattern | Pattern here to signify that these _must_ have the same
encoding.
+ Value::Int(i) | Value::Date(i) | Value::TimeMillis(i) =>
encode_int(*i, buffer),
+ Value::Long(i)
+ | Value::TimestampMillis(i)
+ | Value::TimestampMicros(i)
+ | Value::TimeMicros(i) => encode_long(*i, buffer),
+ Value::Float(x) => buffer.extend_from_slice(&x.to_le_bytes()),
+ Value::Double(x) => buffer.extend_from_slice(&x.to_le_bytes()),
+ Value::Decimal(decimal) => match schema {
+ Schema::Decimal { inner, .. } => match *inner.clone() {
+ Schema::Fixed { size, .. } => {
+ let bytes =
decimal.to_sign_extended_bytes_with_len(size).unwrap();
+ let num_bytes = bytes.len();
+ if num_bytes != size {
+ return Err(Error::EncodeDecimalAsFixedError(num_bytes,
size));
}
- _ => panic!("invalid inner type for decimal: {:?}", inner),
- },
- _ => panic!("invalid schema type for decimal: {:?}", schema),
- },
- &Value::Duration(duration) => {
- let slice: [u8; 12] = duration.into();
- buffer.extend_from_slice(&slice);
- }
- Value::Uuid(uuid) => encode_bytes(&uuid.to_string(), buffer),
- Value::Bytes(bytes) => match *schema {
- Schema::Bytes => encode_bytes(bytes, buffer),
- Schema::Fixed { .. } => buffer.extend(bytes),
- _ => error!("invalid schema type for bytes: {:?}", schema),
- },
- Value::String(s) => match *schema {
- Schema::String => {
- encode_bytes(s, buffer);
+ encode(&Value::Fixed(size, bytes), inner, buffer)?
}
- Schema::Enum { ref symbols, .. } => {
- if let Some(index) = symbols.iter().position(|item| item
== s) {
- encode_int(index as i32, buffer);
- }
+ Schema::Bytes => encode(&Value::Bytes(decimal.try_into()?),
inner, buffer)?,
+ _ => {
+ return Err(Error::ResolveDecimalSchema(SchemaKind::from(
+ *inner.clone(),
+ )));
}
- _ => error!("invalid schema type for String: {:?}", schema),
},
- Value::Fixed(_, bytes) => buffer.extend(bytes),
- Value::Enum(i, _) => encode_int(*i as i32, buffer),
- Value::Union(idx, item) => {
- if let Schema::Union(ref inner) = *schema {
- inner.schemas.iter().for_each(|s| match s {
- Schema::Record { name, .. }
- | Schema::Enum { name, .. }
- | Schema::Fixed { name, .. } => {
- schemas_by_name.insert(name.clone(), s.clone());
- }
- _ => (),
- });
-
- let inner_schema = inner
- .schemas
- .get(*idx as usize)
- .expect("Invalid Union validation occurred");
- encode_long(*idx as i64, buffer);
- encode_ref0(&*item, inner_schema, buffer, schemas_by_name);
+ _ => {
+ return Err(Error::EncodeValueAsSchemaError {
+ value_kind: ValueKind::Decimal,
+ supported_schema: vec![SchemaKind::Decimal],
+ });
+ }
+ },
+ &Value::Duration(duration) => {
+ let slice: [u8; 12] = duration.into();
+ buffer.extend_from_slice(&slice);
+ }
+ Value::Uuid(uuid) => encode_bytes(&uuid.to_string(), buffer),
+ Value::Bytes(bytes) => match *schema {
+ Schema::Bytes => encode_bytes(bytes, buffer),
+ Schema::Fixed { .. } => buffer.extend(bytes),
+ _ => {
+ return Err(Error::EncodeValueAsSchemaError {
+ value_kind: ValueKind::Bytes,
+ supported_schema: vec![SchemaKind::Bytes,
SchemaKind::Fixed],
+ });
+ }
+ },
+ Value::String(s) => match *schema {
+ Schema::String => {
+ encode_bytes(s, buffer);
+ }
+ Schema::Enum { ref symbols, .. } => {
+ if let Some(index) = symbols.iter().position(|item| item == s)
{
+ encode_int(index as i32, buffer);
} else {
- error!("invalid schema type for Union: {:?}", schema);
+ error!("Invalid symbol string {:?}.", &s[..]);
+ return Err(Error::GetEnumSymbol(s.clone()));
}
}
- Value::Array(items) => {
- if let Schema::Array(ref inner) = *schema {
- if !items.is_empty() {
- encode_long(items.len() as i64, buffer);
- for item in items.iter() {
- encode_ref0(item, inner, buffer, schemas_by_name);
- }
+ _ => {
+ return Err(Error::EncodeValueAsSchemaError {
+ value_kind: ValueKind::String,
+ supported_schema: vec![SchemaKind::String,
SchemaKind::Enum],
+ });
+ }
+ },
+ Value::Fixed(_, bytes) => buffer.extend(bytes),
+ Value::Enum(i, _) => encode_int(*i as i32, buffer),
+ Value::Union(idx, item) => {
+ if let Schema::Union(ref inner) = *schema {
+ let inner_schema = inner
+ .schemas
+ .get(*idx as usize)
+ .expect("Invalid Union validation occurred");
+ encode_long(*idx as i64, buffer);
+ encode_internal(&*item, inner_schema, names,
enclosing_namespace, buffer)?;
+ } else {
+ error!("invalid schema type for Union: {:?}", schema);
+ return Err(Error::EncodeValueAsSchemaError {
+ value_kind: ValueKind::Union,
+ supported_schema: vec![SchemaKind::Union],
+ });
+ }
+ }
+ Value::Array(items) => {
+ if let Schema::Array(ref inner) = *schema {
+ if !items.is_empty() {
+ encode_long(items.len() as i64, buffer);
+ for item in items.iter() {
+ encode_internal(item, inner, names,
enclosing_namespace, buffer)?;
}
- buffer.push(0u8);
- } else {
- error!("invalid schema type for Array: {:?}", schema);
}
+ buffer.push(0u8);
+ } else {
+ error!("invalid schema type for Array: {:?}", schema);
+ return Err(Error::EncodeValueAsSchemaError {
+ value_kind: ValueKind::Array,
+ supported_schema: vec![SchemaKind::Array],
+ });
}
- Value::Map(items) => {
- if let Schema::Map(ref inner) = *schema {
- if !items.is_empty() {
- encode_long(items.len() as i64, buffer);
- for (key, value) in items {
- encode_bytes(key, buffer);
- encode_ref0(value, inner, buffer, schemas_by_name);
- }
+ }
+ Value::Map(items) => {
+ if let Schema::Map(ref inner) = *schema {
+ if !items.is_empty() {
+ encode_long(items.len() as i64, buffer);
+ for (key, value) in items {
+ encode_bytes(key, buffer);
+ encode_internal(value, inner, names,
enclosing_namespace, buffer)?;
}
- buffer.push(0u8);
- } else {
- error!("invalid schema type for Map: {:?}", schema);
}
+ buffer.push(0u8);
+ } else {
+ error!("invalid schema type for Map: {:?}", schema);
+ return Err(Error::EncodeValueAsSchemaError {
+ value_kind: ValueKind::Map,
+ supported_schema: vec![SchemaKind::Map],
+ });
}
- Value::Record(fields) => {
- if let Schema::Record {
- fields: ref schema_fields,
- ..
- } = *schema
- {
- for (i, &(_, ref value)) in fields.iter().enumerate() {
- encode_ref0(value, &schema_fields[i].schema, buffer,
schemas_by_name);
- }
+ }
+ Value::Record(fields) => {
+ if let Schema::Record {
+ ref name,
+ fields: ref schema_fields,
+ ..
+ } = *schema
+ {
+ let record_namespace =
name.fully_qualified_name(enclosing_namespace).namespace;
+ for (i, &(_, ref value)) in fields.iter().enumerate() {
+ encode_internal(
+ value,
+ &schema_fields[i].schema,
+ names,
+ &record_namespace,
+ buffer,
+ )?;
}
+ } else {
+ error!("invalid schema type for Record: {:?}", schema);
+ return Err(Error::EncodeValueAsSchemaError {
+ value_kind: ValueKind::Record,
+ supported_schema: vec![SchemaKind::Record],
+ });
}
}
- }
-
- let mut schemas_by_name = HashMap::new();
- encode_ref0(value, schema, buffer, &mut schemas_by_name)
+ };
+ Ok(())
}
-pub fn encode_to_vec(value: &Value, schema: &Schema) -> Vec<u8> {
+pub fn encode_to_vec(value: &Value, schema: &Schema) -> AvroResult<Vec<u8>> {
let mut buffer = Vec::new();
- encode(value, schema, &mut buffer);
- buffer
+ encode(value, schema, &mut buffer)?;
+ Ok(buffer)
}
#[cfg(test)]
-mod tests {
+#[allow(clippy::expect_fun_call)]
+pub(crate) mod tests {
use super::*;
use std::collections::HashMap;
+ pub(crate) fn success(value: &Value, schema: &Schema) -> String {
+ format!(
+ "Value: {:?}\n should encode with schema:\n{:?}",
+ &value, &schema
+ )
+ }
#[test]
fn test_encode_empty_array() {
let mut buf = Vec::new();
let empty: Vec<Value> = Vec::new();
encode(
- &Value::Array(empty),
+ &Value::Array(empty.clone()),
&Schema::Array(Box::new(Schema::Int)),
&mut buf,
- );
+ )
+ .expect(&success(
+ &Value::Array(empty),
+ &Schema::Array(Box::new(Schema::Int)),
+ ));
assert_eq!(vec![0u8], buf);
}
@@ -218,10 +252,14 @@ mod tests {
let mut buf = Vec::new();
let empty: HashMap<String, Value> = HashMap::new();
encode(
- &Value::Map(empty),
+ &Value::Map(empty.clone()),
&Schema::Map(Box::new(Schema::Int)),
&mut buf,
- );
+ )
+ .expect(&success(
+ &Value::Map(empty),
+ &Schema::Map(Box::new(Schema::Int)),
+ ));
assert_eq!(vec![0u8], buf);
}
@@ -230,27 +268,27 @@ mod tests {
let mut buf = Vec::new();
let schema = Schema::parse_str(
r#"
- {
- "type":"record",
- "name":"TestStruct",
- "fields": [
- {
- "name":"a",
- "type":{
- "type":"record",
- "name": "Inner",
- "fields": [ {
- "name":"z",
- "type":"int"
- }]
+ {
+ "type":"record",
+ "name":"TestStruct",
+ "fields": [
+ {
+ "name":"a",
+ "type":{
+ "type":"record",
+ "name": "Inner",
+ "fields": [ {
+ "name":"z",
+ "type":"int"
+ }]
+ }
+ },
+ {
+ "name":"b",
+ "type":"Inner"
}
- },
- {
- "name":"b",
- "type":"Inner"
- }
- ]
- }"#,
+ ]
+ }"#,
)
.unwrap();
@@ -258,7 +296,7 @@ mod tests {
let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
let outer_value =
Value::Record(vec![("a".into(), inner_value1), ("b".into(),
inner_value2)]);
- encode(&outer_value, &schema, &mut buf);
+ encode(&outer_value, &schema, &mut buf).expect(&success(&outer_value,
&schema));
assert!(!buf.is_empty());
}
@@ -267,33 +305,33 @@ mod tests {
let mut buf = Vec::new();
let schema = Schema::parse_str(
r#"
- {
- "type":"record",
- "name":"TestStruct",
- "fields": [
- {
- "name":"a",
- "type":{
- "type":"array",
- "items": {
- "type":"record",
- "name": "Inner",
- "fields": [ {
- "name":"z",
- "type":"int"
- }]
+ {
+ "type":"record",
+ "name":"TestStruct",
+ "fields": [
+ {
+ "name":"a",
+ "type":{
+ "type":"array",
+ "items": {
+ "type":"record",
+ "name": "Inner",
+ "fields": [ {
+ "name":"z",
+ "type":"int"
+ }]
+ }
+ }
+ },
+ {
+ "name":"b",
+ "type": {
+ "type":"map",
+ "values":"Inner"
}
}
- },
- {
- "name":"b",
- "type": {
- "type":"map",
- "values":"Inner"
- }
- }
- ]
- }"#,
+ ]
+ }"#,
)
.unwrap();
@@ -306,7 +344,7 @@ mod tests {
Value::Map(vec![("akey".into(),
inner_value2)].into_iter().collect()),
),
]);
- encode(&outer_value, &schema, &mut buf);
+ encode(&outer_value, &schema, &mut buf).expect(&success(&outer_value,
&schema));
assert!(!buf.is_empty());
}
@@ -315,10 +353,10 @@ mod tests {
let mut buf = Vec::new();
let schema = Schema::parse_str(
r#"
- {
- "type":"record",
- "name":"TestStruct",
- "fields": [
+ {
+ "type":"record",
+ "name":"TestStruct",
+ "fields": [
{
"name":"a",
"type":{
@@ -351,7 +389,7 @@ mod tests {
Value::Map(vec![("akey".into(),
inner_value2)].into_iter().collect()),
),
]);
- encode(&outer_value, &schema, &mut buf);
+ encode(&outer_value, &schema, &mut buf).expect(&success(&outer_value,
&schema));
assert!(!buf.is_empty());
}
@@ -398,7 +436,7 @@ mod tests {
)]);
let outer_value =
Value::Record(vec![("a".into(), inner_value1), ("b".into(),
inner_value2)]);
- encode(&outer_value, &schema, &mut buf);
+ encode(&outer_value, &schema, &mut buf).expect(&success(&outer_value,
&schema));
assert!(!buf.is_empty());
}
@@ -446,7 +484,7 @@ mod tests {
),
("b".into(), Value::Array(vec![inner_value1])),
]);
- encode(&outer_value, &schema, &mut buf);
+ encode(&outer_value, &schema, &mut buf).expect(&success(&outer_value,
&schema));
assert!(!buf.is_empty());
}
@@ -485,7 +523,7 @@ mod tests {
("a".into(), Value::Union(1, Box::new(inner_value1))),
("b".into(), inner_value2.clone()),
]);
- encode(&outer_value1, &schema, &mut buf);
+ encode(&outer_value1, &schema, &mut
buf).expect(&success(&outer_value1, &schema));
assert!(!buf.is_empty());
buf.drain(..);
@@ -493,7 +531,277 @@ mod tests {
("a".into(), Value::Union(0, Box::new(Value::Null))),
("b".into(), inner_value2),
]);
- encode(&outer_value2, &schema, &mut buf);
+ encode(&outer_value2, &schema, &mut
buf).expect(&success(&outer_value1, &schema));
+ assert!(!buf.is_empty());
+ }
+
+ #[test]
+ fn test_avro_3448_proper_multi_level_encoding_outer_namespace() {
+ let schema = r#"
+ {
+ "name": "record_name",
+ "namespace": "space",
+ "type": "record",
+ "fields": [
+ {
+ "name": "outer_field_1",
+ "type": [
+ "null",
+ {
+ "type": "record",
+ "name": "middle_record_name",
+ "fields":[
+ {
+ "name":"middle_field_1",
+ "type":[
+ "null",
+ {
+ "type":"record",
+ "name":"inner_record_name",
+ "fields":[
+ {
+ "name":"inner_field_1",
+ "type":"double"
+ }
+ ]
+ }
+ ]
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "name": "outer_field_2",
+ "type" : "space.inner_record_name"
+ }
+ ]
+ }
+ "#;
+ let schema = Schema::parse_str(schema).unwrap();
+ let inner_record = Value::Record(vec![("inner_field_1".into(),
Value::Double(5.4))]);
+ let middle_record_variation_1 = Value::Record(vec![(
+ "middle_field_1".into(),
+ Value::Union(0, Box::new(Value::Null)),
+ )]);
+ let middle_record_variation_2 = Value::Record(vec![(
+ "middle_field_1".into(),
+ Value::Union(1, Box::new(inner_record.clone())),
+ )]);
+ let outer_record_variation_1 = Value::Record(vec![
+ (
+ "outer_field_1".into(),
+ Value::Union(0, Box::new(Value::Null)),
+ ),
+ ("outer_field_2".into(), inner_record.clone()),
+ ]);
+ let outer_record_variation_2 = Value::Record(vec![
+ (
+ "outer_field_1".into(),
+ Value::Union(1, Box::new(middle_record_variation_1)),
+ ),
+ ("outer_field_2".into(), inner_record.clone()),
+ ]);
+ let outer_record_variation_3 = Value::Record(vec![
+ (
+ "outer_field_1".into(),
+ Value::Union(1, Box::new(middle_record_variation_2)),
+ ),
+ ("outer_field_2".into(), inner_record),
+ ]);
+
+ let mut buf = Vec::new();
+ encode(&outer_record_variation_1, &schema, &mut buf)
+ .expect(&success(&outer_record_variation_1, &schema));
+ assert!(!buf.is_empty());
+ buf.drain(..);
+ encode(&outer_record_variation_2, &schema, &mut buf)
+ .expect(&success(&outer_record_variation_2, &schema));
+ assert!(!buf.is_empty());
+ buf.drain(..);
+ encode(&outer_record_variation_3, &schema, &mut buf)
+ .expect(&success(&outer_record_variation_3, &schema));
+ assert!(!buf.is_empty());
+ }
+
+ #[test]
+ fn test_avro_3448_proper_multi_level_encoding_middle_namespace() {
+ let schema = r#"
+ {
+ "name": "record_name",
+ "namespace": "space",
+ "type": "record",
+ "fields": [
+ {
+ "name": "outer_field_1",
+ "type": [
+ "null",
+ {
+ "type": "record",
+ "name": "middle_record_name",
+ "namespace":"middle_namespace",
+ "fields":[
+ {
+ "name":"middle_field_1",
+ "type":[
+ "null",
+ {
+ "type":"record",
+ "name":"inner_record_name",
+ "fields":[
+ {
+ "name":"inner_field_1",
+ "type":"double"
+ }
+ ]
+ }
+ ]
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "name": "outer_field_2",
+ "type" : "middle_namespace.inner_record_name"
+ }
+ ]
+ }
+ "#;
+ let schema = Schema::parse_str(schema).unwrap();
+ let inner_record = Value::Record(vec![("inner_field_1".into(),
Value::Double(5.4))]);
+ let middle_record_variation_1 = Value::Record(vec![(
+ "middle_field_1".into(),
+ Value::Union(0, Box::new(Value::Null)),
+ )]);
+ let middle_record_variation_2 = Value::Record(vec![(
+ "middle_field_1".into(),
+ Value::Union(1, Box::new(inner_record.clone())),
+ )]);
+ let outer_record_variation_1 = Value::Record(vec![
+ (
+ "outer_field_1".into(),
+ Value::Union(0, Box::new(Value::Null)),
+ ),
+ ("outer_field_2".into(), inner_record.clone()),
+ ]);
+ let outer_record_variation_2 = Value::Record(vec![
+ (
+ "outer_field_1".into(),
+ Value::Union(1, Box::new(middle_record_variation_1)),
+ ),
+ ("outer_field_2".into(), inner_record.clone()),
+ ]);
+ let outer_record_variation_3 = Value::Record(vec![
+ (
+ "outer_field_1".into(),
+ Value::Union(1, Box::new(middle_record_variation_2)),
+ ),
+ ("outer_field_2".into(), inner_record),
+ ]);
+
+ let mut buf = Vec::new();
+ encode(&outer_record_variation_1, &schema, &mut buf)
+ .expect(&success(&outer_record_variation_1, &schema));
+ assert!(!buf.is_empty());
+ buf.drain(..);
+ encode(&outer_record_variation_2, &schema, &mut buf)
+ .expect(&success(&outer_record_variation_2, &schema));
+ assert!(!buf.is_empty());
+ buf.drain(..);
+ encode(&outer_record_variation_3, &schema, &mut buf)
+ .expect(&success(&outer_record_variation_3, &schema));
+ assert!(!buf.is_empty());
+ }
+
+ #[test]
+ fn test_avro_3448_proper_multi_level_encoding_inner_namespace() {
+ let schema = r#"
+ {
+ "name": "record_name",
+ "namespace": "space",
+ "type": "record",
+ "fields": [
+ {
+ "name": "outer_field_1",
+ "type": [
+ "null",
+ {
+ "type": "record",
+ "name": "middle_record_name",
+ "namespace":"middle_namespace",
+ "fields":[
+ {
+ "name":"middle_field_1",
+ "type":[
+ "null",
+ {
+ "type":"record",
+ "name":"inner_record_name",
+ "namespace":"inner_namespace",
+ "fields":[
+ {
+ "name":"inner_field_1",
+ "type":"double"
+ }
+ ]
+ }
+ ]
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "name": "outer_field_2",
+ "type" : "inner_namespace.inner_record_name"
+ }
+ ]
+ }
+ "#;
+ let schema = Schema::parse_str(schema).unwrap();
+ let inner_record = Value::Record(vec![("inner_field_1".into(),
Value::Double(5.4))]);
+ let middle_record_variation_1 = Value::Record(vec![(
+ "middle_field_1".into(),
+ Value::Union(0, Box::new(Value::Null)),
+ )]);
+ let middle_record_variation_2 = Value::Record(vec![(
+ "middle_field_1".into(),
+ Value::Union(1, Box::new(inner_record.clone())),
+ )]);
+ let outer_record_variation_1 = Value::Record(vec![
+ (
+ "outer_field_1".into(),
+ Value::Union(0, Box::new(Value::Null)),
+ ),
+ ("outer_field_2".into(), inner_record.clone()),
+ ]);
+ let outer_record_variation_2 = Value::Record(vec![
+ (
+ "outer_field_1".into(),
+ Value::Union(1, Box::new(middle_record_variation_1)),
+ ),
+ ("outer_field_2".into(), inner_record.clone()),
+ ]);
+ let outer_record_variation_3 = Value::Record(vec![
+ (
+ "outer_field_1".into(),
+ Value::Union(1, Box::new(middle_record_variation_2)),
+ ),
+ ("outer_field_2".into(), inner_record),
+ ]);
+
+ let mut buf = Vec::new();
+ encode(&outer_record_variation_1, &schema, &mut buf)
+ .expect(&success(&outer_record_variation_1, &schema));
+ assert!(!buf.is_empty());
+ buf.drain(..);
+ encode(&outer_record_variation_2, &schema, &mut buf)
+ .expect(&success(&outer_record_variation_2, &schema));
+ assert!(!buf.is_empty());
+ buf.drain(..);
+ encode(&outer_record_variation_3, &schema, &mut buf)
+ .expect(&success(&outer_record_variation_3, &schema));
assert!(!buf.is_empty());
}
}
diff --git a/lang/rust/avro/src/error.rs b/lang/rust/avro/src/error.rs
index b8c5939..43aef5f 100644
--- a/lang/rust/avro/src/error.rs
+++ b/lang/rust/avro/src/error.rs
@@ -15,7 +15,10 @@
// specific language governing permissions and limitations
// under the License.
-use crate::{schema::SchemaKind, types::ValueKind};
+use crate::{
+ schema::{Name, SchemaKind},
+ types::ValueKind,
+};
use std::fmt;
#[derive(thiserror::Error, Debug)]
@@ -88,8 +91,11 @@ pub enum Error {
#[error("Enum symbol index out of bounds: {num_variants}")]
EnumSymbolIndex { index: usize, num_variants: usize },
- #[error("Enum symbol not found")]
- GetEnumSymbol,
+ #[error("Enum symbol not found {0}")]
+ GetEnumSymbol(String),
+
+ #[error("Unable to decode enum index")]
+ GetEnumUnknownIndexValue,
#[error("Scale {scale} is greater than precision {precision}")]
GetScaleAndPrecision { scale: usize, precision: usize },
@@ -378,13 +384,26 @@ pub enum Error {
/// Error while resolving Schema::Ref
#[error("Unresolved schema reference: {0}")]
- SchemaResolutionError(String),
+ SchemaResolutionError(Name),
#[error("The file metadata is already flushed.")]
FileHeaderAlreadyWritten,
#[error("Metadata keys starting with 'avro.' are reserved for internal
usage: {0}.")]
InvalidMetadataKey(String),
+
+ /// Error when two named schema have the same fully qualified name
+ #[error("Two named schema defined for same fullname: {0}.")]
+ AmbiguousSchemaDefinition(Name),
+
+ #[error("Signed decimal bytes length {0} not equal to fixed schema size
{1}.")]
+ EncodeDecimalAsFixedError(usize, usize),
+
+ #[error("Can only encode value type {value_kind:?} as one of
{supported_schema:?}")]
+ EncodeValueAsSchemaError {
+ value_kind: ValueKind,
+ supported_schema: Vec<SchemaKind>,
+ },
}
impl serde::ser::Error for Error {
diff --git a/lang/rust/avro/src/schema.rs b/lang/rust/avro/src/schema.rs
index 1a2f47e..81b7499 100644
--- a/lang/rust/avro/src/schema.rs
+++ b/lang/rust/avro/src/schema.rs
@@ -28,9 +28,9 @@ use serde_json::{Map, Value};
use std::{
borrow::Cow,
collections::{HashMap, HashSet},
- convert::TryInto,
+ convert::{TryFrom, TryInto},
fmt,
- hash::{Hash, Hasher},
+ hash::Hash,
str::FromStr,
};
use strum_macros::{EnumDiscriminants, EnumString};
@@ -103,6 +103,7 @@ pub enum Schema {
/// of `fields`.
Record {
name: Name,
+ aliases: Aliases,
doc: Documentation,
fields: Vec<RecordField>,
lookup: HashMap<String, usize>,
@@ -110,12 +111,14 @@ pub enum Schema {
/// An `enum` Avro schema.
Enum {
name: Name,
+ aliases: Aliases,
doc: Documentation,
symbols: Vec<String>,
},
/// A `fixed` Avro schema.
Fixed {
name: Name,
+ aliases: Aliases,
doc: Documentation,
size: usize,
},
@@ -225,15 +228,22 @@ impl<'a> From<&'a types::Value> for SchemaKind {
///
/// More information about schema names can be found in the
/// [Avro specification](https://avro.apache.org/docs/current/spec.html#names)
-#[derive(Clone, Debug, Deserialize)]
+#[derive(Clone, Debug, Deserialize, Hash, PartialEq, Eq)]
pub struct Name {
pub name: String,
- pub namespace: Option<String>,
- pub aliases: Option<Vec<String>>,
+ pub namespace: Namespace,
}
/// Represents documentation for complex Avro schemas.
pub type Documentation = Option<String>;
+/// Represents the aliases for Named Schema
+pub type Aliases = Option<Vec<String>>;
+/// Represents Schema lookup within a schema env
+pub(crate) type Names = HashMap<Name, Schema>;
+/// Represents Schema lookup within a schema
+pub(crate) type NamesRef<'a> = HashMap<Name, &'a Schema>;
+/// Represents the namespace for Named Schema
+pub type Namespace = Option<String>;
impl Name {
/// Create a new `Name`.
@@ -241,14 +251,10 @@ impl Name {
/// `aliases` will not be defined.
pub fn new(name: &str) -> AvroResult<Name> {
let (name, namespace) = Name::get_name_and_namespace(name)?;
- Ok(Name {
- name,
- namespace,
- aliases: None,
- })
+ Ok(Name { name, namespace })
}
- fn get_name_and_namespace(name: &str) -> AvroResult<(String,
Option<String>)> {
+ fn get_name_and_namespace(name: &str) -> AvroResult<(String, Namespace)> {
let caps = SCHEMA_NAME_R
.captures(name)
.ok_or_else(|| Error::InvalidSchemaName(name.to_string(),
SCHEMA_NAME_R.as_str()))?;
@@ -264,28 +270,15 @@ impl Name {
.name()
.map(|name| Name::get_name_and_namespace(name.as_str()).unwrap())
.ok_or(Error::GetNameField)?;
-
// FIXME Reading name from the type is wrong ! The name there is just
a metadata (AVRO-3430)
let type_name = match complex.get("type") {
Some(Value::Object(complex_type)) => complex_type.name().or(None),
_ => None,
};
- let aliases: Option<Vec<String>> = complex
- .get("aliases")
- .and_then(|aliases| aliases.as_array())
- .and_then(|aliases| {
- aliases
- .iter()
- .map(|alias| alias.as_str())
- .map(|alias| alias.map(|a| a.to_string()))
- .collect::<Option<_>>()
- });
-
Ok(Name {
name: type_name.unwrap_or(name),
namespace: namespace_from_name.or_else(||
complex.string("namespace")),
- aliases,
})
}
@@ -293,15 +286,11 @@ impl Name {
///
/// More information about fullnames can be found in the
/// [Avro
specification](https://avro.apache.org/docs/current/spec.html#names)
- pub fn fullname(&self, default_namespace: Option<&str>) -> String {
+ pub fn fullname(&self, default_namespace: Namespace) -> String {
if self.name.contains('.') {
self.name.clone()
} else {
- let namespace = self
- .namespace
- .as_ref()
- .map(|s| s.as_ref())
- .or(default_namespace);
+ let namespace = self.namespace.clone().or(default_namespace);
match namespace {
Some(ref namespace) => format!("{}.{}", namespace, self.name),
@@ -309,6 +298,29 @@ impl Name {
}
}
}
+
+ /// Return the fully qualified name needed for indexing or searching for
the schema within a schema/schema env context. Puts the enclosing namespace
into the name's namespace for clarity in schema/schema env parsing
+ /// ```ignore
+ /// use apache_avro::schema::Name;
+ ///
+ /// assert_eq!(
+ ///
Name::new("some_name").unwrap().fully_qualified_name(&Some("some_namespace".into())),
+ /// Name::new("some_namespace.some_name").unwrap()
+ /// );
+ /// assert_eq!(
+ ///
Name::new("some_namespace.some_name").unwrap().fully_qualified_name(&Some("other_namespace".into())),
+ /// Name::new("some_namespace.some_name").unwrap()
+ /// );
+ /// ```
+ pub(crate) fn fully_qualified_name(&self, enclosing_namespace: &Namespace)
-> Name {
+ Name {
+ name: self.name.clone(),
+ namespace: self
+ .namespace
+ .clone()
+ .or_else(|| enclosing_namespace.clone()),
+ }
+ }
}
impl From<&str> for Name {
@@ -317,17 +329,86 @@ impl From<&str> for Name {
}
}
-impl Hash for Name {
- fn hash<H: Hasher>(&self, state: &mut H) {
- self.fullname(None).hash(state);
+impl fmt::Display for Name {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.write_str(&self.fullname(None)[..])
+ }
+}
+
+pub(crate) struct ResolvedSchema<'s> {
+ names_ref: NamesRef<'s>,
+ root_schema: &'s Schema,
+}
+
+impl<'s> TryFrom<&'s Schema> for ResolvedSchema<'s> {
+ type Error = Error;
+
+ fn try_from(schema: &'s Schema) -> AvroResult<Self> {
+ let names = HashMap::new();
+ let mut rs = ResolvedSchema {
+ names_ref: names,
+ root_schema: schema,
+ };
+ Self::from_internal(rs.root_schema, &mut rs.names_ref, &None)?;
+ Ok(rs)
}
}
-impl Eq for Name {}
+impl<'s> ResolvedSchema<'s> {
+ pub fn get_names(&self) -> &NamesRef<'s> {
+ &self.names_ref
+ }
-impl PartialEq for Name {
- fn eq(&self, other: &Name) -> bool {
- self.fullname(None).eq(&other.fullname(None))
+ fn from_internal(
+ schema: &'s Schema,
+ names_ref: &mut NamesRef<'s>,
+ enclosing_namespace: &Namespace,
+ ) -> AvroResult<()> {
+ match schema {
+ Schema::Array(schema) | Schema::Map(schema) => {
+ Self::from_internal(schema, names_ref, enclosing_namespace)
+ }
+ Schema::Union(UnionSchema { schemas, .. }) => {
+ for schema in schemas {
+ Self::from_internal(schema, names_ref,
enclosing_namespace)?
+ }
+ Ok(())
+ }
+ Schema::Enum { name, .. } | Schema::Fixed { name, .. } => {
+ let fully_qualified_name =
name.fully_qualified_name(enclosing_namespace);
+ if names_ref
+ .insert(fully_qualified_name.clone(), schema)
+ .is_some()
+ {
+ Err(Error::AmbiguousSchemaDefinition(fully_qualified_name))
+ } else {
+ Ok(())
+ }
+ }
+ Schema::Record { name, fields, .. } => {
+ let fully_qualified_name =
name.fully_qualified_name(enclosing_namespace);
+ if names_ref
+ .insert(fully_qualified_name.clone(), schema)
+ .is_some()
+ {
+ Err(Error::AmbiguousSchemaDefinition(fully_qualified_name))
+ } else {
+ let record_namespace = fully_qualified_name.namespace;
+ for field in fields {
+ Self::from_internal(&field.schema, names_ref,
&record_namespace)?
+ }
+ Ok(())
+ }
+ }
+ Schema::Ref { name } => {
+ let fully_qualified_name =
name.fully_qualified_name(enclosing_namespace);
+ names_ref
+ .get(&fully_qualified_name)
+ .map(|_| ())
+ .ok_or(Error::SchemaResolutionError(fully_qualified_name))
+ }
+ _ => Ok(()),
+ }
}
}
@@ -363,11 +444,16 @@ pub enum RecordFieldOrder {
impl RecordField {
/// Parse a `serde_json::Value` into a `RecordField`.
- fn parse(field: &Map<String, Value>, position: usize, parser: &mut Parser)
-> AvroResult<Self> {
+ fn parse(
+ field: &Map<String, Value>,
+ position: usize,
+ parser: &mut Parser,
+ enclosing_namespace: &Namespace,
+ ) -> AvroResult<Self> {
let name = field.name().ok_or(Error::GetNameFieldFromRecord)?;
// TODO: "type" = "<record name>"
- let schema = parser.parse_complex(field)?;
+ let schema = parser.parse_complex(field, enclosing_namespace)?;
let default = field.get("default").cloned();
@@ -478,11 +564,11 @@ struct Parser {
// A map of name -> Schema::Ref
// Used to resolve cyclic references, i.e. when a
// field's type is a reference to its record's type
- resolving_schemas: HashMap<Name, Schema>,
+ resolving_schemas: Names,
input_order: Vec<Name>,
// A map of name -> fully parsed Schema
// Used to avoid parsing the same schema twice
- parsed_schemas: HashMap<Name, Schema>,
+ parsed_schemas: Names,
}
impl Schema {
@@ -550,7 +636,7 @@ impl Schema {
pub fn parse(value: &Value) -> AvroResult<Schema> {
let mut parser = Parser::default();
- parser.parse(value)
+ parser.parse(value, &None)
}
}
@@ -559,7 +645,7 @@ impl Parser {
fn parse_str(&mut self, input: &str) -> Result<Schema, Error> {
// TODO: (#82) this should be a ParseSchemaError wrapping the JSON
error
let value =
serde_json::from_str(input).map_err(Error::ParseSchemaJson)?;
- self.parse(&value)
+ self.parse(&value, &None)
}
/// Create an array of `Schema`'s from an iterator of JSON Avro schemas.
It is allowed that
@@ -576,7 +662,7 @@ impl Parser {
.input_schemas
.remove_entry(&next_name)
.expect("Key unexpectedly missing");
- let parsed = self.parse(&value)?;
+ let parsed = self.parse(&value, &None)?;
self.parsed_schemas
.insert(get_schema_type_name(name, value), parsed);
}
@@ -594,11 +680,11 @@ impl Parser {
/// Create a `Schema` from a `serde_json::Value` representing a JSON Avro
/// schema.
- fn parse(&mut self, value: &Value) -> AvroResult<Schema> {
+ fn parse(&mut self, value: &Value, enclosing_namespace: &Namespace) ->
AvroResult<Schema> {
match *value {
- Value::String(ref t) => self.parse_known_schema(t.as_str()),
- Value::Object(ref data) => self.parse_complex(data),
- Value::Array(ref data) => self.parse_union(data),
+ Value::String(ref t) => self.parse_known_schema(t.as_str(),
enclosing_namespace),
+ Value::Object(ref data) => self.parse_complex(data,
enclosing_namespace),
+ Value::Array(ref data) => self.parse_union(data,
enclosing_namespace),
_ => Err(Error::ParseSchemaFromValidJson),
}
}
@@ -606,7 +692,11 @@ impl Parser {
/// Parse a `serde_json::Value` representing an Avro type whose Schema is
known into a
/// `Schema`. A Schema for a `serde_json::Value` is known if it is
primitive or has
/// been parsed previously by the parsed and stored in its map of
parsed_schemas.
- fn parse_known_schema(&mut self, name: &str) -> AvroResult<Schema> {
+ fn parse_known_schema(
+ &mut self,
+ name: &str,
+ enclosing_namespace: &Namespace,
+ ) -> AvroResult<Schema> {
match name {
"null" => Ok(Schema::Null),
"boolean" => Ok(Schema::Boolean),
@@ -616,7 +706,7 @@ impl Parser {
"float" => Ok(Schema::Float),
"bytes" => Ok(Schema::Bytes),
"string" => Ok(Schema::String),
- _ => self.fetch_schema_ref(name),
+ _ => self.fetch_schema_ref(name, enclosing_namespace),
}
}
@@ -629,7 +719,11 @@ impl Parser {
///
/// This method allows schemas definitions that depend on other types to
/// parse their dependencies (or look them up if already parsed).
- fn fetch_schema_ref(&mut self, name: &str) -> AvroResult<Schema> {
+ fn fetch_schema_ref(
+ &mut self,
+ name: &str,
+ enclosing_namespace: &Namespace,
+ ) -> AvroResult<Schema> {
fn get_schema_ref(parsed: &Schema) -> Schema {
match &parsed {
Schema::Record { ref name, .. }
@@ -640,20 +734,23 @@ impl Parser {
}
let name = Name::new(name)?;
+ let fully_qualified_name =
name.fully_qualified_name(enclosing_namespace);
- if let Some(parsed) = self.parsed_schemas.get(&name) {
- return Ok(get_schema_ref(parsed));
+ if self.parsed_schemas.get(&fully_qualified_name).is_some() {
+ return Ok(Schema::Ref { name });
}
- if let Some(resolving_schema) = self.resolving_schemas.get(&name) {
+ if let Some(resolving_schema) =
self.resolving_schemas.get(&fully_qualified_name) {
return Ok(resolving_schema.clone());
}
let value = self
.input_schemas
- .remove(&name)
- .ok_or_else(|| Error::ParsePrimitive(name.fullname(None)))?;
+ .remove(&fully_qualified_name)
+ // TODO make a better descriptive error message here that conveys
that a named schema cannot be found
+ .ok_or_else(||
Error::ParsePrimitive(fully_qualified_name.fullname(None)))?;
- let parsed = self.parse(&value)?;
+ // parsing a full schema from inside another schema. Other full schema
will not inherit namespace
+ let parsed = self.parse(&value, &None)?;
self.parsed_schemas
.insert(get_schema_type_name(name, value), parsed.clone());
@@ -686,15 +783,20 @@ impl Parser {
///
/// Avro supports "recursive" definition of types.
/// e.g: {"type": {"type": "string"}}
- fn parse_complex(&mut self, complex: &Map<String, Value>) ->
AvroResult<Schema> {
+ fn parse_complex(
+ &mut self,
+ complex: &Map<String, Value>,
+ enclosing_namespace: &Namespace,
+ ) -> AvroResult<Schema> {
fn logical_verify_type(
complex: &Map<String, Value>,
kinds: &[SchemaKind],
parser: &mut Parser,
+ enclosing_namespace: &Namespace,
) -> AvroResult<Schema> {
match complex.get("type") {
Some(value) => {
- let ty = parser.parse(value)?;
+ let ty = parser.parse(value, enclosing_namespace)?;
if kinds
.iter()
@@ -730,13 +832,14 @@ impl Parser {
kinds: &[SchemaKind],
ok_schema: Schema,
parser: &mut Parser,
+ enclosing_namespace: &Namespace,
) -> AvroResult<Schema> {
- match logical_verify_type(complex, kinds, parser) {
+ match logical_verify_type(complex, kinds, parser,
enclosing_namespace) {
// type and logicalType match!
Ok(_) => Ok(ok_schema),
// the logicalType is not expected for this type!
Err(Error::GetLogicalTypeVariant(json_value)) => match
json_value {
- Value::String(_) => match parser.parse(&json_value) {
+ Value::String(_) => match parser.parse(&json_value,
enclosing_namespace) {
Ok(schema) => {
warn!(
"Ignoring invalid logical type '{}' for schema
of type: {:?}!",
@@ -759,6 +862,7 @@ impl Parser {
complex,
&[SchemaKind::Fixed, SchemaKind::Bytes],
self,
+ enclosing_namespace,
)?);
let (precision, scale) =
Self::parse_precision_and_scale(complex)?;
@@ -770,7 +874,7 @@ impl Parser {
});
}
"uuid" => {
- logical_verify_type(complex, &[SchemaKind::String], self)?;
+ logical_verify_type(complex, &[SchemaKind::String], self,
enclosing_namespace)?;
return Ok(Schema::Uuid);
}
"date" => {
@@ -780,6 +884,7 @@ impl Parser {
&[SchemaKind::Int],
Schema::Date,
self,
+ enclosing_namespace,
);
}
"time-millis" => {
@@ -789,6 +894,7 @@ impl Parser {
&[SchemaKind::Int],
Schema::TimeMillis,
self,
+ enclosing_namespace,
);
}
"time-micros" => {
@@ -798,6 +904,7 @@ impl Parser {
&[SchemaKind::Long],
Schema::TimeMicros,
self,
+ enclosing_namespace,
);
}
"timestamp-millis" => {
@@ -807,6 +914,7 @@ impl Parser {
&[SchemaKind::Long],
Schema::TimestampMillis,
self,
+ enclosing_namespace,
);
}
"timestamp-micros" => {
@@ -816,10 +924,11 @@ impl Parser {
&[SchemaKind::Long],
Schema::TimestampMicros,
self,
+ enclosing_namespace,
);
}
"duration" => {
- logical_verify_type(complex, &[SchemaKind::Fixed], self)?;
+ logical_verify_type(complex, &[SchemaKind::Fixed], self,
enclosing_namespace)?;
return Ok(Schema::Duration);
}
// In this case, of an unknown logical type, we just pass
through to the underlying
@@ -834,28 +943,28 @@ impl Parser {
}
match complex.get("type") {
Some(&Value::String(ref t)) => match t.as_str() {
- "record" => self.parse_record(complex),
- "enum" => self.parse_enum(complex),
- "array" => self.parse_array(complex),
- "map" => self.parse_map(complex),
- "fixed" => self.parse_fixed(complex),
- other => self.parse_known_schema(other),
+ "record" => self.parse_record(complex, enclosing_namespace),
+ "enum" => self.parse_enum(complex, enclosing_namespace),
+ "array" => self.parse_array(complex, enclosing_namespace),
+ "map" => self.parse_map(complex, enclosing_namespace),
+ "fixed" => self.parse_fixed(complex, enclosing_namespace),
+ other => self.parse_known_schema(other, enclosing_namespace),
},
- Some(&Value::Object(ref data)) => self.parse_complex(data),
- Some(&Value::Array(ref variants)) => self.parse_union(variants),
+ Some(&Value::Object(ref data)) => self.parse_complex(data,
enclosing_namespace),
+ Some(&Value::Array(ref variants)) => self.parse_union(variants,
enclosing_namespace),
Some(unknown) => Err(Error::GetComplexType(unknown.clone())),
None => Err(Error::GetComplexTypeField),
}
}
- fn register_resolving_schema(&mut self, name: &Name) {
+ fn register_resolving_schema(&mut self, name: &Name, aliases: &Aliases) {
let resolving_schema = Schema::Ref { name: name.clone() };
self.resolving_schemas
.insert(name.clone(), resolving_schema.clone());
let namespace = &name.namespace;
- if let Some(ref aliases) = name.aliases {
+ if let Some(ref aliases) = aliases {
aliases.iter().for_each(|alias| {
let alias_fullname = match namespace {
Some(ref ns) => format!("{}.{}", ns, alias),
@@ -869,13 +978,20 @@ impl Parser {
}
}
- fn register_parsed_schema(&mut self, name: &Name, schema: &Schema) {
- self.parsed_schemas.insert(name.clone(), schema.clone());
- self.resolving_schemas.remove(name);
+ fn register_parsed_schema(
+ &mut self,
+ fully_qualified_name: &Name,
+ schema: &Schema,
+ aliases: &Aliases,
+ ) {
+ // FIXME, this should be globally aware, so if there is something
overwriting something else then there is an ambiguois schema definition. An
apropriate error should be thrown
+ self.parsed_schemas
+ .insert(fully_qualified_name.clone(), schema.clone());
+ self.resolving_schemas.remove(fully_qualified_name);
- let namespace = &name.namespace;
+ let namespace = &fully_qualified_name.namespace;
- if let Some(ref aliases) = name.aliases {
+ if let Some(ref aliases) = aliases {
aliases.iter().for_each(|alias| {
let alias_fullname = match namespace {
Some(ref ns) => format!("{}.{}", ns, alias),
@@ -889,10 +1005,16 @@ impl Parser {
}
/// Returns already parsed schema or a schema that is currently being
resolved.
- fn get_already_seen_schema(&self, complex: &Map<String, Value>) ->
Option<&Schema> {
+ fn get_already_seen_schema(
+ &self,
+ complex: &Map<String, Value>,
+ enclosing_namespace: &Namespace,
+ ) -> Option<&Schema> {
match complex.get("type") {
Some(Value::String(ref typ)) => {
- let name = Name::new(typ.as_str()).unwrap();
+ let name = Name::new(typ.as_str())
+ .unwrap()
+ .fully_qualified_name(enclosing_namespace);
self.resolving_schemas
.get(&name)
.or_else(|| self.parsed_schemas.get(&name))
@@ -903,20 +1025,24 @@ impl Parser {
/// Parse a `serde_json::Value` representing a Avro record type into a
/// `Schema`.
- fn parse_record(&mut self, complex: &Map<String, Value>) ->
AvroResult<Schema> {
+ fn parse_record(
+ &mut self,
+ complex: &Map<String, Value>,
+ enclosing_namespace: &Namespace,
+ ) -> AvroResult<Schema> {
let name = Name::parse(complex)?;
-
+ let aliases = complex.aliases();
let fields_opt = complex.get("fields");
if fields_opt.is_none() {
- if let Some(seen) = self.get_already_seen_schema(complex) {
+ if let Some(seen) = self.get_already_seen_schema(complex,
enclosing_namespace) {
return Ok(seen.clone());
}
}
let mut lookup = HashMap::new();
-
- self.register_resolving_schema(&name);
+ let fully_qualified_name =
name.fully_qualified_name(enclosing_namespace);
+ self.register_resolving_schema(&fully_qualified_name, &aliases);
let fields: Vec<RecordField> = fields_opt
.and_then(|fields| fields.as_array())
@@ -926,7 +1052,9 @@ impl Parser {
.iter()
.filter_map(|field| field.as_object())
.enumerate()
- .map(|(position, field)| RecordField::parse(field,
position, self))
+ .map(|(position, field)| {
+ RecordField::parse(field, position, self,
&fully_qualified_name.namespace)
+ })
.collect::<Result<_, _>>()
})?;
@@ -935,25 +1063,31 @@ impl Parser {
}
let schema = Schema::Record {
- name: name.clone(),
+ name,
+ aliases: aliases.clone(),
doc: complex.doc(),
fields,
lookup,
};
- self.register_parsed_schema(&name, &schema);
+ self.register_parsed_schema(&fully_qualified_name, &schema, &aliases);
Ok(schema)
}
/// Parse a `serde_json::Value` representing a Avro enum type into a
/// `Schema`.
- fn parse_enum(&mut self, complex: &Map<String, Value>) ->
AvroResult<Schema> {
+ fn parse_enum(
+ &mut self,
+ complex: &Map<String, Value>,
+ enclosing_namespace: &Namespace,
+ ) -> AvroResult<Schema> {
let name = Name::parse(complex)?;
-
+ let aliases = complex.aliases();
+ let fully_qualified_name =
name.fully_qualified_name(enclosing_namespace);
let symbols_opt = complex.get("symbols");
if symbols_opt.is_none() {
- if let Some(seen) = self.get_already_seen_schema(complex) {
+ if let Some(seen) = self.get_already_seen_schema(complex,
enclosing_namespace) {
return Ok(seen.clone());
}
}
@@ -985,54 +1119,72 @@ impl Parser {
}
let schema = Schema::Enum {
- name: name.clone(),
+ name,
+ aliases: aliases.clone(),
doc: complex.doc(),
symbols,
};
- self.register_parsed_schema(&name, &schema);
+ self.register_parsed_schema(&fully_qualified_name, &schema, &aliases);
Ok(schema)
}
/// Parse a `serde_json::Value` representing a Avro array type into a
/// `Schema`.
- fn parse_array(&mut self, complex: &Map<String, Value>) ->
AvroResult<Schema> {
+ fn parse_array(
+ &mut self,
+ complex: &Map<String, Value>,
+ enclosing_namespace: &Namespace,
+ ) -> AvroResult<Schema> {
complex
.get("items")
.ok_or(Error::GetArrayItemsField)
- .and_then(|items| self.parse(items))
+ .and_then(|items| self.parse(items, enclosing_namespace))
.map(|schema| Schema::Array(Box::new(schema)))
}
/// Parse a `serde_json::Value` representing a Avro map type into a
/// `Schema`.
- fn parse_map(&mut self, complex: &Map<String, Value>) ->
AvroResult<Schema> {
+ fn parse_map(
+ &mut self,
+ complex: &Map<String, Value>,
+ enclosing_namespace: &Namespace,
+ ) -> AvroResult<Schema> {
complex
.get("values")
.ok_or(Error::GetMapValuesField)
- .and_then(|items| self.parse(items))
+ .and_then(|items| self.parse(items, enclosing_namespace))
.map(|schema| Schema::Map(Box::new(schema)))
}
/// Parse a `serde_json::Value` representing a Avro union type into a
/// `Schema`.
- fn parse_union(&mut self, items: &[Value]) -> AvroResult<Schema> {
+ fn parse_union(
+ &mut self,
+ items: &[Value],
+ enclosing_namespace: &Namespace,
+ ) -> AvroResult<Schema> {
items
.iter()
- .map(|v| self.parse(v))
+ .map(|v| self.parse(v, enclosing_namespace))
.collect::<Result<Vec<_>, _>>()
.and_then(|schemas| Ok(Schema::Union(UnionSchema::new(schemas)?)))
}
/// Parse a `serde_json::Value` representing a Avro fixed type into a
/// `Schema`.
- fn parse_fixed(&mut self, complex: &Map<String, Value>) ->
AvroResult<Schema> {
+ fn parse_fixed(
+ &mut self,
+ complex: &Map<String, Value>,
+ enclosing_namespace: &Namespace,
+ ) -> AvroResult<Schema> {
let name = Name::parse(complex)?;
-
+ let fully_qualified_name =
name.fully_qualified_name(enclosing_namespace);
+ let aliases = complex.aliases();
let size_opt = complex.get("size");
if size_opt.is_none() {
- if let Some(seen) = self.get_already_seen_schema(complex) {
+ if let Some(seen) = self.get_already_seen_schema(complex,
enclosing_namespace) {
return Ok(seen.clone());
}
}
@@ -1047,12 +1199,13 @@ impl Parser {
.ok_or(Error::GetFixedSizeField)?;
let schema = Schema::Fixed {
- name: name.clone(),
+ name,
+ aliases: aliases.clone(),
doc,
size: size as usize,
};
- self.register_parsed_schema(&name, &schema);
+ self.register_parsed_schema(&fully_qualified_name, &schema, &aliases);
Ok(schema)
}
@@ -1105,6 +1258,7 @@ impl Serialize for Schema {
}
Schema::Record {
ref name,
+ ref aliases,
ref doc,
ref fields,
..
@@ -1118,7 +1272,7 @@ impl Serialize for Schema {
if let Some(ref docstr) = doc {
map.serialize_entry("doc", docstr)?;
}
- if let Some(ref aliases) = name.aliases {
+ if let Some(ref aliases) = aliases {
map.serialize_entry("aliases", aliases)?;
}
map.serialize_entry("fields", fields)?;
@@ -1139,6 +1293,7 @@ impl Serialize for Schema {
ref name,
ref doc,
ref size,
+ ..
} => {
let mut map = serializer.serialize_map(None)?;
map.serialize_entry("type", "fixed")?;
@@ -1204,6 +1359,7 @@ impl Serialize for Schema {
// duration should be or typically is.
let inner = Schema::Fixed {
name: Name::new("duration").unwrap(),
+ aliases: None,
doc: None,
size: 12,
};
@@ -1453,6 +1609,7 @@ mod tests {
let schema_c_expected = Schema::Record {
name: Name::new("C").unwrap(),
+ aliases: None,
doc: None,
fields: vec![RecordField {
name: "field_one".to_string(),
@@ -1508,6 +1665,7 @@ mod tests {
let schema_option_a_expected = Schema::Record {
name: Name::new("OptionA").unwrap(),
+ aliases: None,
doc: None,
fields: vec![RecordField {
name: "field_one".to_string(),
@@ -1553,6 +1711,7 @@ mod tests {
let expected = Schema::Record {
name: Name::new("test").unwrap(),
+ aliases: None,
doc: None,
fields: vec![
RecordField {
@@ -1611,6 +1770,7 @@ mod tests {
let expected = Schema::Record {
name: Name::new("test").unwrap(),
+ aliases: None,
doc: None,
fields: vec![RecordField {
name: "recordField".to_string(),
@@ -1618,6 +1778,7 @@ mod tests {
default: None,
schema: Schema::Record {
name: Name::new("Node").unwrap(),
+ aliases: None,
doc: None,
fields: vec![
RecordField {
@@ -1779,8 +1940,8 @@ mod tests {
name: Name {
name: "LongList".to_owned(),
namespace: None,
- aliases: Some(vec!["LinkedLongs".to_owned()]),
},
+ aliases: Some(vec!["LinkedLongs".to_owned()]),
doc: None,
fields: vec![
RecordField {
@@ -1802,7 +1963,6 @@ mod tests {
name: Name {
name: "LongList".to_owned(),
namespace: None,
- aliases:
Some(vec!["LinkedLongs".to_owned()]),
},
},
])
@@ -1846,8 +2006,8 @@ mod tests {
name: Name {
name: "record".to_owned(),
namespace: None,
- aliases: None,
},
+ aliases: None,
doc: None,
fields: vec![
RecordField {
@@ -1866,7 +2026,6 @@ mod tests {
name: Name {
name: "record".to_owned(),
namespace: None,
- aliases: None,
},
},
order: RecordFieldOrder::Ascending,
@@ -1911,8 +2070,8 @@ mod tests {
name: Name {
name: "record".to_owned(),
namespace: None,
- aliases: None,
},
+ aliases: None,
doc: None,
fields: vec![
RecordField {
@@ -1923,8 +2082,8 @@ mod tests {
name: Name {
name: "enum".to_owned(),
namespace: None,
- aliases: None,
},
+ aliases: None,
doc: None,
symbols: vec!["one".to_string(), "two".to_string(),
"three".to_string()],
},
@@ -1939,8 +2098,8 @@ mod tests {
name: Name {
name: "enum".to_owned(),
namespace: None,
- aliases: None,
},
+ aliases: None,
doc: None,
symbols: vec!["one".to_string(), "two".to_string(),
"three".to_string()],
},
@@ -1986,8 +2145,8 @@ mod tests {
name: Name {
name: "record".to_owned(),
namespace: None,
- aliases: None,
},
+ aliases: None,
doc: None,
fields: vec![
RecordField {
@@ -1998,8 +2157,8 @@ mod tests {
name: Name {
name: "fixed".to_owned(),
namespace: None,
- aliases: None,
},
+ aliases: None,
doc: None,
size: 456,
},
@@ -2014,8 +2173,8 @@ mod tests {
name: Name {
name: "fixed".to_owned(),
namespace: None,
- aliases: None,
},
+ aliases: None,
doc: None,
size: 456,
},
@@ -2040,6 +2199,7 @@ mod tests {
let expected = Schema::Enum {
name: Name::new("Suit").unwrap(),
+ aliases: None,
doc: None,
symbols: vec![
"diamonds".to_owned(),
@@ -2076,6 +2236,7 @@ mod tests {
let expected = Schema::Fixed {
name: Name::new("test").unwrap(),
+ aliases: None,
doc: None,
size: 16usize,
};
@@ -2092,6 +2253,7 @@ mod tests {
let expected = Schema::Fixed {
name: Name::new("test").unwrap(),
+ aliases: None,
doc: Some(String::from("FixedSchema documentation")),
size: 16usize,
};
@@ -2365,4 +2527,574 @@ mod tests {
_ => panic!("Expected an Error::InvalidSchemaName!"),
}
}
+
+ #[test]
+ fn avro_3448_test_proper_resolution_inner_record_inherited_namespace() {
+ let schema = r#"
+ {
+ "name": "record_name",
+ "namespace": "space",
+ "type": "record",
+ "fields": [
+ {
+ "name": "outer_field_1",
+ "type": [
+ "null",
+ {
+ "type":"record",
+ "name":"inner_record_name",
+ "fields":[
+ {
+ "name":"inner_field_1",
+ "type":"double"
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "name": "outer_field_2",
+ "type" : "inner_record_name"
+ }
+ ]
+ }
+ "#;
+ let schema = Schema::parse_str(schema).unwrap();
+ let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't
successfully parse");
+ assert!(rs.get_names().len() == 2);
+ for s in &["space.record_name", "space.inner_record_name"] {
+ assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
+ }
+ }
+
+ #[test]
+ fn avro_3448_test_proper_resolution_inner_record_qualified_namespace() {
+ let schema = r#"
+ {
+ "name": "record_name",
+ "namespace": "space",
+ "type": "record",
+ "fields": [
+ {
+ "name": "outer_field_1",
+ "type": [
+ "null",
+ {
+ "type":"record",
+ "name":"inner_record_name",
+ "fields":[
+ {
+ "name":"inner_field_1",
+ "type":"double"
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "name": "outer_field_2",
+ "type" : "space.inner_record_name"
+ }
+ ]
+ }
+ "#;
+ let schema = Schema::parse_str(schema).unwrap();
+ let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't
successfully parse");
+ assert!(rs.get_names().len() == 2);
+ for s in &["space.record_name", "space.inner_record_name"] {
+ assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
+ }
+ }
+
+ #[test]
+ fn avro_3448_test_proper_resolution_inner_enum_inherited_namespace() {
+ let schema = r#"
+ {
+ "name": "record_name",
+ "namespace": "space",
+ "type": "record",
+ "fields": [
+ {
+ "name": "outer_field_1",
+ "type": [
+ "null",
+ {
+ "type":"enum",
+ "name":"inner_enum_name",
+ "symbols":["Extensive","Testing"]
+ }
+ ]
+ },
+ {
+ "name": "outer_field_2",
+ "type" : "inner_enum_name"
+ }
+ ]
+ }
+ "#;
+ let schema = Schema::parse_str(schema).unwrap();
+ let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't
successfully parse");
+ assert!(rs.get_names().len() == 2);
+ for s in &["space.record_name", "space.inner_enum_name"] {
+ assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
+ }
+ }
+
+ #[test]
+ fn avro_3448_test_proper_resolution_inner_enum_qualified_namespace() {
+ let schema = r#"
+ {
+ "name": "record_name",
+ "namespace": "space",
+ "type": "record",
+ "fields": [
+ {
+ "name": "outer_field_1",
+ "type": [
+ "null",
+ {
+ "type":"enum",
+ "name":"inner_enum_name",
+ "symbols":["Extensive","Testing"]
+ }
+ ]
+ },
+ {
+ "name": "outer_field_2",
+ "type" : "space.inner_enum_name"
+ }
+ ]
+ }
+ "#;
+ let schema = Schema::parse_str(schema).unwrap();
+ let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't
successfully parse");
+ assert!(rs.get_names().len() == 2);
+ for s in &["space.record_name", "space.inner_enum_name"] {
+ assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
+ }
+ }
+
+ #[test]
+ fn avro_3448_test_proper_resolution_inner_fixed_inherited_namespace() {
+ let schema = r#"
+ {
+ "name": "record_name",
+ "namespace": "space",
+ "type": "record",
+ "fields": [
+ {
+ "name": "outer_field_1",
+ "type": [
+ "null",
+ {
+ "type":"fixed",
+ "name":"inner_fixed_name",
+ "size": 16
+ }
+ ]
+ },
+ {
+ "name": "outer_field_2",
+ "type" : "inner_fixed_name"
+ }
+ ]
+ }
+ "#;
+ let schema = Schema::parse_str(schema).unwrap();
+ let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't
successfully parse");
+ assert!(rs.get_names().len() == 2);
+ for s in &["space.record_name", "space.inner_fixed_name"] {
+ assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
+ }
+ }
+
+ #[test]
+ fn avro_3448_test_proper_resolution_inner_fixed_qualified_namespace() {
+ let schema = r#"
+ {
+ "name": "record_name",
+ "namespace": "space",
+ "type": "record",
+ "fields": [
+ {
+ "name": "outer_field_1",
+ "type": [
+ "null",
+ {
+ "type":"fixed",
+ "name":"inner_fixed_name",
+ "size": 16
+ }
+ ]
+ },
+ {
+ "name": "outer_field_2",
+ "type" : "space.inner_fixed_name"
+ }
+ ]
+ }
+ "#;
+ let schema = Schema::parse_str(schema).unwrap();
+ let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't
successfully parse");
+ assert!(rs.get_names().len() == 2);
+ for s in &["space.record_name", "space.inner_fixed_name"] {
+ assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
+ }
+ }
+
+ #[test]
+ fn avro_3448_test_proper_resolution_inner_record_inner_namespace() {
+ let schema = r#"
+ {
+ "name": "record_name",
+ "namespace": "space",
+ "type": "record",
+ "fields": [
+ {
+ "name": "outer_field_1",
+ "type": [
+ "null",
+ {
+ "type":"record",
+ "name":"inner_record_name",
+ "namespace":"inner_space",
+ "fields":[
+ {
+ "name":"inner_field_1",
+ "type":"double"
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "name": "outer_field_2",
+ "type" : "inner_space.inner_record_name"
+ }
+ ]
+ }
+ "#;
+ let schema = Schema::parse_str(schema).unwrap();
+ let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't
successfully parse");
+ assert!(rs.get_names().len() == 2);
+ for s in &["space.record_name", "inner_space.inner_record_name"] {
+ assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
+ }
+ }
+
+ #[test]
+ fn avro_3448_test_proper_resolution_inner_enum_inner_namespace() {
+ let schema = r#"
+ {
+ "name": "record_name",
+ "namespace": "space",
+ "type": "record",
+ "fields": [
+ {
+ "name": "outer_field_1",
+ "type": [
+ "null",
+ {
+ "type":"enum",
+ "name":"inner_enum_name",
+ "namespace": "inner_space",
+ "symbols":["Extensive","Testing"]
+ }
+ ]
+ },
+ {
+ "name": "outer_field_2",
+ "type" : "inner_space.inner_enum_name"
+ }
+ ]
+ }
+ "#;
+ let schema = Schema::parse_str(schema).unwrap();
+ let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't
successfully parse");
+ assert!(rs.get_names().len() == 2);
+ for s in &["space.record_name", "inner_space.inner_enum_name"] {
+ assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
+ }
+ }
+
+ #[test]
+ fn avro_3448_test_proper_resolution_inner_fixed_inner_namespace() {
+ let schema = r#"
+ {
+ "name": "record_name",
+ "namespace": "space",
+ "type": "record",
+ "fields": [
+ {
+ "name": "outer_field_1",
+ "type": [
+ "null",
+ {
+ "type":"fixed",
+ "name":"inner_fixed_name",
+ "namespace": "inner_space",
+ "size": 16
+ }
+ ]
+ },
+ {
+ "name": "outer_field_2",
+ "type" : "inner_space.inner_fixed_name"
+ }
+ ]
+ }
+ "#;
+ let schema = Schema::parse_str(schema).unwrap();
+ let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't
successfully parse");
+ assert!(rs.get_names().len() == 2);
+ for s in &["space.record_name", "inner_space.inner_fixed_name"] {
+ assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
+ }
+ }
+
+ #[test]
+ fn
avro_3448_test_proper_multi_level_resolution_inner_record_outer_namespace() {
+ let schema = r#"
+ {
+ "name": "record_name",
+ "namespace": "space",
+ "type": "record",
+ "fields": [
+ {
+ "name": "outer_field_1",
+ "type": [
+ "null",
+ {
+ "type":"record",
+ "name":"middle_record_name",
+ "fields":[
+ {
+ "name":"middle_field_1",
+ "type":[
+ "null",
+ {
+ "type":"record",
+ "name":"inner_record_name",
+ "fields":[
+ {
+ "name":"inner_field_1",
+ "type":"double"
+ }
+ ]
+ }
+ ]
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "name": "outer_field_2",
+ "type" : "space.inner_record_name"
+ }
+ ]
+ }
+ "#;
+ let schema = Schema::parse_str(schema).unwrap();
+ let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't
successfully parse");
+ assert!(rs.get_names().len() == 3);
+ for s in &[
+ "space.record_name",
+ "space.middle_record_name",
+ "space.inner_record_name",
+ ] {
+ assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
+ }
+ }
+
+ #[test]
+ fn
avro_3448_test_proper_multi_level_resolution_inner_record_middle_namespace() {
+ let schema = r#"
+ {
+ "name": "record_name",
+ "namespace": "space",
+ "type": "record",
+ "fields": [
+ {
+ "name": "outer_field_1",
+ "type": [
+ "null",
+ {
+ "type":"record",
+ "name":"middle_record_name",
+ "namespace":"middle_namespace",
+ "fields":[
+ {
+ "name":"middle_field_1",
+ "type":[
+ "null",
+ {
+ "type":"record",
+ "name":"inner_record_name",
+ "fields":[
+ {
+ "name":"inner_field_1",
+ "type":"double"
+ }
+ ]
+ }
+ ]
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "name": "outer_field_2",
+ "type" : "middle_namespace.inner_record_name"
+ }
+ ]
+ }
+ "#;
+ let schema = Schema::parse_str(schema).unwrap();
+ let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't
successfully parse");
+ assert!(rs.get_names().len() == 3);
+ for s in &[
+ "space.record_name",
+ "middle_namespace.middle_record_name",
+ "middle_namespace.inner_record_name",
+ ] {
+ assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
+ }
+ }
+
+ #[test]
+ fn
avro_3448_test_proper_multi_level_resolution_inner_record_inner_namespace() {
+ let schema = r#"
+ {
+ "name": "record_name",
+ "namespace": "space",
+ "type": "record",
+ "fields": [
+ {
+ "name": "outer_field_1",
+ "type": [
+ "null",
+ {
+ "type":"record",
+ "name":"middle_record_name",
+ "namespace":"middle_namespace",
+ "fields":[
+ {
+ "name":"middle_field_1",
+ "type":[
+ "null",
+ {
+ "type":"record",
+ "name":"inner_record_name",
+ "namespace":"inner_namespace",
+ "fields":[
+ {
+ "name":"inner_field_1",
+ "type":"double"
+ }
+ ]
+ }
+ ]
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "name": "outer_field_2",
+ "type" : "inner_namespace.inner_record_name"
+ }
+ ]
+ }
+ "#;
+ let schema = Schema::parse_str(schema).unwrap();
+ let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't
successfully parse");
+ assert!(rs.get_names().len() == 3);
+ for s in &[
+ "space.record_name",
+ "middle_namespace.middle_record_name",
+ "inner_namespace.inner_record_name",
+ ] {
+ assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
+ }
+ }
+
+ #[test]
+ fn avro_3448_test_proper_in_array_resolution_inherited_namespace() {
+ let schema = r#"
+ {
+ "name": "record_name",
+ "namespace": "space",
+ "type": "record",
+ "fields": [
+ {
+ "name": "outer_field_1",
+ "type": {
+ "type":"array",
+ "items":{
+ "type":"record",
+ "name":"in_array_record",
+ "fields": [
+ {
+ "name":"array_record_field",
+ "type":"string"
+ }
+ ]
+ }
+ }
+ },
+ {
+ "name":"outer_field_2",
+ "type":"in_array_record"
+ }
+ ]
+ }
+ "#;
+ let schema = Schema::parse_str(schema).unwrap();
+ let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't
successfully parse");
+ assert!(rs.get_names().len() == 2);
+ for s in &["space.record_name", "space.in_array_record"] {
+ assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
+ }
+ }
+
+ #[test]
+ fn avro_3448_test_proper_in_map_resolution_inherited_namespace() {
+ let schema = r#"
+ {
+ "name": "record_name",
+ "namespace": "space",
+ "type": "record",
+ "fields": [
+ {
+ "name": "outer_field_1",
+ "type": {
+ "type":"map",
+ "values":{
+ "type":"record",
+ "name":"in_map_record",
+ "fields": [
+ {
+ "name":"map_record_field",
+ "type":"string"
+ }
+ ]
+ }
+ }
+ },
+ {
+ "name":"outer_field_2",
+ "type":"in_map_record"
+ }
+ ]
+ }
+ "#;
+ let schema = Schema::parse_str(schema).unwrap();
+ let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't
successfully parse");
+ assert!(rs.get_names().len() == 2);
+ for s in &["space.record_name", "space.in_map_record"] {
+ assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
+ }
+ }
}
diff --git a/lang/rust/avro/src/schema_compatibility.rs
b/lang/rust/avro/src/schema_compatibility.rs
index b3fb4db..43fbbbe 100644
--- a/lang/rust/avro/src/schema_compatibility.rs
+++ b/lang/rust/avro/src/schema_compatibility.rs
@@ -232,12 +232,14 @@ impl SchemaCompatibility {
SchemaKind::Fixed => {
if let Schema::Fixed {
name: w_name,
+ aliases: _,
doc: _w_doc,
size: w_size,
} = writers_schema
{
if let Schema::Fixed {
name: r_name,
+ aliases: _,
doc: _r_doc,
size: r_size,
} = readers_schema
diff --git a/lang/rust/avro/src/types.rs b/lang/rust/avro/src/types.rs
index e2f6962..322d21b 100644
--- a/lang/rust/avro/src/types.rs
+++ b/lang/rust/avro/src/types.rs
@@ -409,6 +409,7 @@ impl Value {
/// in the Avro specification for the full set of rules of schema
/// resolution.
pub fn resolve(self, schema: &Schema) -> AvroResult<Self> {
+ // FIXME transition to using resolved Schema
let mut schemas_by_name: HashMap<Name, Schema> = HashMap::new();
self.resolve_internal(schema, &mut schemas_by_name)
}
@@ -440,7 +441,7 @@ impl Value {
if let Some(resolved) = schemas_by_name.get(name) {
resolve0(value, resolved, &mut schemas_by_name.clone())
} else {
- Err(Error::SchemaResolutionError(name.fullname(None)))
+ Err(Error::SchemaResolutionError(name.clone()))
}
}
Schema::Null => val.resolve_null(),
@@ -934,6 +935,7 @@ mod tests {
let schema = Schema::Fixed {
size: 4,
name: Name::new("some_fixed").unwrap(),
+ aliases: None,
doc: None,
};
@@ -947,6 +949,7 @@ mod tests {
fn validate_enum() {
let schema = Schema::Enum {
name: Name::new("some_enum").unwrap(),
+ aliases: None,
doc: None,
symbols: vec![
"spades".to_string(),
@@ -964,6 +967,7 @@ mod tests {
let other_schema = Schema::Enum {
name: Name::new("some_other_enum").unwrap(),
+ aliases: None,
doc: None,
symbols: vec![
"hearts".to_string(),
@@ -988,6 +992,7 @@ mod tests {
// }
let schema = Schema::Record {
name: Name::new("some_record").unwrap(),
+ aliases: None,
doc: None,
fields: vec![
RecordField {
@@ -1139,6 +1144,7 @@ mod tests {
scale: 1,
inner: Box::new(Schema::Fixed {
name: Name::new("decimal").unwrap(),
+ aliases: None,
size: 20,
doc: None
})
diff --git a/lang/rust/avro/src/util.rs b/lang/rust/avro/src/util.rs
index e2b353b..232dc64 100644
--- a/lang/rust/avro/src/util.rs
+++ b/lang/rust/avro/src/util.rs
@@ -15,7 +15,10 @@
// specific language governing permissions and limitations
// under the License.
-use crate::{AvroResult, Error};
+use crate::{
+ schema::{Aliases, Documentation},
+ AvroResult, Error,
+};
use serde_json::{Map, Value};
use std::{convert::TryFrom, i64, io::Read, sync::Once};
@@ -33,9 +36,11 @@ pub trait MapHelper {
self.string("name")
}
- fn doc(&self) -> Option<String> {
+ fn doc(&self) -> Documentation {
self.string("doc")
}
+
+ fn aliases(&self) -> Aliases;
}
impl MapHelper for Map<String, Value> {
@@ -44,6 +49,19 @@ impl MapHelper for Map<String, Value> {
.and_then(|v| v.as_str())
.map(|v| v.to_string())
}
+
+ fn aliases(&self) -> Aliases {
+ // FIXME no warning when aliases aren't a json array of json strings
+ self.get("aliases")
+ .and_then(|aliases| aliases.as_array())
+ .and_then(|aliases| {
+ aliases
+ .iter()
+ .map(|alias| alias.as_str())
+ .map(|alias| alias.map(|a| a.to_string()))
+ .collect::<Option<_>>()
+ })
+ }
}
pub fn read_long<R: Read>(reader: &mut R) -> AvroResult<i64> {
diff --git a/lang/rust/avro/src/writer.rs b/lang/rust/avro/src/writer.rs
index e9518e3..e257916 100644
--- a/lang/rust/avro/src/writer.rs
+++ b/lang/rust/avro/src/writer.rs
@@ -17,7 +17,7 @@
//! Logic handling writing in Avro format at user level.
use crate::{
- encode::{encode, encode_ref, encode_to_vec},
+ encode::{encode, encode_to_vec},
schema::Schema,
ser::Serializer,
types::Value,
@@ -266,7 +266,7 @@ impl<'a, W: Write> Writer<'a, W> {
/// Append a raw Avro Value to the payload avoiding to encode it again.
fn append_raw(&mut self, value: &Value, schema: &Schema) ->
AvroResult<usize> {
- self.append_bytes(encode_to_vec(value, schema).as_ref())
+ self.append_bytes(encode_to_vec(value, schema)?.as_ref())
}
/// Append pure bytes to the payload.
@@ -309,7 +309,7 @@ impl<'a, W: Write> Writer<'a, W> {
&metadata.into(),
&Schema::Map(Box::new(Schema::Bytes)),
&mut header,
- );
+ )?;
header.extend_from_slice(&self.marker);
Ok(header)
@@ -341,7 +341,7 @@ fn write_avro_datum<T: Into<Value>>(
if !avro.validate(schema) {
return Err(Error::Validation);
}
- encode(&avro, schema, buffer);
+ encode(&avro, schema, buffer)?;
Ok(())
}
@@ -349,7 +349,7 @@ fn write_value_ref(schema: &Schema, value: &Value, buffer:
&mut Vec<u8>) -> Avro
if !value.validate(schema) {
return Err(Error::Validation);
}
- encode_ref(value, schema, buffer);
+ encode(value, schema, buffer)?;
Ok(())
}
@@ -521,6 +521,7 @@ mod tests {
let size = 30;
let inner = Schema::Fixed {
name: Name::new("decimal").unwrap(),
+ aliases: None,
doc: None,
size,
};
@@ -559,6 +560,7 @@ mod tests {
fn duration() -> TestResult<()> {
let inner = Schema::Fixed {
name: Name::new("duration").unwrap(),
+ aliases: None,
doc: None,
size: 12,
};
diff --git a/lang/rust/avro/tests/schema.rs b/lang/rust/avro/tests/schema.rs
index 24ae247..c404452 100644
--- a/lang/rust/avro/tests/schema.rs
+++ b/lang/rust/avro/tests/schema.rs
@@ -845,6 +845,7 @@ fn test_parse_reused_record_schema_by_fullname() {
match schema.unwrap() {
Schema::Record {
ref name,
+ aliases: _,
doc: _,
ref fields,
lookup: _,
@@ -1088,7 +1089,7 @@ fn test_fullname_name_and_default_namespace_specified() {
init();
let name: Name =
serde_json::from_str(r#"{"name": "a", "namespace": null, "aliases":
null}"#).unwrap();
- let fullname = name.fullname(Some("b.c.d"));
+ let fullname = name.fullname(Some("b.c.d".into()));
assert_eq!("b.c.d.a", fullname);
}
@@ -1097,7 +1098,7 @@ fn
test_fullname_fullname_and_default_namespace_specified() {
init();
let name: Name =
serde_json::from_str(r#"{"name": "a.b.c.d", "namespace": null,
"aliases": null}"#).unwrap();
- let fullname = name.fullname(Some("o.a.h"));
+ let fullname = name.fullname(Some("o.a.h".into()));
assert_eq!("a.b.c.d", fullname);
}
@@ -1107,7 +1108,7 @@ fn
test_fullname_fullname_namespace_and_default_namespace_specified() {
let name: Name =
serde_json::from_str(r#"{"name": "a.b.c.d", "namespace": "o.a.a",
"aliases": null}"#)
.unwrap();
- let fullname = name.fullname(Some("o.a.h"));
+ let fullname = name.fullname(Some("o.a.h".into()));
assert_eq!("a.b.c.d", fullname);
}
@@ -1116,7 +1117,7 @@ fn
test_fullname_name_namespace_and_default_namespace_specified() {
init();
let name: Name =
serde_json::from_str(r#"{"name": "a", "namespace": "o.a.a", "aliases":
null}"#).unwrap();
- let fullname = name.fullname(Some("o.a.h"));
+ let fullname = name.fullname(Some("o.a.h".into()));
assert_eq!("o.a.a.a", fullname);
}