This is an automated email from the ASF dual-hosted git repository.

clesaec pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/avro.git


The following commit(s) were added to refs/heads/main by this push:
     new 1cea6907a AVRO-3904: [RUST] return a Result when checking schema 
compatibility so the … (#2587)
1cea6907a is described below

commit 1cea6907a24773bdc5d7282fdd90e92b6aef0ab3
Author: Marcos Schroh <[email protected]>
AuthorDate: Tue Nov 28 10:02:49 2023 +0100

    AVRO-3904: [RUST] return a Result when checking schema compatibility so the 
… (#2587)
    
    * AVRO-3904: [Rust] return a Result when checking schema compatibility so 
the end users will have feedback in case or errors
    Co-authored-by: Marcos Schroh <[email protected]>
---
 lang/rust/avro/README.md                   |   4 +-
 lang/rust/avro/src/error.rs                |  42 +++
 lang/rust/avro/src/lib.rs                  |   4 +-
 lang/rust/avro/src/schema.rs               |  36 ++
 lang/rust/avro/src/schema_compatibility.rs | 524 ++++++++++++++++++++---------
 5 files changed, 439 insertions(+), 171 deletions(-)

diff --git a/lang/rust/avro/README.md b/lang/rust/avro/README.md
index 07b18748f..a349847fa 100644
--- a/lang/rust/avro/README.md
+++ b/lang/rust/avro/README.md
@@ -634,7 +634,7 @@ use apache_avro::{Schema, 
schema_compatibility::SchemaCompatibility};
 
 let writers_schema = Schema::parse_str(r#"{"type": "array", 
"items":"int"}"#).unwrap();
 let readers_schema = Schema::parse_str(r#"{"type": "array", 
"items":"long"}"#).unwrap();
-assert_eq!(true, SchemaCompatibility::can_read(&writers_schema, 
&readers_schema));
+assert!(SchemaCompatibility::can_read(&writers_schema, 
&readers_schema).is_ok());
 ```
 
 2. Incompatible schemas (a long array schema cannot be read by an int array 
schema)
@@ -647,7 +647,7 @@ use apache_avro::{Schema, 
schema_compatibility::SchemaCompatibility};
 
 let writers_schema = Schema::parse_str(r#"{"type": "array", 
"items":"long"}"#).unwrap();
 let readers_schema = Schema::parse_str(r#"{"type": "array", 
"items":"int"}"#).unwrap();
-assert_eq!(false, SchemaCompatibility::can_read(&writers_schema, 
&readers_schema));
+assert!(SchemaCompatibility::can_read(&writers_schema, 
&readers_schema).is_err());
 ```
 
 <!-- cargo-rdme end -->
diff --git a/lang/rust/avro/src/error.rs b/lang/rust/avro/src/error.rs
index 8fa146027..810c5687a 100644
--- a/lang/rust/avro/src/error.rs
+++ b/lang/rust/avro/src/error.rs
@@ -480,6 +480,48 @@ pub enum Error {
     BadCodecMetadata,
 }
 
+#[derive(thiserror::Error, Debug)]
+pub enum CompatibilityError {
+    #[error("Schemas are not compatible. Writer schema is 
{writer_schema_type}, but reader schema is {reader_schema_type}")]
+    WrongType {
+        writer_schema_type: String,
+        reader_schema_type: String,
+    },
+
+    #[error("Schemas are not compatible. The {schema_type} should have been 
{expected_type}")]
+    TypeExpected {
+        schema_type: String,
+        expected_type: String,
+    },
+
+    #[error("Schemas are not compatible. Field '{0}' in reader schema does not 
match the type in the writer schema")]
+    FieldTypeMismatch(String),
+
+    #[error("Schemas are not compatible. Schemas mismatch")]
+    SchemaMismatch,
+
+    #[error("Schemas are not compatible. Field '{0}' in reader schema must 
have a default value")]
+    MissingDefaultValue(String),
+
+    #[error("Schemas are not compatible. Reader's symbols must contain all 
writer's symbols")]
+    MissingSymbols,
+
+    #[error("Schemas are not compatible. All elements in union must match for 
both schemas")]
+    MissingUnionElements,
+
+    #[error("Schemas are not compatible. Name and size don't match for fixed")]
+    FixedMismatch,
+
+    #[error("Schemas are not compatible. The name must be the same for both 
schemas. Writer's name {writer_name} and reader's name {reader_name}")]
+    NameMismatch {
+        writer_name: String,
+        reader_name: String,
+    },
+
+    #[error("Schemas are not compatible. Unknown type for '{0}'. Make sure 
that the type is a valid one")]
+    Inconclusive(String),
+}
+
 impl serde::ser::Error for Error {
     fn custom<T: fmt::Display>(msg: T) -> Self {
         Error::SerializeValue(msg.to_string())
diff --git a/lang/rust/avro/src/lib.rs b/lang/rust/avro/src/lib.rs
index b2d930068..cb49555d5 100644
--- a/lang/rust/avro/src/lib.rs
+++ b/lang/rust/avro/src/lib.rs
@@ -747,7 +747,7 @@
 //!
 //! let writers_schema = Schema::parse_str(r#"{"type": "array", 
"items":"int"}"#).unwrap();
 //! let readers_schema = Schema::parse_str(r#"{"type": "array", 
"items":"long"}"#).unwrap();
-//! assert_eq!(true, SchemaCompatibility::can_read(&writers_schema, 
&readers_schema));
+//! assert!(SchemaCompatibility::can_read(&writers_schema, 
&readers_schema).is_ok());
 //! ```
 //!
 //! 2. Incompatible schemas (a long array schema cannot be read by an int 
