This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a commit to branch branch-1.11
in repository https://gitbox.apache.org/repos/asf/avro.git


The following commit(s) were added to refs/heads/branch-1.11 by this push:
     new 2bb7eb3  [AVRO-3448][rust] Rust name/namespace spec adherence work 
(#1602)
2bb7eb3 is described below

commit 2bb7eb3a42b98f846936b4d9bc1e4345f6de6e5b
Author: Jack Klamer <[email protected]>
AuthorDate: Fri Mar 18 01:21:43 2022 -0500

    [AVRO-3448][rust] Rust name/namespace spec adherence work (#1602)
    
    * resolved schema struct
    
    * matrix testing started
    
    * [AVRO-3448] encode, decode, parsing in allignment with name/namespace spec
    
    * [AVRO-3448] clippy
    
    * Update lang/rust/avro/src/util.rs
    
    Co-authored-by: Martin Grigorov <[email protected]>
    
    * Update lang/rust/avro/src/schema.rs
    
    Co-authored-by: Martin Grigorov <[email protected]>
    
    * [AVRO-3448] encode with Result return
    
    * [AVRO-3448] clean up types and tests
    
    * AVRO-3448: Fix a typo in an error variant
    
    Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
    
    Co-authored-by: Martin Grigorov <[email protected]>
    Co-authored-by: Martin Tzvetanov Grigorov <[email protected]>
    (cherry picked from commit 128510c4e9c3a714279aa7cea22eda19080521f7)
---
 lang/rust/avro/src/decode.rs               | 764 +++++++++++++++++------
 lang/rust/avro/src/encode.rs               | 700 +++++++++++++++------
 lang/rust/avro/src/error.rs                |  27 +-
 lang/rust/avro/src/schema.rs               | 962 +++++++++++++++++++++++++----
 lang/rust/avro/src/schema_compatibility.rs |   2 +
 lang/rust/avro/src/types.rs                |   8 +-
 lang/rust/avro/src/util.rs                 |  22 +-
 lang/rust/avro/src/writer.rs               |  12 +-
 lang/rust/avro/tests/schema.rs             |   9 +-
 9 files changed, 1996 insertions(+), 510 deletions(-)

diff --git a/lang/rust/avro/src/decode.rs b/lang/rust/avro/src/decode.rs
index 87337bb..7ddb9fc 100644
--- a/lang/rust/avro/src/decode.rs
+++ b/lang/rust/avro/src/decode.rs
@@ -18,7 +18,7 @@
 use crate::{
     decimal::Decimal,
     duration::Duration,
-    schema::Schema,
+    schema::{NamesRef, Namespace, ResolvedSchema, Schema},
     types::Value,
     util::{safe_len, zag_i32, zag_i64},
     AvroResult, Error,
@@ -68,222 +68,225 @@ fn decode_seq_len<R: Read>(reader: &mut R) -> 
AvroResult<usize> {
 
 /// Decode a `Value` from avro format given its `Schema`.
 pub fn decode<R: Read>(schema: &Schema, reader: &mut R) -> AvroResult<Value> {
-    fn decode0<R: Read>(
-        schema: &Schema,
-        reader: &mut R,
-        schemas_by_name: &mut HashMap<String, Schema>,
-    ) -> AvroResult<Value> {
-        match *schema {
-            Schema::Null => Ok(Value::Null),
-            Schema::Boolean => {
-                let mut buf = [0u8; 1];
-                match reader.read_exact(&mut buf[..]) {
-                    Ok(_) => match buf[0] {
-                        0u8 => Ok(Value::Boolean(false)),
-                        1u8 => Ok(Value::Boolean(true)),
-                        _ => Err(Error::BoolValue(buf[0])),
-                    },
-                    Err(io_err) => {
-                        if let ErrorKind::UnexpectedEof = io_err.kind() {
-                            Ok(Value::Null)
-                        } else {
-                            Err(Error::ReadBoolean(io_err))
-                        }
+    let rs = ResolvedSchema::try_from(schema)?;
+    decode_internal(schema, rs.get_names(), &None, reader)
+}
+
+fn decode_internal<R: Read>(
+    schema: &Schema,
+    names: &NamesRef,
+    enclosing_namespace: &Namespace,
+    reader: &mut R,
+) -> AvroResult<Value> {
+    match *schema {
+        Schema::Null => Ok(Value::Null),
+        Schema::Boolean => {
+            let mut buf = [0u8; 1];
+            match reader.read_exact(&mut buf[..]) {
+                Ok(_) => match buf[0] {
+                    0u8 => Ok(Value::Boolean(false)),
+                    1u8 => Ok(Value::Boolean(true)),
+                    _ => Err(Error::BoolValue(buf[0])),
+                },
+                Err(io_err) => {
+                    if let ErrorKind::UnexpectedEof = io_err.kind() {
+                        Ok(Value::Null)
+                    } else {
+                        Err(Error::ReadBoolean(io_err))
                     }
                 }
             }
-            Schema::Decimal { ref inner, .. } => match &**inner {
-                Schema::Fixed { .. } => match decode0(inner, reader, 
schemas_by_name)? {
+        }
+        Schema::Decimal { ref inner, .. } => match &**inner {
+            Schema::Fixed { .. } => {
+                match decode_internal(inner, names, enclosing_namespace, 
reader)? {
                     Value::Fixed(_, bytes) => 
Ok(Value::Decimal(Decimal::from(bytes))),
                     value => Err(Error::FixedValue(value.into())),
-                },
-                Schema::Bytes => match decode0(inner, reader, 
schemas_by_name)? {
-                    Value::Bytes(bytes) => 
Ok(Value::Decimal(Decimal::from(bytes))),
-                    value => Err(Error::BytesValue(value.into())),
-                },
-                schema => Err(Error::ResolveDecimalSchema(schema.into())),
+                }
+            }
+            Schema::Bytes => match decode_internal(inner, names, 
enclosing_namespace, reader)? {
+                Value::Bytes(bytes) => 
Ok(Value::Decimal(Decimal::from(bytes))),
+                value => Err(Error::BytesValue(value.into())),
             },
-            Schema::Uuid => Ok(Value::Uuid(
-                Uuid::from_str(match decode0(&Schema::String, reader, 
schemas_by_name)? {
+            schema => Err(Error::ResolveDecimalSchema(schema.into())),
+        },
+        Schema::Uuid => Ok(Value::Uuid(
+            Uuid::from_str(
+                match decode_internal(&Schema::String, names, 
enclosing_namespace, reader)? {
                     Value::String(ref s) => s,
                     value => return 
Err(Error::GetUuidFromStringValue(value.into())),
-                })
-                .map_err(Error::ConvertStrToUuid)?,
-            )),
-            Schema::Int => decode_int(reader),
-            Schema::Date => zag_i32(reader).map(Value::Date),
-            Schema::TimeMillis => zag_i32(reader).map(Value::TimeMillis),
-            Schema::Long => decode_long(reader),
-            Schema::TimeMicros => zag_i64(reader).map(Value::TimeMicros),
-            Schema::TimestampMillis => 
zag_i64(reader).map(Value::TimestampMillis),
-            Schema::TimestampMicros => 
zag_i64(reader).map(Value::TimestampMicros),
-            Schema::Duration => {
-                let mut buf = [0u8; 12];
-                reader.read_exact(&mut buf).map_err(Error::ReadDuration)?;
-                Ok(Value::Duration(Duration::from(buf)))
-            }
-            Schema::Float => {
-                let mut buf = [0u8; std::mem::size_of::<f32>()];
-                reader.read_exact(&mut buf[..]).map_err(Error::ReadFloat)?;
-                Ok(Value::Float(f32::from_le_bytes(buf)))
-            }
-            Schema::Double => {
-                let mut buf = [0u8; std::mem::size_of::<f64>()];
-                reader.read_exact(&mut buf[..]).map_err(Error::ReadDouble)?;
-                Ok(Value::Double(f64::from_le_bytes(buf)))
-            }
-            Schema::Bytes => {
-                let len = decode_len(reader)?;
-                let mut buf = vec![0u8; len];
-                reader.read_exact(&mut buf).map_err(Error::ReadBytes)?;
-                Ok(Value::Bytes(buf))
-            }
-            Schema::String => {
-                let len = decode_len(reader)?;
-                let mut buf = vec![0u8; len];
-                match reader.read_exact(&mut buf) {
-                    Ok(_) => Ok(Value::String(
-                        String::from_utf8(buf).map_err(Error::ConvertToUtf8)?,
-                    )),
-                    Err(io_err) => {
-                        if let ErrorKind::UnexpectedEof = io_err.kind() {
-                            Ok(Value::Null)
-                        } else {
-                            Err(Error::ReadString(io_err))
-                        }
+                },
+            )
+            .map_err(Error::ConvertStrToUuid)?,
+        )),
+        Schema::Int => decode_int(reader),
+        Schema::Date => zag_i32(reader).map(Value::Date),
+        Schema::TimeMillis => zag_i32(reader).map(Value::TimeMillis),
+        Schema::Long => decode_long(reader),
+        Schema::TimeMicros => zag_i64(reader).map(Value::TimeMicros),
+        Schema::TimestampMillis => zag_i64(reader).map(Value::TimestampMillis),
+        Schema::TimestampMicros => zag_i64(reader).map(Value::TimestampMicros),
+        Schema::Duration => {
+            let mut buf = [0u8; 12];
+            reader.read_exact(&mut buf).map_err(Error::ReadDuration)?;
+            Ok(Value::Duration(Duration::from(buf)))
+        }
+        Schema::Float => {
+            let mut buf = [0u8; std::mem::size_of::<f32>()];
+            reader.read_exact(&mut buf[..]).map_err(Error::ReadFloat)?;
+            Ok(Value::Float(f32::from_le_bytes(buf)))
+        }
+        Schema::Double => {
+            let mut buf = [0u8; std::mem::size_of::<f64>()];
+            reader.read_exact(&mut buf[..]).map_err(Error::ReadDouble)?;
+            Ok(Value::Double(f64::from_le_bytes(buf)))
+        }
+        Schema::Bytes => {
+            let len = decode_len(reader)?;
+            let mut buf = vec![0u8; len];
+            reader.read_exact(&mut buf).map_err(Error::ReadBytes)?;
+            Ok(Value::Bytes(buf))
+        }
+        Schema::String => {
+            let len = decode_len(reader)?;
+            let mut buf = vec![0u8; len];
+            match reader.read_exact(&mut buf) {
+                Ok(_) => Ok(Value::String(
+                    String::from_utf8(buf).map_err(Error::ConvertToUtf8)?,
+                )),
+                Err(io_err) => {
+                    if let ErrorKind::UnexpectedEof = io_err.kind() {
+                        Ok(Value::Null)
+                    } else {
+                        Err(Error::ReadString(io_err))
                     }
                 }
             }
-            Schema::Fixed { ref name, size, .. } => {
-                schemas_by_name.insert(name.name.clone(), schema.clone());
-                let mut buf = vec![0u8; size];
-                reader
-                    .read_exact(&mut buf)
-                    .map_err(|e| Error::ReadFixed(e, size))?;
-                Ok(Value::Fixed(size, buf))
-            }
-            Schema::Array(ref inner) => {
-                let mut items = Vec::new();
-
-                loop {
-                    let len = decode_seq_len(reader)?;
-                    if len == 0 {
-                        break;
-                    }
+        }
+        Schema::Fixed { size, .. } => {
+            let mut buf = vec![0u8; size];
+            reader
+                .read_exact(&mut buf)
+                .map_err(|e| Error::ReadFixed(e, size))?;
+            Ok(Value::Fixed(size, buf))
+        }
+        Schema::Array(ref inner) => {
+            let mut items = Vec::new();
 
-                    items.reserve(len);
-                    for _ in 0..len {
-                        items.push(decode0(inner, reader, schemas_by_name)?);
-                    }
+            loop {
+                let len = decode_seq_len(reader)?;
+                if len == 0 {
+                    break;
                 }
 
-                Ok(Value::Array(items))
+                items.reserve(len);
+                for _ in 0..len {
+                    items.push(decode_internal(inner, names, 
enclosing_namespace, reader)?);
+                }
             }
-            Schema::Map(ref inner) => {
-                let mut items = HashMap::new();
 
-                loop {
-                    let len = decode_seq_len(reader)?;
-                    if len == 0 {
-                        break;
-                    }
+            Ok(Value::Array(items))
+        }
+        Schema::Map(ref inner) => {
+            let mut items = HashMap::new();
 
-                    items.reserve(len);
-                    for _ in 0..len {
-                        match decode0(&Schema::String, reader, 
schemas_by_name)? {
-                            Value::String(key) => {
-                                let value = decode0(inner, reader, 
schemas_by_name)?;
-                                items.insert(key, value);
-                            }
-                            value => return 
Err(Error::MapKeyType(value.into())),
+            loop {
+                let len = decode_seq_len(reader)?;
+                if len == 0 {
+                    break;
+                }
+
+                items.reserve(len);
+                for _ in 0..len {
+                    match decode_internal(&Schema::String, names, 
enclosing_namespace, reader)? {
+                        Value::String(key) => {
+                            let value = decode_internal(inner, names, 
enclosing_namespace, reader)?;
+                            items.insert(key, value);
                         }
+                        value => return Err(Error::MapKeyType(value.into())),
                     }
                 }
+            }
 
-                Ok(Value::Map(items))
+            Ok(Value::Map(items))
+        }
+        Schema::Union(ref inner) => match zag_i64(reader) {
+            Ok(index) => {
+                let variants = inner.variants();
+                let variant = variants
+                    .get(usize::try_from(index).map_err(|e| 
Error::ConvertI64ToUsize(e, index))?)
+                    .ok_or(Error::GetUnionVariant {
+                        index,
+                        num_variants: variants.len(),
+                    })?;
+                let value = decode_internal(variant, names, 
enclosing_namespace, reader)?;
+                Ok(Value::Union(index as u32, Box::new(value)))
             }
-            Schema::Union(ref inner) => match zag_i64(reader) {
-                Ok(index) => {
-                    let variants = inner.variants();
-                    let variant = variants
-                        .get(
-                            usize::try_from(index)
-                                .map_err(|e| Error::ConvertI64ToUsize(e, 
index))?,
-                        )
-                        .ok_or(Error::GetUnionVariant {
-                            index,
-                            num_variants: variants.len(),
-                        })?;
-                    let value = decode0(variant, reader, schemas_by_name)?;
-                    Ok(Value::Union(index as u32, Box::new(value)))
-                }
-                Err(Error::ReadVariableIntegerBytes(io_err)) => {
-                    if let ErrorKind::UnexpectedEof = io_err.kind() {
-                        Ok(Value::Union(0, Box::new(Value::Null)))
-                    } else {
-                        Err(Error::ReadVariableIntegerBytes(io_err))
-                    }
-                }
-                Err(io_err) => Err(io_err),
-            },
-            Schema::Record {
-                ref name,
-                ref fields,
-                ..
-            } => {
-                schemas_by_name.insert(name.name.clone(), schema.clone());
-                // Benchmarks indicate ~10% improvement using this method.
-                let mut items = Vec::with_capacity(fields.len());
-                for field in fields {
-                    // TODO: This clone is also expensive. See if we can do 
away with it...
-                    items.push((
-                        field.name.clone(),
-                        decode0(&field.schema, reader, schemas_by_name)?,
-                    ));
+            Err(Error::ReadVariableIntegerBytes(io_err)) => {
+                if let ErrorKind::UnexpectedEof = io_err.kind() {
+                    Ok(Value::Union(0, Box::new(Value::Null)))
+                } else {
+                    Err(Error::ReadVariableIntegerBytes(io_err))
                 }
-                Ok(Value::Record(items))
             }
-            Schema::Enum {
-                ref name,
-                ref symbols,
-                ..
-            } => {
-                schemas_by_name.insert(name.name.clone(), schema.clone());
-                Ok(if let Value::Int(raw_index) = decode_int(reader)? {
-                    let index = usize::try_from(raw_index)
-                        .map_err(|e| Error::ConvertI32ToUsize(e, raw_index))?;
-                    if (0..=symbols.len()).contains(&index) {
-                        let symbol = symbols[index].clone();
-                        Value::Enum(raw_index as u32, symbol)
-                    } else {
-                        return Err(Error::GetEnumValue {
-                            index,
-                            nsymbols: symbols.len(),
-                        });
-                    }
-                } else {
-                    return Err(Error::GetEnumSymbol);
-                })
+            Err(io_err) => Err(io_err),
+        },
+        Schema::Record {
+            ref name,
+            ref fields,
+            ..
+        } => {
+            let fully_qualified_name = 
name.fully_qualified_name(enclosing_namespace);
+            // Benchmarks indicate ~10% improvement using this method.
+            let mut items = Vec::with_capacity(fields.len());
+            for field in fields {
+                // TODO: This clone is also expensive. See if we can do away 
with it...
+                items.push((
+                    field.name.clone(),
+                    decode_internal(
+                        &field.schema,
+                        names,
+                        &fully_qualified_name.namespace,
+                        reader,
+                    )?,
+                ));
             }
-            Schema::Ref { ref name } => {
-                let name = &name.name;
-                if let Some(resolved) = schemas_by_name.get(name.as_str()) {
-                    decode0(resolved, reader, &mut schemas_by_name.clone())
+            Ok(Value::Record(items))
+        }
+        Schema::Enum { ref symbols, .. } => {
+            Ok(if let Value::Int(raw_index) = decode_int(reader)? {
+                let index = usize::try_from(raw_index)
+                    .map_err(|e| Error::ConvertI32ToUsize(e, raw_index))?;
+                if (0..=symbols.len()).contains(&index) {
+                    let symbol = symbols[index].clone();
+                    Value::Enum(raw_index as u32, symbol)
                 } else {
-                    Err(Error::SchemaResolutionError(name.clone()))
+                    return Err(Error::GetEnumValue {
+                        index,
+                        nsymbols: symbols.len(),
+                    });
                 }
+            } else {
+                return Err(Error::GetEnumUnknownIndexValue);
+            })
+        }
+        Schema::Ref { ref name } => {
+            let fully_qualified_name = 
name.fully_qualified_name(enclosing_namespace);
+            if let Some(resolved) = names.get(&fully_qualified_name) {
+                decode_internal(resolved, names, 
&fully_qualified_name.namespace, reader)
+            } else {
+                Err(Error::SchemaResolutionError(fully_qualified_name))
             }
         }
     }
-
-    let mut schemas_by_name: HashMap<String, Schema> = HashMap::new();
-    decode0(schema, reader, &mut schemas_by_name)
 }
 
 #[cfg(test)]
+#[allow(clippy::expect_fun_call)]
 mod tests {
     use crate::{
         decode::decode,
+        encode::{encode, tests::success},
         schema::Schema,
         types::{
             Value,
@@ -333,6 +336,7 @@ mod tests {
             size: 2,
             doc: None,
             name: Name::new("decimal").unwrap(),
+            aliases: None,
         });
         let schema = Schema::Decimal {
             inner,
@@ -343,7 +347,7 @@ mod tests {
         let value = Value::Decimal(Decimal::from(bigint.to_signed_bytes_be()));
 
         let mut buffer = Vec::new();
-        encode(&value, &schema, &mut buffer);
+        encode(&value, &schema, &mut buffer).expect(&success(&value, &schema));
 
         let mut bytes = &buffer[..];
         let result = decode(&schema, &mut bytes).unwrap();
@@ -357,6 +361,7 @@ mod tests {
         let inner = Box::new(Schema::Fixed {
             size: 13,
             name: Name::new("decimal").unwrap(),
+            aliases: None,
             doc: None,
         });
         let schema = Schema::Decimal {
@@ -369,9 +374,402 @@ mod tests {
         ));
         let mut buffer = Vec::<u8>::new();
 
-        encode(&value, &schema, &mut buffer);
+        encode(&value, &schema, &mut buffer).expect(&success(&value, &schema));
         let mut bytes: &[u8] = &buffer[..];
         let result = decode(&schema, &mut bytes).unwrap();
         assert_eq!(result, value);
     }
+
+    #[test]
+    fn test_avro_3448_recursive_definition_decode_union() {
+        // if encoding fails in this test check the corresponding test in 
encode
+        let schema = Schema::parse_str(
+            r#"
+        {
+            "type":"record",
+            "name":"TestStruct",
+            "fields": [
+                {
+                    "name":"a",
+                    "type":[ "null", {
+                        "type":"record",
+                        "name": "Inner",
+                        "fields": [ {
+                            "name":"z",
+                            "type":"int"
+                        }]
+                    }]
+                },
+                {
+                    "name":"b",
+                    "type":"Inner"
+                }
+            ]
+        }"#,
+        )
+        .unwrap();
+
+        let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
+        let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
+        let outer_value1 = Value::Record(vec![
+            ("a".into(), Value::Union(1, Box::new(inner_value1))),
+            ("b".into(), inner_value2.clone()),
+        ]);
+        let mut buf = Vec::new();
+        encode(&outer_value1, &schema, &mut 
buf).expect(&success(&outer_value1, &schema));
+        assert!(!buf.is_empty());
+        let mut bytes = &buf[..];
+        assert_eq!(
+            outer_value1,
+            decode(&schema, &mut bytes).expect(&format!(
+                "Failed to decode using recursive definitions with schema:\n 
{:?}\n",
+                &schema
+            ))
+        );
+
+        let mut buf = Vec::new();
+        let outer_value2 = Value::Record(vec![
+            ("a".into(), Value::Union(0, Box::new(Value::Null))),
+            ("b".into(), inner_value2),
+        ]);
+        encode(&outer_value2, &schema, &mut 
buf).expect(&success(&outer_value2, &schema));
+        let mut bytes = &buf[..];
+        assert_eq!(
+            outer_value2,
+            decode(&schema, &mut bytes).expect(&format!(
+                "Failed to decode using recursive definitions with schema:\n 
{:?}\n",
+                &schema
+            ))
+        );
+    }
+
+    #[test]
+    fn test_avro_3448_recursive_definition_decode_array() {
+        let schema = Schema::parse_str(
+            r#"
+        {
+            "type":"record",
+            "name":"TestStruct",
+            "fields": [
+                {
+                    "name":"a",
+                    "type":{
+                        "type":"array",
+                        "items": {
+                            "type":"record",
+                            "name": "Inner",
+                            "fields": [ {
+                                "name":"z",
+                                "type":"int"
+                            }]
+                        }
+                    }
+                },
+                {
+                    "name":"b",
+                    "type": "Inner"
+                }
+            ]
+        }"#,
+        )
+        .unwrap();
+
+        let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
+        let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
+        let outer_value = Value::Record(vec![
+            ("a".into(), Value::Array(vec![inner_value1])),
+            ("b".into(), inner_value2),
+        ]);
+        let mut buf = Vec::new();
+        encode(&outer_value, &schema, &mut buf).expect(&success(&outer_value, 
&schema));
+        let mut bytes = &buf[..];
+        assert_eq!(
+            outer_value,
+            decode(&schema, &mut bytes).expect(&format!(
+                "Failed to decode using recursive definitions with schema:\n 
{:?}\n",
+                &schema
+            ))
+        )
+    }
+
+    #[test]
+    fn test_avro_3448_recursive_definition_decode_map() {
+        let schema = Schema::parse_str(
+            r#"
+        {
+            "type":"record",
+            "name":"TestStruct",
+            "fields": [
+                {
+                    "name":"a",
+                    "type":{
+                        "type":"map",
+                        "values": {
+                            "type":"record",
+                            "name": "Inner",
+                            "fields": [ {
+                                "name":"z",
+                                "type":"int"
+                            }]
+                        }
+                    }
+                },
+                {
+                    "name":"b",
+                    "type": "Inner"
+                }
+            ]
+        }"#,
+        )
+        .unwrap();
+
+        let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
+        let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
+        let outer_value = Value::Record(vec![
+            (
+                "a".into(),
+                Value::Map(vec![("akey".into(), 
inner_value1)].into_iter().collect()),
+            ),
+            ("b".into(), inner_value2),
+        ]);
+        let mut buf = Vec::new();
+        encode(&outer_value, &schema, &mut buf).expect(&success(&outer_value, 
&schema));
+        let mut bytes = &buf[..];
+        assert_eq!(
+            outer_value,
+            decode(&schema, &mut bytes).expect(&format!(
+                "Failed to decode using recursive definitions with schema:\n 
{:?}\n",
+                &schema
+            ))
+        )
+    }
+
+    #[test]
+    fn test_avro_3448_proper_multi_level_decoding_middle_namespace() {
+        // if encoding fails in this test check the corresponding test in 
encode
+        let schema = r#"
+        {
+          "name": "record_name",
+          "namespace": "space",
+          "type": "record",
+          "fields": [
+            {
+              "name": "outer_field_1",
+              "type": [
+                        "null",
+                        {
+                            "type": "record",
+                            "name": "middle_record_name",
+                            "namespace":"middle_namespace",
+                            "fields":[
+                                {
+                                    "name":"middle_field_1",
+                                    "type":[
+                                        "null",
+                                        {
+                                            "type":"record",
+                                            "name":"inner_record_name",
+                                            "fields":[
+                                                {
+                                                    "name":"inner_field_1",
+                                                    "type":"double"
+                                                }
+                                            ]
+                                        }
+                                    ]
+                                }
+                            ]
+                        }
+                    ]
+            },
+            {
+                "name": "outer_field_2",
+                "type" : "middle_namespace.inner_record_name"
+            }
+          ]
+        }
+        "#;
+        let schema = Schema::parse_str(schema).unwrap();
+        let inner_record = Value::Record(vec![("inner_field_1".into(), 
Value::Double(5.4))]);
+        let middle_record_variation_1 = Value::Record(vec![(
+            "middle_field_1".into(),
+            Value::Union(0, Box::new(Value::Null)),
+        )]);
+        let middle_record_variation_2 = Value::Record(vec![(
+            "middle_field_1".into(),
+            Value::Union(1, Box::new(inner_record.clone())),
+        )]);
+        let outer_record_variation_1 = Value::Record(vec![
+            (
+                "outer_field_1".into(),
+                Value::Union(0, Box::new(Value::Null)),
+            ),
+            ("outer_field_2".into(), inner_record.clone()),
+        ]);
+        let outer_record_variation_2 = Value::Record(vec![
+            (
+                "outer_field_1".into(),
+                Value::Union(1, Box::new(middle_record_variation_1)),
+            ),
+            ("outer_field_2".into(), inner_record.clone()),
+        ]);
+        let outer_record_variation_3 = Value::Record(vec![
+            (
+                "outer_field_1".into(),
+                Value::Union(1, Box::new(middle_record_variation_2)),
+            ),
+            ("outer_field_2".into(), inner_record),
+        ]);
+
+        let mut buf = Vec::new();
+        encode(&outer_record_variation_1, &schema, &mut buf)
+            .expect(&success(&outer_record_variation_1, &schema));
+        let mut bytes = &buf[..];
+        assert_eq!(
+            outer_record_variation_1,
+            decode(&schema, &mut bytes).expect(&format!(
+                "Failed to Decode with recursively defined namespace with 
schema:\n {:?}\n",
+                &schema
+            ))
+        );
+
+        let mut buf = Vec::new();
+        encode(&outer_record_variation_2, &schema, &mut buf)
+            .expect(&success(&outer_record_variation_2, &schema));
+        let mut bytes = &buf[..];
+        assert_eq!(
+            outer_record_variation_2,
+            decode(&schema, &mut bytes).expect(&format!(
+                "Failed to Decode with recursively defined namespace with 
schema:\n {:?}\n",
+                &schema
+            ))
+        );
+
+        let mut buf = Vec::new();
+        encode(&outer_record_variation_3, &schema, &mut buf)
+            .expect(&success(&outer_record_variation_3, &schema));
+        let mut bytes = &buf[..];
+        assert_eq!(
+            outer_record_variation_3,
+            decode(&schema, &mut bytes).expect(&format!(
+                "Failed to Decode with recursively defined namespace with 
schema:\n {:?}\n",
+                &schema
+            ))
+        );
+    }
+
+    #[test]
+    fn test_avro_3448_proper_multi_level_decoding_inner_namespace() {
+        // if encoding fails in this test check the corresponding test in 
encode
+        let schema = r#"
+        {
+          "name": "record_name",
+          "namespace": "space",
+          "type": "record",
+          "fields": [
+            {
+              "name": "outer_field_1",
+              "type": [
+                        "null",
+                        {
+                            "type": "record",
+                            "name": "middle_record_name",
+                            "namespace":"middle_namespace",
+                            "fields":[
+                                {
+                                    "name":"middle_field_1",
+                                    "type":[
+                                        "null",
+                                        {
+                                            "type":"record",
+                                            "name":"inner_record_name",
+                                            "namespace":"inner_namespace",
+                                            "fields":[
+                                                {
+                                                    "name":"inner_field_1",
+                                                    "type":"double"
+                                                }
+                                            ]
+                                        }
+                                    ]
+                                }
+                            ]
+                        }
+                    ]
+            },
+            {
+                "name": "outer_field_2",
+                "type" : "inner_namespace.inner_record_name"
+            }
+          ]
+        }
+        "#;
+        let schema = Schema::parse_str(schema).unwrap();
+        let inner_record = Value::Record(vec![("inner_field_1".into(), 
Value::Double(5.4))]);
+        let middle_record_variation_1 = Value::Record(vec![(
+            "middle_field_1".into(),
+            Value::Union(0, Box::new(Value::Null)),
+        )]);
+        let middle_record_variation_2 = Value::Record(vec![(
+            "middle_field_1".into(),
+            Value::Union(1, Box::new(inner_record.clone())),
+        )]);
+        let outer_record_variation_1 = Value::Record(vec![
+            (
+                "outer_field_1".into(),
+                Value::Union(0, Box::new(Value::Null)),
+            ),
+            ("outer_field_2".into(), inner_record.clone()),
+        ]);
+        let outer_record_variation_2 = Value::Record(vec![
+            (
+                "outer_field_1".into(),
+                Value::Union(1, Box::new(middle_record_variation_1)),
+            ),
+            ("outer_field_2".into(), inner_record.clone()),
+        ]);
+        let outer_record_variation_3 = Value::Record(vec![
+            (
+                "outer_field_1".into(),
+                Value::Union(1, Box::new(middle_record_variation_2)),
+            ),
+            ("outer_field_2".into(), inner_record),
+        ]);
+
+        let mut buf = Vec::new();
+        encode(&outer_record_variation_1, &schema, &mut buf)
+            .expect(&success(&outer_record_variation_1, &schema));
+        let mut bytes = &buf[..];
+        assert_eq!(
+            outer_record_variation_1,
+            decode(&schema, &mut bytes).expect(&format!(
+                "Failed to Decode with recursively defined namespace with 
schema:\n {:?}\n",
+                &schema
+            ))
+        );
+
+        let mut buf = Vec::new();
+        encode(&outer_record_variation_2, &schema, &mut buf)
+            .expect(&success(&outer_record_variation_2, &schema));
+        let mut bytes = &buf[..];
+        assert_eq!(
+            outer_record_variation_2,
+            decode(&schema, &mut bytes).expect(&format!(
+                "Failed to Decode with recursively defined namespace with 
schema:\n {:?}\n",
+                &schema
+            ))
+        );
+
+        let mut buf = Vec::new();
+        encode(&outer_record_variation_3, &schema, &mut buf)
+            .expect(&success(&outer_record_variation_3, &schema));
+        let mut bytes = &buf[..];
+        assert_eq!(
+            outer_record_variation_3,
+            decode(&schema, &mut bytes).expect(&format!(
+                "Failed to Decode with recursively defined namespace with 
schema:\n {:?}\n",
+                &schema
+            ))
+        );
+    }
 }
diff --git a/lang/rust/avro/src/encode.rs b/lang/rust/avro/src/encode.rs
index 664cd92..95d660d 100644
--- a/lang/rust/avro/src/encode.rs
+++ b/lang/rust/avro/src/encode.rs
@@ -16,24 +16,26 @@
 // under the License.
 
 use crate::{
-    schema::{Name, Schema},
-    types::Value,
+    schema::{NamesRef, Namespace, ResolvedSchema, Schema, SchemaKind},
+    types::{Value, ValueKind},
     util::{zig_i32, zig_i64},
+    AvroResult, Error,
 };
-use std::{collections::HashMap, convert::TryInto};
+use std::convert::{TryFrom, TryInto};
 
 /// Encode a `Value` into avro format.
 ///
 /// **NOTE** This will not perform schema validation. The value is assumed to
 /// be valid with regards to the schema. Schema are needed only to guide the
 /// encoding for complex type values.
-pub fn encode(value: &Value, schema: &Schema, buffer: &mut Vec<u8>) {
-    encode_ref(value, schema, buffer)
+pub fn encode(value: &Value, schema: &Schema, buffer: &mut Vec<u8>) -> 
AvroResult<()> {
+    let rs = ResolvedSchema::try_from(schema)?;
+    encode_internal(value, schema, rs.get_names(), &None, buffer)
 }
 
 fn encode_bytes<B: AsRef<[u8]> + ?Sized>(s: &B, buffer: &mut Vec<u8>) {
     let bytes = s.as_ref();
-    encode(&Value::Long(bytes.len() as i64), &Schema::Long, buffer);
+    encode_long(bytes.len() as i64, buffer);
     buffer.extend_from_slice(bytes);
 }
 
@@ -45,171 +47,203 @@ fn encode_int(i: i32, buffer: &mut Vec<u8>) {
     zig_i32(i, buffer)
 }
 
-/// Encode a `Value` into avro format.
-///
-/// **NOTE** This will not perform schema validation. The value is assumed to
-/// be valid with regards to the schema. Schema are needed only to guide the
-/// encoding for complex type values.
-pub fn encode_ref(value: &Value, schema: &Schema, buffer: &mut Vec<u8>) {
-    fn encode_ref0(
-        value: &Value,
-        schema: &Schema,
-        buffer: &mut Vec<u8>,
-        schemas_by_name: &mut HashMap<Name, Schema>,
-    ) {
-        match &schema {
-            Schema::Ref { ref name } => {
-                let resolved = schemas_by_name.get(name).unwrap();
-                return encode_ref0(value, resolved, buffer, &mut 
schemas_by_name.clone());
-            }
-            Schema::Record { ref name, .. }
-            | Schema::Enum { ref name, .. }
-            | Schema::Fixed { ref name, .. } => {
-                schemas_by_name.insert(name.clone(), schema.clone());
-            }
-            _ => (),
-        }
+fn encode_internal(
+    value: &Value,
+    schema: &Schema,
+    names: &NamesRef,
+    enclosing_namespace: &Namespace,
+    buffer: &mut Vec<u8>,
+) -> AvroResult<()> {
+    if let Schema::Ref { ref name } = schema {
+        let fully_qualified_name = 
name.fully_qualified_name(enclosing_namespace);
+        let resolved = *names
+            .get(&fully_qualified_name)
+            .ok_or(Error::SchemaResolutionError(fully_qualified_name))?;
+        return encode_internal(value, resolved, names, enclosing_namespace, 
buffer);
+    }
 
-        match value {
-            Value::Null => (),
-            Value::Boolean(b) => buffer.push(if *b { 1u8 } else { 0u8 }),
-            // Pattern | Pattern here to signify that these _must_ have the 
same encoding.
-            Value::Int(i) | Value::Date(i) | Value::TimeMillis(i) => 
encode_int(*i, buffer),
-            Value::Long(i)
-            | Value::TimestampMillis(i)
-            | Value::TimestampMicros(i)
-            | Value::TimeMicros(i) => encode_long(*i, buffer),
-            Value::Float(x) => buffer.extend_from_slice(&x.to_le_bytes()),
-            Value::Double(x) => buffer.extend_from_slice(&x.to_le_bytes()),
-            Value::Decimal(decimal) => match schema {
-                Schema::Decimal { inner, .. } => match *inner.clone() {
-                    Schema::Fixed { size, .. } => {
-                        let bytes = 
decimal.to_sign_extended_bytes_with_len(size).unwrap();
-                        let num_bytes = bytes.len();
-                        if num_bytes != size {
-                            panic!(
-                                "signed decimal bytes length {} not equal to 
fixed schema size {}",
-                                num_bytes, size
-                            );
-                        }
-                        encode(&Value::Fixed(size, bytes), inner, buffer)
-                    }
-                    Schema::Bytes => {
-                        encode(&Value::Bytes(decimal.try_into().unwrap()), 
inner, buffer)
+    match value {
+        Value::Null => (),
+        Value::Boolean(b) => buffer.push(if *b { 1u8 } else { 0u8 }),
+        // Pattern | Pattern here to signify that these _must_ have the same 
encoding.
+        Value::Int(i) | Value::Date(i) | Value::TimeMillis(i) => 
encode_int(*i, buffer),
+        Value::Long(i)
+        | Value::TimestampMillis(i)
+        | Value::TimestampMicros(i)
+        | Value::TimeMicros(i) => encode_long(*i, buffer),
+        Value::Float(x) => buffer.extend_from_slice(&x.to_le_bytes()),
+        Value::Double(x) => buffer.extend_from_slice(&x.to_le_bytes()),
+        Value::Decimal(decimal) => match schema {
+            Schema::Decimal { inner, .. } => match *inner.clone() {
+                Schema::Fixed { size, .. } => {
+                    let bytes = 
decimal.to_sign_extended_bytes_with_len(size).unwrap();
+                    let num_bytes = bytes.len();
+                    if num_bytes != size {
+                        return Err(Error::EncodeDecimalAsFixedError(num_bytes, 
size));
                     }
-                    _ => panic!("invalid inner type for decimal: {:?}", inner),
-                },
-                _ => panic!("invalid schema type for decimal: {:?}", schema),
-            },
-            &Value::Duration(duration) => {
-                let slice: [u8; 12] = duration.into();
-                buffer.extend_from_slice(&slice);
-            }
-            Value::Uuid(uuid) => encode_bytes(&uuid.to_string(), buffer),
-            Value::Bytes(bytes) => match *schema {
-                Schema::Bytes => encode_bytes(bytes, buffer),
-                Schema::Fixed { .. } => buffer.extend(bytes),
-                _ => error!("invalid schema type for bytes: {:?}", schema),
-            },
-            Value::String(s) => match *schema {
-                Schema::String => {
-                    encode_bytes(s, buffer);
+                    encode(&Value::Fixed(size, bytes), inner, buffer)?
                 }
-                Schema::Enum { ref symbols, .. } => {
-                    if let Some(index) = symbols.iter().position(|item| item 
== s) {
-                        encode_int(index as i32, buffer);
-                    }
+                Schema::Bytes => encode(&Value::Bytes(decimal.try_into()?), 
inner, buffer)?,
+                _ => {
+                    return Err(Error::ResolveDecimalSchema(SchemaKind::from(
+                        *inner.clone(),
+                    )));
                 }
-                _ => error!("invalid schema type for String: {:?}", schema),
             },
-            Value::Fixed(_, bytes) => buffer.extend(bytes),
-            Value::Enum(i, _) => encode_int(*i as i32, buffer),
-            Value::Union(idx, item) => {
-                if let Schema::Union(ref inner) = *schema {
-                    inner.schemas.iter().for_each(|s| match s {
-                        Schema::Record { name, .. }
-                        | Schema::Enum { name, .. }
-                        | Schema::Fixed { name, .. } => {
-                            schemas_by_name.insert(name.clone(), s.clone());
-                        }
-                        _ => (),
-                    });
-
-                    let inner_schema = inner
-                        .schemas
-                        .get(*idx as usize)
-                        .expect("Invalid Union validation occurred");
-                    encode_long(*idx as i64, buffer);
-                    encode_ref0(&*item, inner_schema, buffer, schemas_by_name);
+            _ => {
+                return Err(Error::EncodeValueAsSchemaError {
+                    value_kind: ValueKind::Decimal,
+                    supported_schema: vec![SchemaKind::Decimal],
+                });
+            }
+        },
+        &Value::Duration(duration) => {
+            let slice: [u8; 12] = duration.into();
+            buffer.extend_from_slice(&slice);
+        }
+        Value::Uuid(uuid) => encode_bytes(&uuid.to_string(), buffer),
+        Value::Bytes(bytes) => match *schema {
+            Schema::Bytes => encode_bytes(bytes, buffer),
+            Schema::Fixed { .. } => buffer.extend(bytes),
+            _ => {
+                return Err(Error::EncodeValueAsSchemaError {
+                    value_kind: ValueKind::Bytes,
+                    supported_schema: vec![SchemaKind::Bytes, 
SchemaKind::Fixed],
+                });
+            }
+        },
+        Value::String(s) => match *schema {
+            Schema::String => {
+                encode_bytes(s, buffer);
+            }
+            Schema::Enum { ref symbols, .. } => {
+                if let Some(index) = symbols.iter().position(|item| item == s) 
{
+                    encode_int(index as i32, buffer);
                 } else {
-                    error!("invalid schema type for Union: {:?}", schema);
+                    error!("Invalid symbol string {:?}.", &s[..]);
+                    return Err(Error::GetEnumSymbol(s.clone()));
                 }
             }
-            Value::Array(items) => {
-                if let Schema::Array(ref inner) = *schema {
-                    if !items.is_empty() {
-                        encode_long(items.len() as i64, buffer);
-                        for item in items.iter() {
-                            encode_ref0(item, inner, buffer, schemas_by_name);
-                        }
+            _ => {
+                return Err(Error::EncodeValueAsSchemaError {
+                    value_kind: ValueKind::String,
+                    supported_schema: vec![SchemaKind::String, 
SchemaKind::Enum],
+                });
+            }
+        },
+        Value::Fixed(_, bytes) => buffer.extend(bytes),
+        Value::Enum(i, _) => encode_int(*i as i32, buffer),
+        Value::Union(idx, item) => {
+            if let Schema::Union(ref inner) = *schema {
+                let inner_schema = inner
+                    .schemas
+                    .get(*idx as usize)
+                    .expect("Invalid Union validation occurred");
+                encode_long(*idx as i64, buffer);
+                encode_internal(&*item, inner_schema, names, 
enclosing_namespace, buffer)?;
+            } else {
+                error!("invalid schema type for Union: {:?}", schema);
+                return Err(Error::EncodeValueAsSchemaError {
+                    value_kind: ValueKind::Union,
+                    supported_schema: vec![SchemaKind::Union],
+                });
+            }
+        }
+        Value::Array(items) => {
+            if let Schema::Array(ref inner) = *schema {
+                if !items.is_empty() {
+                    encode_long(items.len() as i64, buffer);
+                    for item in items.iter() {
+                        encode_internal(item, inner, names, 
enclosing_namespace, buffer)?;
                     }
-                    buffer.push(0u8);
-                } else {
-                    error!("invalid schema type for Array: {:?}", schema);
                 }
+                buffer.push(0u8);
+            } else {
+                error!("invalid schema type for Array: {:?}", schema);
+                return Err(Error::EncodeValueAsSchemaError {
+                    value_kind: ValueKind::Array,
+                    supported_schema: vec![SchemaKind::Array],
+                });
             }
-            Value::Map(items) => {
-                if let Schema::Map(ref inner) = *schema {
-                    if !items.is_empty() {
-                        encode_long(items.len() as i64, buffer);
-                        for (key, value) in items {
-                            encode_bytes(key, buffer);
-                            encode_ref0(value, inner, buffer, schemas_by_name);
-                        }
+        }
+        Value::Map(items) => {
+            if let Schema::Map(ref inner) = *schema {
+                if !items.is_empty() {
+                    encode_long(items.len() as i64, buffer);
+                    for (key, value) in items {
+                        encode_bytes(key, buffer);
+                        encode_internal(value, inner, names, 
enclosing_namespace, buffer)?;
                     }
-                    buffer.push(0u8);
-                } else {
-                    error!("invalid schema type for Map: {:?}", schema);
                 }
+                buffer.push(0u8);
+            } else {
+                error!("invalid schema type for Map: {:?}", schema);
+                return Err(Error::EncodeValueAsSchemaError {
+                    value_kind: ValueKind::Map,
+                    supported_schema: vec![SchemaKind::Map],
+                });
             }
-            Value::Record(fields) => {
-                if let Schema::Record {
-                    fields: ref schema_fields,
-                    ..
-                } = *schema
-                {
-                    for (i, &(_, ref value)) in fields.iter().enumerate() {
-                        encode_ref0(value, &schema_fields[i].schema, buffer, 
schemas_by_name);
-                    }
+        }
+        Value::Record(fields) => {
+            if let Schema::Record {
+                ref name,
+                fields: ref schema_fields,
+                ..
+            } = *schema
+            {
+                let record_namespace = 
name.fully_qualified_name(enclosing_namespace).namespace;
+                for (i, &(_, ref value)) in fields.iter().enumerate() {
+                    encode_internal(
+                        value,
+                        &schema_fields[i].schema,
+                        names,
+                        &record_namespace,
+                        buffer,
+                    )?;
                 }
+            } else {
+                error!("invalid schema type for Record: {:?}", schema);
+                return Err(Error::EncodeValueAsSchemaError {
+                    value_kind: ValueKind::Record,
+                    supported_schema: vec![SchemaKind::Record],
+                });
             }
         }
-    }
-
-    let mut schemas_by_name = HashMap::new();
-    encode_ref0(value, schema, buffer, &mut schemas_by_name)
+    };
+    Ok(())
 }
 
-pub fn encode_to_vec(value: &Value, schema: &Schema) -> Vec<u8> {
+pub fn encode_to_vec(value: &Value, schema: &Schema) -> AvroResult<Vec<u8>> {
     let mut buffer = Vec::new();
-    encode(value, schema, &mut buffer);
-    buffer
+    encode(value, schema, &mut buffer)?;
+    Ok(buffer)
 }
 
 #[cfg(test)]
-mod tests {
+#[allow(clippy::expect_fun_call)]
+pub(crate) mod tests {
     use super::*;
     use std::collections::HashMap;
+    pub(crate) fn success(value: &Value, schema: &Schema) -> String {
+        format!(
+            "Value: {:?}\n should encode with schema:\n{:?}",
+            &value, &schema
+        )
+    }
 
     #[test]
     fn test_encode_empty_array() {
         let mut buf = Vec::new();
         let empty: Vec<Value> = Vec::new();
         encode(
-            &Value::Array(empty),
+            &Value::Array(empty.clone()),
             &Schema::Array(Box::new(Schema::Int)),
             &mut buf,
-        );
+        )
+        .expect(&success(
+            &Value::Array(empty),
+            &Schema::Array(Box::new(Schema::Int)),
+        ));
         assert_eq!(vec![0u8], buf);
     }
 
@@ -218,10 +252,14 @@ mod tests {
         let mut buf = Vec::new();
         let empty: HashMap<String, Value> = HashMap::new();
         encode(
-            &Value::Map(empty),
+            &Value::Map(empty.clone()),
             &Schema::Map(Box::new(Schema::Int)),
             &mut buf,
-        );
+        )
+        .expect(&success(
+            &Value::Map(empty),
+            &Schema::Map(Box::new(Schema::Int)),
+        ));
         assert_eq!(vec![0u8], buf);
     }
 
@@ -230,27 +268,27 @@ mod tests {
         let mut buf = Vec::new();
         let schema = Schema::parse_str(
             r#"
-        {
-            "type":"record",
-            "name":"TestStruct",
-            "fields": [
-                {
-                    "name":"a",
-                    "type":{
-                        "type":"record",
-                        "name": "Inner",
-                        "fields": [ {
-                            "name":"z",
-                            "type":"int"
-                        }]
+            {
+                "type":"record",
+                "name":"TestStruct",
+                "fields": [
+                    {
+                        "name":"a",
+                        "type":{
+                            "type":"record",
+                            "name": "Inner",
+                            "fields": [ {
+                                "name":"z",
+                                "type":"int"
+                            }]
+                        }
+                    },
+                    {
+                        "name":"b",
+                        "type":"Inner"
                     }
-                },
-                {
-                    "name":"b",
-                    "type":"Inner"
-                }
-            ]
-        }"#,
+                ]
+            }"#,
         )
         .unwrap();
 
@@ -258,7 +296,7 @@ mod tests {
         let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
         let outer_value =
             Value::Record(vec![("a".into(), inner_value1), ("b".into(), 
inner_value2)]);
-        encode(&outer_value, &schema, &mut buf);
+        encode(&outer_value, &schema, &mut buf).expect(&success(&outer_value, 
&schema));
         assert!(!buf.is_empty());
     }
 
@@ -267,33 +305,33 @@ mod tests {
         let mut buf = Vec::new();
         let schema = Schema::parse_str(
             r#"
-        {
-            "type":"record",
-            "name":"TestStruct",
-            "fields": [
-                {
-                    "name":"a",
-                    "type":{
-                        "type":"array",
-                        "items": {
-                            "type":"record",
-                            "name": "Inner",
-                            "fields": [ {
-                                "name":"z",
-                                "type":"int"
-                            }]
+            {
+                "type":"record",
+                "name":"TestStruct",
+                "fields": [
+                    {
+                        "name":"a",
+                        "type":{
+                            "type":"array",
+                            "items": {
+                                "type":"record",
+                                "name": "Inner",
+                                "fields": [ {
+                                    "name":"z",
+                                    "type":"int"
+                                }]
+                            }
+                        }
+                    },
+                    {
+                        "name":"b",
+                        "type": {
+                            "type":"map",
+                            "values":"Inner"
                         }
                     }
-                },
-                {
-                    "name":"b",
-                    "type": {
-                        "type":"map",
-                        "values":"Inner"
-                    }
-                }
-            ]
-        }"#,
+                ]
+            }"#,
         )
         .unwrap();
 
@@ -306,7 +344,7 @@ mod tests {
                 Value::Map(vec![("akey".into(), 
inner_value2)].into_iter().collect()),
             ),
         ]);
-        encode(&outer_value, &schema, &mut buf);
+        encode(&outer_value, &schema, &mut buf).expect(&success(&outer_value, 
&schema));
         assert!(!buf.is_empty());
     }
 
@@ -315,10 +353,10 @@ mod tests {
         let mut buf = Vec::new();
         let schema = Schema::parse_str(
             r#"
-        {
-            "type":"record",
-            "name":"TestStruct",
-            "fields": [
+            {
+                "type":"record",
+                "name":"TestStruct",
+                "fields": [
                 {
                     "name":"a",
                     "type":{
@@ -351,7 +389,7 @@ mod tests {
                 Value::Map(vec![("akey".into(), 
inner_value2)].into_iter().collect()),
             ),
         ]);
-        encode(&outer_value, &schema, &mut buf);
+        encode(&outer_value, &schema, &mut buf).expect(&success(&outer_value, 
&schema));
         assert!(!buf.is_empty());
     }
 
@@ -398,7 +436,7 @@ mod tests {
         )]);
         let outer_value =
             Value::Record(vec![("a".into(), inner_value1), ("b".into(), 
inner_value2)]);
-        encode(&outer_value, &schema, &mut buf);
+        encode(&outer_value, &schema, &mut buf).expect(&success(&outer_value, 
&schema));
         assert!(!buf.is_empty());
     }
 
@@ -446,7 +484,7 @@ mod tests {
             ),
             ("b".into(), Value::Array(vec![inner_value1])),
         ]);
-        encode(&outer_value, &schema, &mut buf);
+        encode(&outer_value, &schema, &mut buf).expect(&success(&outer_value, 
&schema));
         assert!(!buf.is_empty());
     }
 
@@ -485,7 +523,7 @@ mod tests {
             ("a".into(), Value::Union(1, Box::new(inner_value1))),
             ("b".into(), inner_value2.clone()),
         ]);
-        encode(&outer_value1, &schema, &mut buf);
+        encode(&outer_value1, &schema, &mut 
buf).expect(&success(&outer_value1, &schema));
         assert!(!buf.is_empty());
 
         buf.drain(..);
@@ -493,7 +531,277 @@ mod tests {
             ("a".into(), Value::Union(0, Box::new(Value::Null))),
             ("b".into(), inner_value2),
         ]);
-        encode(&outer_value2, &schema, &mut buf);
+        encode(&outer_value2, &schema, &mut 
buf).expect(&success(&outer_value1, &schema));
+        assert!(!buf.is_empty());
+    }
+
+    #[test]
+    fn test_avro_3448_proper_multi_level_encoding_outer_namespace() {
+        let schema = r#"
+        {
+          "name": "record_name",
+          "namespace": "space",
+          "type": "record",
+          "fields": [
+            {
+              "name": "outer_field_1",
+              "type": [
+                        "null",
+                        {
+                            "type": "record",
+                            "name": "middle_record_name",
+                            "fields":[
+                                {
+                                    "name":"middle_field_1",
+                                    "type":[
+                                        "null",
+                                        {
+                                            "type":"record",
+                                            "name":"inner_record_name",
+                                            "fields":[
+                                                {
+                                                    "name":"inner_field_1",
+                                                    "type":"double"
+                                                }
+                                            ]
+                                        }
+                                    ]
+                                }
+                            ]
+                        }
+                    ]
+            },
+            {
+                "name": "outer_field_2",
+                "type" : "space.inner_record_name"
+            }
+          ]
+        }
+        "#;
+        let schema = Schema::parse_str(schema).unwrap();
+        let inner_record = Value::Record(vec![("inner_field_1".into(), 
Value::Double(5.4))]);
+        let middle_record_variation_1 = Value::Record(vec![(
+            "middle_field_1".into(),
+            Value::Union(0, Box::new(Value::Null)),
+        )]);
+        let middle_record_variation_2 = Value::Record(vec![(
+            "middle_field_1".into(),
+            Value::Union(1, Box::new(inner_record.clone())),
+        )]);
+        let outer_record_variation_1 = Value::Record(vec![
+            (
+                "outer_field_1".into(),
+                Value::Union(0, Box::new(Value::Null)),
+            ),
+            ("outer_field_2".into(), inner_record.clone()),
+        ]);
+        let outer_record_variation_2 = Value::Record(vec![
+            (
+                "outer_field_1".into(),
+                Value::Union(1, Box::new(middle_record_variation_1)),
+            ),
+            ("outer_field_2".into(), inner_record.clone()),
+        ]);
+        let outer_record_variation_3 = Value::Record(vec![
+            (
+                "outer_field_1".into(),
+                Value::Union(1, Box::new(middle_record_variation_2)),
+            ),
+            ("outer_field_2".into(), inner_record),
+        ]);
+
+        let mut buf = Vec::new();
+        encode(&outer_record_variation_1, &schema, &mut buf)
+            .expect(&success(&outer_record_variation_1, &schema));
+        assert!(!buf.is_empty());
+        buf.drain(..);
+        encode(&outer_record_variation_2, &schema, &mut buf)
+            .expect(&success(&outer_record_variation_2, &schema));
+        assert!(!buf.is_empty());
+        buf.drain(..);
+        encode(&outer_record_variation_3, &schema, &mut buf)
+            .expect(&success(&outer_record_variation_3, &schema));
+        assert!(!buf.is_empty());
+    }
+
+    #[test]
+    fn test_avro_3448_proper_multi_level_encoding_middle_namespace() {
+        let schema = r#"
+        {
+          "name": "record_name",
+          "namespace": "space",
+          "type": "record",
+          "fields": [
+            {
+              "name": "outer_field_1",
+              "type": [
+                        "null",
+                        {
+                            "type": "record",
+                            "name": "middle_record_name",
+                            "namespace":"middle_namespace",
+                            "fields":[
+                                {
+                                    "name":"middle_field_1",
+                                    "type":[
+                                        "null",
+                                        {
+                                            "type":"record",
+                                            "name":"inner_record_name",
+                                            "fields":[
+                                                {
+                                                    "name":"inner_field_1",
+                                                    "type":"double"
+                                                }
+                                            ]
+                                        }
+                                    ]
+                                }
+                            ]
+                        }
+                    ]
+            },
+            {
+                "name": "outer_field_2",
+                "type" : "middle_namespace.inner_record_name"
+            }
+          ]
+        }
+        "#;
+        let schema = Schema::parse_str(schema).unwrap();
+        let inner_record = Value::Record(vec![("inner_field_1".into(), 
Value::Double(5.4))]);
+        let middle_record_variation_1 = Value::Record(vec![(
+            "middle_field_1".into(),
+            Value::Union(0, Box::new(Value::Null)),
+        )]);
+        let middle_record_variation_2 = Value::Record(vec![(
+            "middle_field_1".into(),
+            Value::Union(1, Box::new(inner_record.clone())),
+        )]);
+        let outer_record_variation_1 = Value::Record(vec![
+            (
+                "outer_field_1".into(),
+                Value::Union(0, Box::new(Value::Null)),
+            ),
+            ("outer_field_2".into(), inner_record.clone()),
+        ]);
+        let outer_record_variation_2 = Value::Record(vec![
+            (
+                "outer_field_1".into(),
+                Value::Union(1, Box::new(middle_record_variation_1)),
+            ),
+            ("outer_field_2".into(), inner_record.clone()),
+        ]);
+        let outer_record_variation_3 = Value::Record(vec![
+            (
+                "outer_field_1".into(),
+                Value::Union(1, Box::new(middle_record_variation_2)),
+            ),
+            ("outer_field_2".into(), inner_record),
+        ]);
+
+        let mut buf = Vec::new();
+        encode(&outer_record_variation_1, &schema, &mut buf)
+            .expect(&success(&outer_record_variation_1, &schema));
+        assert!(!buf.is_empty());
+        buf.drain(..);
+        encode(&outer_record_variation_2, &schema, &mut buf)
+            .expect(&success(&outer_record_variation_2, &schema));
+        assert!(!buf.is_empty());
+        buf.drain(..);
+        encode(&outer_record_variation_3, &schema, &mut buf)
+            .expect(&success(&outer_record_variation_3, &schema));
+        assert!(!buf.is_empty());
+    }
+
+    #[test]
+    fn test_avro_3448_proper_multi_level_encoding_inner_namespace() {
+        let schema = r#"
+        {
+          "name": "record_name",
+          "namespace": "space",
+          "type": "record",
+          "fields": [
+            {
+              "name": "outer_field_1",
+              "type": [
+                        "null",
+                        {
+                            "type": "record",
+                            "name": "middle_record_name",
+                            "namespace":"middle_namespace",
+                            "fields":[
+                                {
+                                    "name":"middle_field_1",
+                                    "type":[
+                                        "null",
+                                        {
+                                            "type":"record",
+                                            "name":"inner_record_name",
+                                            "namespace":"inner_namespace",
+                                            "fields":[
+                                                {
+                                                    "name":"inner_field_1",
+                                                    "type":"double"
+                                                }
+                                            ]
+                                        }
+                                    ]
+                                }
+                            ]
+                        }
+                    ]
+            },
+            {
+                "name": "outer_field_2",
+                "type" : "inner_namespace.inner_record_name"
+            }
+          ]
+        }
+        "#;
+        let schema = Schema::parse_str(schema).unwrap();
+        let inner_record = Value::Record(vec![("inner_field_1".into(), 
Value::Double(5.4))]);
+        let middle_record_variation_1 = Value::Record(vec![(
+            "middle_field_1".into(),
+            Value::Union(0, Box::new(Value::Null)),
+        )]);
+        let middle_record_variation_2 = Value::Record(vec![(
+            "middle_field_1".into(),
+            Value::Union(1, Box::new(inner_record.clone())),
+        )]);
+        let outer_record_variation_1 = Value::Record(vec![
+            (
+                "outer_field_1".into(),
+                Value::Union(0, Box::new(Value::Null)),
+            ),
+            ("outer_field_2".into(), inner_record.clone()),
+        ]);
+        let outer_record_variation_2 = Value::Record(vec![
+            (
+                "outer_field_1".into(),
+                Value::Union(1, Box::new(middle_record_variation_1)),
+            ),
+            ("outer_field_2".into(), inner_record.clone()),
+        ]);
+        let outer_record_variation_3 = Value::Record(vec![
+            (
+                "outer_field_1".into(),
+                Value::Union(1, Box::new(middle_record_variation_2)),
+            ),
+            ("outer_field_2".into(), inner_record),
+        ]);
+
+        let mut buf = Vec::new();
+        encode(&outer_record_variation_1, &schema, &mut buf)
+            .expect(&success(&outer_record_variation_1, &schema));
+        assert!(!buf.is_empty());
+        buf.drain(..);
+        encode(&outer_record_variation_2, &schema, &mut buf)
+            .expect(&success(&outer_record_variation_2, &schema));
+        assert!(!buf.is_empty());
+        buf.drain(..);
+        encode(&outer_record_variation_3, &schema, &mut buf)
+            .expect(&success(&outer_record_variation_3, &schema));
         assert!(!buf.is_empty());
     }
 }
diff --git a/lang/rust/avro/src/error.rs b/lang/rust/avro/src/error.rs
index b8c5939..43aef5f 100644
--- a/lang/rust/avro/src/error.rs
+++ b/lang/rust/avro/src/error.rs
@@ -15,7 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::{schema::SchemaKind, types::ValueKind};
+use crate::{
+    schema::{Name, SchemaKind},
+    types::ValueKind,
+};
 use std::fmt;
 
 #[derive(thiserror::Error, Debug)]
@@ -88,8 +91,11 @@ pub enum Error {
     #[error("Enum symbol index out of bounds: {num_variants}")]
     EnumSymbolIndex { index: usize, num_variants: usize },
 
-    #[error("Enum symbol not found")]
-    GetEnumSymbol,
+    #[error("Enum symbol not found {0}")]
+    GetEnumSymbol(String),
+
+    #[error("Unable to decode enum index")]
+    GetEnumUnknownIndexValue,
 
     #[error("Scale {scale} is greater than precision {precision}")]
     GetScaleAndPrecision { scale: usize, precision: usize },
@@ -378,13 +384,26 @@ pub enum Error {
 
     /// Error while resolving Schema::Ref
     #[error("Unresolved schema reference: {0}")]
-    SchemaResolutionError(String),
+    SchemaResolutionError(Name),
 
     #[error("The file metadata is already flushed.")]
     FileHeaderAlreadyWritten,
 
     #[error("Metadata keys starting with 'avro.' are reserved for internal 
usage: {0}.")]
     InvalidMetadataKey(String),
+
+    /// Error when two named schema have the same fully qualified name
+    #[error("Two named schema defined for same fullname: {0}.")]
+    AmbiguousSchemaDefinition(Name),
+
+    #[error("Signed decimal bytes length {0} not equal to fixed schema size 
{1}.")]
+    EncodeDecimalAsFixedError(usize, usize),
+
+    #[error("Can only encode value type {value_kind:?} as one of 
{supported_schema:?}")]
+    EncodeValueAsSchemaError {
+        value_kind: ValueKind,
+        supported_schema: Vec<SchemaKind>,
+    },
 }
 
 impl serde::ser::Error for Error {
diff --git a/lang/rust/avro/src/schema.rs b/lang/rust/avro/src/schema.rs
index 1a2f47e..81b7499 100644
--- a/lang/rust/avro/src/schema.rs
+++ b/lang/rust/avro/src/schema.rs
@@ -28,9 +28,9 @@ use serde_json::{Map, Value};
 use std::{
     borrow::Cow,
     collections::{HashMap, HashSet},
-    convert::TryInto,
+    convert::{TryFrom, TryInto},
     fmt,
-    hash::{Hash, Hasher},
+    hash::Hash,
     str::FromStr,
 };
 use strum_macros::{EnumDiscriminants, EnumString};
@@ -103,6 +103,7 @@ pub enum Schema {
     /// of `fields`.
     Record {
         name: Name,
+        aliases: Aliases,
         doc: Documentation,
         fields: Vec<RecordField>,
         lookup: HashMap<String, usize>,
@@ -110,12 +111,14 @@ pub enum Schema {
     /// An `enum` Avro schema.
     Enum {
         name: Name,
+        aliases: Aliases,
         doc: Documentation,
         symbols: Vec<String>,
     },
     /// A `fixed` Avro schema.
     Fixed {
         name: Name,
+        aliases: Aliases,
         doc: Documentation,
         size: usize,
     },
@@ -225,15 +228,22 @@ impl<'a> From<&'a types::Value> for SchemaKind {
 ///
 /// More information about schema names can be found in the
 /// [Avro specification](https://avro.apache.org/docs/current/spec.html#names)
-#[derive(Clone, Debug, Deserialize)]
+#[derive(Clone, Debug, Deserialize, Hash, PartialEq, Eq)]
 pub struct Name {
     pub name: String,
-    pub namespace: Option<String>,
-    pub aliases: Option<Vec<String>>,
+    pub namespace: Namespace,
 }
 
 /// Represents documentation for complex Avro schemas.
 pub type Documentation = Option<String>;
+/// Represents the aliases for Named Schema
+pub type Aliases = Option<Vec<String>>;
+/// Represents Schema lookup within a schema env
+pub(crate) type Names = HashMap<Name, Schema>;
+/// Represents Schema lookup within a schema
+pub(crate) type NamesRef<'a> = HashMap<Name, &'a Schema>;
+/// Represents the namespace for Named Schema
+pub type Namespace = Option<String>;
 
 impl Name {
     /// Create a new `Name`.
@@ -241,14 +251,10 @@ impl Name {
     /// `aliases` will not be defined.
     pub fn new(name: &str) -> AvroResult<Name> {
         let (name, namespace) = Name::get_name_and_namespace(name)?;
-        Ok(Name {
-            name,
-            namespace,
-            aliases: None,
-        })
+        Ok(Name { name, namespace })
     }
 
-    fn get_name_and_namespace(name: &str) -> AvroResult<(String, 
Option<String>)> {
+    fn get_name_and_namespace(name: &str) -> AvroResult<(String, Namespace)> {
         let caps = SCHEMA_NAME_R
             .captures(name)
             .ok_or_else(|| Error::InvalidSchemaName(name.to_string(), 
SCHEMA_NAME_R.as_str()))?;
@@ -264,28 +270,15 @@ impl Name {
             .name()
             .map(|name| Name::get_name_and_namespace(name.as_str()).unwrap())
             .ok_or(Error::GetNameField)?;
-
         // FIXME Reading name from the type is wrong ! The name there is just 
a metadata (AVRO-3430)
         let type_name = match complex.get("type") {
             Some(Value::Object(complex_type)) => complex_type.name().or(None),
             _ => None,
         };
 
-        let aliases: Option<Vec<String>> = complex
-            .get("aliases")
-            .and_then(|aliases| aliases.as_array())
-            .and_then(|aliases| {
-                aliases
-                    .iter()
-                    .map(|alias| alias.as_str())
-                    .map(|alias| alias.map(|a| a.to_string()))
-                    .collect::<Option<_>>()
-            });
-
         Ok(Name {
             name: type_name.unwrap_or(name),
             namespace: namespace_from_name.or_else(|| 
complex.string("namespace")),
-            aliases,
         })
     }
 
@@ -293,15 +286,11 @@ impl Name {
     ///
     /// More information about fullnames can be found in the
     /// [Avro 
specification](https://avro.apache.org/docs/current/spec.html#names)
-    pub fn fullname(&self, default_namespace: Option<&str>) -> String {
+    pub fn fullname(&self, default_namespace: Namespace) -> String {
         if self.name.contains('.') {
             self.name.clone()
         } else {
-            let namespace = self
-                .namespace
-                .as_ref()
-                .map(|s| s.as_ref())
-                .or(default_namespace);
+            let namespace = self.namespace.clone().or(default_namespace);
 
             match namespace {
                 Some(ref namespace) => format!("{}.{}", namespace, self.name),
@@ -309,6 +298,29 @@ impl Name {
             }
         }
     }
+
+    /// Return the fully qualified name needed for indexing or searching for 
the schema within a schema/schema env context. Puts the enclosing namespace 
into the name's namespace for clarity in schema/schema env parsing
+    /// ```ignore
+    /// use apache_avro::schema::Name;
+    ///
+    /// assert_eq!(
+    /// 
Name::new("some_name").unwrap().fully_qualified_name(&Some("some_namespace".into())),
+    /// Name::new("some_namespace.some_name").unwrap()
+    /// );
+    /// assert_eq!(
+    /// 
Name::new("some_namespace.some_name").unwrap().fully_qualified_name(&Some("other_namespace".into())),
+    /// Name::new("some_namespace.some_name").unwrap()
+    /// );
+    /// ```
+    pub(crate) fn fully_qualified_name(&self, enclosing_namespace: &Namespace) 
-> Name {
+        Name {
+            name: self.name.clone(),
+            namespace: self
+                .namespace
+                .clone()
+                .or_else(|| enclosing_namespace.clone()),
+        }
+    }
 }
 
 impl From<&str> for Name {
@@ -317,17 +329,86 @@ impl From<&str> for Name {
     }
 }
 
-impl Hash for Name {
-    fn hash<H: Hasher>(&self, state: &mut H) {
-        self.fullname(None).hash(state);
+impl fmt::Display for Name {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.write_str(&self.fullname(None)[..])
+    }
+}
+
+pub(crate) struct ResolvedSchema<'s> {
+    names_ref: NamesRef<'s>,
+    root_schema: &'s Schema,
+}
+
+impl<'s> TryFrom<&'s Schema> for ResolvedSchema<'s> {
+    type Error = Error;
+
+    fn try_from(schema: &'s Schema) -> AvroResult<Self> {
+        let names = HashMap::new();
+        let mut rs = ResolvedSchema {
+            names_ref: names,
+            root_schema: schema,
+        };
+        Self::from_internal(rs.root_schema, &mut rs.names_ref, &None)?;
+        Ok(rs)
     }
 }
 
-impl Eq for Name {}
+impl<'s> ResolvedSchema<'s> {
+    pub fn get_names(&self) -> &NamesRef<'s> {
+        &self.names_ref
+    }
 
-impl PartialEq for Name {
-    fn eq(&self, other: &Name) -> bool {
-        self.fullname(None).eq(&other.fullname(None))
+    fn from_internal(
+        schema: &'s Schema,
+        names_ref: &mut NamesRef<'s>,
+        enclosing_namespace: &Namespace,
+    ) -> AvroResult<()> {
+        match schema {
+            Schema::Array(schema) | Schema::Map(schema) => {
+                Self::from_internal(schema, names_ref, enclosing_namespace)
+            }
+            Schema::Union(UnionSchema { schemas, .. }) => {
+                for schema in schemas {
+                    Self::from_internal(schema, names_ref, 
enclosing_namespace)?
+                }
+                Ok(())
+            }
+            Schema::Enum { name, .. } | Schema::Fixed { name, .. } => {
+                let fully_qualified_name = 
name.fully_qualified_name(enclosing_namespace);
+                if names_ref
+                    .insert(fully_qualified_name.clone(), schema)
+                    .is_some()
+                {
+                    Err(Error::AmbiguousSchemaDefinition(fully_qualified_name))
+                } else {
+                    Ok(())
+                }
+            }
+            Schema::Record { name, fields, .. } => {
+                let fully_qualified_name = 
name.fully_qualified_name(enclosing_namespace);
+                if names_ref
+                    .insert(fully_qualified_name.clone(), schema)
+                    .is_some()
+                {
+                    Err(Error::AmbiguousSchemaDefinition(fully_qualified_name))
+                } else {
+                    let record_namespace = fully_qualified_name.namespace;
+                    for field in fields {
+                        Self::from_internal(&field.schema, names_ref, 
&record_namespace)?
+                    }
+                    Ok(())
+                }
+            }
+            Schema::Ref { name } => {
+                let fully_qualified_name = 
name.fully_qualified_name(enclosing_namespace);
+                names_ref
+                    .get(&fully_qualified_name)
+                    .map(|_| ())
+                    .ok_or(Error::SchemaResolutionError(fully_qualified_name))
+            }
+            _ => Ok(()),
+        }
     }
 }
 
@@ -363,11 +444,16 @@ pub enum RecordFieldOrder {
 
 impl RecordField {
     /// Parse a `serde_json::Value` into a `RecordField`.
-    fn parse(field: &Map<String, Value>, position: usize, parser: &mut Parser) 
-> AvroResult<Self> {
+    fn parse(
+        field: &Map<String, Value>,
+        position: usize,
+        parser: &mut Parser,
+        enclosing_namespace: &Namespace,
+    ) -> AvroResult<Self> {
         let name = field.name().ok_or(Error::GetNameFieldFromRecord)?;
 
         // TODO: "type" = "<record name>"
-        let schema = parser.parse_complex(field)?;
+        let schema = parser.parse_complex(field, enclosing_namespace)?;
 
         let default = field.get("default").cloned();
 
@@ -478,11 +564,11 @@ struct Parser {
     // A map of name -> Schema::Ref
     // Used to resolve cyclic references, i.e. when a
     // field's type is a reference to its record's type
-    resolving_schemas: HashMap<Name, Schema>,
+    resolving_schemas: Names,
     input_order: Vec<Name>,
     // A map of name -> fully parsed Schema
     // Used to avoid parsing the same schema twice
-    parsed_schemas: HashMap<Name, Schema>,
+    parsed_schemas: Names,
 }
 
 impl Schema {
@@ -550,7 +636,7 @@ impl Schema {
 
     pub fn parse(value: &Value) -> AvroResult<Schema> {
         let mut parser = Parser::default();
-        parser.parse(value)
+        parser.parse(value, &None)
     }
 }
 
@@ -559,7 +645,7 @@ impl Parser {
     fn parse_str(&mut self, input: &str) -> Result<Schema, Error> {
         // TODO: (#82) this should be a ParseSchemaError wrapping the JSON 
error
         let value = 
serde_json::from_str(input).map_err(Error::ParseSchemaJson)?;
-        self.parse(&value)
+        self.parse(&value, &None)
     }
 
     /// Create an array of `Schema`'s from an iterator of JSON Avro schemas. 
It is allowed that
@@ -576,7 +662,7 @@ impl Parser {
                 .input_schemas
                 .remove_entry(&next_name)
                 .expect("Key unexpectedly missing");
-            let parsed = self.parse(&value)?;
+            let parsed = self.parse(&value, &None)?;
             self.parsed_schemas
                 .insert(get_schema_type_name(name, value), parsed);
         }
@@ -594,11 +680,11 @@ impl Parser {
 
     /// Create a `Schema` from a `serde_json::Value` representing a JSON Avro
     /// schema.
-    fn parse(&mut self, value: &Value) -> AvroResult<Schema> {
+    fn parse(&mut self, value: &Value, enclosing_namespace: &Namespace) -> 
AvroResult<Schema> {
         match *value {
-            Value::String(ref t) => self.parse_known_schema(t.as_str()),
-            Value::Object(ref data) => self.parse_complex(data),
-            Value::Array(ref data) => self.parse_union(data),
+            Value::String(ref t) => self.parse_known_schema(t.as_str(), 
enclosing_namespace),
+            Value::Object(ref data) => self.parse_complex(data, 
enclosing_namespace),
+            Value::Array(ref data) => self.parse_union(data, 
enclosing_namespace),
             _ => Err(Error::ParseSchemaFromValidJson),
         }
     }
@@ -606,7 +692,11 @@ impl Parser {
     /// Parse a `serde_json::Value` representing an Avro type whose Schema is 
known into a
     /// `Schema`. A Schema for a `serde_json::Value` is known if it is 
primitive or has
     /// been parsed previously by the parsed and stored in its map of 
parsed_schemas.
-    fn parse_known_schema(&mut self, name: &str) -> AvroResult<Schema> {
+    fn parse_known_schema(
+        &mut self,
+        name: &str,
+        enclosing_namespace: &Namespace,
+    ) -> AvroResult<Schema> {
         match name {
             "null" => Ok(Schema::Null),
             "boolean" => Ok(Schema::Boolean),
@@ -616,7 +706,7 @@ impl Parser {
             "float" => Ok(Schema::Float),
             "bytes" => Ok(Schema::Bytes),
             "string" => Ok(Schema::String),
-            _ => self.fetch_schema_ref(name),
+            _ => self.fetch_schema_ref(name, enclosing_namespace),
         }
     }
 
@@ -629,7 +719,11 @@ impl Parser {
     ///
     /// This method allows schemas definitions that depend on other types to
     /// parse their dependencies (or look them up if already parsed).
-    fn fetch_schema_ref(&mut self, name: &str) -> AvroResult<Schema> {
+    fn fetch_schema_ref(
+        &mut self,
+        name: &str,
+        enclosing_namespace: &Namespace,
+    ) -> AvroResult<Schema> {
         fn get_schema_ref(parsed: &Schema) -> Schema {
             match &parsed {
                 Schema::Record { ref name, .. }
@@ -640,20 +734,23 @@ impl Parser {
         }
 
         let name = Name::new(name)?;
+        let fully_qualified_name = 
name.fully_qualified_name(enclosing_namespace);
 
-        if let Some(parsed) = self.parsed_schemas.get(&name) {
-            return Ok(get_schema_ref(parsed));
+        if self.parsed_schemas.get(&fully_qualified_name).is_some() {
+            return Ok(Schema::Ref { name });
         }
-        if let Some(resolving_schema) = self.resolving_schemas.get(&name) {
+        if let Some(resolving_schema) = 
self.resolving_schemas.get(&fully_qualified_name) {
             return Ok(resolving_schema.clone());
         }
 
         let value = self
             .input_schemas
-            .remove(&name)
-            .ok_or_else(|| Error::ParsePrimitive(name.fullname(None)))?;
+            .remove(&fully_qualified_name)
+            // TODO make a better descriptive error message here that conveys 
that a named schema cannot be found
+            .ok_or_else(|| 
Error::ParsePrimitive(fully_qualified_name.fullname(None)))?;
 
-        let parsed = self.parse(&value)?;
+        // parsing a full schema from inside another schema. Other full schema 
will not inherit namespace
+        let parsed = self.parse(&value, &None)?;
         self.parsed_schemas
             .insert(get_schema_type_name(name, value), parsed.clone());
 
@@ -686,15 +783,20 @@ impl Parser {
     ///
     /// Avro supports "recursive" definition of types.
     /// e.g: {"type": {"type": "string"}}
-    fn parse_complex(&mut self, complex: &Map<String, Value>) -> 
AvroResult<Schema> {
+    fn parse_complex(
+        &mut self,
+        complex: &Map<String, Value>,
+        enclosing_namespace: &Namespace,
+    ) -> AvroResult<Schema> {
         fn logical_verify_type(
             complex: &Map<String, Value>,
             kinds: &[SchemaKind],
             parser: &mut Parser,
+            enclosing_namespace: &Namespace,
         ) -> AvroResult<Schema> {
             match complex.get("type") {
                 Some(value) => {
-                    let ty = parser.parse(value)?;
+                    let ty = parser.parse(value, enclosing_namespace)?;
 
                     if kinds
                         .iter()
@@ -730,13 +832,14 @@ impl Parser {
             kinds: &[SchemaKind],
             ok_schema: Schema,
             parser: &mut Parser,
+            enclosing_namespace: &Namespace,
         ) -> AvroResult<Schema> {
-            match logical_verify_type(complex, kinds, parser) {
+            match logical_verify_type(complex, kinds, parser, 
enclosing_namespace) {
                 // type and logicalType match!
                 Ok(_) => Ok(ok_schema),
                 // the logicalType is not expected for this type!
                 Err(Error::GetLogicalTypeVariant(json_value)) => match 
json_value {
-                    Value::String(_) => match parser.parse(&json_value) {
+                    Value::String(_) => match parser.parse(&json_value, 
enclosing_namespace) {
                         Ok(schema) => {
                             warn!(
                                 "Ignoring invalid logical type '{}' for schema 
of type: {:?}!",
@@ -759,6 +862,7 @@ impl Parser {
                         complex,
                         &[SchemaKind::Fixed, SchemaKind::Bytes],
                         self,
+                        enclosing_namespace,
                     )?);
 
                     let (precision, scale) = 
Self::parse_precision_and_scale(complex)?;
@@ -770,7 +874,7 @@ impl Parser {
                     });
                 }
                 "uuid" => {
-                    logical_verify_type(complex, &[SchemaKind::String], self)?;
+                    logical_verify_type(complex, &[SchemaKind::String], self, 
enclosing_namespace)?;
                     return Ok(Schema::Uuid);
                 }
                 "date" => {
@@ -780,6 +884,7 @@ impl Parser {
                         &[SchemaKind::Int],
                         Schema::Date,
                         self,
+                        enclosing_namespace,
                     );
                 }
                 "time-millis" => {
@@ -789,6 +894,7 @@ impl Parser {
                         &[SchemaKind::Int],
                         Schema::TimeMillis,
                         self,
+                        enclosing_namespace,
                     );
                 }
                 "time-micros" => {
@@ -798,6 +904,7 @@ impl Parser {
                         &[SchemaKind::Long],
                         Schema::TimeMicros,
                         self,
+                        enclosing_namespace,
                     );
                 }
                 "timestamp-millis" => {
@@ -807,6 +914,7 @@ impl Parser {
                         &[SchemaKind::Long],
                         Schema::TimestampMillis,
                         self,
+                        enclosing_namespace,
                     );
                 }
                 "timestamp-micros" => {
@@ -816,10 +924,11 @@ impl Parser {
                         &[SchemaKind::Long],
                         Schema::TimestampMicros,
                         self,
+                        enclosing_namespace,
                     );
                 }
                 "duration" => {
-                    logical_verify_type(complex, &[SchemaKind::Fixed], self)?;
+                    logical_verify_type(complex, &[SchemaKind::Fixed], self, 
enclosing_namespace)?;
                     return Ok(Schema::Duration);
                 }
                 // In this case, of an unknown logical type, we just pass 
through to the underlying
@@ -834,28 +943,28 @@ impl Parser {
         }
         match complex.get("type") {
             Some(&Value::String(ref t)) => match t.as_str() {
-                "record" => self.parse_record(complex),
-                "enum" => self.parse_enum(complex),
-                "array" => self.parse_array(complex),
-                "map" => self.parse_map(complex),
-                "fixed" => self.parse_fixed(complex),
-                other => self.parse_known_schema(other),
+                "record" => self.parse_record(complex, enclosing_namespace),
+                "enum" => self.parse_enum(complex, enclosing_namespace),
+                "array" => self.parse_array(complex, enclosing_namespace),
+                "map" => self.parse_map(complex, enclosing_namespace),
+                "fixed" => self.parse_fixed(complex, enclosing_namespace),
+                other => self.parse_known_schema(other, enclosing_namespace),
             },
-            Some(&Value::Object(ref data)) => self.parse_complex(data),
-            Some(&Value::Array(ref variants)) => self.parse_union(variants),
+            Some(&Value::Object(ref data)) => self.parse_complex(data, 
enclosing_namespace),
+            Some(&Value::Array(ref variants)) => self.parse_union(variants, 
enclosing_namespace),
             Some(unknown) => Err(Error::GetComplexType(unknown.clone())),
             None => Err(Error::GetComplexTypeField),
         }
     }
 
-    fn register_resolving_schema(&mut self, name: &Name) {
+    fn register_resolving_schema(&mut self, name: &Name, aliases: &Aliases) {
         let resolving_schema = Schema::Ref { name: name.clone() };
         self.resolving_schemas
             .insert(name.clone(), resolving_schema.clone());
 
         let namespace = &name.namespace;
 
-        if let Some(ref aliases) = name.aliases {
+        if let Some(ref aliases) = aliases {
             aliases.iter().for_each(|alias| {
                 let alias_fullname = match namespace {
                     Some(ref ns) => format!("{}.{}", ns, alias),
@@ -869,13 +978,20 @@ impl Parser {
         }
     }
 
-    fn register_parsed_schema(&mut self, name: &Name, schema: &Schema) {
-        self.parsed_schemas.insert(name.clone(), schema.clone());
-        self.resolving_schemas.remove(name);
+    fn register_parsed_schema(
+        &mut self,
+        fully_qualified_name: &Name,
+        schema: &Schema,
+        aliases: &Aliases,
+    ) {
+        // FIXME, this should be globally aware, so if there is something 
overwriting something else then there is an ambiguois schema definition. An 
apropriate error should be thrown
+        self.parsed_schemas
+            .insert(fully_qualified_name.clone(), schema.clone());
+        self.resolving_schemas.remove(fully_qualified_name);
 
-        let namespace = &name.namespace;
+        let namespace = &fully_qualified_name.namespace;
 
-        if let Some(ref aliases) = name.aliases {
+        if let Some(ref aliases) = aliases {
             aliases.iter().for_each(|alias| {
                 let alias_fullname = match namespace {
                     Some(ref ns) => format!("{}.{}", ns, alias),
@@ -889,10 +1005,16 @@ impl Parser {
     }
 
     /// Returns already parsed schema or a schema that is currently being 
resolved.
-    fn get_already_seen_schema(&self, complex: &Map<String, Value>) -> 
Option<&Schema> {
+    fn get_already_seen_schema(
+        &self,
+        complex: &Map<String, Value>,
+        enclosing_namespace: &Namespace,
+    ) -> Option<&Schema> {
         match complex.get("type") {
             Some(Value::String(ref typ)) => {
-                let name = Name::new(typ.as_str()).unwrap();
+                let name = Name::new(typ.as_str())
+                    .unwrap()
+                    .fully_qualified_name(enclosing_namespace);
                 self.resolving_schemas
                     .get(&name)
                     .or_else(|| self.parsed_schemas.get(&name))
@@ -903,20 +1025,24 @@ impl Parser {
 
     /// Parse a `serde_json::Value` representing a Avro record type into a
     /// `Schema`.
-    fn parse_record(&mut self, complex: &Map<String, Value>) -> 
AvroResult<Schema> {
+    fn parse_record(
+        &mut self,
+        complex: &Map<String, Value>,
+        enclosing_namespace: &Namespace,
+    ) -> AvroResult<Schema> {
         let name = Name::parse(complex)?;
-
+        let aliases = complex.aliases();
         let fields_opt = complex.get("fields");
 
         if fields_opt.is_none() {
-            if let Some(seen) = self.get_already_seen_schema(complex) {
+            if let Some(seen) = self.get_already_seen_schema(complex, 
enclosing_namespace) {
                 return Ok(seen.clone());
             }
         }
 
         let mut lookup = HashMap::new();
-
-        self.register_resolving_schema(&name);
+        let fully_qualified_name = 
name.fully_qualified_name(enclosing_namespace);
+        self.register_resolving_schema(&fully_qualified_name, &aliases);
 
         let fields: Vec<RecordField> = fields_opt
             .and_then(|fields| fields.as_array())
@@ -926,7 +1052,9 @@ impl Parser {
                     .iter()
                     .filter_map(|field| field.as_object())
                     .enumerate()
-                    .map(|(position, field)| RecordField::parse(field, 
position, self))
+                    .map(|(position, field)| {
+                        RecordField::parse(field, position, self, 
&fully_qualified_name.namespace)
+                    })
                     .collect::<Result<_, _>>()
             })?;
 
@@ -935,25 +1063,31 @@ impl Parser {
         }
 
         let schema = Schema::Record {
-            name: name.clone(),
+            name,
+            aliases: aliases.clone(),
             doc: complex.doc(),
             fields,
             lookup,
         };
 
-        self.register_parsed_schema(&name, &schema);
+        self.register_parsed_schema(&fully_qualified_name, &schema, &aliases);
         Ok(schema)
     }
 
     /// Parse a `serde_json::Value` representing a Avro enum type into a
     /// `Schema`.
-    fn parse_enum(&mut self, complex: &Map<String, Value>) -> 
AvroResult<Schema> {
+    fn parse_enum(
+        &mut self,
+        complex: &Map<String, Value>,
+        enclosing_namespace: &Namespace,
+    ) -> AvroResult<Schema> {
         let name = Name::parse(complex)?;
-
+        let aliases = complex.aliases();
+        let fully_qualified_name = 
name.fully_qualified_name(enclosing_namespace);
         let symbols_opt = complex.get("symbols");
 
         if symbols_opt.is_none() {
-            if let Some(seen) = self.get_already_seen_schema(complex) {
+            if let Some(seen) = self.get_already_seen_schema(complex, 
enclosing_namespace) {
                 return Ok(seen.clone());
             }
         }
@@ -985,54 +1119,72 @@ impl Parser {
         }
 
         let schema = Schema::Enum {
-            name: name.clone(),
+            name,
+            aliases: aliases.clone(),
             doc: complex.doc(),
             symbols,
         };
 
-        self.register_parsed_schema(&name, &schema);
+        self.register_parsed_schema(&fully_qualified_name, &schema, &aliases);
 
         Ok(schema)
     }
 
     /// Parse a `serde_json::Value` representing a Avro array type into a
     /// `Schema`.
-    fn parse_array(&mut self, complex: &Map<String, Value>) -> 
AvroResult<Schema> {
+    fn parse_array(
+        &mut self,
+        complex: &Map<String, Value>,
+        enclosing_namespace: &Namespace,
+    ) -> AvroResult<Schema> {
         complex
             .get("items")
             .ok_or(Error::GetArrayItemsField)
-            .and_then(|items| self.parse(items))
+            .and_then(|items| self.parse(items, enclosing_namespace))
             .map(|schema| Schema::Array(Box::new(schema)))
     }
 
     /// Parse a `serde_json::Value` representing a Avro map type into a
     /// `Schema`.
-    fn parse_map(&mut self, complex: &Map<String, Value>) -> 
AvroResult<Schema> {
+    fn parse_map(
+        &mut self,
+        complex: &Map<String, Value>,
+        enclosing_namespace: &Namespace,
+    ) -> AvroResult<Schema> {
         complex
             .get("values")
             .ok_or(Error::GetMapValuesField)
-            .and_then(|items| self.parse(items))
+            .and_then(|items| self.parse(items, enclosing_namespace))
             .map(|schema| Schema::Map(Box::new(schema)))
     }
 
     /// Parse a `serde_json::Value` representing a Avro union type into a
     /// `Schema`.
-    fn parse_union(&mut self, items: &[Value]) -> AvroResult<Schema> {
+    fn parse_union(
+        &mut self,
+        items: &[Value],
+        enclosing_namespace: &Namespace,
+    ) -> AvroResult<Schema> {
         items
             .iter()
-            .map(|v| self.parse(v))
+            .map(|v| self.parse(v, enclosing_namespace))
             .collect::<Result<Vec<_>, _>>()
             .and_then(|schemas| Ok(Schema::Union(UnionSchema::new(schemas)?)))
     }
 
     /// Parse a `serde_json::Value` representing a Avro fixed type into a
     /// `Schema`.
-    fn parse_fixed(&mut self, complex: &Map<String, Value>) -> 
AvroResult<Schema> {
+    fn parse_fixed(
+        &mut self,
+        complex: &Map<String, Value>,
+        enclosing_namespace: &Namespace,
+    ) -> AvroResult<Schema> {
         let name = Name::parse(complex)?;
-
+        let fully_qualified_name = 
name.fully_qualified_name(enclosing_namespace);
+        let aliases = complex.aliases();
         let size_opt = complex.get("size");
         if size_opt.is_none() {
-            if let Some(seen) = self.get_already_seen_schema(complex) {
+            if let Some(seen) = self.get_already_seen_schema(complex, 
enclosing_namespace) {
                 return Ok(seen.clone());
             }
         }
@@ -1047,12 +1199,13 @@ impl Parser {
             .ok_or(Error::GetFixedSizeField)?;
 
         let schema = Schema::Fixed {
-            name: name.clone(),
+            name,
+            aliases: aliases.clone(),
             doc,
             size: size as usize,
         };
 
-        self.register_parsed_schema(&name, &schema);
+        self.register_parsed_schema(&fully_qualified_name, &schema, &aliases);
 
         Ok(schema)
     }
@@ -1105,6 +1258,7 @@ impl Serialize for Schema {
             }
             Schema::Record {
                 ref name,
+                ref aliases,
                 ref doc,
                 ref fields,
                 ..
@@ -1118,7 +1272,7 @@ impl Serialize for Schema {
                 if let Some(ref docstr) = doc {
                     map.serialize_entry("doc", docstr)?;
                 }
-                if let Some(ref aliases) = name.aliases {
+                if let Some(ref aliases) = aliases {
                     map.serialize_entry("aliases", aliases)?;
                 }
                 map.serialize_entry("fields", fields)?;
@@ -1139,6 +1293,7 @@ impl Serialize for Schema {
                 ref name,
                 ref doc,
                 ref size,
+                ..
             } => {
                 let mut map = serializer.serialize_map(None)?;
                 map.serialize_entry("type", "fixed")?;
@@ -1204,6 +1359,7 @@ impl Serialize for Schema {
                 // duration should be or typically is.
                 let inner = Schema::Fixed {
                     name: Name::new("duration").unwrap(),
+                    aliases: None,
                     doc: None,
                     size: 12,
                 };
@@ -1453,6 +1609,7 @@ mod tests {
 
         let schema_c_expected = Schema::Record {
             name: Name::new("C").unwrap(),
+            aliases: None,
             doc: None,
             fields: vec![RecordField {
                 name: "field_one".to_string(),
@@ -1508,6 +1665,7 @@ mod tests {
 
         let schema_option_a_expected = Schema::Record {
             name: Name::new("OptionA").unwrap(),
+            aliases: None,
             doc: None,
             fields: vec![RecordField {
                 name: "field_one".to_string(),
@@ -1553,6 +1711,7 @@ mod tests {
 
         let expected = Schema::Record {
             name: Name::new("test").unwrap(),
+            aliases: None,
             doc: None,
             fields: vec![
                 RecordField {
@@ -1611,6 +1770,7 @@ mod tests {
 
         let expected = Schema::Record {
             name: Name::new("test").unwrap(),
+            aliases: None,
             doc: None,
             fields: vec![RecordField {
                 name: "recordField".to_string(),
@@ -1618,6 +1778,7 @@ mod tests {
                 default: None,
                 schema: Schema::Record {
                     name: Name::new("Node").unwrap(),
+                    aliases: None,
                     doc: None,
                     fields: vec![
                         RecordField {
@@ -1779,8 +1940,8 @@ mod tests {
             name: Name {
                 name: "LongList".to_owned(),
                 namespace: None,
-                aliases: Some(vec!["LinkedLongs".to_owned()]),
             },
+            aliases: Some(vec!["LinkedLongs".to_owned()]),
             doc: None,
             fields: vec![
                 RecordField {
@@ -1802,7 +1963,6 @@ mod tests {
                                 name: Name {
                                     name: "LongList".to_owned(),
                                     namespace: None,
-                                    aliases: 
Some(vec!["LinkedLongs".to_owned()]),
                                 },
                             },
                         ])
@@ -1846,8 +2006,8 @@ mod tests {
             name: Name {
                 name: "record".to_owned(),
                 namespace: None,
-                aliases: None,
             },
+            aliases: None,
             doc: None,
             fields: vec![
                 RecordField {
@@ -1866,7 +2026,6 @@ mod tests {
                         name: Name {
                             name: "record".to_owned(),
                             namespace: None,
-                            aliases: None,
                         },
                     },
                     order: RecordFieldOrder::Ascending,
@@ -1911,8 +2070,8 @@ mod tests {
             name: Name {
                 name: "record".to_owned(),
                 namespace: None,
-                aliases: None,
             },
+            aliases: None,
             doc: None,
             fields: vec![
                 RecordField {
@@ -1923,8 +2082,8 @@ mod tests {
                         name: Name {
                             name: "enum".to_owned(),
                             namespace: None,
-                            aliases: None,
                         },
+                        aliases: None,
                         doc: None,
                         symbols: vec!["one".to_string(), "two".to_string(), 
"three".to_string()],
                     },
@@ -1939,8 +2098,8 @@ mod tests {
                         name: Name {
                             name: "enum".to_owned(),
                             namespace: None,
-                            aliases: None,
                         },
+                        aliases: None,
                         doc: None,
                         symbols: vec!["one".to_string(), "two".to_string(), 
"three".to_string()],
                     },
@@ -1986,8 +2145,8 @@ mod tests {
             name: Name {
                 name: "record".to_owned(),
                 namespace: None,
-                aliases: None,
             },
+            aliases: None,
             doc: None,
             fields: vec![
                 RecordField {
@@ -1998,8 +2157,8 @@ mod tests {
                         name: Name {
                             name: "fixed".to_owned(),
                             namespace: None,
-                            aliases: None,
                         },
+                        aliases: None,
                         doc: None,
                         size: 456,
                     },
@@ -2014,8 +2173,8 @@ mod tests {
                         name: Name {
                             name: "fixed".to_owned(),
                             namespace: None,
-                            aliases: None,
                         },
+                        aliases: None,
                         doc: None,
                         size: 456,
                     },
@@ -2040,6 +2199,7 @@ mod tests {
 
         let expected = Schema::Enum {
             name: Name::new("Suit").unwrap(),
+            aliases: None,
             doc: None,
             symbols: vec![
                 "diamonds".to_owned(),
@@ -2076,6 +2236,7 @@ mod tests {
 
         let expected = Schema::Fixed {
             name: Name::new("test").unwrap(),
+            aliases: None,
             doc: None,
             size: 16usize,
         };
@@ -2092,6 +2253,7 @@ mod tests {
 
         let expected = Schema::Fixed {
             name: Name::new("test").unwrap(),
+            aliases: None,
             doc: Some(String::from("FixedSchema documentation")),
             size: 16usize,
         };
@@ -2365,4 +2527,574 @@ mod tests {
             _ => panic!("Expected an Error::InvalidSchemaName!"),
         }
     }
+
+    #[test]
+    fn avro_3448_test_proper_resolution_inner_record_inherited_namespace() {
+        let schema = r#"
+        {
+          "name": "record_name",
+          "namespace": "space",
+          "type": "record",
+          "fields": [
+            {
+              "name": "outer_field_1",
+              "type": [
+                        "null",
+                        {
+                            "type":"record",
+                            "name":"inner_record_name",
+                            "fields":[
+                                {
+                                    "name":"inner_field_1",
+                                    "type":"double"
+                                }
+                            ]
+                        }
+                    ]
+            },
+            {
+                "name": "outer_field_2",
+                "type" : "inner_record_name"
+            }
+          ]
+        }
+        "#;
+        let schema = Schema::parse_str(schema).unwrap();
+        let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't 
successfully parse");
+        assert!(rs.get_names().len() == 2);
+        for s in &["space.record_name", "space.inner_record_name"] {
+            assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
+        }
+    }
+
+    #[test]
+    fn avro_3448_test_proper_resolution_inner_record_qualified_namespace() {
+        let schema = r#"
+        {
+          "name": "record_name",
+          "namespace": "space",
+          "type": "record",
+          "fields": [
+            {
+              "name": "outer_field_1",
+              "type": [
+                        "null",
+                        {
+                            "type":"record",
+                            "name":"inner_record_name",
+                            "fields":[
+                                {
+                                    "name":"inner_field_1",
+                                    "type":"double"
+                                }
+                            ]
+                        }
+                    ]
+            },
+            {
+                "name": "outer_field_2",
+                "type" : "space.inner_record_name"
+            }
+          ]
+        }
+        "#;
+        let schema = Schema::parse_str(schema).unwrap();
+        let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't 
successfully parse");
+        assert!(rs.get_names().len() == 2);
+        for s in &["space.record_name", "space.inner_record_name"] {
+            assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
+        }
+    }
+
+    #[test]
+    fn avro_3448_test_proper_resolution_inner_enum_inherited_namespace() {
+        let schema = r#"
+        {
+          "name": "record_name",
+          "namespace": "space",
+          "type": "record",
+          "fields": [
+            {
+              "name": "outer_field_1",
+              "type": [
+                        "null",
+                        {
+                            "type":"enum",
+                            "name":"inner_enum_name",
+                            "symbols":["Extensive","Testing"]
+                        }
+                    ]
+            },
+            {
+                "name": "outer_field_2",
+                "type" : "inner_enum_name"
+            }
+          ]
+        }
+        "#;
+        let schema = Schema::parse_str(schema).unwrap();
+        let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't 
successfully parse");
+        assert!(rs.get_names().len() == 2);
+        for s in &["space.record_name", "space.inner_enum_name"] {
+            assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
+        }
+    }
+
+    #[test]
+    fn avro_3448_test_proper_resolution_inner_enum_qualified_namespace() {
+        let schema = r#"
+        {
+          "name": "record_name",
+          "namespace": "space",
+          "type": "record",
+          "fields": [
+            {
+              "name": "outer_field_1",
+              "type": [
+                        "null",
+                        {
+                            "type":"enum",
+                            "name":"inner_enum_name",
+                            "symbols":["Extensive","Testing"]
+                        }
+                    ]
+            },
+            {
+                "name": "outer_field_2",
+                "type" : "space.inner_enum_name"
+            }
+          ]
+        }
+        "#;
+        let schema = Schema::parse_str(schema).unwrap();
+        let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't 
successfully parse");
+        assert!(rs.get_names().len() == 2);
+        for s in &["space.record_name", "space.inner_enum_name"] {
+            assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
+        }
+    }
+
+    #[test]
+    fn avro_3448_test_proper_resolution_inner_fixed_inherited_namespace() {
+        let schema = r#"
+        {
+          "name": "record_name",
+          "namespace": "space",
+          "type": "record",
+          "fields": [
+            {
+              "name": "outer_field_1",
+              "type": [
+                        "null",
+                        {
+                            "type":"fixed",
+                            "name":"inner_fixed_name",
+                            "size": 16
+                        }
+                    ]
+            },
+            {
+                "name": "outer_field_2",
+                "type" : "inner_fixed_name"
+            }
+          ]
+        }
+        "#;
+        let schema = Schema::parse_str(schema).unwrap();
+        let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't 
successfully parse");
+        assert!(rs.get_names().len() == 2);
+        for s in &["space.record_name", "space.inner_fixed_name"] {
+            assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
+        }
+    }
+
+    #[test]
+    fn avro_3448_test_proper_resolution_inner_fixed_qualified_namespace() {
+        let schema = r#"
+        {
+          "name": "record_name",
+          "namespace": "space",
+          "type": "record",
+          "fields": [
+            {
+              "name": "outer_field_1",
+              "type": [
+                        "null",
+                        {
+                            "type":"fixed",
+                            "name":"inner_fixed_name",
+                            "size": 16
+                        }
+                    ]
+            },
+            {
+                "name": "outer_field_2",
+                "type" : "space.inner_fixed_name"
+            }
+          ]
+        }
+        "#;
+        let schema = Schema::parse_str(schema).unwrap();
+        let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't 
successfully parse");
+        assert!(rs.get_names().len() == 2);
+        for s in &["space.record_name", "space.inner_fixed_name"] {
+            assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
+        }
+    }
+
+    #[test]
+    fn avro_3448_test_proper_resolution_inner_record_inner_namespace() {
+        let schema = r#"
+        {
+          "name": "record_name",
+          "namespace": "space",
+          "type": "record",
+          "fields": [
+            {
+              "name": "outer_field_1",
+              "type": [
+                        "null",
+                        {
+                            "type":"record",
+                            "name":"inner_record_name",
+                            "namespace":"inner_space",
+                            "fields":[
+                                {
+                                    "name":"inner_field_1",
+                                    "type":"double"
+                                }
+                            ]
+                        }
+                    ]
+            },
+            {
+                "name": "outer_field_2",
+                "type" : "inner_space.inner_record_name"
+            }
+          ]
+        }
+        "#;
+        let schema = Schema::parse_str(schema).unwrap();
+        let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't 
successfully parse");
+        assert!(rs.get_names().len() == 2);
+        for s in &["space.record_name", "inner_space.inner_record_name"] {
+            assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
+        }
+    }
+
+    #[test]
+    fn avro_3448_test_proper_resolution_inner_enum_inner_namespace() {
+        let schema = r#"
+        {
+          "name": "record_name",
+          "namespace": "space",
+          "type": "record",
+          "fields": [
+            {
+              "name": "outer_field_1",
+              "type": [
+                        "null",
+                        {
+                            "type":"enum",
+                            "name":"inner_enum_name",
+                            "namespace": "inner_space",
+                            "symbols":["Extensive","Testing"]
+                        }
+                    ]
+            },
+            {
+                "name": "outer_field_2",
+                "type" : "inner_space.inner_enum_name"
+            }
+          ]
+        }
+        "#;
+        let schema = Schema::parse_str(schema).unwrap();
+        let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't 
successfully parse");
+        assert!(rs.get_names().len() == 2);
+        for s in &["space.record_name", "inner_space.inner_enum_name"] {
+            assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
+        }
+    }
+
+    #[test]
+    fn avro_3448_test_proper_resolution_inner_fixed_inner_namespace() {
+        let schema = r#"
+        {
+          "name": "record_name",
+          "namespace": "space",
+          "type": "record",
+          "fields": [
+            {
+              "name": "outer_field_1",
+              "type": [
+                        "null",
+                        {
+                            "type":"fixed",
+                            "name":"inner_fixed_name",
+                            "namespace": "inner_space",
+                            "size": 16
+                        }
+                    ]
+            },
+            {
+                "name": "outer_field_2",
+                "type" : "inner_space.inner_fixed_name"
+            }
+          ]
+        }
+        "#;
+        let schema = Schema::parse_str(schema).unwrap();
+        let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't 
successfully parse");
+        assert!(rs.get_names().len() == 2);
+        for s in &["space.record_name", "inner_space.inner_fixed_name"] {
+            assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
+        }
+    }
+
+    #[test]
+    fn 
avro_3448_test_proper_multi_level_resolution_inner_record_outer_namespace() {
+        let schema = r#"
+        {
+          "name": "record_name",
+          "namespace": "space",
+          "type": "record",
+          "fields": [
+            {
+              "name": "outer_field_1",
+              "type": [
+                        "null",
+                        {
+                            "type":"record",
+                            "name":"middle_record_name",
+                            "fields":[
+                                {
+                                    "name":"middle_field_1",
+                                    "type":[
+                                        "null",
+                                        {
+                                            "type":"record",
+                                            "name":"inner_record_name",
+                                            "fields":[
+                                                {
+                                                    "name":"inner_field_1",
+                                                    "type":"double"
+                                                }
+                                            ]
+                                        }
+                                    ]
+                                }
+                            ]
+                        }
+                    ]
+            },
+            {
+                "name": "outer_field_2",
+                "type" : "space.inner_record_name"
+            }
+          ]
+        }
+        "#;
+        let schema = Schema::parse_str(schema).unwrap();
+        let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't 
successfully parse");
+        assert!(rs.get_names().len() == 3);
+        for s in &[
+            "space.record_name",
+            "space.middle_record_name",
+            "space.inner_record_name",
+        ] {
+            assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
+        }
+    }
+
+    #[test]
+    fn 
avro_3448_test_proper_multi_level_resolution_inner_record_middle_namespace() {
+        let schema = r#"
+        {
+          "name": "record_name",
+          "namespace": "space",
+          "type": "record",
+          "fields": [
+            {
+              "name": "outer_field_1",
+              "type": [
+                        "null",
+                        {
+                            "type":"record",
+                            "name":"middle_record_name",
+                            "namespace":"middle_namespace",
+                            "fields":[
+                                {
+                                    "name":"middle_field_1",
+                                    "type":[
+                                        "null",
+                                        {
+                                            "type":"record",
+                                            "name":"inner_record_name",
+                                            "fields":[
+                                                {
+                                                    "name":"inner_field_1",
+                                                    "type":"double"
+                                                }
+                                            ]
+                                        }
+                                    ]
+                                }
+                            ]
+                        }
+                    ]
+            },
+            {
+                "name": "outer_field_2",
+                "type" : "middle_namespace.inner_record_name"
+            }
+          ]
+        }
+        "#;
+        let schema = Schema::parse_str(schema).unwrap();
+        let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't 
successfully parse");
+        assert!(rs.get_names().len() == 3);
+        for s in &[
+            "space.record_name",
+            "middle_namespace.middle_record_name",
+            "middle_namespace.inner_record_name",
+        ] {
+            assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
+        }
+    }
+
+    #[test]
+    fn 
avro_3448_test_proper_multi_level_resolution_inner_record_inner_namespace() {
+        let schema = r#"
+        {
+          "name": "record_name",
+          "namespace": "space",
+          "type": "record",
+          "fields": [
+            {
+              "name": "outer_field_1",
+              "type": [
+                        "null",
+                        {
+                            "type":"record",
+                            "name":"middle_record_name",
+                            "namespace":"middle_namespace",
+                            "fields":[
+                                {
+                                    "name":"middle_field_1",
+                                    "type":[
+                                        "null",
+                                        {
+                                            "type":"record",
+                                            "name":"inner_record_name",
+                                            "namespace":"inner_namespace",
+                                            "fields":[
+                                                {
+                                                    "name":"inner_field_1",
+                                                    "type":"double"
+                                                }
+                                            ]
+                                        }
+                                    ]
+                                }
+                            ]
+                        }
+                    ]
+            },
+            {
+                "name": "outer_field_2",
+                "type" : "inner_namespace.inner_record_name"
+            }
+          ]
+        }
+        "#;
+        let schema = Schema::parse_str(schema).unwrap();
+        let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't 
successfully parse");
+        assert!(rs.get_names().len() == 3);
+        for s in &[
+            "space.record_name",
+            "middle_namespace.middle_record_name",
+            "inner_namespace.inner_record_name",
+        ] {
+            assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
+        }
+    }
+
+    #[test]
+    fn avro_3448_test_proper_in_array_resolution_inherited_namespace() {
+        let schema = r#"
+        {
+          "name": "record_name",
+          "namespace": "space",
+          "type": "record",
+          "fields": [
+            {
+              "name": "outer_field_1",
+              "type": {
+                  "type":"array",
+                  "items":{
+                      "type":"record",
+                      "name":"in_array_record",
+                      "fields": [
+                          {
+                              "name":"array_record_field",
+                              "type":"string"
+                          }
+                      ]
+                  }
+              }
+            },
+            {
+                "name":"outer_field_2",
+                "type":"in_array_record"
+            }
+          ]
+        }
+        "#;
+        let schema = Schema::parse_str(schema).unwrap();
+        let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't 
successfully parse");
+        assert!(rs.get_names().len() == 2);
+        for s in &["space.record_name", "space.in_array_record"] {
+            assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
+        }
+    }
+
+    #[test]
+    fn avro_3448_test_proper_in_map_resolution_inherited_namespace() {
+        let schema = r#"
+        {
+          "name": "record_name",
+          "namespace": "space",
+          "type": "record",
+          "fields": [
+            {
+              "name": "outer_field_1",
+              "type": {
+                  "type":"map",
+                  "values":{
+                      "type":"record",
+                      "name":"in_map_record",
+                      "fields": [
+                          {
+                              "name":"map_record_field",
+                              "type":"string"
+                          }
+                      ]
+                  }
+              }
+            },
+            {
+                "name":"outer_field_2",
+                "type":"in_map_record"
+            }
+          ]
+        }
+        "#;
+        let schema = Schema::parse_str(schema).unwrap();
+        let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't 
successfully parse");
+        assert!(rs.get_names().len() == 2);
+        for s in &["space.record_name", "space.in_map_record"] {
+            assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
+        }
+    }
 }
diff --git a/lang/rust/avro/src/schema_compatibility.rs 
b/lang/rust/avro/src/schema_compatibility.rs
index b3fb4db..43fbbbe 100644
--- a/lang/rust/avro/src/schema_compatibility.rs
+++ b/lang/rust/avro/src/schema_compatibility.rs
@@ -232,12 +232,14 @@ impl SchemaCompatibility {
                 SchemaKind::Fixed => {
                     if let Schema::Fixed {
                         name: w_name,
+                        aliases: _,
                         doc: _w_doc,
                         size: w_size,
                     } = writers_schema
                     {
                         if let Schema::Fixed {
                             name: r_name,
+                            aliases: _,
                             doc: _r_doc,
                             size: r_size,
                         } = readers_schema
diff --git a/lang/rust/avro/src/types.rs b/lang/rust/avro/src/types.rs
index e2f6962..322d21b 100644
--- a/lang/rust/avro/src/types.rs
+++ b/lang/rust/avro/src/types.rs
@@ -409,6 +409,7 @@ impl Value {
     /// in the Avro specification for the full set of rules of schema
     /// resolution.
     pub fn resolve(self, schema: &Schema) -> AvroResult<Self> {
+        // FIXME transition to using resolved Schema
         let mut schemas_by_name: HashMap<Name, Schema> = HashMap::new();
         self.resolve_internal(schema, &mut schemas_by_name)
     }
@@ -440,7 +441,7 @@ impl Value {
                     if let Some(resolved) = schemas_by_name.get(name) {
                         resolve0(value, resolved, &mut schemas_by_name.clone())
                     } else {
-                        Err(Error::SchemaResolutionError(name.fullname(None)))
+                        Err(Error::SchemaResolutionError(name.clone()))
                     }
                 }
                 Schema::Null => val.resolve_null(),
@@ -934,6 +935,7 @@ mod tests {
         let schema = Schema::Fixed {
             size: 4,
             name: Name::new("some_fixed").unwrap(),
+            aliases: None,
             doc: None,
         };
 
@@ -947,6 +949,7 @@ mod tests {
     fn validate_enum() {
         let schema = Schema::Enum {
             name: Name::new("some_enum").unwrap(),
+            aliases: None,
             doc: None,
             symbols: vec![
                 "spades".to_string(),
@@ -964,6 +967,7 @@ mod tests {
 
         let other_schema = Schema::Enum {
             name: Name::new("some_other_enum").unwrap(),
+            aliases: None,
             doc: None,
             symbols: vec![
                 "hearts".to_string(),
@@ -988,6 +992,7 @@ mod tests {
         // }
         let schema = Schema::Record {
             name: Name::new("some_record").unwrap(),
+            aliases: None,
             doc: None,
             fields: vec![
                 RecordField {
@@ -1139,6 +1144,7 @@ mod tests {
                 scale: 1,
                 inner: Box::new(Schema::Fixed {
                     name: Name::new("decimal").unwrap(),
+                    aliases: None,
                     size: 20,
                     doc: None
                 })
diff --git a/lang/rust/avro/src/util.rs b/lang/rust/avro/src/util.rs
index e2b353b..232dc64 100644
--- a/lang/rust/avro/src/util.rs
+++ b/lang/rust/avro/src/util.rs
@@ -15,7 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::{AvroResult, Error};
+use crate::{
+    schema::{Aliases, Documentation},
+    AvroResult, Error,
+};
 use serde_json::{Map, Value};
 use std::{convert::TryFrom, i64, io::Read, sync::Once};
 
@@ -33,9 +36,11 @@ pub trait MapHelper {
         self.string("name")
     }
 
-    fn doc(&self) -> Option<String> {
+    fn doc(&self) -> Documentation {
         self.string("doc")
     }
+
+    fn aliases(&self) -> Aliases;
 }
 
 impl MapHelper for Map<String, Value> {
@@ -44,6 +49,19 @@ impl MapHelper for Map<String, Value> {
             .and_then(|v| v.as_str())
             .map(|v| v.to_string())
     }
+
+    fn aliases(&self) -> Aliases {
+        // FIXME no warning when aliases aren't a json array of json strings
+        self.get("aliases")
+            .and_then(|aliases| aliases.as_array())
+            .and_then(|aliases| {
+                aliases
+                    .iter()
+                    .map(|alias| alias.as_str())
+                    .map(|alias| alias.map(|a| a.to_string()))
+                    .collect::<Option<_>>()
+            })
+    }
 }
 
 pub fn read_long<R: Read>(reader: &mut R) -> AvroResult<i64> {
diff --git a/lang/rust/avro/src/writer.rs b/lang/rust/avro/src/writer.rs
index e9518e3..e257916 100644
--- a/lang/rust/avro/src/writer.rs
+++ b/lang/rust/avro/src/writer.rs
@@ -17,7 +17,7 @@
 
 //! Logic handling writing in Avro format at user level.
 use crate::{
-    encode::{encode, encode_ref, encode_to_vec},
+    encode::{encode, encode_to_vec},
     schema::Schema,
     ser::Serializer,
     types::Value,
@@ -266,7 +266,7 @@ impl<'a, W: Write> Writer<'a, W> {
 
     /// Append a raw Avro Value to the payload avoiding to encode it again.
     fn append_raw(&mut self, value: &Value, schema: &Schema) -> 
AvroResult<usize> {
-        self.append_bytes(encode_to_vec(value, schema).as_ref())
+        self.append_bytes(encode_to_vec(value, schema)?.as_ref())
     }
 
     /// Append pure bytes to the payload.
@@ -309,7 +309,7 @@ impl<'a, W: Write> Writer<'a, W> {
             &metadata.into(),
             &Schema::Map(Box::new(Schema::Bytes)),
             &mut header,
-        );
+        )?;
         header.extend_from_slice(&self.marker);
 
         Ok(header)
@@ -341,7 +341,7 @@ fn write_avro_datum<T: Into<Value>>(
     if !avro.validate(schema) {
         return Err(Error::Validation);
     }
-    encode(&avro, schema, buffer);
+    encode(&avro, schema, buffer)?;
     Ok(())
 }
 
@@ -349,7 +349,7 @@ fn write_value_ref(schema: &Schema, value: &Value, buffer: 
&mut Vec<u8>) -> Avro
     if !value.validate(schema) {
         return Err(Error::Validation);
     }
-    encode_ref(value, schema, buffer);
+    encode(value, schema, buffer)?;
     Ok(())
 }
 
@@ -521,6 +521,7 @@ mod tests {
         let size = 30;
         let inner = Schema::Fixed {
             name: Name::new("decimal").unwrap(),
+            aliases: None,
             doc: None,
             size,
         };
@@ -559,6 +560,7 @@ mod tests {
     fn duration() -> TestResult<()> {
         let inner = Schema::Fixed {
             name: Name::new("duration").unwrap(),
+            aliases: None,
             doc: None,
             size: 12,
         };
diff --git a/lang/rust/avro/tests/schema.rs b/lang/rust/avro/tests/schema.rs
index 24ae247..c404452 100644
--- a/lang/rust/avro/tests/schema.rs
+++ b/lang/rust/avro/tests/schema.rs
@@ -845,6 +845,7 @@ fn test_parse_reused_record_schema_by_fullname() {
     match schema.unwrap() {
         Schema::Record {
             ref name,
+            aliases: _,
             doc: _,
             ref fields,
             lookup: _,
@@ -1088,7 +1089,7 @@ fn test_fullname_name_and_default_namespace_specified() {
     init();
     let name: Name =
         serde_json::from_str(r#"{"name": "a", "namespace": null, "aliases": 
null}"#).unwrap();
-    let fullname = name.fullname(Some("b.c.d"));
+    let fullname = name.fullname(Some("b.c.d".into()));
     assert_eq!("b.c.d.a", fullname);
 }
 
@@ -1097,7 +1098,7 @@ fn 
test_fullname_fullname_and_default_namespace_specified() {
     init();
     let name: Name =
         serde_json::from_str(r#"{"name": "a.b.c.d", "namespace": null, 
"aliases": null}"#).unwrap();
-    let fullname = name.fullname(Some("o.a.h"));
+    let fullname = name.fullname(Some("o.a.h".into()));
     assert_eq!("a.b.c.d", fullname);
 }
 
@@ -1107,7 +1108,7 @@ fn 
test_fullname_fullname_namespace_and_default_namespace_specified() {
     let name: Name =
         serde_json::from_str(r#"{"name": "a.b.c.d", "namespace": "o.a.a", 
"aliases": null}"#)
             .unwrap();
-    let fullname = name.fullname(Some("o.a.h"));
+    let fullname = name.fullname(Some("o.a.h".into()));
     assert_eq!("a.b.c.d", fullname);
 }
 
@@ -1116,7 +1117,7 @@ fn 
test_fullname_name_namespace_and_default_namespace_specified() {
     init();
     let name: Name =
         serde_json::from_str(r#"{"name": "a", "namespace": "o.a.a", "aliases": 
null}"#).unwrap();
-    let fullname = name.fullname(Some("o.a.h"));
+    let fullname = name.fullname(Some("o.a.h".into()));
     assert_eq!("o.a.a.a", fullname);
 }
 

Reply via email to