This is an automated email from the ASF dual-hosted git repository. mgrigorov pushed a commit to branch avro-3609-rust-support-custom-attributes in repository https://gitbox.apache.org/repos/asf/avro.git
commit a9cedd0355efee6a7b97c3d34c9d3b057cd0714d Author: Martin Tzvetanov Grigorov <[email protected]> AuthorDate: Tue Aug 16 14:17:02 2022 +0300 AVRO-3609: [Rust] Add custom attributes field to Record, Enum and Fixed schemata Signed-off-by: Martin Tzvetanov Grigorov <[email protected]> --- lang/rust/avro/src/decode.rs | 2 ++ lang/rust/avro/src/schema.rs | 41 +++++++++++++++++++++++------- lang/rust/avro/src/schema_compatibility.rs | 2 ++ lang/rust/avro/src/types.rs | 21 ++++++++++----- lang/rust/avro/src/writer.rs | 2 ++ lang/rust/avro/tests/schema.rs | 1 + lang/rust/avro_derive/src/lib.rs | 2 ++ 7 files changed, 55 insertions(+), 16 deletions(-) diff --git a/lang/rust/avro/src/decode.rs b/lang/rust/avro/src/decode.rs index cb32ebdb7..4f9e7e945 100644 --- a/lang/rust/avro/src/decode.rs +++ b/lang/rust/avro/src/decode.rs @@ -344,6 +344,7 @@ mod tests { doc: None, name: Name::new("decimal").unwrap(), aliases: None, + attributes: Default::default(), }); let schema = Schema::Decimal { inner, @@ -370,6 +371,7 @@ mod tests { name: Name::new("decimal").unwrap(), aliases: None, doc: None, + attributes: Default::default(), }); let schema = Schema::Decimal { inner, diff --git a/lang/rust/avro/src/schema.rs b/lang/rust/avro/src/schema.rs index 0a9603fe7..04f9e62eb 100644 --- a/lang/rust/avro/src/schema.rs +++ b/lang/rust/avro/src/schema.rs @@ -107,6 +107,7 @@ pub enum Schema { doc: Documentation, fields: Vec<RecordField>, lookup: BTreeMap<String, usize>, + attributes: HashMap<String, Value>, }, /// An `enum` Avro schema. Enum { @@ -114,6 +115,7 @@ pub enum Schema { aliases: Aliases, doc: Documentation, symbols: Vec<String>, + attributes: HashMap<String, Value>, }, /// A `fixed` Avro schema. Fixed { @@ -121,6 +123,7 @@ pub enum Schema { aliases: Aliases, doc: Documentation, size: usize, + attributes: HashMap<String, Value>, }, /// Logical type which represents `Decimal` values. The underlying type is serialized and /// deserialized as `Schema::Bytes` or `Schema::Fixed`. @@ -340,7 +343,7 @@ impl<'de> Deserialize<'de> for Name { where D: serde::de::Deserializer<'de>, { - serde_json::Value::deserialize(deserializer).and_then(|value| { + Value::deserialize(deserializer).and_then(|value| { use serde::de::Error; if let Value::Object(json) = value { Name::parse(&json).map_err(D::Error::custom) @@ -1220,6 +1223,7 @@ impl Parser { doc: complex.doc(), fields, lookup, + attributes: Default::default(), }; self.register_parsed_schema(&fully_qualified_name, &schema, &aliases); @@ -1276,6 +1280,7 @@ impl Parser { aliases: aliases.clone(), doc: complex.doc(), symbols, + attributes: Default::default(), }; self.register_parsed_schema(&fully_qualified_name, &schema, &aliases); @@ -1357,6 +1362,7 @@ impl Parser { aliases: aliases.clone(), doc, size: size as usize, + attributes: Default::default(), }; self.register_parsed_schema(&fully_qualified_name, &schema, &aliases); @@ -1556,6 +1562,7 @@ impl Serialize for Schema { aliases: None, doc: None, size: 12, + attributes: Default::default(), }; map.serialize_entry("type", &inner)?; map.serialize_entry("logicalType", "duration")?; @@ -1584,11 +1591,11 @@ impl Serialize for RecordField { /// Parses a **valid** avro schema into the Parsing Canonical Form. /// https://avro.apache.org/docs/1.8.2/spec.html#Parsing+Canonical+Form+for+Schemas -fn parsing_canonical_form(schema: &serde_json::Value) -> String { +fn parsing_canonical_form(schema: &Value) -> String { match schema { - serde_json::Value::Object(map) => pcf_map(map), - serde_json::Value::String(s) => pcf_string(s), - serde_json::Value::Array(v) => pcf_array(v), + Value::Object(map) => pcf_map(map), + Value::String(s) => pcf_string(s), + Value::Array(v) => pcf_array(v), json => panic!( "got invalid JSON value for canonical form of schema: {0}", json @@ -1596,7 +1603,7 @@ fn parsing_canonical_form(schema: &serde_json::Value) -> String { } } -fn pcf_map(schema: &Map<String, serde_json::Value>) -> String { +fn pcf_map(schema: &Map<String, Value>) -> String { // Look for the namespace variant up front. let ns = schema.get("namespace").and_then(|v| v.as_str()); let mut fields = Vec::new(); @@ -1604,7 +1611,7 @@ fn pcf_map(schema: &Map<String, serde_json::Value>) -> String { // Reduce primitive types to their simple form. ([PRIMITIVE] rule) if schema.len() == 1 && k == "type" { // Invariant: function is only callable from a valid schema, so this is acceptable. - if let serde_json::Value::String(s) = v { + if let Value::String(s) = v { return pcf_string(s); } } @@ -1656,7 +1663,7 @@ fn pcf_map(schema: &Map<String, serde_json::Value>) -> String { format!("{{{}}}", inter) } -fn pcf_array(arr: &[serde_json::Value]) -> String { +fn pcf_array(arr: &[Value]) -> String { let inter = arr .iter() .map(parsing_canonical_form) @@ -1764,7 +1771,7 @@ pub mod derive { T: AvroSchemaComponent, { fn get_schema() -> Schema { - T::get_schema_in_ctxt(&mut HashMap::default(), &Option::None) + T::get_schema_in_ctxt(&mut HashMap::default(), &None) } } @@ -2026,6 +2033,7 @@ mod tests { position: 0, }], lookup: BTreeMap::from_iter(vec![("field_one".to_string(), 0)]), + attributes: Default::default(), }; assert_eq!(schema_c, schema_c_expected); @@ -2113,6 +2121,7 @@ mod tests { position: 0, }], lookup: BTreeMap::from_iter(vec![("field_one".to_string(), 0)]), + attributes: Default::default(), }; assert_eq!(schema_option_a, schema_option_a_expected); @@ -2161,6 +2170,7 @@ mod tests { }, ], lookup, + attributes: Default::default(), }; assert_eq!(parsed, expected); @@ -2230,11 +2240,13 @@ mod tests { }, ], lookup: node_lookup, + attributes: Default::default(), }, order: RecordFieldOrder::Ascending, position: 0, }], lookup, + attributes: Default::default(), }; assert_eq!(schema, expected); @@ -2402,6 +2414,7 @@ mod tests { }, ], lookup, + attributes: Default::default(), }; assert_eq!(schema, expected); @@ -2462,6 +2475,7 @@ mod tests { }, ], lookup, + attributes: Default::default(), }; assert_eq!(schema, expected); @@ -2515,6 +2529,7 @@ mod tests { aliases: None, doc: None, symbols: vec!["one".to_string(), "two".to_string(), "three".to_string()], + attributes: Default::default(), }, order: RecordFieldOrder::Ascending, position: 0, @@ -2531,12 +2546,14 @@ mod tests { aliases: None, doc: None, symbols: vec!["one".to_string(), "two".to_string(), "three".to_string()], + attributes: Default::default(), }, order: RecordFieldOrder::Ascending, position: 1, }, ], lookup, + attributes: Default::default(), }; assert_eq!(schema, expected); @@ -2590,6 +2607,7 @@ mod tests { aliases: None, doc: None, size: 456, + attributes: Default::default(), }, order: RecordFieldOrder::Ascending, position: 0, @@ -2606,12 +2624,14 @@ mod tests { aliases: None, doc: None, size: 456, + attributes: Default::default(), }, order: RecordFieldOrder::Ascending, position: 1, }, ], lookup, + attributes: Default::default(), }; assert_eq!(schema, expected); @@ -2636,6 +2656,7 @@ mod tests { "clubs".to_owned(), "hearts".to_owned(), ], + attributes: Default::default(), }; assert_eq!(expected, schema); @@ -2668,6 +2689,7 @@ mod tests { aliases: None, doc: None, size: 16usize, + attributes: Default::default(), }; assert_eq!(expected, schema); @@ -2685,6 +2707,7 @@ mod tests { aliases: None, doc: Some(String::from("FixedSchema documentation")), size: 16usize, + attributes: Default::default(), }; assert_eq!(expected, schema); diff --git a/lang/rust/avro/src/schema_compatibility.rs b/lang/rust/avro/src/schema_compatibility.rs index 43fbbbe8d..843a139df 100644 --- a/lang/rust/avro/src/schema_compatibility.rs +++ b/lang/rust/avro/src/schema_compatibility.rs @@ -235,6 +235,7 @@ impl SchemaCompatibility { aliases: _, doc: _w_doc, size: w_size, + attributes: _, } = writers_schema { if let Schema::Fixed { @@ -242,6 +243,7 @@ impl SchemaCompatibility { aliases: _, doc: _r_doc, size: r_size, + attributes: _, } = readers_schema { return w_name.fullname(None) == r_name.fullname(None) diff --git a/lang/rust/avro/src/types.rs b/lang/rust/avro/src/types.rs index 042350b93..ac3ba21b9 100644 --- a/lang/rust/avro/src/types.rs +++ b/lang/rust/avro/src/types.rs @@ -1039,9 +1039,10 @@ mod tests { position: 0, }], lookup: Default::default(), + attributes: Default::default(), }, false, - "Invalid value: Record([(\"unknown_field_name\", Null)]) for schema: Record { name: Name { name: \"record_name\", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: \"field_name\", doc: None, default: None, schema: Int, order: Ignore, position: 0 }], lookup: {} }. Reason: There is no schema field for field 'unknown_field_name'", + "Invalid value: Record([(\"unknown_field_name\", Null)]) for schema: Record { name: Name { name: \"record_name\", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: \"field_name\", doc: None, default: None, schema: Int, order: Ignore, position: 0 }], lookup: {}, attributes: {} }. Reason: There is no schema field for field 'unknown_field_name'", ), ( Value::Record(vec![("field_name".to_string(), Value::Null)]), @@ -1060,9 +1061,10 @@ mod tests { position: 0, }], lookup: [("field_name".to_string(), 0)].iter().cloned().collect(), + attributes: Default::default(), }, false, - "Invalid value: Record([(\"field_name\", Null)]) for schema: Record { name: Name { name: \"record_name\", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: \"field_name\", doc: None, default: None, schema: Ref { name: Name { name: \"missing\", namespace: None } }, order: Ignore, position: 0 }], lookup: {\"field_name\": 0} }. Reason: Unresolved schema reference: 'missing'. Parsed names: []", + "Invalid value: Record([(\"field_name\", Null)]) for schema: Record { name: Name { name: \"record_name\", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: \"field_name\", doc: None, default: None, schema: Ref { name: Name { name: \"missing\", namespace: None } }, order: Ignore, position: 0 }], lookup: {\"field_name\": 0}, attributes: {} }. Reason: Unresolved schema reference: 'missing'. Parsed names: []", ), ]; @@ -1088,6 +1090,7 @@ mod tests { name: Name::new("some_fixed").unwrap(), aliases: None, doc: None, + attributes: Default::default(), }; assert!(Value::Fixed(4, vec![0, 0, 0, 0]).validate(&schema)); @@ -1125,6 +1128,7 @@ mod tests { "diamonds".to_string(), "clubs".to_string(), ], + attributes: Default::default(), }; assert!(Value::Enum(0, "spades".to_string()).validate(&schema)); @@ -1170,6 +1174,7 @@ mod tests { "clubs".to_string(), "spades".to_string(), ], + attributes: Default::default(), }; let value = Value::Enum(0, "spades".to_string()); @@ -1218,6 +1223,7 @@ mod tests { .iter() .cloned() .collect(), + attributes: Default::default(), }; assert!(Value::Record(vec![ @@ -1237,7 +1243,7 @@ mod tests { ("b".to_string(), Value::String("foo".to_string())), ]); assert!(!value.validate(&schema)); - assert_logged("Invalid value: Record([(\"a\", Boolean(false)), (\"b\", String(\"foo\"))]) for schema: Record { name: Name { name: \"some_record\", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: \"a\", doc: None, default: None, schema: Long, order: Ascending, position: 0 }, RecordField { name: \"b\", doc: None, default: None, schema: String, order: Ascending, position: 1 }], lookup: {\"a\": 0, \"b\": 1} }. Reason: Unsupported value-schema combination"); + assert_logged("Invalid value: Record([(\"a\", Boolean(false)), (\"b\", String(\"foo\"))]) for schema: Record { name: Name { name: \"some_record\", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: \"a\", doc: None, default: None, schema: Long, order: Ascending, position: 0 }, RecordField { name: \"b\", doc: None, default: None, schema: String, order: Ascending, position: 1 }], lookup: {\"a\": 0, \"b\": 1}, attributes: {} }. Reason: Unsupported value-schema [...] let value = Value::Record(vec![ ("a".to_string(), Value::Long(42i64)), @@ -1245,7 +1251,7 @@ mod tests { ]); assert!(!value.validate(&schema)); assert_logged( - "Invalid value: Record([(\"a\", Long(42)), (\"c\", String(\"foo\"))]) for schema: Record { name: Name { name: \"some_record\", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: \"a\", doc: None, default: None, schema: Long, order: Ascending, position: 0 }, RecordField { name: \"b\", doc: None, default: None, schema: String, order: Ascending, position: 1 }], lookup: {\"a\": 0, \"b\": 1} }. Reason: There is no schema field for field 'c'" + "Invalid value: Record([(\"a\", Long(42)), (\"c\", String(\"foo\"))]) for schema: Record { name: Name { name: \"some_record\", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: \"a\", doc: None, default: None, schema: Long, order: Ascending, position: 0 }, RecordField { name: \"b\", doc: None, default: None, schema: String, order: Ascending, position: 1 }], lookup: {\"a\": 0, \"b\": 1}, attributes: {} }. Reason: There is no schema field for field 'c'" ); let value = Value::Record(vec![ @@ -1255,7 +1261,7 @@ mod tests { ]); assert!(!value.validate(&schema)); assert_logged( - r#"Invalid value: Record([("a", Long(42)), ("b", String("foo")), ("c", Null)]) for schema: Record { name: Name { name: "some_record", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "a", doc: None, default: None, schema: Long, order: Ascending, position: 0 }, RecordField { name: "b", doc: None, default: None, schema: String, order: Ascending, position: 1 }], lookup: {"a": 0, "b": 1} }. Reason: The value's records length (3) is different than the sche [...] + r#"Invalid value: Record([("a", Long(42)), ("b", String("foo")), ("c", Null)]) for schema: Record { name: Name { name: "some_record", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "a", doc: None, default: None, schema: Long, order: Ascending, position: 0 }, RecordField { name: "b", doc: None, default: None, schema: String, order: Ascending, position: 1 }], lookup: {"a": 0, "b": 1}, attributes: {} }. Reason: The value's records length (3) is differe [...] ); assert!(Value::Map( @@ -1275,7 +1281,7 @@ mod tests { ) .validate(&schema)); assert_logged( - r#"Invalid value: Map({"c": Long(123)}) for schema: Record { name: Name { name: "some_record", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "a", doc: None, default: None, schema: Long, order: Ascending, position: 0 }, RecordField { name: "b", doc: None, default: None, schema: String, order: Ascending, position: 1 }], lookup: {"a": 0, "b": 1} }. Reason: Field with name '"a"' is not a member of the map items + r#"Invalid value: Map({"c": Long(123)}) for schema: Record { name: Name { name: "some_record", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "a", doc: None, default: None, schema: Long, order: Ascending, position: 0 }, RecordField { name: "b", doc: None, default: None, schema: String, order: Ascending, position: 1 }], lookup: {"a": 0, "b": 1}, attributes: {} }. Reason: Field with name '"a"' is not a member of the map items Field with name '"b"' is not a member of the map items"#, ); @@ -1387,7 +1393,8 @@ Field with name '"b"' is not a member of the map items"#, name: Name::new("decimal").unwrap(), aliases: None, size: 20, - doc: None + doc: None, + attributes: Default::default(), }) }) .is_ok()); diff --git a/lang/rust/avro/src/writer.rs b/lang/rust/avro/src/writer.rs index 2fc42cccf..98ceafe46 100644 --- a/lang/rust/avro/src/writer.rs +++ b/lang/rust/avro/src/writer.rs @@ -686,6 +686,7 @@ mod tests { aliases: None, doc: None, size, + attributes: Default::default(), }; let value = vec![0u8; size]; logical_type_test( @@ -725,6 +726,7 @@ mod tests { aliases: None, doc: None, size: 12, + attributes: Default::default(), }; let value = Value::Duration(Duration::new( Months::new(256), diff --git a/lang/rust/avro/tests/schema.rs b/lang/rust/avro/tests/schema.rs index b4f59e553..f8bdd4a68 100644 --- a/lang/rust/avro/tests/schema.rs +++ b/lang/rust/avro/tests/schema.rs @@ -860,6 +860,7 @@ fn test_parse_reused_record_schema_by_fullname() { doc: _, ref fields, lookup: _, + attributes: _, } => { assert_eq!(name.fullname(None), "test.Weather", "Name does not match!"); diff --git a/lang/rust/avro_derive/src/lib.rs b/lang/rust/avro_derive/src/lib.rs index abba0e195..18d7ed775 100644 --- a/lang/rust/avro_derive/src/lib.rs +++ b/lang/rust/avro_derive/src/lib.rs @@ -179,6 +179,7 @@ fn get_data_struct_schema_def( doc: #record_doc, fields: schema_fields, lookup, + attributes: Default::default(), } }) } @@ -204,6 +205,7 @@ fn get_data_enum_schema_def( aliases: #enum_aliases, doc: #doc, symbols: vec![#(#symbols.to_owned()),*], + attributes: Default::default(), } }) } else {