array schema)
@@ -760,7 +760,7 @@
 //!
 //! let writers_schema = Schema::parse_str(r#"{"type": "array", 
"items":"long"}"#).unwrap();
 //! let readers_schema = Schema::parse_str(r#"{"type": "array", 
"items":"int"}"#).unwrap();
-//! assert_eq!(false, SchemaCompatibility::can_read(&writers_schema, 
&readers_schema));
+//! assert!(SchemaCompatibility::can_read(&writers_schema, 
&readers_schema).is_err());
 //! ```
 
 mod codec;
diff --git a/lang/rust/avro/src/schema.rs b/lang/rust/avro/src/schema.rs
index f2487e316..181e3fcff 100644
--- a/lang/rust/avro/src/schema.rs
+++ b/lang/rust/avro/src/schema.rs
@@ -206,6 +206,42 @@ impl From<&types::Value> for SchemaKind {
     }
 }
 
+// Implement `Display` for `SchemaKind`.
+impl fmt::Display for Schema {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        match self {
+            Schema::Null => write!(f, "Null"),
+            Schema::Boolean => write!(f, "Boolean"),
+            Schema::Int => write!(f, "Int"),
+            Schema::Long => write!(f, "Long"),
+            Schema::Float => write!(f, "Float"),
+            Schema::Double => write!(f, "Double"),
+            Schema::Bytes => write!(f, "Bytes"),
+            Schema::String => write!(f, "String"),
+            Schema::Array(..) => write!(f, "Array"),
+            Schema::Map(..) => write!(f, "Map"),
+            Schema::Union(..) => write!(f, "Union"),
+            Schema::Record(..) => write!(f, "Record"),
+            Schema::Enum(..) => write!(f, "Enum"),
+            Schema::Fixed(..) => write!(f, "Fixed"),
+            Schema::Decimal(..) => write!(f, "Decimal"),
+            Schema::BigDecimal => write!(f, "BigDecimal"),
+            Schema::Uuid => write!(f, "Uuid"),
+            Schema::Date => write!(f, "Date"),
+            Schema::TimeMillis => write!(f, "TimeMillis"),
+            Schema::TimeMicros => write!(f, "TimeMicros"),
+            Schema::TimestampMillis => write!(f, "TimestampMillis"),
+            Schema::TimestampMicros => write!(f, "TimestampMicros"),
+            Schema::LocalTimestampMillis => write!(f, "LocalTimestampMillis"),
+            Schema::LocalTimestampMicros => write!(f, "LocalTimestampMicros"),
+            Schema::Duration => write!(f, "Duration"),
+            Schema::Ref { name } => {
+                write!(f, "{}", name.name)
+            }
+        }
+    }
+}
+
 /// Represents names for `record`, `enum` and `fixed` Avro schemas.
 ///
 /// Each of these `Schema`s have a `fullname` composed of two parts:
diff --git a/lang/rust/avro/src/schema_compatibility.rs 
b/lang/rust/avro/src/schema_compatibility.rs
index 8a0b2a4d7..1f2f29e34 100644
--- a/lang/rust/avro/src/schema_compatibility.rs
+++ b/lang/rust/avro/src/schema_compatibility.rs
@@ -16,7 +16,10 @@
 // under the License.
 
 //! Logic for checking schema compatibility
-use crate::schema::{EnumSchema, FixedSchema, RecordSchema, Schema, SchemaKind};
+use crate::{
+    error::CompatibilityError,
+    schema::{EnumSchema, FixedSchema, RecordSchema, Schema, SchemaKind},
+};
 use std::{
     collections::{hash_map::DefaultHasher, HashSet},
     hash::Hasher,
@@ -37,7 +40,11 @@ impl Checker {
         }
     }
 
