This is an automated email from the ASF dual-hosted git repository. mgrigorov pushed a commit to branch avro-3904-schema-compatibility-improvements in repository https://gitbox.apache.org/repos/asf/avro.git
commit 19832f2f0e198d14f6cad913663fed349cd08c75 Author: Martin Tzvetanov Grigorov <[email protected]> AuthorDate: Fri Dec 1 11:32:48 2023 +0200 AVRO-3904: [Rust] Minor improvements to the new schema compatibility changes Signed-off-by: Martin Tzvetanov Grigorov <[email protected]> --- lang/rust/avro/src/error.rs | 41 +++--- lang/rust/avro/src/schema.rs | 40 +----- lang/rust/avro/src/schema_compatibility.rs | 211 ++++++++++++++--------------- 3 files changed, 127 insertions(+), 165 deletions(-) diff --git a/lang/rust/avro/src/error.rs b/lang/rust/avro/src/error.rs index 810c5687a..13cec65c7 100644 --- a/lang/rust/avro/src/error.rs +++ b/lang/rust/avro/src/error.rs @@ -362,7 +362,7 @@ pub enum Error { DeflateCompress(#[source] std::io::Error), #[error("Failed to finish flate compressor")] - DeflateCompressFinish(std::io::Error), + DeflateCompressFinish(#[source] std::io::Error), #[error("Failed to decompress with flate")] DeflateDecompress(#[source] std::io::Error), @@ -480,45 +480,44 @@ pub enum Error { BadCodecMetadata, } -#[derive(thiserror::Error, Debug)] +#[derive(thiserror::Error, PartialEq)] pub enum CompatibilityError { - #[error("Schemas are not compatible. Writer schema is {writer_schema_type}, but reader schema is {reader_schema_type}")] + #[error("Incompatible schema types! Writer schema is '{writer_schema_type}', but reader schema is '{reader_schema_type}'")] WrongType { writer_schema_type: String, reader_schema_type: String, }, - #[error("Schemas are not compatible. The {schema_type} should have been {expected_type}")] + #[error("Incompatible schema types! The {schema_type} should have been {expected_type:?}")] TypeExpected { schema_type: String, - expected_type: String, + expected_type: &'static [SchemaKind], }, - #[error("Schemas are not compatible. Field '{0}' in reader schema does not match the type in the writer schema")] - FieldTypeMismatch(String), + #[error("Incompatible schemata! Field '{0}' in reader schema does not match the type in the writer schema")] + FieldTypeMismatch(String, #[source] Box<CompatibilityError>), - #[error("Schemas are not compatible. Schemas mismatch")] - SchemaMismatch, - - #[error("Schemas are not compatible. Field '{0}' in reader schema must have a default value")] + #[error("Incompatible schemata! Field '{0}' in reader schema must have a default value")] MissingDefaultValue(String), - #[error("Schemas are not compatible. Reader's symbols must contain all writer's symbols")] + #[error("Incompatible schemata! Reader's symbols must contain all writer's symbols")] MissingSymbols, - #[error("Schemas are not compatible. All elements in union must match for both schemas")] + #[error("Incompatible schemata! All elements in union must match for both schemas")] MissingUnionElements, - #[error("Schemas are not compatible. Name and size don't match for fixed")] + #[error("Incompatible schemata! Name and size don't match for fixed")] FixedMismatch, - #[error("Schemas are not compatible. The name must be the same for both schemas. Writer's name {writer_name} and reader's name {reader_name}")] + #[error("Incompatible schemata! The name must be the same for both schemas. Writer's name {writer_name} and reader's name {reader_name}")] NameMismatch { writer_name: String, reader_name: String, }, - #[error("Schemas are not compatible. Unknown type for '{0}'. Make sure that the type is a valid one")] + #[error( + "Incompatible schemata! Unknown type for '{0}'. Make sure that the type is a valid one" + )] Inconclusive(String), } @@ -543,3 +542,13 @@ impl fmt::Debug for Error { write!(f, "{}", msg) } } + +impl fmt::Debug for CompatibilityError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + let mut msg = self.to_string(); + if let Some(e) = self.source() { + msg.extend([": ", &e.to_string()]); + } + write!(f, "{}", msg) + } +} diff --git a/lang/rust/avro/src/schema.rs b/lang/rust/avro/src/schema.rs index 181e3fcff..23320509e 100644 --- a/lang/rust/avro/src/schema.rs +++ b/lang/rust/avro/src/schema.rs @@ -35,7 +35,7 @@ use std::{ io::Read, str::FromStr, }; -use strum_macros::{EnumDiscriminants, EnumString}; +use strum_macros::{Display, EnumDiscriminants, EnumString}; lazy_static! { static ref ENUM_SYMBOL_NAME_R: Regex = Regex::new(r"^[A-Za-z_][A-Za-z0-9_]*$").unwrap(); @@ -73,7 +73,7 @@ impl fmt::Display for SchemaFingerprint { /// Represents any valid Avro schema /// More information about Avro schemas can be found in the /// [Avro Specification](https://avro.apache.org/docs/current/spec.html#schemas) -#[derive(Clone, Debug, EnumDiscriminants)] +#[derive(Clone, Debug, Display, EnumDiscriminants)] #[strum_discriminants(name(SchemaKind), derive(Hash, Ord, PartialOrd))] pub enum Schema { /// A `null` Avro schema. @@ -206,42 +206,6 @@ impl From<&types::Value> for SchemaKind { } } -// Implement `Display` for `SchemaKind`. -impl fmt::Display for Schema { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self { - Schema::Null => write!(f, "Null"), - Schema::Boolean => write!(f, "Boolean"), - Schema::Int => write!(f, "Int"), - Schema::Long => write!(f, "Long"), - Schema::Float => write!(f, "Float"), - Schema::Double => write!(f, "Double"), - Schema::Bytes => write!(f, "Bytes"), - Schema::String => write!(f, "String"), - Schema::Array(..) => write!(f, "Array"), - Schema::Map(..) => write!(f, "Map"), - Schema::Union(..) => write!(f, "Union"), - Schema::Record(..) => write!(f, "Record"), - Schema::Enum(..) => write!(f, "Enum"), - Schema::Fixed(..) => write!(f, "Fixed"), - Schema::Decimal(..) => write!(f, "Decimal"), - Schema::BigDecimal => write!(f, "BigDecimal"), - Schema::Uuid => write!(f, "Uuid"), - Schema::Date => write!(f, "Date"), - Schema::TimeMillis => write!(f, "TimeMillis"), - Schema::TimeMicros => write!(f, "TimeMicros"), - Schema::TimestampMillis => write!(f, "TimestampMillis"), - Schema::TimestampMicros => write!(f, "TimestampMicros"), - Schema::LocalTimestampMillis => write!(f, "LocalTimestampMillis"), - Schema::LocalTimestampMicros => write!(f, "LocalTimestampMicros"), - Schema::Duration => write!(f, "Duration"), - Schema::Ref { name } => { - write!(f, "{}", name.name) - } - } - } -} - /// Represents names for `record`, `enum` and `fixed` Avro schemas. /// /// Each of these `Schema`s have a `fullname` composed of two parts: diff --git a/lang/rust/avro/src/schema_compatibility.rs b/lang/rust/avro/src/schema_compatibility.rs index 1f2f29e34..0cf713bb4 100644 --- a/lang/rust/avro/src/schema_compatibility.rs +++ b/lang/rust/avro/src/schema_compatibility.rs @@ -78,9 +78,9 @@ impl Checker { }), } } else { - Err(CompatibilityError::WrongType { - writer_schema_type: format!("{}", writers_schema), - reader_schema_type: format!("{}", readers_schema), + Err(CompatibilityError::TypeExpected { + schema_type: String::from("writers_schema"), + expected_type: &[SchemaKind::Record], }) } } @@ -94,9 +94,9 @@ impl Checker { }), } } else { - Err(CompatibilityError::WrongType { - writer_schema_type: format!("{}", writers_schema), - reader_schema_type: format!("{}", readers_schema), + Err(CompatibilityError::TypeExpected { + schema_type: String::from("writers_schema"), + expected_type: &[SchemaKind::Array], }) } } @@ -143,7 +143,7 @@ impl Checker { if w_type == SchemaKind::Union { return Err(CompatibilityError::TypeExpected { schema_type: String::from("writers_schema"), - expected_type: String::from("record"), + expected_type: &[SchemaKind::Record], }); } @@ -159,11 +159,13 @@ impl Checker { { for field in r_fields.iter() { if let Some(pos) = w_lookup.get(&field.name) { - if self - .full_match_schemas(&w_fields[*pos].schema, &field.schema) - .is_err() + if let Err(err) = + self.full_match_schemas(&w_fields[*pos].schema, &field.schema) { - return Err(CompatibilityError::FieldTypeMismatch(field.name.clone())); + return Err(CompatibilityError::FieldTypeMismatch( + field.name.clone(), + Box::new(err), + )); } } else if field.default.is_none() { return Err(CompatibilityError::MissingDefaultValue(field.name.clone())); @@ -179,29 +181,18 @@ impl Checker { writers_schema: &Schema, readers_schema: &Schema, ) -> Result<(), CompatibilityError> { - // Do not need to check the SchemaKind of reader as this function - // is only called when the readers_schema is Union - let w_type = SchemaKind::from(writers_schema); - - if w_type == SchemaKind::Union { - if let Schema::Union(u) = writers_schema { - if u.schemas - .iter() - .all(|schema| self.full_match_schemas(schema, readers_schema).is_ok()) - { - return Ok(()); - } else { - return Err(CompatibilityError::MissingUnionElements); - } + if let Schema::Union(u) = writers_schema { + if u.schemas + .iter() + .all(|schema| self.full_match_schemas(schema, readers_schema).is_ok()) + { + return Ok(()); + } else { + return Err(CompatibilityError::MissingUnionElements); } - // } else { - // return Err(Error::CompatibilityError(String::from( - // "writers_schema should have been Schema::Union", - // ))); - // } } else if let Schema::Union(u) = readers_schema { - // This check is nneded because the writer_schema can be a not union - // but the type can be contain in the union of the reeader schema + // This check is needed because the writer_schema can be not union + // but the type can be contain in the union of the reader schema // e.g. writer_schema is string and reader_schema is [string, int] if u.schemas .iter() @@ -210,7 +201,7 @@ impl Checker { return Ok(()); } } - Err(CompatibilityError::SchemaMismatch) + Err(CompatibilityError::MissingUnionElements) } fn recursion_in_progress(&mut self, writers_schema: &Schema, readers_schema: &Schema) -> bool { @@ -242,9 +233,12 @@ impl SchemaCompatibility { /// `mutual_read` performs a full, recursive check that a datum written using either /// the writers_schema or the readers_schema can be read using the other schema. - pub fn mutual_read(writers_schema: &Schema, readers_schema: &Schema) -> bool { - SchemaCompatibility::can_read(writers_schema, readers_schema).is_ok() - && SchemaCompatibility::can_read(readers_schema, writers_schema).is_ok() + pub fn mutual_read( + writers_schema: &Schema, + readers_schema: &Schema, + ) -> Result<(), CompatibilityError> { + SchemaCompatibility::can_read(writers_schema, readers_schema)?; + SchemaCompatibility::can_read(readers_schema, writers_schema) } /// `match_schemas` performs a basic check that a datum written with the @@ -283,13 +277,13 @@ impl SchemaCompatibility { } else { return Err(CompatibilityError::TypeExpected { schema_type: String::from("readers_schema"), - expected_type: String::from("record"), + expected_type: &[SchemaKind::Record], }); } } else { return Err(CompatibilityError::TypeExpected { schema_type: String::from("writers_schema"), - expected_type: String::from("record"), + expected_type: &[SchemaKind::Record], }); } } @@ -316,7 +310,7 @@ impl SchemaCompatibility { } else { return Err(CompatibilityError::TypeExpected { schema_type: String::from("writers_schema"), - expected_type: String::from("fFixed"), + expected_type: &[SchemaKind::Fixed], }); } } @@ -335,13 +329,13 @@ impl SchemaCompatibility { } else { return Err(CompatibilityError::TypeExpected { schema_type: String::from("readers_schema"), - expected_type: String::from("enum"), + expected_type: &[SchemaKind::Enum], }); } } else { return Err(CompatibilityError::TypeExpected { schema_type: String::from("writers_schema"), - expected_type: String::from("enum"), + expected_type: &[SchemaKind::Enum], }); } } @@ -352,13 +346,13 @@ impl SchemaCompatibility { } else { return Err(CompatibilityError::TypeExpected { schema_type: String::from("readers_schema"), - expected_type: String::from("map"), + expected_type: &[SchemaKind::Map], }); } } else { return Err(CompatibilityError::TypeExpected { schema_type: String::from("writers_schema"), - expected_type: String::from("map"), + expected_type: &[SchemaKind::Map], }); } } @@ -369,13 +363,13 @@ impl SchemaCompatibility { } else { return Err(CompatibilityError::TypeExpected { schema_type: String::from("readers_schema"), - expected_type: String::from("array"), + expected_type: &[SchemaKind::Array], }); } } else { return Err(CompatibilityError::TypeExpected { schema_type: String::from("writers_schema"), - expected_type: String::from("array"), + expected_type: &[SchemaKind::Array], }); } } @@ -398,7 +392,7 @@ impl SchemaCompatibility { } else { Err(CompatibilityError::TypeExpected { schema_type: String::from("readers_schema"), - expected_type: String::from("long, float or double"), + expected_type: &[SchemaKind::Long, SchemaKind::Float, SchemaKind::Double], }) } } @@ -411,7 +405,7 @@ impl SchemaCompatibility { } else { Err(CompatibilityError::TypeExpected { schema_type: String::from("readers_schema"), - expected_type: String::from("float or double"), + expected_type: &[SchemaKind::Float, SchemaKind::Double], }) } } @@ -424,7 +418,7 @@ impl SchemaCompatibility { } else { Err(CompatibilityError::TypeExpected { schema_type: String::from("readers_schema"), - expected_type: String::from("float or double"), + expected_type: &[SchemaKind::Float, SchemaKind::Double], }) } } @@ -434,7 +428,7 @@ impl SchemaCompatibility { } else { Err(CompatibilityError::TypeExpected { schema_type: String::from("readers_schema"), - expected_type: String::from("bytes"), + expected_type: &[SchemaKind::Bytes], }) } } @@ -444,7 +438,7 @@ impl SchemaCompatibility { } else { Err(CompatibilityError::TypeExpected { schema_type: String::from("readers_schema"), - expected_type: String::from("string"), + expected_type: &[SchemaKind::String], }) } } @@ -606,10 +600,9 @@ mod tests { #[test] fn test_broken() { assert_eq!( - "Schemas are not compatible. All elements in union must match for both schemas", + CompatibilityError::MissingUnionElements, SchemaCompatibility::can_read(&int_string_union_schema(), &int_union_schema()) .unwrap_err() - .to_string() ) } @@ -736,10 +729,10 @@ mod tests { "#, )?; assert!(SchemaCompatibility::can_read(&writer_schema(), &reader_schema,).is_ok()); - assert_eq!("Schemas are not compatible. Field 'oldfield2' in reader schema must have a default value", SchemaCompatibility::can_read( - &reader_schema, - &writer_schema() - ).unwrap_err().to_string()); + assert_eq!( + CompatibilityError::MissingDefaultValue(String::from("oldfield2")), + SchemaCompatibility::can_read(&reader_schema, &writer_schema()).unwrap_err() + ); Ok(()) } @@ -754,10 +747,10 @@ mod tests { "#, )?; assert!(SchemaCompatibility::can_read(&writer_schema(), &reader_schema).is_ok()); - assert_eq!("Schemas are not compatible. Field 'oldfield1' in reader schema must have a default value", SchemaCompatibility::can_read( - &reader_schema, - &writer_schema() - ).unwrap_err().to_string()); + assert_eq!( + CompatibilityError::MissingDefaultValue(String::from("oldfield1")), + SchemaCompatibility::can_read(&reader_schema, &writer_schema()).unwrap_err() + ); Ok(()) } @@ -789,10 +782,10 @@ mod tests { "#, )?; assert!(SchemaCompatibility::can_read(&writer_schema(), &reader_schema).is_ok()); - assert_eq!("Schemas are not compatible. Field 'oldfield2' in reader schema must have a default value",SchemaCompatibility::can_read( - &reader_schema, - &writer_schema() - ).unwrap_err().to_string()); + assert_eq!( + CompatibilityError::MissingDefaultValue(String::from("oldfield2")), + SchemaCompatibility::can_read(&reader_schema, &writer_schema()).unwrap_err() + ); Ok(()) } @@ -807,13 +800,14 @@ mod tests { ]} "#, )?; - assert_eq!("Schemas are not compatible. Field 'newfield1' in reader schema must have a default value", SchemaCompatibility::can_read( - &writer_schema(), - &reader_schema).unwrap_err().to_string()); - assert_eq!("Schemas are not compatible. Field 'oldfield2' in reader schema must have a default value", SchemaCompatibility::can_read( - &reader_schema, - &writer_schema() - ).unwrap_err().to_string()); + assert_eq!( + CompatibilityError::MissingDefaultValue(String::from("newfield1")), + SchemaCompatibility::can_read(&writer_schema(), &reader_schema).unwrap_err() + ); + assert_eq!( + CompatibilityError::MissingDefaultValue(String::from("oldfield2")), + SchemaCompatibility::can_read(&reader_schema, &writer_schema()).unwrap_err() + ); Ok(()) } @@ -824,10 +818,10 @@ mod tests { let invalid_reader = string_map_schema(); assert!(SchemaCompatibility::can_read(&string_array_schema(), &valid_reader).is_ok()); - assert_eq!("Schemas are not compatible. Unknown type for 'writers_schema'. Make sure that the type is a valid one", SchemaCompatibility::can_read( - &string_array_schema(), - &invalid_reader - ).unwrap_err().to_string()); + assert_eq!( + CompatibilityError::Inconclusive(String::from("writers_schema")), + SchemaCompatibility::can_read(&string_array_schema(), &invalid_reader).unwrap_err() + ); } #[test] @@ -835,10 +829,11 @@ mod tests { let valid_reader = Schema::String; assert!(SchemaCompatibility::can_read(&Schema::String, &valid_reader).is_ok()); assert_eq!( - "Schemas are not compatible. The readers_schema should have been long, float or double", - SchemaCompatibility::can_read(&Schema::Int, &Schema::String) - .unwrap_err() - .to_string() + CompatibilityError::TypeExpected { + schema_type: String::from("readers_schema"), + expected_type: &[SchemaKind::Long, SchemaKind::Float, SchemaKind::Double], + }, + SchemaCompatibility::can_read(&Schema::Int, &Schema::String).unwrap_err() ); } @@ -849,10 +844,8 @@ mod tests { let union_reader = union_schema(vec![Schema::String]); assert_eq!( - "Schemas are not compatible. All elements in union must match for both schemas", - SchemaCompatibility::can_read(&union_writer, &union_reader) - .unwrap_err() - .to_string() + CompatibilityError::MissingUnionElements, + SchemaCompatibility::can_read(&union_writer, &union_reader).unwrap_err() ); assert!(SchemaCompatibility::can_read(&union_reader, &union_writer).is_ok()); } @@ -869,15 +862,21 @@ mod tests { let int_schema = Schema::parse_str( r#" - {"type":"record", "name":"MyRecord", "namespace":"ns", "fields": [ - {"name":"field1", "type":"int"} - ]} -"#, + {"type":"record", "name":"MyRecord", "namespace":"ns", "fields": [ + {"name":"field1", "type":"int"} + ]} + "#, )?; assert_eq!( - "Schemas are not compatible. Field 'field1' in reader schema does not match the type in the writer schema", - SchemaCompatibility::can_read(&string_schema, &int_schema).unwrap_err().to_string() + CompatibilityError::FieldTypeMismatch( + "field1".to_owned(), + Box::new(CompatibilityError::TypeExpected { + schema_type: "readers_schema".to_owned(), + expected_type: &[SchemaKind::Bytes] + }) + ), + SchemaCompatibility::can_read(&string_schema, &int_schema).unwrap_err() ); Ok(()) @@ -893,10 +892,8 @@ mod tests { let enum_schema2 = Schema::parse_str(r#"{"type":"enum", "name":"MyEnum", "symbols":["A","B","C"]}"#)?; assert_eq!( - "Schemas are not compatible. Reader's symbols must contain all writer's symbols", - SchemaCompatibility::can_read(&enum_schema2, &enum_schema1) - .unwrap_err() - .to_string() + CompatibilityError::MissingSymbols, + SchemaCompatibility::can_read(&enum_schema2, &enum_schema1).unwrap_err() ); assert!(SchemaCompatibility::can_read(&enum_schema1, &enum_schema2).is_ok()); @@ -971,10 +968,8 @@ mod tests { // short name match, but no structure match let read_schema = union_schema(vec![Schema::Null, point_3d_no_default_schema()]); assert_eq!( - "Schemas are not compatible. Schemas mismatch", - SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema) - .unwrap_err() - .to_string() + CompatibilityError::MissingUnionElements, + SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema).unwrap_err() ); } @@ -988,10 +983,8 @@ mod tests { point_3d_schema(), ]); assert_eq!( - "Schemas are not compatible. Schemas mismatch", - SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema) - .unwrap_err() - .to_string() + CompatibilityError::MissingUnionElements, + SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema).unwrap_err() ); } @@ -1005,10 +998,8 @@ mod tests { point_2d_schema(), ]); assert_eq!( - "Schemas are not compatible. Schemas mismatch", - SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema) - .unwrap_err() - .to_string() + CompatibilityError::MissingUnionElements, + SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema).unwrap_err() ); } @@ -1022,10 +1013,8 @@ mod tests { point_3d_schema(), ]); assert_eq!( - "Schemas are not compatible. Schemas mismatch", - SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema) - .unwrap_err() - .to_string() + CompatibilityError::MissingUnionElements, + SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema).unwrap_err() ); } @@ -1313,7 +1302,7 @@ mod tests { {"name": "success", "type": ["null", {"type": "map", "values": "int"}], "default": null} ] }"#)?, - "Schemas are not compatible. Field 'success' in reader schema does not match the type in the writer schema" + "Incompatible schemata! Field 'success' in reader schema does not match the type in the writer schema" ), ( Schema::parse_str( @@ -1332,7 +1321,7 @@ mod tests { {"name": "max_values", "type": ["null", {"type": "array", "items": "int"}], "default": null} ] }"#)?, - "Schemas are not compatible. Field 'max_values' in reader schema does not match the type in the writer schema" + "Incompatible schemata! Field 'max_values' in reader schema does not match the type in the writer schema" ) ];
