This is an automated email from the ASF dual-hosted git repository.
mgrigorov pushed a commit to branch branch-1.11
in repository https://gitbox.apache.org/repos/asf/avro.git
The following commit(s) were added to refs/heads/branch-1.11 by this push:
new 9b54d1b6b AVRO-3904: [Rust] Minor improvements to the new schema
compatibility changes (#2600)
9b54d1b6b is described below
commit 9b54d1b6bf2f1939fce7437d6db0f6b03fcb211a
Author: Martin Grigorov <[email protected]>
AuthorDate: Mon Dec 4 13:12:08 2023 +0200
AVRO-3904: [Rust] Minor improvements to the new schema compatibility
changes (#2600)
* AVRO-3904: [Rust] Minor improvements to the new schema compatibility
changes
Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
* AVRO-3904: [Rust] Use `Debug` instead of `Display` when printing schemata
in CompatibilityError
Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
---------
Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
(cherry picked from commit efd3b2acf24cd4036b4d0362c8a583218caf209d)
Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
---
lang/rust/avro/src/error.rs | 53 ++-
lang/rust/avro/src/schema_compatibility.rs | 553 +++++++++++++++++++----------
2 files changed, 418 insertions(+), 188 deletions(-)
diff --git a/lang/rust/avro/src/error.rs b/lang/rust/avro/src/error.rs
index 30a192069..a7960656c 100644
--- a/lang/rust/avro/src/error.rs
+++ b/lang/rust/avro/src/error.rs
@@ -362,7 +362,7 @@ pub enum Error {
DeflateCompress(#[source] std::io::Error),
#[error("Failed to finish flate compressor")]
- DeflateCompressFinish(std::io::Error),
+ DeflateCompressFinish(#[source] std::io::Error),
#[error("Failed to decompress with flate")]
DeflateDecompress(#[source] std::io::Error),
@@ -480,6 +480,47 @@ pub enum Error {
BadCodecMetadata,
}
+#[derive(thiserror::Error, PartialEq)]
+pub enum CompatibilityError {
+ #[error("Incompatible schema types! Writer schema is
'{writer_schema_type}', but reader schema is '{reader_schema_type}'")]
+ WrongType {
+ writer_schema_type: String,
+ reader_schema_type: String,
+ },
+
+ #[error("Incompatible schema types! The {schema_type} should have been
{expected_type:?}")]
+ TypeExpected {
+ schema_type: String,
+ expected_type: &'static [SchemaKind],
+ },
+
+ #[error("Incompatible schemata! Field '{0}' in reader schema does not
match the type in the writer schema")]
+ FieldTypeMismatch(String, #[source] Box<CompatibilityError>),
+
+ #[error("Incompatible schemata! Field '{0}' in reader schema must have a
default value")]
+ MissingDefaultValue(String),
+
+ #[error("Incompatible schemata! Reader's symbols must contain all writer's
symbols")]
+ MissingSymbols,
+
+ #[error("Incompatible schemata! All elements in union must match for both
schemas")]
+ MissingUnionElements,
+
+ #[error("Incompatible schemata! Name and size don't match for fixed")]
+ FixedMismatch,
+
+ #[error("Incompatible schemata! The name must be the same for both
schemas. Writer's name {writer_name} and reader's name {reader_name}")]
+ NameMismatch {
+ writer_name: String,
+ reader_name: String,
+ },
+
+ #[error(
+ "Incompatible schemata! Unknown type for '{0}'. Make sure that the
type is a valid one"
+ )]
+ Inconclusive(String),
+}
+
impl serde::ser::Error for Error {
fn custom<T: fmt::Display>(msg: T) -> Self {
Error::SerializeValue(msg.to_string())
@@ -501,3 +542,13 @@ impl fmt::Debug for Error {
write!(f, "{}", msg)
}
}
+
+impl fmt::Debug for CompatibilityError {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ let mut msg = self.to_string();
+ if let Some(e) = self.source() {
+ msg.extend([": ", &e.to_string()]);
+ }
+ write!(f, "{}", msg)
+ }
+}
diff --git a/lang/rust/avro/src/schema_compatibility.rs
b/lang/rust/avro/src/schema_compatibility.rs
index 8a0b2a4d7..861f31aac 100644
--- a/lang/rust/avro/src/schema_compatibility.rs
+++ b/lang/rust/avro/src/schema_compatibility.rs
@@ -16,7 +16,10 @@
// under the License.
//! Logic for checking schema compatibility
-use crate::schema::{EnumSchema, FixedSchema, RecordSchema, Schema, SchemaKind};
+use crate::{
+ error::CompatibilityError,
+ schema::{EnumSchema, FixedSchema, RecordSchema, Schema, SchemaKind},
+};
use std::{
collections::{hash_map::DefaultHasher, HashSet},
hash::Hasher,
@@ -37,7 +40,11 @@ impl Checker {
}
}
- pub(crate) fn can_read(&mut self, writers_schema: &Schema, readers_schema:
&Schema) -> bool {
+ pub(crate) fn can_read(
+ &mut self,
+ writers_schema: &Schema,
+ readers_schema: &Schema,
+ ) -> Result<(), CompatibilityError> {
self.full_match_schemas(writers_schema, readers_schema)
}
@@ -45,44 +52,52 @@ impl Checker {
&mut self,
writers_schema: &Schema,
readers_schema: &Schema,
- ) -> bool {
+ ) -> Result<(), CompatibilityError> {
if self.recursion_in_progress(writers_schema, readers_schema) {
- return true;
+ return Ok(());
}
- if !SchemaCompatibility::match_schemas(writers_schema, readers_schema)
{
- return false;
- }
+ SchemaCompatibility::match_schemas(writers_schema, readers_schema)?;
let w_type = SchemaKind::from(writers_schema);
let r_type = SchemaKind::from(readers_schema);
if w_type != SchemaKind::Union && (r_type.is_primitive() || r_type ==
SchemaKind::Fixed) {
- return true;
+ return Ok(());
}
match r_type {
SchemaKind::Record => self.match_record_schemas(writers_schema,
readers_schema),
SchemaKind::Map => {
if let Schema::Map(w_m) = writers_schema {
- if let Schema::Map(r_m) = readers_schema {
- self.full_match_schemas(w_m, r_m)
- } else {
- unreachable!("readers_schema should have been
Schema::Map")
+ match readers_schema {
+ Schema::Map(r_m) => self.full_match_schemas(w_m, r_m),
+ _ => Err(CompatibilityError::WrongType {
+ writer_schema_type: format!("{:#?}",
writers_schema),
+ reader_schema_type: format!("{:#?}",
readers_schema),
+ }),
}
} else {
- unreachable!("writers_schema should have been Schema::Map")
+ Err(CompatibilityError::TypeExpected {
+ schema_type: String::from("writers_schema"),
+ expected_type: &[SchemaKind::Record],
+ })
}
}
SchemaKind::Array => {
if let Schema::Array(w_a) = writers_schema {
- if let Schema::Array(r_a) = readers_schema {
- self.full_match_schemas(w_a, r_a)
- } else {
- unreachable!("readers_schema should have been
Schema::Array")
+ match readers_schema {
+ Schema::Array(r_a) => self.full_match_schemas(w_a,
r_a),
+ _ => Err(CompatibilityError::WrongType {
+ writer_schema_type: format!("{:#?}",
writers_schema),
+ reader_schema_type: format!("{:#?}",
readers_schema),
+ }),
}
} else {
- unreachable!("writers_schema should have been
Schema::Array")
+ Err(CompatibilityError::TypeExpected {
+ schema_type: String::from("writers_schema"),
+ expected_type: &[SchemaKind::Array],
+ })
}
}
SchemaKind::Union => self.match_union_schemas(writers_schema,
readers_schema),
@@ -96,10 +111,12 @@ impl Checker {
symbols: r_symbols, ..
}) = readers_schema
{
- return !w_symbols.iter().any(|e|
!r_symbols.contains(e));
+ if w_symbols.iter().all(|e| r_symbols.contains(e)) {
+ return Ok(());
+ }
}
}
- false
+ Err(CompatibilityError::MissingSymbols)
}
_ => {
if w_type == SchemaKind::Union {
@@ -109,16 +126,25 @@ impl Checker {
}
}
}
- false
+ Err(CompatibilityError::Inconclusive(String::from(
+ "writers_schema",
+ )))
}
}
}
- fn match_record_schemas(&mut self, writers_schema: &Schema,
readers_schema: &Schema) -> bool {
+ fn match_record_schemas(
+ &mut self,
+ writers_schema: &Schema,
+ readers_schema: &Schema,
+ ) -> Result<(), CompatibilityError> {
let w_type = SchemaKind::from(writers_schema);
if w_type == SchemaKind::Union {
- return false;
+ return Err(CompatibilityError::TypeExpected {
+ schema_type: String::from("writers_schema"),
+ expected_type: &[SchemaKind::Record],
+ });
}
if let Schema::Record(RecordSchema {
@@ -133,39 +159,49 @@ impl Checker {
{
for field in r_fields.iter() {
if let Some(pos) = w_lookup.get(&field.name) {
- if !self.full_match_schemas(&w_fields[*pos].schema,
&field.schema) {
- return false;
+ if let Err(err) =
+ self.full_match_schemas(&w_fields[*pos].schema,
&field.schema)
+ {
+ return Err(CompatibilityError::FieldTypeMismatch(
+ field.name.clone(),
+ Box::new(err),
+ ));
}
} else if field.default.is_none() {
- return false;
+ return
Err(CompatibilityError::MissingDefaultValue(field.name.clone()));
}
}
}
}
- true
+ Ok(())
}
- fn match_union_schemas(&mut self, writers_schema: &Schema, readers_schema:
&Schema) -> bool {
- let w_type = SchemaKind::from(writers_schema);
- let r_type = SchemaKind::from(readers_schema);
-
- assert_eq!(r_type, SchemaKind::Union);
-
- if w_type == SchemaKind::Union {
- if let Schema::Union(u) = writers_schema {
- u.schemas
- .iter()
- .all(|schema| self.full_match_schemas(schema,
readers_schema))
+ fn match_union_schemas(
+ &mut self,
+ writers_schema: &Schema,
+ readers_schema: &Schema,
+ ) -> Result<(), CompatibilityError> {
+ if let Schema::Union(u) = writers_schema {
+ if u.schemas
+ .iter()
+ .all(|schema| self.full_match_schemas(schema,
readers_schema).is_ok())
+ {
+ return Ok(());
} else {
- unreachable!("writers_schema should have been Schema::Union")
+ return Err(CompatibilityError::MissingUnionElements);
}
} else if let Schema::Union(u) = readers_schema {
- u.schemas
+ // This check is needed because the writer_schema can be not union
+ // but the type can be contain in the union of the reader schema
+ // e.g. writer_schema is string and reader_schema is [string, int]
+ if u.schemas
.iter()
- .any(|schema| self.full_match_schemas(writers_schema, schema))
- } else {
- unreachable!("readers_schema should have been Schema::Union")
+ .any(|schema| self.full_match_schemas(writers_schema,
schema).is_ok())
+ {
+ return Ok(());
+ }
}
+ Err(CompatibilityError::MissingUnionElements)
}
fn recursion_in_progress(&mut self, writers_schema: &Schema,
readers_schema: &Schema) -> bool {
@@ -187,16 +223,22 @@ impl Checker {
impl SchemaCompatibility {
/// `can_read` performs a full, recursive check that a datum written using
the
/// writers_schema can be read using the readers_schema.
- pub fn can_read(writers_schema: &Schema, readers_schema: &Schema) -> bool {
+ pub fn can_read(
+ writers_schema: &Schema,
+ readers_schema: &Schema,
+ ) -> Result<(), CompatibilityError> {
let mut c = Checker::new();
c.can_read(writers_schema, readers_schema)
}
/// `mutual_read` performs a full, recursive check that a datum written
using either
/// the writers_schema or the readers_schema can be read using the other
schema.
- pub fn mutual_read(writers_schema: &Schema, readers_schema: &Schema) ->
bool {
- SchemaCompatibility::can_read(writers_schema, readers_schema)
- && SchemaCompatibility::can_read(readers_schema, writers_schema)
+ pub fn mutual_read(
+ writers_schema: &Schema,
+ readers_schema: &Schema,
+ ) -> Result<(), CompatibilityError> {
+ SchemaCompatibility::can_read(writers_schema, readers_schema)?;
+ SchemaCompatibility::can_read(readers_schema, writers_schema)
}
/// `match_schemas` performs a basic check that a datum written with the
@@ -204,29 +246,45 @@ impl SchemaCompatibility {
/// matching the types, including schema promotion, and matching the full
name for
/// named types. Aliases for named types are not supported here, and the
rust
/// implementation of Avro in general does not include support for
aliases (I think).
- pub(crate) fn match_schemas(writers_schema: &Schema, readers_schema:
&Schema) -> bool {
+ pub(crate) fn match_schemas(
+ writers_schema: &Schema,
+ readers_schema: &Schema,
+ ) -> Result<(), CompatibilityError> {
let w_type = SchemaKind::from(writers_schema);
let r_type = SchemaKind::from(readers_schema);
if w_type == SchemaKind::Union || r_type == SchemaKind::Union {
- return true;
+ return Ok(());
}
if w_type == r_type {
if r_type.is_primitive() {
- return true;
+ return Ok(());
}
match r_type {
SchemaKind::Record => {
if let Schema::Record(RecordSchema { name: w_name, .. }) =
writers_schema {
if let Schema::Record(RecordSchema { name: r_name, ..
}) = readers_schema {
- return w_name.name == r_name.name;
+ if w_name.name == r_name.name {
+ return Ok(());
+ } else {
+ return Err(CompatibilityError::NameMismatch {
+ writer_name: w_name.name.clone(),
+ reader_name: r_name.name.clone(),
+ });
+ }
} else {
- unreachable!("readers_schema should have been
Schema::Record")
+ return Err(CompatibilityError::TypeExpected {
+ schema_type: String::from("readers_schema"),
+ expected_type: &[SchemaKind::Record],
+ });
}
} else {
- unreachable!("writers_schema should have been
Schema::Record")
+ return Err(CompatibilityError::TypeExpected {
+ schema_type: String::from("writers_schema"),
+ expected_type: &[SchemaKind::Record],
+ });
}
}
SchemaKind::Fixed => {
@@ -246,23 +304,39 @@ impl SchemaCompatibility {
attributes: _,
}) = readers_schema
{
- return w_name.name == r_name.name && w_size ==
r_size;
+ return (w_name.name == r_name.name && w_size ==
r_size)
+ .then_some(())
+ .ok_or(CompatibilityError::FixedMismatch);
} else {
- unreachable!("readers_schema should have been
Schema::Fixed")
+ return Err(CompatibilityError::TypeExpected {
+ schema_type: String::from("writers_schema"),
+ expected_type: &[SchemaKind::Fixed],
+ });
}
- } else {
- unreachable!("writers_schema should have been
Schema::Fixed")
}
}
SchemaKind::Enum => {
if let Schema::Enum(EnumSchema { name: w_name, .. }) =
writers_schema {
if let Schema::Enum(EnumSchema { name: r_name, .. }) =
readers_schema {
- return w_name.name == r_name.name;
+ if w_name.name == r_name.name {
+ return Ok(());
+ } else {
+ return Err(CompatibilityError::NameMismatch {
+ writer_name: w_name.name.clone(),
+ reader_name: r_name.name.clone(),
+ });
+ }
} else {
- unreachable!("readers_schema should have been
Schema::Enum")
+ return Err(CompatibilityError::TypeExpected {
+ schema_type: String::from("readers_schema"),
+ expected_type: &[SchemaKind::Enum],
+ });
}
} else {
- unreachable!("writers_schema should have been
Schema::Enum")
+ return Err(CompatibilityError::TypeExpected {
+ schema_type: String::from("writers_schema"),
+ expected_type: &[SchemaKind::Enum],
+ });
}
}
SchemaKind::Map => {
@@ -270,10 +344,16 @@ impl SchemaCompatibility {
if let Schema::Map(r_m) = readers_schema {
return SchemaCompatibility::match_schemas(w_m,
r_m);
} else {
- unreachable!("readers_schema should have been
Schema::Map")
+ return Err(CompatibilityError::TypeExpected {
+ schema_type: String::from("readers_schema"),
+ expected_type: &[SchemaKind::Map],
+ });
}
} else {
- unreachable!("writers_schema should have been
Schema::Map")
+ return Err(CompatibilityError::TypeExpected {
+ schema_type: String::from("writers_schema"),
+ expected_type: &[SchemaKind::Map],
+ });
}
}
SchemaKind::Array => {
@@ -281,45 +361,91 @@ impl SchemaCompatibility {
if let Schema::Array(r_a) = readers_schema {
return SchemaCompatibility::match_schemas(w_a,
r_a);
} else {
- unreachable!("readers_schema should have been
Schema::Array")
+ return Err(CompatibilityError::TypeExpected {
+ schema_type: String::from("readers_schema"),
+ expected_type: &[SchemaKind::Array],
+ });
}
} else {
- unreachable!("writers_schema should have been
Schema::Array")
+ return Err(CompatibilityError::TypeExpected {
+ schema_type: String::from("writers_schema"),
+ expected_type: &[SchemaKind::Array],
+ });
}
}
- _ => (),
+ _ => {
+ return Err(CompatibilityError::Inconclusive(String::from(
+ "readers_schema",
+ )))
+ }
};
}
- if w_type == SchemaKind::Int
- && [SchemaKind::Long, SchemaKind::Float, SchemaKind::Double]
- .iter()
- .any(|&t| t == r_type)
- {
- return true;
- }
-
- if w_type == SchemaKind::Long
- && [SchemaKind::Float, SchemaKind::Double]
- .iter()
- .any(|&t| t == r_type)
- {
- return true;
- }
-
- if w_type == SchemaKind::Float && r_type == SchemaKind::Double {
- return true;
- }
-
- if w_type == SchemaKind::String && r_type == SchemaKind::Bytes {
- return true;
- }
-
- if w_type == SchemaKind::Bytes && r_type == SchemaKind::String {
- return true;
+ // Here are the checks for primitive types
+ match w_type {
+ SchemaKind::Int => {
+ if [SchemaKind::Long, SchemaKind::Float, SchemaKind::Double]
+ .iter()
+ .any(|&t| t == r_type)
+ {
+ Ok(())
+ } else {
+ Err(CompatibilityError::TypeExpected {
+ schema_type: String::from("readers_schema"),
+ expected_type: &[SchemaKind::Long, SchemaKind::Float,
SchemaKind::Double],
+ })
+ }
+ }
+ SchemaKind::Long => {
+ if [SchemaKind::Float, SchemaKind::Double]
+ .iter()
+ .any(|&t| t == r_type)
+ {
+ Ok(())
+ } else {
+ Err(CompatibilityError::TypeExpected {
+ schema_type: String::from("readers_schema"),
+ expected_type: &[SchemaKind::Float,
SchemaKind::Double],
+ })
+ }
+ }
+ SchemaKind::Float => {
+ if [SchemaKind::Float, SchemaKind::Double]
+ .iter()
+ .any(|&t| t == r_type)
+ {
+ Ok(())
+ } else {
+ Err(CompatibilityError::TypeExpected {
+ schema_type: String::from("readers_schema"),
+ expected_type: &[SchemaKind::Float,
SchemaKind::Double],
+ })
+ }
+ }
+ SchemaKind::String => {
+ if r_type == SchemaKind::Bytes {
+ Ok(())
+ } else {
+ Err(CompatibilityError::TypeExpected {
+ schema_type: String::from("readers_schema"),
+ expected_type: &[SchemaKind::Bytes],
+ })
+ }
+ }
+ SchemaKind::Bytes => {
+ if r_type == SchemaKind::String {
+ Ok(())
+ } else {
+ Err(CompatibilityError::TypeExpected {
+ schema_type: String::from("readers_schema"),
+ expected_type: &[SchemaKind::String],
+ })
+ }
+ }
+ _ => Err(CompatibilityError::Inconclusive(String::from(
+ "writers_schema",
+ ))),
}
-
- false
}
}
@@ -473,10 +599,11 @@ mod tests {
#[test]
fn test_broken() {
- assert!(!SchemaCompatibility::can_read(
- &int_string_union_schema(),
- &int_union_schema()
- ))
+ assert_eq!(
+ CompatibilityError::MissingUnionElements,
+ SchemaCompatibility::can_read(&int_string_union_schema(),
&int_union_schema())
+ .unwrap_err()
+ )
}
#[test]
@@ -526,9 +653,9 @@ mod tests {
(nested_record(), nested_optional_record()),
];
- assert!(!incompatible_schemas
+ assert!(incompatible_schemas
.iter()
- .any(|(reader, writer)| SchemaCompatibility::can_read(writer,
reader)));
+ .any(|(reader, writer)| SchemaCompatibility::can_read(writer,
reader).is_err()));
}
#[test]
@@ -577,7 +704,7 @@ mod tests {
assert!(compatible_schemas
.iter()
- .all(|(reader, writer)| SchemaCompatibility::can_read(writer,
reader)));
+ .all(|(reader, writer)| SchemaCompatibility::can_read(writer,
reader).is_ok()));
}
fn writer_schema() -> Schema {
@@ -601,14 +728,11 @@ mod tests {
]}
"#,
)?;
- assert!(SchemaCompatibility::can_read(
- &writer_schema(),
- &reader_schema,
- ));
- assert!(!SchemaCompatibility::can_read(
- &reader_schema,
- &writer_schema()
- ));
+ assert!(SchemaCompatibility::can_read(&writer_schema(),
&reader_schema,).is_ok());
+ assert_eq!(
+ CompatibilityError::MissingDefaultValue(String::from("oldfield2")),
+ SchemaCompatibility::can_read(&reader_schema,
&writer_schema()).unwrap_err()
+ );
Ok(())
}
@@ -622,14 +746,11 @@ mod tests {
]}
"#,
)?;
- assert!(SchemaCompatibility::can_read(
- &writer_schema(),
- &reader_schema
- ));
- assert!(!SchemaCompatibility::can_read(
- &reader_schema,
- &writer_schema()
- ));
+ assert!(SchemaCompatibility::can_read(&writer_schema(),
&reader_schema).is_ok());
+ assert_eq!(
+ CompatibilityError::MissingDefaultValue(String::from("oldfield1")),
+ SchemaCompatibility::can_read(&reader_schema,
&writer_schema()).unwrap_err()
+ );
Ok(())
}
@@ -644,14 +765,8 @@ mod tests {
]}
"#,
)?;
- assert!(SchemaCompatibility::can_read(
- &writer_schema(),
- &reader_schema
- ));
- assert!(SchemaCompatibility::can_read(
- &reader_schema,
- &writer_schema()
- ));
+ assert!(SchemaCompatibility::can_read(&writer_schema(),
&reader_schema).is_ok());
+ assert!(SchemaCompatibility::can_read(&reader_schema,
&writer_schema()).is_ok());
Ok(())
}
@@ -666,14 +781,11 @@ mod tests {
]}
"#,
)?;
- assert!(SchemaCompatibility::can_read(
- &writer_schema(),
- &reader_schema
- ));
- assert!(!SchemaCompatibility::can_read(
- &reader_schema,
- &writer_schema()
- ));
+ assert!(SchemaCompatibility::can_read(&writer_schema(),
&reader_schema).is_ok());
+ assert_eq!(
+ CompatibilityError::MissingDefaultValue(String::from("oldfield2")),
+ SchemaCompatibility::can_read(&reader_schema,
&writer_schema()).unwrap_err()
+ );
Ok(())
}
@@ -688,14 +800,14 @@ mod tests {
]}
"#,
)?;
- assert!(!SchemaCompatibility::can_read(
- &writer_schema(),
- &reader_schema
- ));
- assert!(!SchemaCompatibility::can_read(
- &reader_schema,
- &writer_schema()
- ));
+ assert_eq!(
+ CompatibilityError::MissingDefaultValue(String::from("newfield1")),
+ SchemaCompatibility::can_read(&writer_schema(),
&reader_schema).unwrap_err()
+ );
+ assert_eq!(
+ CompatibilityError::MissingDefaultValue(String::from("oldfield2")),
+ SchemaCompatibility::can_read(&reader_schema,
&writer_schema()).unwrap_err()
+ );
Ok(())
}
@@ -705,27 +817,24 @@ mod tests {
let valid_reader = string_array_schema();
let invalid_reader = string_map_schema();
- assert!(SchemaCompatibility::can_read(
- &string_array_schema(),
- &valid_reader
- ));
- assert!(!SchemaCompatibility::can_read(
- &string_array_schema(),
- &invalid_reader
- ));
+ assert!(SchemaCompatibility::can_read(&string_array_schema(),
&valid_reader).is_ok());
+ assert_eq!(
+ CompatibilityError::Inconclusive(String::from("writers_schema")),
+ SchemaCompatibility::can_read(&string_array_schema(),
&invalid_reader).unwrap_err()
+ );
}
#[test]
fn test_primitive_writer_schema() {
let valid_reader = Schema::String;
- assert!(SchemaCompatibility::can_read(
- &Schema::String,
- &valid_reader
- ));
- assert!(!SchemaCompatibility::can_read(
- &Schema::Int,
- &Schema::String
- ));
+ assert!(SchemaCompatibility::can_read(&Schema::String,
&valid_reader).is_ok());
+ assert_eq!(
+ CompatibilityError::TypeExpected {
+ schema_type: String::from("readers_schema"),
+ expected_type: &[SchemaKind::Long, SchemaKind::Float,
SchemaKind::Double],
+ },
+ SchemaCompatibility::can_read(&Schema::Int,
&Schema::String).unwrap_err()
+ );
}
#[test]
@@ -734,8 +843,11 @@ mod tests {
let union_writer = union_schema(vec![Schema::Int, Schema::String]);
let union_reader = union_schema(vec![Schema::String]);
- assert!(!SchemaCompatibility::can_read(&union_writer, &union_reader));
- assert!(SchemaCompatibility::can_read(&union_reader, &union_writer));
+ assert_eq!(
+ CompatibilityError::MissingUnionElements,
+ SchemaCompatibility::can_read(&union_writer,
&union_reader).unwrap_err()
+ );
+ assert!(SchemaCompatibility::can_read(&union_reader,
&union_writer).is_ok());
}
#[test]
@@ -750,13 +862,22 @@ mod tests {
let int_schema = Schema::parse_str(
r#"
- {"type":"record", "name":"MyRecord", "namespace":"ns", "fields": [
- {"name":"field1", "type":"int"}
- ]}
-"#,
+ {"type":"record", "name":"MyRecord", "namespace":"ns", "fields":
[
+ {"name":"field1", "type":"int"}
+ ]}
+ "#,
)?;
- assert!(!SchemaCompatibility::can_read(&string_schema, &int_schema));
+ assert_eq!(
+ CompatibilityError::FieldTypeMismatch(
+ "field1".to_owned(),
+ Box::new(CompatibilityError::TypeExpected {
+ schema_type: "readers_schema".to_owned(),
+ expected_type: &[SchemaKind::Bytes]
+ })
+ ),
+ SchemaCompatibility::can_read(&string_schema,
&int_schema).unwrap_err()
+ );
Ok(())
}
@@ -770,8 +891,11 @@ mod tests {
)?;
let enum_schema2 =
Schema::parse_str(r#"{"type":"enum", "name":"MyEnum",
"symbols":["A","B","C"]}"#)?;
- assert!(!SchemaCompatibility::can_read(&enum_schema2, &enum_schema1));
- assert!(SchemaCompatibility::can_read(&enum_schema1, &enum_schema2));
+ assert_eq!(
+ CompatibilityError::MissingSymbols,
+ SchemaCompatibility::can_read(&enum_schema2,
&enum_schema1).unwrap_err()
+ );
+ assert!(SchemaCompatibility::can_read(&enum_schema1,
&enum_schema2).is_ok());
Ok(())
}
@@ -843,10 +967,10 @@ mod tests {
fn test_union_resolution_no_structure_match() {
// short name match, but no structure match
let read_schema = union_schema(vec![Schema::Null,
point_3d_no_default_schema()]);
- assert!(!SchemaCompatibility::can_read(
- &point_2d_fullname_schema(),
- &read_schema
- ));
+ assert_eq!(
+ CompatibilityError::MissingUnionElements,
+ SchemaCompatibility::can_read(&point_2d_fullname_schema(),
&read_schema).unwrap_err()
+ );
}
#[test]
@@ -858,10 +982,10 @@ mod tests {
point_2d_schema(),
point_3d_schema(),
]);
- assert!(!SchemaCompatibility::can_read(
- &point_2d_fullname_schema(),
- &read_schema
- ));
+ assert_eq!(
+ CompatibilityError::MissingUnionElements,
+ SchemaCompatibility::can_read(&point_2d_fullname_schema(),
&read_schema).unwrap_err()
+ );
}
#[test]
@@ -873,10 +997,10 @@ mod tests {
point_3d_schema(),
point_2d_schema(),
]);
- assert!(!SchemaCompatibility::can_read(
- &point_2d_fullname_schema(),
- &read_schema
- ));
+ assert_eq!(
+ CompatibilityError::MissingUnionElements,
+ SchemaCompatibility::can_read(&point_2d_fullname_schema(),
&read_schema).unwrap_err()
+ );
}
#[test]
@@ -888,10 +1012,10 @@ mod tests {
point_3d_match_name_schema(),
point_3d_schema(),
]);
- assert!(!SchemaCompatibility::can_read(
- &point_2d_fullname_schema(),
- &read_schema
- ));
+ assert_eq!(
+ CompatibilityError::MissingUnionElements,
+ SchemaCompatibility::can_read(&point_2d_fullname_schema(),
&read_schema).unwrap_err()
+ );
}
#[test]
@@ -904,10 +1028,7 @@ mod tests {
point_3d_schema(),
point_2d_fullname_schema(),
]);
- assert!(SchemaCompatibility::can_read(
- &point_2d_fullname_schema(),
- &read_schema
- ));
+ assert!(SchemaCompatibility::can_read(&point_2d_fullname_schema(),
&read_schema).is_ok());
}
#[test]
@@ -1078,7 +1199,7 @@ mod tests {
let schema_v1 = Schema::parse_str(RAW_SCHEMA_V1)?;
let schema_v2 = Schema::parse_str(RAW_SCHEMA_V2)?;
- assert!(SchemaCompatibility::can_read(&schema_v1, &schema_v2));
+ assert!(SchemaCompatibility::can_read(&schema_v1, &schema_v2).is_ok());
Ok(())
}
@@ -1153,7 +1274,65 @@ mod tests {
];
for (schema_1, schema_2) in schemas {
- assert!(SchemaCompatibility::can_read(&schema_1, &schema_2));
+ assert!(SchemaCompatibility::can_read(&schema_1,
&schema_2).is_ok());
+ }
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_can_read_compatibility_errors() -> TestResult {
+ let schemas = [
+ (
+ Schema::parse_str(
+ r#"{
+ "type": "record",
+ "name": "StatisticsMap",
+ "fields": [
+ {"name": "average", "type": "int", "default": 0},
+ {"name": "success", "type": {"type": "map", "values":
"int"}}
+ ]
+ }"#)?,
+ Schema::parse_str(
+ r#"{
+ "type": "record",
+ "name": "StatisticsMap",
+ "fields": [
+ {"name": "average", "type": "int", "default": 0},
+ {"name": "success", "type": ["null", {"type": "map",
"values": "int"}], "default": null}
+ ]
+ }"#)?,
+ "Incompatible schemata! Field 'success' in reader schema does
not match the type in the writer schema"
+ ),
+ (
+ Schema::parse_str(
+ r#"{
+ "type": "record",
+ "name": "StatisticsArray",
+ "fields": [
+ {"name": "max_values", "type": {"type": "array",
"items": "int"}}
+ ]
+ }"#)?,
+ Schema::parse_str(
+ r#"{
+ "type": "record",
+ "name": "StatisticsArray",
+ "fields": [
+ {"name": "max_values", "type": ["null", {"type":
"array", "items": "int"}], "default": null}
+ ]
+ }"#)?,
+ "Incompatible schemata! Field 'max_values' in reader schema
does not match the type in the writer schema"
+ )
+ ];
+
+ for (schema_1, schema_2, error) in schemas {
+ assert!(SchemaCompatibility::can_read(&schema_1,
&schema_2).is_ok());
+ assert_eq!(
+ error,
+ SchemaCompatibility::can_read(&schema_2, &schema_1)
+ .unwrap_err()
+ .to_string()
+ );
}
Ok(())