-    pub(crate) fn can_read(&mut self, writers_schema: &Schema, readers_schema: 
&Schema) -> bool {
+    pub(crate) fn can_read(
+        &mut self,
+        writers_schema: &Schema,
+        readers_schema: &Schema,
+    ) -> Result<(), CompatibilityError> {
         self.full_match_schemas(writers_schema, readers_schema)
     }
 
@@ -45,44 +52,52 @@ impl Checker {
         &mut self,
         writers_schema: &Schema,
         readers_schema: &Schema,
-    ) -> bool {
+    ) -> Result<(), CompatibilityError> {
         if self.recursion_in_progress(writers_schema, readers_schema) {
-            return true;
+            return Ok(());
         }
 
-        if !SchemaCompatibility::match_schemas(writers_schema, readers_schema) 
{
-            return false;
-        }
+        SchemaCompatibility::match_schemas(writers_schema, readers_schema)?;
 
         let w_type = SchemaKind::from(writers_schema);
         let r_type = SchemaKind::from(readers_schema);
 
         if w_type != SchemaKind::Union && (r_type.is_primitive() || r_type == 
SchemaKind::Fixed) {
-            return true;
+            return Ok(());
         }
 
         match r_type {
             SchemaKind::Record => self.match_record_schemas(writers_schema, 
readers_schema),
             SchemaKind::Map => {
                 if let Schema::Map(w_m) = writers_schema {
-                    if let Schema::Map(r_m) = readers_schema {
-                        self.full_match_schemas(w_m, r_m)
-                    } else {
-                        unreachable!("readers_schema should have been 
Schema::Map")
+                    match readers_schema {
+                        Schema::Map(r_m) => self.full_match_schemas(w_m, r_m),
+                        _ => Err(CompatibilityError::WrongType {
+                            writer_schema_type: format!("{}", writers_schema),
+                            reader_schema_type: format!("{}", readers_schema),
+                        }),
                     }
                 } else {
-                    unreachable!("writers_schema should have been Schema::Map")
+                    Err(CompatibilityError::WrongType {
+                        writer_schema_type: format!("{}", writers_schema),
+                        reader_schema_type: format!("{}", readers_schema),
+                    })
                 }
             }
             SchemaKind::Array => {
                 if let Schema::Array(w_a) = writers_schema {
-                    if let Schema::Array(r_a) = readers_schema {
-                        self.full_match_schemas(w_a, r_a)
-                    } else {
-                        unreachable!("readers_schema should have been 
Schema::Array")
+                    match readers_schema {
+                        Schema::Array(r_a) => self.full_match_schemas(w_a, 
r_a),
+                        _ => Err(CompatibilityError::WrongType {
+                            writer_schema_type: format!("{}", writers_schema),
+                            reader_schema_type: format!("{}", readers_schema),
+                        }),
                     }
                 } else {
-                    unreachable!("writers_schema should have been 
Schema::Array")
+                    Err(CompatibilityError::WrongType {
+                        writer_schema_type: format!("{}", writers_schema),
+                        reader_schema_type: format!("{}", readers_schema),
+                    })
                 }
             }
             SchemaKind::Union => self.match_union_schemas(writers_schema, 
readers_schema),
@@ -96,10 +111,12 @@ impl Checker {
                         symbols: r_symbols, ..
                     }) = readers_schema
                     {
-                        return !w_symbols.iter().any(|e| 
!r_symbols.contains(e));
+                        if w_symbols.iter().all(|e| r_symbols.contains(e)) {
+                            return Ok(());
+                        }
                     }
                 }
-                false
+                Err(CompatibilityError::MissingSymbols)
             }
             _ => {
                 if w_type == SchemaKind::Union {
@@ -109,16 +126,25 @@ impl Checker {
                         }
                     }
                 }
-                false
+                Err(CompatibilityError::Inconclusive(String::from(
+                    "writers_schema",
+                )))
             }
         }
     }
 
-    fn match_record_schemas(&mut self, writers_schema: &Schema, 
readers_schema: &Schema) -> bool {
+    fn match_record_schemas(
+        &mut self,
+        writers_schema: &Schema,
+        readers_schema: &Schema,
+    ) -> Result<(), CompatibilityError> {
         let w_type = SchemaKind::from(writers_schema);
 
         if w_type == SchemaKind::Union {
-            return false;
+            return Err(CompatibilityError::TypeExpected {
+                schema_type: String::from("writers_schema"),
+                expected_type: String::from("record"),
+            });
         }
 
         if let Schema::Record(RecordSchema {
@@ -133,39 +159,58 @@ impl Checker {
             {
                 for field in r_fields.iter() {
                     if let Some(pos) = w_lookup.get(&field.name) {
-                        if !self.full_match_schemas(&w_fields[*pos].schema, 
&field.schema) {
-                            return false;
+                        if self
+                            .full_match_schemas(&w_fields[*pos].schema, 
&field.schema)
+                            .is_err()
+                        {
+                            return 
Err(CompatibilityError::FieldTypeMismatch(field.name.clone()));
                         }
                     } else if field.default.is_none() {
-                        return false;
+                        return 
Err(CompatibilityError::MissingDefaultValue(field.name.clone()));
                     }
                 }
             }
         }
-        true
+        Ok(())
     }
 
-    fn match_union_schemas(&mut self, writers_schema: &Schema, readers_schema: 
&Schema) -> bool {
+    fn match_union_schemas(
+        &mut self,
+        writers_schema: &Schema,
+        readers_schema: &Schema,
+    ) -> Result<(), CompatibilityError> {
+        // Do not need to check the SchemaKind of reader as this function
+        // is only called when the readers_schema is Union
         let w_type = SchemaKind::from(writers_schema);
-        let r_type = SchemaKind::from(readers_schema);
-
-        assert_eq!(r_type, SchemaKind::Union);
 
         if w_type == SchemaKind::Union {
             if let Schema::Union(u) = writers_schema {
-                u.schemas
+                if u.schemas
                     .iter()
-                    .all(|schema| self.full_match_schemas(schema, 
readers_schema))
-            } else {
-                unreachable!("writers_schema should have been Schema::Union")
+                    .all(|schema| self.full_match_schemas(schema, 
readers_schema).is_ok())
+                {
+                    return Ok(());
+                } else {
+                    return Err(CompatibilityError::MissingUnionElements);
+                }
             }
+            // } else {
+            //     return Err(Error::CompatibilityError(String::from(
+            //         "writers_schema should have been Schema::Union",
+            //     )));
+            // }
         } else if let Schema::Union(u) = readers_schema {
-            u.schemas
+            // This check is nneded because the writer_schema can be a not 
union
+            // but the type can be contain in the union of the reeader schema
+            // e.g. writer_schema is string and reader_schema is [string, int]
+            if u.schemas
                 .iter()
-                .any(|schema| self.full_match_schemas(writers_schema, schema))
-        } else {
-            unreachable!("readers_schema should have been Schema::Union")
+                .any(|schema| self.full_match_schemas(writers_schema, 
schema).is_ok())
+            {
+                return Ok(());
+            }
         }
+        Err(CompatibilityError::SchemaMismatch)
     }
 
     fn recursion_in_progress(&mut self, writers_schema: &Schema, 
readers_schema: &Schema) -> bool {
@@ -187,7 +232,10 @@ impl Checker {
 impl SchemaCompatibility {
     /// `can_read` performs a full, recursive check that a datum written using 
the
     /// writers_schema can be read using the readers_schema.
-    pub fn can_read(writers_schema: &Schema, readers_schema: &Schema) -> bool {
+    pub fn can_read(
+        writers_schema: &Schema,
+        readers_schema: &Schema,
+    ) -> Result<(), CompatibilityError> {
         let mut c = Checker::new();
         c.can_read(writers_schema, readers_schema)
     }
@@ -195,8 +243,8 @@ impl SchemaCompatibility {
     /// `mutual_read` performs a full, recursive check that a datum written 
using either
     /// the writers_schema or the readers_schema can be read using the other 
schema.
     pub fn mutual_read(writers_schema: &Schema, readers_schema: &Schema) -> 
bool {
-        SchemaCompatibility::can_read(writers_schema, readers_schema)
-            && SchemaCompatibility::can_read(readers_schema, writers_schema)
+        SchemaCompatibility::can_read(writers_schema, readers_schema).is_ok()
+            && SchemaCompatibility::can_read(readers_schema, 
writers_schema).is_ok()
     }
 
     ///  `match_schemas` performs a basic check that a datum written with the
@@ -204,29 +252,45 @@ impl SchemaCompatibility {
     ///  matching the types, including schema promotion, and matching the full 
name for
     ///  named types. Aliases for named types are not supported here, and the 
rust
     ///  implementation of Avro in general does not include support for 
aliases (I think).
-    pub(crate) fn match_schemas(writers_schema: &Schema, readers_schema: 
&Schema) -> bool {
+    pub(crate) fn match_schemas(
+        writers_schema: &Schema,
+        readers_schema: &Schema,
+    ) -> Result<(), CompatibilityError> {
         let w_type = SchemaKind::from(writers_schema);
         let r_type = SchemaKind::from(readers_schema);
 
         if w_type == SchemaKind::Union || r_type == SchemaKind::Union {
-            return true;
+            return Ok(());
         }
 
         if w_type == r_type {
             if r_type.is_primitive() {
-                return true;
+                return Ok(());
             }
 
             match r_type {
                 SchemaKind::Record => {
                     if let Schema::Record(RecordSchema { name: w_name, .. }) = 
writers_schema {
                         if let Schema::Record(RecordSchema { name: r_name, .. 
}) = readers_schema {
-                            return w_name.name == r_name.name;
+                            if w_name.name == r_name.name {
+                                return Ok(());
+                            } else {
+                                return Err(CompatibilityError::NameMismatch {
+                                    writer_name: w_name.name.clone(),
+                                    reader_name: r_name.name.clone(),
+                                });
+                            }
                         } else {
-                            unreachable!("readers_schema should have been 
Schema::Record")
+                            return Err(CompatibilityError::TypeExpected {
+                                schema_type: String::from("readers_schema"),
+                                expected_type: String::from("record"),
+                            });
                         }
                     } else {
-                        unreachable!("writers_schema should have been 
Schema::Record")
+                        return Err(CompatibilityError::TypeExpected {
+                            schema_type: String::from("writers_schema"),
+                            expected_type: String::from("record"),
+                        });
                     }
                 }
                 SchemaKind::Fixed => {
@@ -246,23 +310,39 @@ impl SchemaCompatibility {
                             attributes: _,
                         }) = readers_schema
                         {
-                            return w_name.name == r_name.name && w_size == 
r_size;
+                            return (w_name.name == r_name.name && w_size == 
r_size)
+                                .then_some(())
+                                .ok_or(CompatibilityError::FixedMismatch);
                         } else {
-                            unreachable!("readers_schema should have been 
Schema::Fixed")
+                            return Err(CompatibilityError::TypeExpected {
+                                schema_type: String::from("writers_schema"),
+                                expected_type: String::from("fFixed"),
+                            });
                         }
-                    } else {
-                        unreachable!("writers_schema should have been 
Schema::Fixed")
                     }
                 }
                 SchemaKind::Enum => {
                     if let Schema::Enum(EnumSchema { name: w_name, .. }) = 
writers_schema {
                         if let Schema::Enum(EnumSchema { name: r_name, .. }) = 
readers_schema {
-                            return w_name.name == r_name.name;
+                            if w_name.name == r_name.name {
+                                return Ok(());
+                            } else {
+                                return Err(CompatibilityError::NameMismatch {
+                                    writer_name: w_name.name.clone(),
+                                    reader_name: r_name.name.clone(),
+                                });
+                            }
                         } else {
-                            unreachable!("readers_schema should have been 
Schema::Enum")
+                            return Err(CompatibilityError::TypeExpected {
+                                schema_type: String::from("readers_schema"),
+                                expected_type: String::from("enum"),
+                            });
                         }
                     } else {
-                        unreachable!("writers_schema should have been 
Schema::Enum")
+                        return Err(CompatibilityError::TypeExpected {
+                            schema_type: String::from("writers_schema"),
+                            expected_type: String::from("enum"),
+                        });
                     }
                 }
                 SchemaKind::Map => {
@@ -270,10 +350,16 @@ impl SchemaCompatibility {
                         if let Schema::Map(r_m) = readers_schema {
                             return SchemaCompatibility::match_schemas(w_m, 
r_m);
                         } else {
-                            unreachable!("readers_schema should have been 
Schema::Map")
+                            return Err(CompatibilityError::TypeExpected {
+                                schema_type: String::from("readers_schema"),
+                                expected_type: String::from("map"),
+                            });
                         }
                     } else {
-                        unreachable!("writers_schema should have been 
Schema::Map")
+                        return Err(CompatibilityError::TypeExpected {
+                            schema_type: String::from("writers_schema"),
+                            expected_type: String::from("map"),
+                        });
                     }
                 }
                 SchemaKind::Array => {
@@ -281,45 +367,91 @@ impl SchemaCompatibility {
                         if let Schema::Array(r_a) = readers_schema {
                             return SchemaCompatibility::match_schemas(w_a, 
r_a);
                         } else {
-                            unreachable!("readers_schema should have been 
Schema::Array")
+                            return Err(CompatibilityError::TypeExpected {
+                                schema_type: String::from("readers_schema"),
+                                expected_type: String::from("array"),
+                            });
                         }
                     } else {
-                        unreachable!("writers_schema should have been 
Schema::Array")
+                        return Err(CompatibilityError::TypeExpected {
+                            schema_type: String::from("writers_schema"),
+                            expected_type: String::from("array"),
+                        });
                     }
                 }
-                _ => (),
+                _ => {
+                    return Err(CompatibilityError::Inconclusive(String::from(
+                        "readers_schema",
+                    )))
+                }
             };
         }
 
-        if w_type == SchemaKind::Int
-            && [SchemaKind::Long, SchemaKind::Float, SchemaKind::Double]
-                .iter()
-                .any(|&t| t == r_type)
-        {
-            return true;
-        }
-
-        if w_type == SchemaKind::Long
-            && [SchemaKind::Float, SchemaKind::Double]
-                .iter()
-                .any(|&t| t == r_type)
-        {
-            return true;
-        }
-
-        if w_type == SchemaKind::Float && r_type == SchemaKind::Double {
-            return true;
-        }
-
-        if w_type == SchemaKind::String && r_type == SchemaKind::Bytes {
-            return true;
-        }
-
-        if w_type == SchemaKind::Bytes && r_type == SchemaKind::String {
-            return true;
+        // Here are the checks for primitive types
+        match w_type {
+            SchemaKind::Int => {
+                if [SchemaKind::Long, SchemaKind::Float, SchemaKind::Double]
+                    .iter()
+                    .any(|&t| t == r_type)
+                {
+                    Ok(())
+                } else {
+                    Err(CompatibilityError::TypeExpected {
+                        schema_type: String::from("readers_schema"),
+                        expected_type: String::from("long, float or double"),
+                    })
+                }
+            }
+            SchemaKind::Long => {
+                if [SchemaKind::Float, SchemaKind::Double]
+                    .iter()
+                    .any(|&t| t == r_type)
+                {
+                    Ok(())
+                } else {
+                    Err(CompatibilityError::TypeExpected {
+                        schema_type: String::from("readers_schema"),
+                        expected_type: String::from("float or double"),
+                    })
+                }
+            }
+            SchemaKind::Float => {
+                if [SchemaKind::Float, SchemaKind::Double]
+                    .iter()
+                    .any(|&t| t == r_type)
+                {
+                    Ok(())
+                } else {
+                    Err(CompatibilityError::TypeExpected {
+                        schema_type: String::from("readers_schema"),
+                        expected_type: String::from("float or double"),
+                    })
+                }
+            }
+            SchemaKind::String => {
+                if r_type == SchemaKind::Bytes {
+                    Ok(())
+                } else {
+                    Err(CompatibilityError::TypeExpected {
+                        schema_type: String::from("readers_schema"),
+                        expected_type: String::from("bytes"),
+                    })
+                }
+            }
+            SchemaKind::Bytes => {
+                if r_type == SchemaKind::String {
+                    Ok(())
+                } else {
+                    Err(CompatibilityError::TypeExpected {
+                        schema_type: String::from("readers_schema"),
+                        expected_type: String::from("string"),
+                    })
+                }
+            }
+            _ => Err(CompatibilityError::Inconclusive(String::from(
+                "writers_schema",
+            ))),
         }
-
-        false
     }
 }
 
@@ -473,10 +605,12 @@ mod tests {
 
     #[test]
     fn test_broken() {
-        assert!(!SchemaCompatibility::can_read(
-            &int_string_union_schema(),
-            &int_union_schema()
-        ))
+        assert_eq!(
+            "Schemas are not compatible. All elements in union must match for 
both schemas",
+            SchemaCompatibility::can_read(&int_string_union_schema(), 
&int_union_schema())
+                .unwrap_err()
+                .to_string()
+        )
     }
 
     #[test]
@@ -526,9 +660,9 @@ mod tests {
             (nested_record(), nested_optional_record()),
         ];
 
-        assert!(!incompatible_schemas
+        assert!(incompatible_schemas
             .iter()
-            .any(|(reader, writer)| SchemaCompatibility::can_read(writer, 
reader)));
+            .any(|(reader, writer)| SchemaCompatibility::can_read(writer, 
reader).is_err()));
     }
 
     #[test]
@@ -577,7 +711,7 @@ mod tests {
 
         assert!(compatible_schemas
             .iter()
-            .all(|(reader, writer)| SchemaCompatibility::can_read(writer, 
reader)));
+            .all(|(reader, writer)| SchemaCompatibility::can_read(writer, 
reader).is_ok()));
     }
 
     fn writer_schema() -> Schema {
@@ -601,14 +735,11 @@ mod tests {
       ]}
 "#,
         )?;
-        assert!(SchemaCompatibility::can_read(
-            &writer_schema(),
-            &reader_schema,
-        ));
-        assert!(!SchemaCompatibility::can_read(
+        assert!(SchemaCompatibility::can_read(&writer_schema(), 
&reader_schema,).is_ok());
+        assert_eq!("Schemas are not compatible. Field 'oldfield2' in reader 
schema must have a default value", SchemaCompatibility::can_read(
             &reader_schema,
             &writer_schema()
-        ));
+        ).unwrap_err().to_string());
 
         Ok(())
     }
@@ -622,14 +753,11 @@ mod tests {
         ]}
 "#,
         )?;
-        assert!(SchemaCompatibility::can_read(
-            &writer_schema(),
-            &reader_schema
-        ));
-        assert!(!SchemaCompatibility::can_read(
+        assert!(SchemaCompatibility::can_read(&writer_schema(), 
&reader_schema).is_ok());
+        assert_eq!("Schemas are not compatible. Field 'oldfield1' in reader 
schema must have a default value", SchemaCompatibility::can_read(
             &reader_schema,
             &writer_schema()
-        ));
+        ).unwrap_err().to_string());
 
         Ok(())
     }
@@ -644,14 +772,8 @@ mod tests {
         ]}
 "#,
         )?;
-        assert!(SchemaCompatibility::can_read(
-            &writer_schema(),
-            &reader_schema
-        ));
-        assert!(SchemaCompatibility::can_read(
-            &reader_schema,
-            &writer_schema()
-        ));
+        assert!(SchemaCompatibility::can_read(&writer_schema(), 
&reader_schema).is_ok());
+        assert!(SchemaCompatibility::can_read(&reader_schema, 
&writer_schema()).is_ok());
 
         Ok(())
     }
@@ -666,14 +788,11 @@ mod tests {
         ]}
 "#,
         )?;
-        assert!(SchemaCompatibility::can_read(
-            &writer_schema(),
-            &reader_schema
-        ));
-        assert!(!SchemaCompatibility::can_read(
+        assert!(SchemaCompatibility::can_read(&writer_schema(), 
&reader_schema).is_ok());
+        assert_eq!("Schemas are not compatible. Field 'oldfield2' in reader 
schema must have a default value",SchemaCompatibility::can_read(
             &reader_schema,
             &writer_schema()
-        ));
+        ).unwrap_err().to_string());
 
         Ok(())
     }
@@ -688,14 +807,13 @@ mod tests {
         ]}
 "#,
         )?;
-        assert!(!SchemaCompatibility::can_read(
+        assert_eq!("Schemas are not compatible. Field 'newfield1' in reader 
schema must have a default value", SchemaCompatibility::can_read(
             &writer_schema(),
-            &reader_schema
-        ));
-        assert!(!SchemaCompatibility::can_read(
+            &reader_schema).unwrap_err().to_string());
+        assert_eq!("Schemas are not compatible. Field 'oldfield2' in reader 
schema must have a default value", SchemaCompatibility::can_read(
             &reader_schema,
             &writer_schema()
-        ));
+        ).unwrap_err().to_string());
 
         Ok(())
     }
@@ -705,27 +823,23 @@ mod tests {
         let valid_reader = string_array_schema();
         let invalid_reader = string_map_schema();
 
-        assert!(SchemaCompatibility::can_read(
-            &string_array_schema(),
-            &valid_reader
-        ));
-        assert!(!SchemaCompatibility::can_read(
+        assert!(SchemaCompatibility::can_read(&string_array_schema(), 
&valid_reader).is_ok());
+        assert_eq!("Schemas are not compatible. Unknown type for 
'writers_schema'. Make sure that the type is a valid one", 
SchemaCompatibility::can_read(
             &string_array_schema(),
             &invalid_reader
-        ));
+        ).unwrap_err().to_string());
     }
 
     #[test]
     fn test_primitive_writer_schema() {
         let valid_reader = Schema::String;
-        assert!(SchemaCompatibility::can_read(
-            &Schema::String,
-            &valid_reader
-        ));
-        assert!(!SchemaCompatibility::can_read(
-            &Schema::Int,
-            &Schema::String
-        ));
+        assert!(SchemaCompatibility::can_read(&Schema::String, 
&valid_reader).is_ok());
+        assert_eq!(
+            "Schemas are not compatible. The readers_schema should have been 
long, float or double",
+            SchemaCompatibility::can_read(&Schema::Int, &Schema::String)
+                .unwrap_err()
+                .to_string()
+        );
     }
 
     #[test]
@@ -734,8 +848,13 @@ mod tests {
         let union_writer = union_schema(vec![Schema::Int, Schema::String]);
         let union_reader = union_schema(vec![Schema::String]);
 
-        assert!(!SchemaCompatibility::can_read(&union_writer, &union_reader));
-        assert!(SchemaCompatibility::can_read(&union_reader, &union_writer));
+        assert_eq!(
+            "Schemas are not compatible. All elements in union must match for 
both schemas",
+            SchemaCompatibility::can_read(&union_writer, &union_reader)
+                .unwrap_err()
+                .to_string()
+        );
+        assert!(SchemaCompatibility::can_read(&union_reader, 
&union_writer).is_ok());
     }
 
     #[test]
@@ -756,7 +875,10 @@ mod tests {
 "#,
         )?;
 
-        assert!(!SchemaCompatibility::can_read(&string_schema, &int_schema));
+        assert_eq!(
+            "Schemas are not compatible. Field 'field1' in reader schema does 
not match the type in the writer schema",
+            SchemaCompatibility::can_read(&string_schema, 
&int_schema).unwrap_err().to_string()
+        );
 
         Ok(())
     }
@@ -770,8 +892,13 @@ mod tests {
         )?;
         let enum_schema2 =
             Schema::parse_str(r#"{"type":"enum", "name":"MyEnum", 
"symbols":["A","B","C"]}"#)?;
-        assert!(!SchemaCompatibility::can_read(&enum_schema2, &enum_schema1));
-        assert!(SchemaCompatibility::can_read(&enum_schema1, &enum_schema2));
+        assert_eq!(
+            "Schemas are not compatible. Reader's symbols must contain all 
writer's symbols",
+            SchemaCompatibility::can_read(&enum_schema2, &enum_schema1)
+                .unwrap_err()
+                .to_string()
+        );
+        assert!(SchemaCompatibility::can_read(&enum_schema1, 
&enum_schema2).is_ok());
 
         Ok(())
     }
