This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a commit to branch branch-1.11
in repository https://gitbox.apache.org/repos/asf/avro.git


The following commit(s) were added to refs/heads/branch-1.11 by this push:
     new 9b54d1b6b AVRO-3904: [Rust] Minor improvements to the new schema 
compatibility changes (#2600)
9b54d1b6b is described below

commit 9b54d1b6bf2f1939fce7437d6db0f6b03fcb211a
Author: Martin Grigorov <[email protected]>
AuthorDate: Mon Dec 4 13:12:08 2023 +0200

    AVRO-3904: [Rust] Minor improvements to the new schema compatibility 
changes (#2600)
    
    * AVRO-3904: [Rust] Minor improvements to the new schema compatibility 
changes
    
    Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
    
    * AVRO-3904: [Rust] Use `Debug` instead of `Display` when printing schemata 
in CompatibilityError
    
    Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
    
    ---------
    
    Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
    (cherry picked from commit efd3b2acf24cd4036b4d0362c8a583218caf209d)
    Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
---
 lang/rust/avro/src/error.rs                |  53 ++-
 lang/rust/avro/src/schema_compatibility.rs | 553 +++++++++++++++++++----------
 2 files changed, 418 insertions(+), 188 deletions(-)

diff --git a/lang/rust/avro/src/error.rs b/lang/rust/avro/src/error.rs
index 30a192069..a7960656c 100644
--- a/lang/rust/avro/src/error.rs
+++ b/lang/rust/avro/src/error.rs
@@ -362,7 +362,7 @@ pub enum Error {
     DeflateCompress(#[source] std::io::Error),
 
     #[error("Failed to finish flate compressor")]
-    DeflateCompressFinish(std::io::Error),
+    DeflateCompressFinish(#[source] std::io::Error),
 
     #[error("Failed to decompress with flate")]
     DeflateDecompress(#[source] std::io::Error),
@@ -480,6 +480,47 @@ pub enum Error {
     BadCodecMetadata,
 }
 
+#[derive(thiserror::Error, PartialEq)]
+pub enum CompatibilityError {
+    #[error("Incompatible schema types! Writer schema is 
'{writer_schema_type}', but reader schema is '{reader_schema_type}'")]
+    WrongType {
+        writer_schema_type: String,
+        reader_schema_type: String,
+    },
+
+    #[error("Incompatible schema types! The {schema_type} should have been 
{expected_type:?}")]
+    TypeExpected {
+        schema_type: String,
+        expected_type: &'static [SchemaKind],
+    },
+
+    #[error("Incompatible schemata! Field '{0}' in reader schema does not 
match the type in the writer schema")]
+    FieldTypeMismatch(String, #[source] Box<CompatibilityError>),
+
+    #[error("Incompatible schemata! Field '{0}' in reader schema must have a 
default value")]
+    MissingDefaultValue(String),
+
+    #[error("Incompatible schemata! Reader's symbols must contain all writer's 
symbols")]
+    MissingSymbols,
+
+    #[error("Incompatible schemata! All elements in union must match for both 
schemas")]
+    MissingUnionElements,
+
+    #[error("Incompatible schemata! Name and size don't match for fixed")]
+    FixedMismatch,
+
+    #[error("Incompatible schemata! The name must be the same for both 
schemas. Writer's name {writer_name} and reader's name {reader_name}")]
+    NameMismatch {
+        writer_name: String,
+        reader_name: String,
+    },
+
+    #[error(
+        "Incompatible schemata! Unknown type for '{0}'. Make sure that the 
type is a valid one"
+    )]
+    Inconclusive(String),
+}
+
 impl serde::ser::Error for Error {
     fn custom<T: fmt::Display>(msg: T) -> Self {
         Error::SerializeValue(msg.to_string())
@@ -501,3 +542,13 @@ impl fmt::Debug for Error {
         write!(f, "{}", msg)
     }
 }
+
+impl fmt::Debug for CompatibilityError {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        let mut msg = self.to_string();
+        if let Some(e) = self.source() {
+            msg.extend([": ", &e.to_string()]);
+        }
+        write!(f, "{}", msg)
+    }
+}
diff --git a/lang/rust/avro/src/schema_compatibility.rs 
b/lang/rust/avro/src/schema_compatibility.rs
index 8a0b2a4d7..861f31aac 100644
--- a/lang/rust/avro/src/schema_compatibility.rs
+++ b/lang/rust/avro/src/schema_compatibility.rs
@@ -16,7 +16,10 @@
 // under the License.
 
 //! Logic for checking schema compatibility
-use crate::schema::{EnumSchema, FixedSchema, RecordSchema, Schema, SchemaKind};
+use crate::{
+    error::CompatibilityError,
+    schema::{EnumSchema, FixedSchema, RecordSchema, Schema, SchemaKind},
+};
 use std::{
     collections::{hash_map::DefaultHasher, HashSet},
     hash::Hasher,
@@ -37,7 +40,11 @@ impl Checker {
         }
     }
 
-    pub(crate) fn can_read(&mut self, writers_schema: &Schema, readers_schema: 
&Schema) -> bool {
+    pub(crate) fn can_read(
+        &mut self,
+        writers_schema: &Schema,
+        readers_schema: &Schema,
+    ) -> Result<(), CompatibilityError> {
         self.full_match_schemas(writers_schema, readers_schema)
     }
 
@@ -45,44 +52,52 @@ impl Checker {
         &mut self,
         writers_schema: &Schema,
         readers_schema: &Schema,
-    ) -> bool {
+    ) -> Result<(), CompatibilityError> {
         if self.recursion_in_progress(writers_schema, readers_schema) {
-            return true;
+            return Ok(());
         }
 
-        if !SchemaCompatibility::match_schemas(writers_schema, readers_schema) 
{
-            return false;
-        }
+        SchemaCompatibility::match_schemas(writers_schema, readers_schema)?;
 
         let w_type = SchemaKind::from(writers_schema);
         let r_type = SchemaKind::from(readers_schema);
 
         if w_type != SchemaKind::Union && (r_type.is_primitive() || r_type == 
SchemaKind::Fixed) {
-            return true;
+            return Ok(());
         }
 
         match r_type {
             SchemaKind::Record => self.match_record_schemas(writers_schema, 
readers_schema),
             SchemaKind::Map => {
                 if let Schema::Map(w_m) = writers_schema {
-                    if let Schema::Map(r_m) = readers_schema {
-                        self.full_match_schemas(w_m, r_m)
-                    } else {
-                        unreachable!("readers_schema should have been 
Schema::Map")
+                    match readers_schema {
+                        Schema::Map(r_m) => self.full_match_schemas(w_m, r_m),
+                        _ => Err(CompatibilityError::WrongType {
+                            writer_schema_type: format!("{:#?}", 
writers_schema),
+                            reader_schema_type: format!("{:#?}", 
readers_schema),
+                        }),
                     }
                 } else {
-                    unreachable!("writers_schema should have been Schema::Map")
+                    Err(CompatibilityError::TypeExpected {
+                        schema_type: String::from("writers_schema"),
+                        expected_type: &[SchemaKind::Record],
+                    })
                 }
             }
             SchemaKind::Array => {
                 if let Schema::Array(w_a) = writers_schema {
-                    if let Schema::Array(r_a) = readers_schema {
-                        self.full_match_schemas(w_a, r_a)
-                    } else {
-                        unreachable!("readers_schema should have been 
Schema::Array")
+                    match readers_schema {
+                        Schema::Array(r_a) => self.full_match_schemas(w_a, 
r_a),
+                        _ => Err(CompatibilityError::WrongType {
+                            writer_schema_type: format!("{:#?}", 
writers_schema),
+                            reader_schema_type: format!("{:#?}", 
readers_schema),
+                        }),
                     }
                 } else {
-                    unreachable!("writers_schema should have been 
Schema::Array")
+                    Err(CompatibilityError::TypeExpected {
+                        schema_type: String::from("writers_schema"),
+                        expected_type: &[SchemaKind::Array],
+                    })
                 }
             }
             SchemaKind::Union => self.match_union_schemas(writers_schema, 
readers_schema),
@@ -96,10 +111,12 @@ impl Checker {
                         symbols: r_symbols, ..
                     }) = readers_schema
                     {
-                        return !w_symbols.iter().any(|e| 
!r_symbols.contains(e));
+                        if w_symbols.iter().all(|e| r_symbols.contains(e)) {
+                            return Ok(());
+                        }
                     }
                 }
-                false
+                Err(CompatibilityError::MissingSymbols)
             }
             _ => {
                 if w_type == SchemaKind::Union {
@@ -109,16 +126,25 @@ impl Checker {
                         }
                     }
                 }
-                false
+                Err(CompatibilityError::Inconclusive(String::from(
+                    "writers_schema",
+                )))
             }
         }
     }
 
-    fn match_record_schemas(&mut self, writers_schema: &Schema, 
readers_schema: &Schema) -> bool {
+    fn match_record_schemas(
+        &mut self,
+        writers_schema: &Schema,
+        readers_schema: &Schema,
+    ) -> Result<(), CompatibilityError> {
         let w_type = SchemaKind::from(writers_schema);
 
         if w_type == SchemaKind::Union {
-            return false;
+            return Err(CompatibilityError::TypeExpected {
+                schema_type: String::from("writers_schema"),
+                expected_type: &[SchemaKind::Record],
+            });
         }
 
         if let Schema::Record(RecordSchema {
@@ -133,39 +159,49 @@ impl Checker {
             {
                 for field in r_fields.iter() {
                     if let Some(pos) = w_lookup.get(&field.name) {
-                        if !self.full_match_schemas(&w_fields[*pos].schema, 
&field.schema) {
-                            return false;
+                        if let Err(err) =
+                            self.full_match_schemas(&w_fields[*pos].schema, 
&field.schema)
+                        {
+                            return Err(CompatibilityError::FieldTypeMismatch(
+                                field.name.clone(),
+                                Box::new(err),
+                            ));
                         }
                     } else if field.default.is_none() {
-                        return false;
+                        return 
Err(CompatibilityError::MissingDefaultValue(field.name.clone()));
                     }
                 }
             }
         }
-        true
+        Ok(())
     }
 
-    fn match_union_schemas(&mut self, writers_schema: &Schema, readers_schema: 
&Schema) -> bool {
-        let w_type = SchemaKind::from(writers_schema);
-        let r_type = SchemaKind::from(readers_schema);
-
-        assert_eq!(r_type, SchemaKind::Union);
-
-        if w_type == SchemaKind::Union {
-            if let Schema::Union(u) = writers_schema {
-                u.schemas
-                    .iter()
-                    .all(|schema| self.full_match_schemas(schema, 
readers_schema))
+    fn match_union_schemas(
+        &mut self,
+        writers_schema: &Schema,
+        readers_schema: &Schema,
+    ) -> Result<(), CompatibilityError> {
+        if let Schema::Union(u) = writers_schema {
+            if u.schemas
+                .iter()
+                .all(|schema| self.full_match_schemas(schema, 
readers_schema).is_ok())
+            {
+                return Ok(());
             } else {
-                unreachable!("writers_schema should have been Schema::Union")
+                return Err(CompatibilityError::MissingUnionElements);
             }
         } else if let Schema::Union(u) = readers_schema {
-            u.schemas
+            // This check is needed because the writer_schema can be not union
+            // but the type can be contain in the union of the reader schema
+            // e.g. writer_schema is string and reader_schema is [string, int]
+            if u.schemas
                 .iter()
-                .any(|schema| self.full_match_schemas(writers_schema, schema))
-        } else {
-            unreachable!("readers_schema should have been Schema::Union")
+                .any(|schema| self.full_match_schemas(writers_schema, 
schema).is_ok())
+            {
+                return Ok(());
+            }
         }
+        Err(CompatibilityError::MissingUnionElements)
     }
 
     fn recursion_in_progress(&mut self, writers_schema: &Schema, 
readers_schema: &Schema) -> bool {
@@ -187,16 +223,22 @@ impl Checker {
 impl SchemaCompatibility {
     /// `can_read` performs a full, recursive check that a datum written using 
the
     /// writers_schema can be read using the readers_schema.
-    pub fn can_read(writers_schema: &Schema, readers_schema: &Schema) -> bool {
+    pub fn can_read(
+        writers_schema: &Schema,
+        readers_schema: &Schema,
+    ) -> Result<(), CompatibilityError> {
         let mut c = Checker::new();
         c.can_read(writers_schema, readers_schema)
     }
 
     /// `mutual_read` performs a full, recursive check that a datum written 
using either
     /// the writers_schema or the readers_schema can be read using the other 
schema.
-    pub fn mutual_read(writers_schema: &Schema, readers_schema: &Schema) -> 
bool {
-        SchemaCompatibility::can_read(writers_schema, readers_schema)
-            && SchemaCompatibility::can_read(readers_schema, writers_schema)
+    pub fn mutual_read(
+        writers_schema: &Schema,
+        readers_schema: &Schema,
+    ) -> Result<(), CompatibilityError> {
+        SchemaCompatibility::can_read(writers_schema, readers_schema)?;
+        SchemaCompatibility::can_read(readers_schema, writers_schema)
     }
 
     ///  `match_schemas` performs a basic check that a datum written with the
@@ -204,29 +246,45 @@ impl SchemaCompatibility {
     ///  matching the types, including schema promotion, and matching the full 
name for
     ///  named types. Aliases for named types are not supported here, and the 
rust
     ///  implementation of Avro in general does not include support for 
aliases (I think).
-    pub(crate) fn match_schemas(writers_schema: &Schema, readers_schema: 
&Schema) -> bool {
+    pub(crate) fn match_schemas(
+        writers_schema: &Schema,
+        readers_schema: &Schema,
+    ) -> Result<(), CompatibilityError> {
         let w_type = SchemaKind::from(writers_schema);
         let r_type = SchemaKind::from(readers_schema);
 
         if w_type == SchemaKind::Union || r_type == SchemaKind::Union {
-            return true;
+            return Ok(());
         }
 
         if w_type == r_type {
             if r_type.is_primitive() {
-                return true;
+                return Ok(());
             }
 
             match r_type {
                 SchemaKind::Record => {
                     if let Schema::Record(RecordSchema { name: w_name, .. }) = 
writers_schema {
                         if let Schema::Record(RecordSchema { name: r_name, .. 
}) = readers_schema {
-                            return w_name.name == r_name.name;
+                            if w_name.name == r_name.name {
+                                return Ok(());
+                            } else {
+                                return Err(CompatibilityError::NameMismatch {
+                                    writer_name: w_name.name.clone(),
+                                    reader_name: r_name.name.clone(),
+                                });
+                            }
                         } else {
-                            unreachable!("readers_schema should have been 
Schema::Record")
+                            return Err(CompatibilityError::TypeExpected {
+                                schema_type: String::from("readers_schema"),
+                                expected_type: &[SchemaKind::Record],
+                            });
                         }
                     } else {
-                        unreachable!("writers_schema should have been 
Schema::Record")
+                        return Err(CompatibilityError::TypeExpected {
+                            schema_type: String::from("writers_schema"),
+                            expected_type: &[SchemaKind::Record],
+                        });
                     }
                 }
                 SchemaKind::Fixed => {
@@ -246,23 +304,39 @@ impl SchemaCompatibility {
                             attributes: _,
                         }) = readers_schema
                         {
-                            return w_name.name == r_name.name && w_size == 
r_size;
+                            return (w_name.name == r_name.name && w_size == 
r_size)
+                                .then_some(())
+                                .ok_or(CompatibilityError::FixedMismatch);
                         } else {
-                            unreachable!("readers_schema should have been 
Schema::Fixed")
+                            return Err(CompatibilityError::TypeExpected {
+                                schema_type: String::from("writers_schema"),
+                                expected_type: &[SchemaKind::Fixed],
+                            });
                         }
-                    } else {
-                        unreachable!("writers_schema should have been 
Schema::Fixed")
                     }
                 }
                 SchemaKind::Enum => {
                     if let Schema::Enum(EnumSchema { name: w_name, .. }) = 
writers_schema {
                         if let Schema::Enum(EnumSchema { name: r_name, .. }) = 
readers_schema {
-                            return w_name.name == r_name.name;
+                            if w_name.name == r_name.name {
+                                return Ok(());
+                            } else {
+                                return Err(CompatibilityError::NameMismatch {
+                                    writer_name: w_name.name.clone(),
+                                    reader_name: r_name.name.clone(),
+                                });
+                            }
                         } else {
-                            unreachable!("readers_schema should have been 
Schema::Enum")
+                            return Err(CompatibilityError::TypeExpected {
+                                schema_type: String::from("readers_schema"),
+                                expected_type: &[SchemaKind::Enum],
+                            });
                         }
                     } else {
-                        unreachable!("writers_schema should have been 
Schema::Enum")
+                        return Err(CompatibilityError::TypeExpected {
+                            schema_type: String::from("writers_schema"),
+                            expected_type: &[SchemaKind::Enum],
+                        });
                     }
                 }
                 SchemaKind::Map => {
@@ -270,10 +344,16 @@ impl SchemaCompatibility {
                         if let Schema::Map(r_m) = readers_schema {
                             return SchemaCompatibility::match_schemas(w_m, 
r_m);
                         } else {
-                            unreachable!("readers_schema should have been 
Schema::Map")
+                            return Err(CompatibilityError::TypeExpected {
+                                schema_type: String::from("readers_schema"),
+                                expected_type: &[SchemaKind::Map],
+                            });
                         }
                     } else {
-                        unreachable!("writers_schema should have been 
Schema::Map")
+                        return Err(CompatibilityError::TypeExpected {
+                            schema_type: String::from("writers_schema"),
+                            expected_type: &[SchemaKind::Map],
+                        });
                     }
                 }
                 SchemaKind::Array => {
@@ -281,45 +361,91 @@ impl SchemaCompatibility {
                         if let Schema::Array(r_a) = readers_schema {
                             return SchemaCompatibility::match_schemas(w_a, 
r_a);
                         } else {
-                            unreachable!("readers_schema should have been 
Schema::Array")
+                            return Err(CompatibilityError::TypeExpected {
+                                schema_type: String::from("readers_schema"),
+                                expected_type: &[SchemaKind::Array],
+                            });
                         }
                     } else {
-                        unreachable!("writers_schema should have been 
Schema::Array")
+                        return Err(CompatibilityError::TypeExpected {
+                            schema_type: String::from("writers_schema"),
+                            expected_type: &[SchemaKind::Array],
+                        });
                     }
                 }
-                _ => (),
+                _ => {
+                    return Err(CompatibilityError::Inconclusive(String::from(
+                        "readers_schema",
+                    )))
+                }
             };
         }
 
-        if w_type == SchemaKind::Int
-            && [SchemaKind::Long, SchemaKind::Float, SchemaKind::Double]
-                .iter()
-                .any(|&t| t == r_type)
-        {
-            return true;
-        }
-
-        if w_type == SchemaKind::Long
-            && [SchemaKind::Float, SchemaKind::Double]
-                .iter()
-                .any(|&t| t == r_type)
-        {
-            return true;
-        }
-
-        if w_type == SchemaKind::Float && r_type == SchemaKind::Double {
-            return true;
-        }
-
-        if w_type == SchemaKind::String && r_type == SchemaKind::Bytes {
-            return true;
-        }
-
-        if w_type == SchemaKind::Bytes && r_type == SchemaKind::String {
-            return true;
+        // Here are the checks for primitive types
+        match w_type {
+            SchemaKind::Int => {
+                if [SchemaKind::Long, SchemaKind::Float, SchemaKind::Double]
+                    .iter()
+                    .any(|&t| t == r_type)
+                {
+                    Ok(())
+                } else {
+                    Err(CompatibilityError::TypeExpected {
+                        schema_type: String::from("readers_schema"),
+                        expected_type: &[SchemaKind::Long, SchemaKind::Float, 
SchemaKind::Double],
+                    })
+                }
+            }
+            SchemaKind::Long => {
+                if [SchemaKind::Float, SchemaKind::Double]
+                    .iter()
+                    .any(|&t| t == r_type)
+                {
+                    Ok(())
+                } else {
+                    Err(CompatibilityError::TypeExpected {
+                        schema_type: String::from("readers_schema"),
+                        expected_type: &[SchemaKind::Float, 
SchemaKind::Double],
+                    })
+                }
+            }
+            SchemaKind::Float => {
+                if [SchemaKind::Float, SchemaKind::Double]
+                    .iter()
+                    .any(|&t| t == r_type)
+                {
+                    Ok(())
+                } else {
+                    Err(CompatibilityError::TypeExpected {
+                        schema_type: String::from("readers_schema"),
+                        expected_type: &[SchemaKind::Float, 
SchemaKind::Double],
+                    })
+                }
+            }
+            SchemaKind::String => {
+                if r_type == SchemaKind::Bytes {
+                    Ok(())
+                } else {
+                    Err(CompatibilityError::TypeExpected {
+                        schema_type: String::from("readers_schema"),
+                        expected_type: &[SchemaKind::Bytes],
+                    })
+                }
+            }
+            SchemaKind::Bytes => {
+                if r_type == SchemaKind::String {
+                    Ok(())
+                } else {
+                    Err(CompatibilityError::TypeExpected {
+                        schema_type: String::from("readers_schema"),
+                        expected_type: &[SchemaKind::String],
+                    })
+                }
+            }
+            _ => Err(CompatibilityError::Inconclusive(String::from(
+                "writers_schema",
+            ))),
         }
-
-        false
     }
 }
 
@@ -473,10 +599,11 @@ mod tests {
 
     #[test]
     fn test_broken() {
-        assert!(!SchemaCompatibility::can_read(
-            &int_string_union_schema(),
-            &int_union_schema()
-        ))
+        assert_eq!(
+            CompatibilityError::MissingUnionElements,
+            SchemaCompatibility::can_read(&int_string_union_schema(), 
&int_union_schema())
+                .unwrap_err()
+        )
     }
 
     #[test]
@@ -526,9 +653,9 @@ mod tests {
             (nested_record(), nested_optional_record()),
         ];
 
-        assert!(!incompatible_schemas
+        assert!(incompatible_schemas
             .iter()
-            .any(|(reader, writer)| SchemaCompatibility::can_read(writer, 
reader)));
+            .any(|(reader, writer)| SchemaCompatibility::can_read(writer, 
reader).is_err()));
     }
 
     #[test]
@@ -577,7 +704,7 @@ mod tests {
 
         assert!(compatible_schemas
             .iter()
-            .all(|(reader, writer)| SchemaCompatibility::can_read(writer, 
reader)));
+            .all(|(reader, writer)| SchemaCompatibility::can_read(writer, 
reader).is_ok()));
     }
 
     fn writer_schema() -> Schema {
@@ -601,14 +728,11 @@ mod tests {
       ]}
 "#,
         )?;
-        assert!(SchemaCompatibility::can_read(
-            &writer_schema(),
-            &reader_schema,
-        ));
-        assert!(!SchemaCompatibility::can_read(
-            &reader_schema,
-            &writer_schema()
-        ));
+        assert!(SchemaCompatibility::can_read(&writer_schema(), 
&reader_schema,).is_ok());
+        assert_eq!(
+            CompatibilityError::MissingDefaultValue(String::from("oldfield2")),
+            SchemaCompatibility::can_read(&reader_schema, 
&writer_schema()).unwrap_err()
+        );
 
         Ok(())
     }
@@ -622,14 +746,11 @@ mod tests {
         ]}
 "#,
         )?;
-        assert!(SchemaCompatibility::can_read(
-            &writer_schema(),
-            &reader_schema
-        ));
-        assert!(!SchemaCompatibility::can_read(
-            &reader_schema,
-            &writer_schema()
-        ));
+        assert!(SchemaCompatibility::can_read(&writer_schema(), 
&reader_schema).is_ok());
+        assert_eq!(
+            CompatibilityError::MissingDefaultValue(String::from("oldfield1")),
+            SchemaCompatibility::can_read(&reader_schema, 
&writer_schema()).unwrap_err()
+        );
 
         Ok(())
     }
@@ -644,14 +765,8 @@ mod tests {
         ]}
 "#,
         )?;
-        assert!(SchemaCompatibility::can_read(
-            &writer_schema(),
-            &reader_schema
-        ));
-        assert!(SchemaCompatibility::can_read(
-            &reader_schema,
-            &writer_schema()
-        ));
+        assert!(SchemaCompatibility::can_read(&writer_schema(), 
&reader_schema).is_ok());
+        assert!(SchemaCompatibility::can_read(&reader_schema, 
&writer_schema()).is_ok());
 
         Ok(())
     }
@@ -666,14 +781,11 @@ mod tests {
         ]}
 "#,
         )?;
-        assert!(SchemaCompatibility::can_read(
-            &writer_schema(),
-            &reader_schema
-        ));
-        assert!(!SchemaCompatibility::can_read(
-            &reader_schema,
-            &writer_schema()
-        ));
+        assert!(SchemaCompatibility::can_read(&writer_schema(), 
&reader_schema).is_ok());
+        assert_eq!(
+            CompatibilityError::MissingDefaultValue(String::from("oldfield2")),
+            SchemaCompatibility::can_read(&reader_schema, 
&writer_schema()).unwrap_err()
+        );
 
         Ok(())
     }
@@ -688,14 +800,14 @@ mod tests {
         ]}
 "#,
         )?;
-        assert!(!SchemaCompatibility::can_read(
-            &writer_schema(),
-            &reader_schema
-        ));
-        assert!(!SchemaCompatibility::can_read(
-            &reader_schema,
-            &writer_schema()
-        ));
+        assert_eq!(
+            CompatibilityError::MissingDefaultValue(String::from("newfield1")),
+            SchemaCompatibility::can_read(&writer_schema(), 
&reader_schema).unwrap_err()
+        );
+        assert_eq!(
+            CompatibilityError::MissingDefaultValue(String::from("oldfield2")),
+            SchemaCompatibility::can_read(&reader_schema, 
&writer_schema()).unwrap_err()
+        );
 
         Ok(())
     }
@@ -705,27 +817,24 @@ mod tests {
         let valid_reader = string_array_schema();
         let invalid_reader = string_map_schema();
 
-        assert!(SchemaCompatibility::can_read(
-            &string_array_schema(),
-            &valid_reader
-        ));
-        assert!(!SchemaCompatibility::can_read(
-            &string_array_schema(),
-            &invalid_reader
-        ));
+        assert!(SchemaCompatibility::can_read(&string_array_schema(), 
&valid_reader).is_ok());
+        assert_eq!(
+            CompatibilityError::Inconclusive(String::from("writers_schema")),
+            SchemaCompatibility::can_read(&string_array_schema(), 
&invalid_reader).unwrap_err()
+        );
     }
 
     #[test]
     fn test_primitive_writer_schema() {
         let valid_reader = Schema::String;
-        assert!(SchemaCompatibility::can_read(
-            &Schema::String,
-            &valid_reader
-        ));
-        assert!(!SchemaCompatibility::can_read(
-            &Schema::Int,
-            &Schema::String
-        ));
+        assert!(SchemaCompatibility::can_read(&Schema::String, 
&valid_reader).is_ok());
+        assert_eq!(
+            CompatibilityError::TypeExpected {
+                schema_type: String::from("readers_schema"),
+                expected_type: &[SchemaKind::Long, SchemaKind::Float, 
SchemaKind::Double],
+            },
+            SchemaCompatibility::can_read(&Schema::Int, 
&Schema::String).unwrap_err()
+        );
     }
 
     #[test]
@@ -734,8 +843,11 @@ mod tests {
         let union_writer = union_schema(vec![Schema::Int, Schema::String]);
         let union_reader = union_schema(vec![Schema::String]);
 
-        assert!(!SchemaCompatibility::can_read(&union_writer, &union_reader));
-        assert!(SchemaCompatibility::can_read(&union_reader, &union_writer));
+        assert_eq!(
+            CompatibilityError::MissingUnionElements,
+            SchemaCompatibility::can_read(&union_writer, 
&union_reader).unwrap_err()
+        );
+        assert!(SchemaCompatibility::can_read(&union_reader, 
&union_writer).is_ok());
     }
 
     #[test]
@@ -750,13 +862,22 @@ mod tests {
 
         let int_schema = Schema::parse_str(
             r#"
-      {"type":"record", "name":"MyRecord", "namespace":"ns", "fields": [
-        {"name":"field1", "type":"int"}
-      ]}
-"#,
+              {"type":"record", "name":"MyRecord", "namespace":"ns", "fields": 
[
+                {"name":"field1", "type":"int"}
+              ]}
+        "#,
         )?;
 
-        assert!(!SchemaCompatibility::can_read(&string_schema, &int_schema));
+        assert_eq!(
+            CompatibilityError::FieldTypeMismatch(
+                "field1".to_owned(),
+                Box::new(CompatibilityError::TypeExpected {
+                    schema_type: "readers_schema".to_owned(),
+                    expected_type: &[SchemaKind::Bytes]
+                })
+            ),
+            SchemaCompatibility::can_read(&string_schema, 
&int_schema).unwrap_err()
+        );
 
         Ok(())
     }
@@ -770,8 +891,11 @@ mod tests {
         )?;
         let enum_schema2 =
             Schema::parse_str(r#"{"type":"enum", "name":"MyEnum", 
"symbols":["A","B","C"]}"#)?;
-        assert!(!SchemaCompatibility::can_read(&enum_schema2, &enum_schema1));
-        assert!(SchemaCompatibility::can_read(&enum_schema1, &enum_schema2));
+        assert_eq!(
+            CompatibilityError::MissingSymbols,
+            SchemaCompatibility::can_read(&enum_schema2, 
&enum_schema1).unwrap_err()
+        );
+        assert!(SchemaCompatibility::can_read(&enum_schema1, 
&enum_schema2).is_ok());
 
         Ok(())
     }
@@ -843,10 +967,10 @@ mod tests {
     fn test_union_resolution_no_structure_match() {
         // short name match, but no structure match
         let read_schema = union_schema(vec![Schema::Null, 
point_3d_no_default_schema()]);
-        assert!(!SchemaCompatibility::can_read(
-            &point_2d_fullname_schema(),
-            &read_schema
-        ));
+        assert_eq!(
+            CompatibilityError::MissingUnionElements,
+            SchemaCompatibility::can_read(&point_2d_fullname_schema(), 
&read_schema).unwrap_err()
+        );
     }
 
     #[test]
@@ -858,10 +982,10 @@ mod tests {
             point_2d_schema(),
             point_3d_schema(),
         ]);
-        assert!(!SchemaCompatibility::can_read(
-            &point_2d_fullname_schema(),
-            &read_schema
-        ));
+        assert_eq!(
+            CompatibilityError::MissingUnionElements,
+            SchemaCompatibility::can_read(&point_2d_fullname_schema(), 
&read_schema).unwrap_err()
+        );
     }
 
     #[test]
@@ -873,10 +997,10 @@ mod tests {
             point_3d_schema(),
             point_2d_schema(),
         ]);
-        assert!(!SchemaCompatibility::can_read(
-            &point_2d_fullname_schema(),
-            &read_schema
-        ));
+        assert_eq!(
+            CompatibilityError::MissingUnionElements,
+            SchemaCompatibility::can_read(&point_2d_fullname_schema(), 
&read_schema).unwrap_err()
+        );
     }
 
     #[test]
@@ -888,10 +1012,10 @@ mod tests {
             point_3d_match_name_schema(),
             point_3d_schema(),
         ]);
-        assert!(!SchemaCompatibility::can_read(
-            &point_2d_fullname_schema(),
-            &read_schema
-        ));
+        assert_eq!(
+            CompatibilityError::MissingUnionElements,
+            SchemaCompatibility::can_read(&point_2d_fullname_schema(), 
&read_schema).unwrap_err()
+        );
     }
 
     #[test]
@@ -904,10 +1028,7 @@ mod tests {
             point_3d_schema(),
             point_2d_fullname_schema(),
         ]);
-        assert!(SchemaCompatibility::can_read(
-            &point_2d_fullname_schema(),
-            &read_schema
-        ));
+        assert!(SchemaCompatibility::can_read(&point_2d_fullname_schema(), 
&read_schema).is_ok());
     }
 
     #[test]
@@ -1078,7 +1199,7 @@ mod tests {
         let schema_v1 = Schema::parse_str(RAW_SCHEMA_V1)?;
         let schema_v2 = Schema::parse_str(RAW_SCHEMA_V2)?;
 
-        assert!(SchemaCompatibility::can_read(&schema_v1, &schema_v2));
+        assert!(SchemaCompatibility::can_read(&schema_v1, &schema_v2).is_ok());
 
         Ok(())
     }
@@ -1153,7 +1274,65 @@ mod tests {
         ];
 
         for (schema_1, schema_2) in schemas {
-            assert!(SchemaCompatibility::can_read(&schema_1, &schema_2));
+            assert!(SchemaCompatibility::can_read(&schema_1, 
&schema_2).is_ok());
+        }
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_can_read_compatibility_errors() -> TestResult {
+        let schemas = [
+            (
+                Schema::parse_str(
+                    r#"{
+                    "type": "record",
+                    "name": "StatisticsMap",
+                    "fields": [
+                        {"name": "average", "type": "int", "default": 0},
+                        {"name": "success", "type": {"type": "map", "values": 
"int"}}
+                    ]
+                }"#)?,
+                Schema::parse_str(
+                    r#"{
+                    "type": "record",
+                    "name": "StatisticsMap",
+                    "fields": [
+                        {"name": "average", "type": "int", "default": 0},
+                        {"name": "success", "type": ["null", {"type": "map", 
"values": "int"}], "default": null}
+                    ]
+                }"#)?,
+                "Incompatible schemata! Field 'success' in reader schema does 
not match the type in the writer schema"
+            ),
+            (
+                Schema::parse_str(
+                    r#"{
+                        "type": "record",
+                        "name": "StatisticsArray",
+                        "fields": [
+                            {"name": "max_values", "type": {"type": "array", 
"items": "int"}}
+                        ]
+                    }"#)?,
+                Schema::parse_str(
+                    r#"{
+                        "type": "record",
+                        "name": "StatisticsArray",
+                        "fields": [
+                            {"name": "max_values", "type": ["null", {"type": 
"array", "items": "int"}], "default": null}
+                        ]
+                    }"#)?,
+                "Incompatible schemata! Field 'max_values' in reader schema 
does not match the type in the writer schema"
+            )
+        ];
+
+        for (schema_1, schema_2, error) in schemas {
+            assert!(SchemaCompatibility::can_read(&schema_1, 
&schema_2).is_ok());
+            assert_eq!(
+                error,
+                SchemaCompatibility::can_read(&schema_2, &schema_1)
+                    .unwrap_err()
+                    .to_string()
+            );
         }
 
         Ok(())


Reply via email to