This is an automated email from the ASF dual-hosted git repository.
clesaec pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/avro.git
The following commit(s) were added to refs/heads/main by this push:
new 1cea6907a AVRO-3904: [RUST] return a Result when checking schema
compatibility so the … (#2587)
1cea6907a is described below
commit 1cea6907a24773bdc5d7282fdd90e92b6aef0ab3
Author: Marcos Schroh <[email protected]>
AuthorDate: Tue Nov 28 10:02:49 2023 +0100
AVRO-3904: [RUST] return a Result when checking schema compatibility so the
… (#2587)
* AVRO-3904: [Rust] return a Result when checking schema compatibility so
the end users will have feedback in case or errors
Co-authored-by: Marcos Schroh <[email protected]>
---
lang/rust/avro/README.md | 4 +-
lang/rust/avro/src/error.rs | 42 +++
lang/rust/avro/src/lib.rs | 4 +-
lang/rust/avro/src/schema.rs | 36 ++
lang/rust/avro/src/schema_compatibility.rs | 524 ++++++++++++++++++++---------
5 files changed, 439 insertions(+), 171 deletions(-)
diff --git a/lang/rust/avro/README.md b/lang/rust/avro/README.md
index 07b18748f..a349847fa 100644
--- a/lang/rust/avro/README.md
+++ b/lang/rust/avro/README.md
@@ -634,7 +634,7 @@ use apache_avro::{Schema,
schema_compatibility::SchemaCompatibility};
let writers_schema = Schema::parse_str(r#"{"type": "array",
"items":"int"}"#).unwrap();
let readers_schema = Schema::parse_str(r#"{"type": "array",
"items":"long"}"#).unwrap();
-assert_eq!(true, SchemaCompatibility::can_read(&writers_schema,
&readers_schema));
+assert!(SchemaCompatibility::can_read(&writers_schema,
&readers_schema).is_ok());
```
2. Incompatible schemas (a long array schema cannot be read by an int array
schema)
@@ -647,7 +647,7 @@ use apache_avro::{Schema,
schema_compatibility::SchemaCompatibility};
let writers_schema = Schema::parse_str(r#"{"type": "array",
"items":"long"}"#).unwrap();
let readers_schema = Schema::parse_str(r#"{"type": "array",
"items":"int"}"#).unwrap();
-assert_eq!(false, SchemaCompatibility::can_read(&writers_schema,
&readers_schema));
+assert!(SchemaCompatibility::can_read(&writers_schema,
&readers_schema).is_err());
```
<!-- cargo-rdme end -->
diff --git a/lang/rust/avro/src/error.rs b/lang/rust/avro/src/error.rs
index 8fa146027..810c5687a 100644
--- a/lang/rust/avro/src/error.rs
+++ b/lang/rust/avro/src/error.rs
@@ -480,6 +480,48 @@ pub enum Error {
BadCodecMetadata,
}
+#[derive(thiserror::Error, Debug)]
+pub enum CompatibilityError {
+ #[error("Schemas are not compatible. Writer schema is
{writer_schema_type}, but reader schema is {reader_schema_type}")]
+ WrongType {
+ writer_schema_type: String,
+ reader_schema_type: String,
+ },
+
+ #[error("Schemas are not compatible. The {schema_type} should have been
{expected_type}")]
+ TypeExpected {
+ schema_type: String,
+ expected_type: String,
+ },
+
+ #[error("Schemas are not compatible. Field '{0}' in reader schema does not
match the type in the writer schema")]
+ FieldTypeMismatch(String),
+
+ #[error("Schemas are not compatible. Schemas mismatch")]
+ SchemaMismatch,
+
+ #[error("Schemas are not compatible. Field '{0}' in reader schema must
have a default value")]
+ MissingDefaultValue(String),
+
+ #[error("Schemas are not compatible. Reader's symbols must contain all
writer's symbols")]
+ MissingSymbols,
+
+ #[error("Schemas are not compatible. All elements in union must match for
both schemas")]
+ MissingUnionElements,
+
+ #[error("Schemas are not compatible. Name and size don't match for fixed")]
+ FixedMismatch,
+
+ #[error("Schemas are not compatible. The name must be the same for both
schemas. Writer's name {writer_name} and reader's name {reader_name}")]
+ NameMismatch {
+ writer_name: String,
+ reader_name: String,
+ },
+
+ #[error("Schemas are not compatible. Unknown type for '{0}'. Make sure
that the type is a valid one")]
+ Inconclusive(String),
+}
+
impl serde::ser::Error for Error {
fn custom<T: fmt::Display>(msg: T) -> Self {
Error::SerializeValue(msg.to_string())
diff --git a/lang/rust/avro/src/lib.rs b/lang/rust/avro/src/lib.rs
index b2d930068..cb49555d5 100644
--- a/lang/rust/avro/src/lib.rs
+++ b/lang/rust/avro/src/lib.rs
@@ -747,7 +747,7 @@
//!
//! let writers_schema = Schema::parse_str(r#"{"type": "array",
"items":"int"}"#).unwrap();
//! let readers_schema = Schema::parse_str(r#"{"type": "array",
"items":"long"}"#).unwrap();
-//! assert_eq!(true, SchemaCompatibility::can_read(&writers_schema,
&readers_schema));
+//! assert!(SchemaCompatibility::can_read(&writers_schema,
&readers_schema).is_ok());
//! ```
//!
//! 2. Incompatible schemas (a long array schema cannot be read by an int
array schema)
@@ -760,7 +760,7 @@
//!
//! let writers_schema = Schema::parse_str(r#"{"type": "array",
"items":"long"}"#).unwrap();
//! let readers_schema = Schema::parse_str(r#"{"type": "array",
"items":"int"}"#).unwrap();
-//! assert_eq!(false, SchemaCompatibility::can_read(&writers_schema,
&readers_schema));
+//! assert!(SchemaCompatibility::can_read(&writers_schema,
&readers_schema).is_err());
//! ```
mod codec;
diff --git a/lang/rust/avro/src/schema.rs b/lang/rust/avro/src/schema.rs
index f2487e316..181e3fcff 100644
--- a/lang/rust/avro/src/schema.rs
+++ b/lang/rust/avro/src/schema.rs
@@ -206,6 +206,42 @@ impl From<&types::Value> for SchemaKind {
}
}
+// Implement `Display` for `SchemaKind`.
+impl fmt::Display for Schema {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ match self {
+ Schema::Null => write!(f, "Null"),
+ Schema::Boolean => write!(f, "Boolean"),
+ Schema::Int => write!(f, "Int"),
+ Schema::Long => write!(f, "Long"),
+ Schema::Float => write!(f, "Float"),
+ Schema::Double => write!(f, "Double"),
+ Schema::Bytes => write!(f, "Bytes"),
+ Schema::String => write!(f, "String"),
+ Schema::Array(..) => write!(f, "Array"),
+ Schema::Map(..) => write!(f, "Map"),
+ Schema::Union(..) => write!(f, "Union"),
+ Schema::Record(..) => write!(f, "Record"),
+ Schema::Enum(..) => write!(f, "Enum"),
+ Schema::Fixed(..) => write!(f, "Fixed"),
+ Schema::Decimal(..) => write!(f, "Decimal"),
+ Schema::BigDecimal => write!(f, "BigDecimal"),
+ Schema::Uuid => write!(f, "Uuid"),
+ Schema::Date => write!(f, "Date"),
+ Schema::TimeMillis => write!(f, "TimeMillis"),
+ Schema::TimeMicros => write!(f, "TimeMicros"),
+ Schema::TimestampMillis => write!(f, "TimestampMillis"),
+ Schema::TimestampMicros => write!(f, "TimestampMicros"),
+ Schema::LocalTimestampMillis => write!(f, "LocalTimestampMillis"),
+ Schema::LocalTimestampMicros => write!(f, "LocalTimestampMicros"),
+ Schema::Duration => write!(f, "Duration"),
+ Schema::Ref { name } => {
+ write!(f, "{}", name.name)
+ }
+ }
+ }
+}
+
/// Represents names for `record`, `enum` and `fixed` Avro schemas.
///
/// Each of these `Schema`s have a `fullname` composed of two parts:
diff --git a/lang/rust/avro/src/schema_compatibility.rs
b/lang/rust/avro/src/schema_compatibility.rs
index 8a0b2a4d7..1f2f29e34 100644
--- a/lang/rust/avro/src/schema_compatibility.rs
+++ b/lang/rust/avro/src/schema_compatibility.rs
@@ -16,7 +16,10 @@
// under the License.
//! Logic for checking schema compatibility
-use crate::schema::{EnumSchema, FixedSchema, RecordSchema, Schema, SchemaKind};
+use crate::{
+ error::CompatibilityError,
+ schema::{EnumSchema, FixedSchema, RecordSchema, Schema, SchemaKind},
+};
use std::{
collections::{hash_map::DefaultHasher, HashSet},
hash::Hasher,
@@ -37,7 +40,11 @@ impl Checker {
}
}
- pub(crate) fn can_read(&mut self, writers_schema: &Schema, readers_schema:
&Schema) -> bool {
+ pub(crate) fn can_read(
+ &mut self,
+ writers_schema: &Schema,
+ readers_schema: &Schema,
+ ) -> Result<(), CompatibilityError> {
self.full_match_schemas(writers_schema, readers_schema)
}
@@ -45,44 +52,52 @@ impl Checker {
&mut self,
writers_schema: &Schema,
readers_schema: &Schema,
- ) -> bool {
+ ) -> Result<(), CompatibilityError> {
if self.recursion_in_progress(writers_schema, readers_schema) {
- return true;
+ return Ok(());
}
- if !SchemaCompatibility::match_schemas(writers_schema, readers_schema)
{
- return false;
- }
+ SchemaCompatibility::match_schemas(writers_schema, readers_schema)?;
let w_type = SchemaKind::from(writers_schema);
let r_type = SchemaKind::from(readers_schema);
if w_type != SchemaKind::Union && (r_type.is_primitive() || r_type ==
SchemaKind::Fixed) {
- return true;
+ return Ok(());
}
match r_type {
SchemaKind::Record => self.match_record_schemas(writers_schema,
readers_schema),
SchemaKind::Map => {
if let Schema::Map(w_m) = writers_schema {
- if let Schema::Map(r_m) = readers_schema {
- self.full_match_schemas(w_m, r_m)
- } else {
- unreachable!("readers_schema should have been
Schema::Map")
+ match readers_schema {
+ Schema::Map(r_m) => self.full_match_schemas(w_m, r_m),
+ _ => Err(CompatibilityError::WrongType {
+ writer_schema_type: format!("{}", writers_schema),
+ reader_schema_type: format!("{}", readers_schema),
+ }),
}
} else {
- unreachable!("writers_schema should have been Schema::Map")
+ Err(CompatibilityError::WrongType {
+ writer_schema_type: format!("{}", writers_schema),
+ reader_schema_type: format!("{}", readers_schema),
+ })
}
}
SchemaKind::Array => {
if let Schema::Array(w_a) = writers_schema {
- if let Schema::Array(r_a) = readers_schema {
- self.full_match_schemas(w_a, r_a)
- } else {
- unreachable!("readers_schema should have been
Schema::Array")
+ match readers_schema {
+ Schema::Array(r_a) => self.full_match_schemas(w_a,
r_a),
+ _ => Err(CompatibilityError::WrongType {
+ writer_schema_type: format!("{}", writers_schema),
+ reader_schema_type: format!("{}", readers_schema),
+ }),
}
} else {
- unreachable!("writers_schema should have been
Schema::Array")
+ Err(CompatibilityError::WrongType {
+ writer_schema_type: format!("{}", writers_schema),
+ reader_schema_type: format!("{}", readers_schema),
+ })
}
}
SchemaKind::Union => self.match_union_schemas(writers_schema,
readers_schema),
@@ -96,10 +111,12 @@ impl Checker {
symbols: r_symbols, ..
}) = readers_schema
{
- return !w_symbols.iter().any(|e|
!r_symbols.contains(e));
+ if w_symbols.iter().all(|e| r_symbols.contains(e)) {
+ return Ok(());
+ }
}
}
- false
+ Err(CompatibilityError::MissingSymbols)
}
_ => {
if w_type == SchemaKind::Union {
@@ -109,16 +126,25 @@ impl Checker {
}
}
}
- false
+ Err(CompatibilityError::Inconclusive(String::from(
+ "writers_schema",
+ )))
}
}
}
- fn match_record_schemas(&mut self, writers_schema: &Schema,
readers_schema: &Schema) -> bool {
+ fn match_record_schemas(
+ &mut self,
+ writers_schema: &Schema,
+ readers_schema: &Schema,
+ ) -> Result<(), CompatibilityError> {
let w_type = SchemaKind::from(writers_schema);
if w_type == SchemaKind::Union {
- return false;
+ return Err(CompatibilityError::TypeExpected {
+ schema_type: String::from("writers_schema"),
+ expected_type: String::from("record"),
+ });
}
if let Schema::Record(RecordSchema {
@@ -133,39 +159,58 @@ impl Checker {
{
for field in r_fields.iter() {
if let Some(pos) = w_lookup.get(&field.name) {
- if !self.full_match_schemas(&w_fields[*pos].schema,
&field.schema) {
- return false;
+ if self
+ .full_match_schemas(&w_fields[*pos].schema,
&field.schema)
+ .is_err()
+ {
+ return
Err(CompatibilityError::FieldTypeMismatch(field.name.clone()));
}
} else if field.default.is_none() {
- return false;
+ return
Err(CompatibilityError::MissingDefaultValue(field.name.clone()));
}
}
}
}
- true
+ Ok(())
}
- fn match_union_schemas(&mut self, writers_schema: &Schema, readers_schema:
&Schema) -> bool {
+ fn match_union_schemas(
+ &mut self,
+ writers_schema: &Schema,
+ readers_schema: &Schema,
+ ) -> Result<(), CompatibilityError> {
+ // Do not need to check the SchemaKind of reader as this function
+ // is only called when the readers_schema is Union
let w_type = SchemaKind::from(writers_schema);
- let r_type = SchemaKind::from(readers_schema);
-
- assert_eq!(r_type, SchemaKind::Union);
if w_type == SchemaKind::Union {
if let Schema::Union(u) = writers_schema {
- u.schemas
+ if u.schemas
.iter()
- .all(|schema| self.full_match_schemas(schema,
readers_schema))
- } else {
- unreachable!("writers_schema should have been Schema::Union")
+ .all(|schema| self.full_match_schemas(schema,
readers_schema).is_ok())
+ {
+ return Ok(());
+ } else {
+ return Err(CompatibilityError::MissingUnionElements);
+ }
}
+ // } else {
+ // return Err(Error::CompatibilityError(String::from(
+ // "writers_schema should have been Schema::Union",
+ // )));
+ // }
} else if let Schema::Union(u) = readers_schema {
- u.schemas
+ // This check is nneded because the writer_schema can be a not
union
+ // but the type can be contain in the union of the reeader schema
+ // e.g. writer_schema is string and reader_schema is [string, int]
+ if u.schemas
.iter()
- .any(|schema| self.full_match_schemas(writers_schema, schema))
- } else {
- unreachable!("readers_schema should have been Schema::Union")
+ .any(|schema| self.full_match_schemas(writers_schema,
schema).is_ok())
+ {
+ return Ok(());
+ }
}
+ Err(CompatibilityError::SchemaMismatch)
}
fn recursion_in_progress(&mut self, writers_schema: &Schema,
readers_schema: &Schema) -> bool {
@@ -187,7 +232,10 @@ impl Checker {
impl SchemaCompatibility {
/// `can_read` performs a full, recursive check that a datum written using
the
/// writers_schema can be read using the readers_schema.
- pub fn can_read(writers_schema: &Schema, readers_schema: &Schema) -> bool {
+ pub fn can_read(
+ writers_schema: &Schema,
+ readers_schema: &Schema,
+ ) -> Result<(), CompatibilityError> {
let mut c = Checker::new();
c.can_read(writers_schema, readers_schema)
}
@@ -195,8 +243,8 @@ impl SchemaCompatibility {
/// `mutual_read` performs a full, recursive check that a datum written
using either
/// the writers_schema or the readers_schema can be read using the other
schema.
pub fn mutual_read(writers_schema: &Schema, readers_schema: &Schema) ->
bool {
- SchemaCompatibility::can_read(writers_schema, readers_schema)
- && SchemaCompatibility::can_read(readers_schema, writers_schema)
+ SchemaCompatibility::can_read(writers_schema, readers_schema).is_ok()
+ && SchemaCompatibility::can_read(readers_schema,
writers_schema).is_ok()
}
/// `match_schemas` performs a basic check that a datum written with the
@@ -204,29 +252,45 @@ impl SchemaCompatibility {
/// matching the types, including schema promotion, and matching the full
name for
/// named types. Aliases for named types are not supported here, and the
rust
/// implementation of Avro in general does not include support for
aliases (I think).
- pub(crate) fn match_schemas(writers_schema: &Schema, readers_schema:
&Schema) -> bool {
+ pub(crate) fn match_schemas(
+ writers_schema: &Schema,
+ readers_schema: &Schema,
+ ) -> Result<(), CompatibilityError> {
let w_type = SchemaKind::from(writers_schema);
let r_type = SchemaKind::from(readers_schema);
if w_type == SchemaKind::Union || r_type == SchemaKind::Union {
- return true;
+ return Ok(());
}
if w_type == r_type {
if r_type.is_primitive() {
- return true;
+ return Ok(());
}
match r_type {
SchemaKind::Record => {
if let Schema::Record(RecordSchema { name: w_name, .. }) =
writers_schema {
if let Schema::Record(RecordSchema { name: r_name, ..
}) = readers_schema {
- return w_name.name == r_name.name;
+ if w_name.name == r_name.name {
+ return Ok(());
+ } else {
+ return Err(CompatibilityError::NameMismatch {
+ writer_name: w_name.name.clone(),
+ reader_name: r_name.name.clone(),
+ });
+ }
} else {
- unreachable!("readers_schema should have been
Schema::Record")
+ return Err(CompatibilityError::TypeExpected {
+ schema_type: String::from("readers_schema"),
+ expected_type: String::from("record"),
+ });
}
} else {
- unreachable!("writers_schema should have been
Schema::Record")
+ return Err(CompatibilityError::TypeExpected {
+ schema_type: String::from("writers_schema"),
+ expected_type: String::from("record"),
+ });
}
}
SchemaKind::Fixed => {
@@ -246,23 +310,39 @@ impl SchemaCompatibility {
attributes: _,
}) = readers_schema
{
- return w_name.name == r_name.name && w_size ==
r_size;
+ return (w_name.name == r_name.name && w_size ==
r_size)
+ .then_some(())
+ .ok_or(CompatibilityError::FixedMismatch);
} else {
- unreachable!("readers_schema should have been
Schema::Fixed")
+ return Err(CompatibilityError::TypeExpected {
+ schema_type: String::from("writers_schema"),
+ expected_type: String::from("fFixed"),
+ });
}
- } else {
- unreachable!("writers_schema should have been
Schema::Fixed")
}
}
SchemaKind::Enum => {
if let Schema::Enum(EnumSchema { name: w_name, .. }) =
writers_schema {
if let Schema::Enum(EnumSchema { name: r_name, .. }) =
readers_schema {
- return w_name.name == r_name.name;
+ if w_name.name == r_name.name {
+ return Ok(());
+ } else {
+ return Err(CompatibilityError::NameMismatch {
+ writer_name: w_name.name.clone(),
+ reader_name: r_name.name.clone(),
+ });
+ }
} else {
- unreachable!("readers_schema should have been
Schema::Enum")
+ return Err(CompatibilityError::TypeExpected {
+ schema_type: String::from("readers_schema"),
+ expected_type: String::from("enum"),
+ });
}
} else {
- unreachable!("writers_schema should have been
Schema::Enum")
+ return Err(CompatibilityError::TypeExpected {
+ schema_type: String::from("writers_schema"),
+ expected_type: String::from("enum"),
+ });
}
}
SchemaKind::Map => {
@@ -270,10 +350,16 @@ impl SchemaCompatibility {
if let Schema::Map(r_m) = readers_schema {
return SchemaCompatibility::match_schemas(w_m,
r_m);
} else {
- unreachable!("readers_schema should have been
Schema::Map")
+ return Err(CompatibilityError::TypeExpected {
+ schema_type: String::from("readers_schema"),
+ expected_type: String::from("map"),
+ });
}
} else {
- unreachable!("writers_schema should have been
Schema::Map")
+ return Err(CompatibilityError::TypeExpected {
+ schema_type: String::from("writers_schema"),
+ expected_type: String::from("map"),
+ });
}
}
SchemaKind::Array => {
@@ -281,45 +367,91 @@ impl SchemaCompatibility {
if let Schema::Array(r_a) = readers_schema {
return SchemaCompatibility::match_schemas(w_a,
r_a);
} else {
- unreachable!("readers_schema should have been
Schema::Array")
+ return Err(CompatibilityError::TypeExpected {
+ schema_type: String::from("readers_schema"),
+ expected_type: String::from("array"),
+ });
}
} else {
- unreachable!("writers_schema should have been
Schema::Array")
+ return Err(CompatibilityError::TypeExpected {
+ schema_type: String::from("writers_schema"),
+ expected_type: String::from("array"),
+ });
}
}
- _ => (),
+ _ => {
+ return Err(CompatibilityError::Inconclusive(String::from(
+ "readers_schema",
+ )))
+ }
};
}
- if w_type == SchemaKind::Int
- && [SchemaKind::Long, SchemaKind::Float, SchemaKind::Double]
- .iter()
- .any(|&t| t == r_type)
- {
- return true;
- }
-
- if w_type == SchemaKind::Long
- && [SchemaKind::Float, SchemaKind::Double]
- .iter()
- .any(|&t| t == r_type)
- {
- return true;
- }
-
- if w_type == SchemaKind::Float && r_type == SchemaKind::Double {
- return true;
- }
-
- if w_type == SchemaKind::String && r_type == SchemaKind::Bytes {
- return true;
- }
-
- if w_type == SchemaKind::Bytes && r_type == SchemaKind::String {
- return true;
+ // Here are the checks for primitive types
+ match w_type {
+ SchemaKind::Int => {
+ if [SchemaKind::Long, SchemaKind::Float, SchemaKind::Double]
+ .iter()
+ .any(|&t| t == r_type)
+ {
+ Ok(())
+ } else {
+ Err(CompatibilityError::TypeExpected {
+ schema_type: String::from("readers_schema"),
+ expected_type: String::from("long, float or double"),
+ })
+ }
+ }
+ SchemaKind::Long => {
+ if [SchemaKind::Float, SchemaKind::Double]
+ .iter()
+ .any(|&t| t == r_type)
+ {
+ Ok(())
+ } else {
+ Err(CompatibilityError::TypeExpected {
+ schema_type: String::from("readers_schema"),
+ expected_type: String::from("float or double"),
+ })
+ }
+ }
+ SchemaKind::Float => {
+ if [SchemaKind::Float, SchemaKind::Double]
+ .iter()
+ .any(|&t| t == r_type)
+ {
+ Ok(())
+ } else {
+ Err(CompatibilityError::TypeExpected {
+ schema_type: String::from("readers_schema"),
+ expected_type: String::from("float or double"),
+ })
+ }
+ }
+ SchemaKind::String => {
+ if r_type == SchemaKind::Bytes {
+ Ok(())
+ } else {
+ Err(CompatibilityError::TypeExpected {
+ schema_type: String::from("readers_schema"),
+ expected_type: String::from("bytes"),
+ })
+ }
+ }
+ SchemaKind::Bytes => {
+ if r_type == SchemaKind::String {
+ Ok(())
+ } else {
+ Err(CompatibilityError::TypeExpected {
+ schema_type: String::from("readers_schema"),
+ expected_type: String::from("string"),
+ })
+ }
+ }
+ _ => Err(CompatibilityError::Inconclusive(String::from(
+ "writers_schema",
+ ))),
}
-
- false
}
}
@@ -473,10 +605,12 @@ mod tests {
#[test]
fn test_broken() {
- assert!(!SchemaCompatibility::can_read(
- &int_string_union_schema(),
- &int_union_schema()
- ))
+ assert_eq!(
+ "Schemas are not compatible. All elements in union must match for
both schemas",
+ SchemaCompatibility::can_read(&int_string_union_schema(),
&int_union_schema())
+ .unwrap_err()
+ .to_string()
+ )
}
#[test]
@@ -526,9 +660,9 @@ mod tests {
(nested_record(), nested_optional_record()),
];
- assert!(!incompatible_schemas
+ assert!(incompatible_schemas
.iter()
- .any(|(reader, writer)| SchemaCompatibility::can_read(writer,
reader)));
+ .any(|(reader, writer)| SchemaCompatibility::can_read(writer,
reader).is_err()));
}
#[test]
@@ -577,7 +711,7 @@ mod tests {
assert!(compatible_schemas
.iter()
- .all(|(reader, writer)| SchemaCompatibility::can_read(writer,
reader)));
+ .all(|(reader, writer)| SchemaCompatibility::can_read(writer,
reader).is_ok()));
}
fn writer_schema() -> Schema {
@@ -601,14 +735,11 @@ mod tests {
]}
"#,
)?;
- assert!(SchemaCompatibility::can_read(
- &writer_schema(),
- &reader_schema,
- ));
- assert!(!SchemaCompatibility::can_read(
+ assert!(SchemaCompatibility::can_read(&writer_schema(),
&reader_schema,).is_ok());
+ assert_eq!("Schemas are not compatible. Field 'oldfield2' in reader
schema must have a default value", SchemaCompatibility::can_read(
&reader_schema,
&writer_schema()
- ));
+ ).unwrap_err().to_string());
Ok(())
}
@@ -622,14 +753,11 @@ mod tests {
]}
"#,
)?;
- assert!(SchemaCompatibility::can_read(
- &writer_schema(),
- &reader_schema
- ));
- assert!(!SchemaCompatibility::can_read(
+ assert!(SchemaCompatibility::can_read(&writer_schema(),
&reader_schema).is_ok());
+ assert_eq!("Schemas are not compatible. Field 'oldfield1' in reader
schema must have a default value", SchemaCompatibility::can_read(
&reader_schema,
&writer_schema()
- ));
+ ).unwrap_err().to_string());
Ok(())
}
@@ -644,14 +772,8 @@ mod tests {
]}
"#,
)?;
- assert!(SchemaCompatibility::can_read(
- &writer_schema(),
- &reader_schema
- ));
- assert!(SchemaCompatibility::can_read(
- &reader_schema,
- &writer_schema()
- ));
+ assert!(SchemaCompatibility::can_read(&writer_schema(),
&reader_schema).is_ok());
+ assert!(SchemaCompatibility::can_read(&reader_schema,
&writer_schema()).is_ok());
Ok(())
}
@@ -666,14 +788,11 @@ mod tests {
]}
"#,
)?;
- assert!(SchemaCompatibility::can_read(
- &writer_schema(),
- &reader_schema
- ));
- assert!(!SchemaCompatibility::can_read(
+ assert!(SchemaCompatibility::can_read(&writer_schema(),
&reader_schema).is_ok());
+ assert_eq!("Schemas are not compatible. Field 'oldfield2' in reader
schema must have a default value",SchemaCompatibility::can_read(
&reader_schema,
&writer_schema()
- ));
+ ).unwrap_err().to_string());
Ok(())
}
@@ -688,14 +807,13 @@ mod tests {
]}
"#,
)?;
- assert!(!SchemaCompatibility::can_read(
+ assert_eq!("Schemas are not compatible. Field 'newfield1' in reader
schema must have a default value", SchemaCompatibility::can_read(
&writer_schema(),
- &reader_schema
- ));
- assert!(!SchemaCompatibility::can_read(
+ &reader_schema).unwrap_err().to_string());
+ assert_eq!("Schemas are not compatible. Field 'oldfield2' in reader
schema must have a default value", SchemaCompatibility::can_read(
&reader_schema,
&writer_schema()
- ));
+ ).unwrap_err().to_string());
Ok(())
}
@@ -705,27 +823,23 @@ mod tests {
let valid_reader = string_array_schema();
let invalid_reader = string_map_schema();
- assert!(SchemaCompatibility::can_read(
- &string_array_schema(),
- &valid_reader
- ));
- assert!(!SchemaCompatibility::can_read(
+ assert!(SchemaCompatibility::can_read(&string_array_schema(),
&valid_reader).is_ok());
+ assert_eq!("Schemas are not compatible. Unknown type for
'writers_schema'. Make sure that the type is a valid one",
SchemaCompatibility::can_read(
&string_array_schema(),
&invalid_reader
- ));
+ ).unwrap_err().to_string());
}
#[test]
fn test_primitive_writer_schema() {
let valid_reader = Schema::String;
- assert!(SchemaCompatibility::can_read(
- &Schema::String,
- &valid_reader
- ));
- assert!(!SchemaCompatibility::can_read(
- &Schema::Int,
- &Schema::String
- ));
+ assert!(SchemaCompatibility::can_read(&Schema::String,
&valid_reader).is_ok());
+ assert_eq!(
+ "Schemas are not compatible. The readers_schema should have been
long, float or double",
+ SchemaCompatibility::can_read(&Schema::Int, &Schema::String)
+ .unwrap_err()
+ .to_string()
+ );
}
#[test]
@@ -734,8 +848,13 @@ mod tests {
let union_writer = union_schema(vec![Schema::Int, Schema::String]);
let union_reader = union_schema(vec![Schema::String]);
- assert!(!SchemaCompatibility::can_read(&union_writer, &union_reader));
- assert!(SchemaCompatibility::can_read(&union_reader, &union_writer));
+ assert_eq!(
+ "Schemas are not compatible. All elements in union must match for
both schemas",
+ SchemaCompatibility::can_read(&union_writer, &union_reader)
+ .unwrap_err()
+ .to_string()
+ );
+ assert!(SchemaCompatibility::can_read(&union_reader,
&union_writer).is_ok());
}
#[test]
@@ -756,7 +875,10 @@ mod tests {
"#,
)?;
- assert!(!SchemaCompatibility::can_read(&string_schema, &int_schema));
+ assert_eq!(
+ "Schemas are not compatible. Field 'field1' in reader schema does
not match the type in the writer schema",
+ SchemaCompatibility::can_read(&string_schema,
&int_schema).unwrap_err().to_string()
+ );
Ok(())
}
@@ -770,8 +892,13 @@ mod tests {
)?;
let enum_schema2 =
Schema::parse_str(r#"{"type":"enum", "name":"MyEnum",
"symbols":["A","B","C"]}"#)?;
- assert!(!SchemaCompatibility::can_read(&enum_schema2, &enum_schema1));
- assert!(SchemaCompatibility::can_read(&enum_schema1, &enum_schema2));
+ assert_eq!(
+ "Schemas are not compatible. Reader's symbols must contain all
writer's symbols",
+ SchemaCompatibility::can_read(&enum_schema2, &enum_schema1)
+ .unwrap_err()
+ .to_string()
+ );
+ assert!(SchemaCompatibility::can_read(&enum_schema1,
&enum_schema2).is_ok());
Ok(())
}
@@ -843,10 +970,12 @@ mod tests {
fn test_union_resolution_no_structure_match() {
// short name match, but no structure match
let read_schema = union_schema(vec![Schema::Null,
point_3d_no_default_schema()]);
- assert!(!SchemaCompatibility::can_read(
- &point_2d_fullname_schema(),
- &read_schema
- ));
+ assert_eq!(
+ "Schemas are not compatible. Schemas mismatch",
+ SchemaCompatibility::can_read(&point_2d_fullname_schema(),
&read_schema)
+ .unwrap_err()
+ .to_string()
+ );
}
#[test]
@@ -858,10 +987,12 @@ mod tests {
point_2d_schema(),
point_3d_schema(),
]);
- assert!(!SchemaCompatibility::can_read(
- &point_2d_fullname_schema(),
- &read_schema
- ));
+ assert_eq!(
+ "Schemas are not compatible. Schemas mismatch",
+ SchemaCompatibility::can_read(&point_2d_fullname_schema(),
&read_schema)
+ .unwrap_err()
+ .to_string()
+ );
}
#[test]
@@ -873,10 +1004,12 @@ mod tests {
point_3d_schema(),
point_2d_schema(),
]);
- assert!(!SchemaCompatibility::can_read(
- &point_2d_fullname_schema(),
- &read_schema
- ));
+ assert_eq!(
+ "Schemas are not compatible. Schemas mismatch",
+ SchemaCompatibility::can_read(&point_2d_fullname_schema(),
&read_schema)
+ .unwrap_err()
+ .to_string()
+ );
}
#[test]
@@ -888,10 +1021,12 @@ mod tests {
point_3d_match_name_schema(),
point_3d_schema(),
]);
- assert!(!SchemaCompatibility::can_read(
- &point_2d_fullname_schema(),
- &read_schema
- ));
+ assert_eq!(
+ "Schemas are not compatible. Schemas mismatch",
+ SchemaCompatibility::can_read(&point_2d_fullname_schema(),
&read_schema)
+ .unwrap_err()
+ .to_string()
+ );
}
#[test]
@@ -904,10 +1039,7 @@ mod tests {
point_3d_schema(),
point_2d_fullname_schema(),
]);
- assert!(SchemaCompatibility::can_read(
- &point_2d_fullname_schema(),
- &read_schema
- ));
+ assert!(SchemaCompatibility::can_read(&point_2d_fullname_schema(),
&read_schema).is_ok());
}
#[test]
@@ -1078,7 +1210,7 @@ mod tests {
let schema_v1 = Schema::parse_str(RAW_SCHEMA_V1)?;
let schema_v2 = Schema::parse_str(RAW_SCHEMA_V2)?;
- assert!(SchemaCompatibility::can_read(&schema_v1, &schema_v2));
+ assert!(SchemaCompatibility::can_read(&schema_v1, &schema_v2).is_ok());
Ok(())
}
@@ -1153,7 +1285,65 @@ mod tests {
];
for (schema_1, schema_2) in schemas {
- assert!(SchemaCompatibility::can_read(&schema_1, &schema_2));
+ assert!(SchemaCompatibility::can_read(&schema_1,
&schema_2).is_ok());
+ }
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_can_read_compatibility_errors() -> TestResult {
+ let schemas = [
+ (
+ Schema::parse_str(
+ r#"{
+ "type": "record",
+ "name": "StatisticsMap",
+ "fields": [
+ {"name": "average", "type": "int", "default": 0},
+ {"name": "success", "type": {"type": "map", "values":
"int"}}
+ ]
+ }"#)?,
+ Schema::parse_str(
+ r#"{
+ "type": "record",
+ "name": "StatisticsMap",
+ "fields": [
+ {"name": "average", "type": "int", "default": 0},
+ {"name": "success", "type": ["null", {"type": "map",
"values": "int"}], "default": null}
+ ]
+ }"#)?,
+ "Schemas are not compatible. Field 'success' in reader schema
does not match the type in the writer schema"
+ ),
+ (
+ Schema::parse_str(
+ r#"{
+ "type": "record",
+ "name": "StatisticsArray",
+ "fields": [
+ {"name": "max_values", "type": {"type": "array",
"items": "int"}}
+ ]
+ }"#)?,
+ Schema::parse_str(
+ r#"{
+ "type": "record",
+ "name": "StatisticsArray",
+ "fields": [
+ {"name": "max_values", "type": ["null", {"type":
"array", "items": "int"}], "default": null}
+ ]
+ }"#)?,
+ "Schemas are not compatible. Field 'max_values' in reader
schema does not match the type in the writer schema"
+ )
+ ];
+
+ for (schema_1, schema_2, error) in schemas {
+ assert!(SchemaCompatibility::can_read(&schema_1,
&schema_2).is_ok());
+ assert_eq!(
+ error,
+ SchemaCompatibility::can_read(&schema_2, &schema_1)
+ .unwrap_err()
+ .to_string()
+ );
}
Ok(())