@@ -843,10 +970,12 @@ mod tests {
     fn test_union_resolution_no_structure_match() {
         // short name match, but no structure match
         let read_schema = union_schema(vec![Schema::Null, 
point_3d_no_default_schema()]);
-        assert!(!SchemaCompatibility::can_read(
-            &point_2d_fullname_schema(),
-            &read_schema
-        ));
+        assert_eq!(
+            "Schemas are not compatible. Schemas mismatch",
+            SchemaCompatibility::can_read(&point_2d_fullname_schema(), 
&read_schema)
+                .unwrap_err()
+                .to_string()
+        );
     }
 
     #[test]
@@ -858,10 +987,12 @@ mod tests {
             point_2d_schema(),
             point_3d_schema(),
         ]);
-        assert!(!SchemaCompatibility::can_read(
-            &point_2d_fullname_schema(),
-            &read_schema
-        ));
+        assert_eq!(
+            "Schemas are not compatible. Schemas mismatch",
+            SchemaCompatibility::can_read(&point_2d_fullname_schema(), 
&read_schema)
+                .unwrap_err()
+                .to_string()
+        );
     }
 
     #[test]
@@ -873,10 +1004,12 @@ mod tests {
             point_3d_schema(),
             point_2d_schema(),
         ]);
-        assert!(!SchemaCompatibility::can_read(
-            &point_2d_fullname_schema(),
-            &read_schema
-        ));
+        assert_eq!(
+            "Schemas are not compatible. Schemas mismatch",
+            SchemaCompatibility::can_read(&point_2d_fullname_schema(), 
&read_schema)
+                .unwrap_err()
+                .to_string()
+        );
     }
 
     #[test]
@@ -888,10 +1021,12 @@ mod tests {
             point_3d_match_name_schema(),
             point_3d_schema(),
         ]);
-        assert!(!SchemaCompatibility::can_read(
-            &point_2d_fullname_schema(),
-            &read_schema
-        ));
+        assert_eq!(
+            "Schemas are not compatible. Schemas mismatch",
+            SchemaCompatibility::can_read(&point_2d_fullname_schema(), 
&read_schema)
+                .unwrap_err()
+                .to_string()
+        );
     }
 
     #[test]
@@ -904,10 +1039,7 @@ mod tests {
             point_3d_schema(),
             point_2d_fullname_schema(),
         ]);
-        assert!(SchemaCompatibility::can_read(
-            &point_2d_fullname_schema(),
-            &read_schema
-        ));
+        assert!(SchemaCompatibility::can_read(&point_2d_fullname_schema(), 
&read_schema).is_ok());
     }
 
     #[test]
@@ -1078,7 +1210,7 @@ mod tests {
         let schema_v1 = Schema::parse_str(RAW_SCHEMA_V1)?;
         let schema_v2 = Schema::parse_str(RAW_SCHEMA_V2)?;
 
-        assert!(SchemaCompatibility::can_read(&schema_v1, &schema_v2));
+        assert!(SchemaCompatibility::can_read(&schema_v1, &schema_v2).is_ok());
 
         Ok(())
     }
@@ -1153,7 +1285,65 @@ mod tests {
         ];
 
         for (schema_1, schema_2) in schemas {
-            assert!(SchemaCompatibility::can_read(&schema_1, &schema_2));
+            assert!(SchemaCompatibility::can_read(&schema_1, 
&schema_2).is_ok());
+        }
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_can_read_compatibility_errors() -> TestResult {
+        let schemas = [
+            (
+                Schema::parse_str(
+                r#"{
+                    "type": "record",
+                    "name": "StatisticsMap",
+                    "fields": [
+                        {"name": "average", "type": "int", "default": 0},
+                        {"name": "success", "type": {"type": "map", "values": 
"int"}}
+                    ]
+                }"#)?,
+                Schema::parse_str(
+                        r#"{
+                    "type": "record",
+                    "name": "StatisticsMap",
+                    "fields": [
+                        {"name": "average", "type": "int", "default": 0},
+                        {"name": "success", "type": ["null", {"type": "map", 
"values": "int"}], "default": null}
+                    ]
+                }"#)?,
+                "Schemas are not compatible. Field 'success' in reader schema 
does not match the type in the writer schema"
+            ),
+            (
+                Schema::parse_str(
+                    r#"{
+                        "type": "record",
+                        "name": "StatisticsArray",
+                        "fields": [
+                            {"name": "max_values", "type": {"type": "array", 
"items": "int"}}
+                        ]
+                    }"#)?,
+                    Schema::parse_str(
+                    r#"{
+                        "type": "record",
+                        "name": "StatisticsArray",
+                        "fields": [
+                            {"name": "max_values", "type": ["null", {"type": 
"array", "items": "int"}], "default": null}
+                        ]
+                    }"#)?,
+                    "Schemas are not compatible. Field 'max_values' in reader 
schema does not match the type in the writer schema"
+            )
+        ];
+
+        for (schema_1, schema_2, error) in schemas {
+            assert!(SchemaCompatibility::can_read(&schema_1, 
&schema_2).is_ok());
+            assert_eq!(
+                error,
+                SchemaCompatibility::can_read(&schema_2, &schema_1)
+                    .unwrap_err()
+                    .to_string()
+            );
         }
 
         Ok(())


Reply via email to