kojiromike commented on a change in pull request #979:
URL: https://github.com/apache/avro/pull/979#discussion_r526945205
##########
File path: lang/py/avro/compatibility.py
##########
@@ -0,0 +1,314 @@
+from copy import copy
+from enum import Enum
+from typing import Dict, List, Optional, Set, cast
+
+from avro.errors import AvroRuntimeException
+from avro.schema import ArraySchema, EnumSchema, Field, FixedSchema,
MapSchema, NamedSchema, RecordSchema, Schema, UnionSchema
+
+
+class SchemaType(str, Enum):
+ ARRAY = "array"
+ BOOLEAN = "boolean"
+ BYTES = "bytes"
+ DOUBLE = "double"
+ ENUM = "enum"
+ FIXED = "fixed"
+ FLOAT = "float"
+ INT = "int"
+ LONG = "long"
+ MAP = "map"
+ NULL = "null"
+ RECORD = "record"
+ STRING = "string"
+ UNION = "union"
+
+
+class SchemaCompatibilityType(Enum):
+ compatible = "compatible"
+ incompatible = "incompatible"
+ recursion_in_progress = "recursion_in_progress"
+
+
+class SchemaIncompatibilityType(Enum):
+ name_mismatch = "name_mismatch"
+ fixed_size_mismatch = "fixed_size_mismatch"
+ missing_enum_symbols = "missing_enum_symbols"
+ reader_field_missing_default_value = "reader_field_missing_default_value"
+ type_mismatch = "type_mismatch"
+ missing_union_branch = "missing_union_branch"
+
+
+PRIMITIVE_TYPES = {
+ SchemaType.NULL, SchemaType.BOOLEAN, SchemaType.INT,
+ SchemaType.LONG, SchemaType.FLOAT, SchemaType.DOUBLE,
+ SchemaType.BYTES, SchemaType.STRING
+}
+
+
+class SchemaCompatibilityResult:
+ def __init__(
+ self,
+ compatibility: SchemaCompatibilityType =
SchemaCompatibilityType.recursion_in_progress,
+ incompatibilities: List[SchemaIncompatibilityType] = None,
+ messages: Optional[Set[str]] = None,
+ locations: Optional[Set[str]] = None,
+ ):
+ self.locations = locations or {"/"}
+ self.messages = messages or set()
+ self.compatibility = compatibility
+ self.incompatibilities = incompatibilities or []
+
+ def merged_with(self, that):
+ """
+ Merges the current {@code SchemaCompatibilityResult} with the supplied
result
+ into a new instance, combining the list of Incompatibilities and
regressing to the
+ SchemaCompatibilityType.incompatible state if any incompatibilities
are encountered.
+ :param that: SchemaCompatibilityResult
+ :return: SchemaCompatibilityResult
+ """
+ that = cast(SchemaCompatibilityResult, that)
+ merged = [*copy(self.incompatibilities), *copy(that.incompatibilities)]
+ if self.compatibility is SchemaCompatibilityType.compatible:
+ compat = that.compatibility
+ messages = that.messages
+ locations = that.locations
+ else:
+ compat = self.compatibility
+ messages = self.messages.union(that.messages)
+ locations = self.locations.union(that.locations)
+ return SchemaCompatibilityResult(
+ compatibility=compat, incompatibilities=merged, messages=messages,
locations=locations
+ )
+
+
+CompatibleResult =
SchemaCompatibilityResult(SchemaCompatibilityType.compatible)
+
+
+class ReaderWriter:
+ def __init__(self, reader: Schema, writer: Schema) -> None:
+ self.reader, self.writer = reader, writer
+
+ def __hash__(self) -> SchemaType.INT:
+ return id(self.reader) ^ id(self.writer)
+
+ def __eq__(self, other) -> bool:
+ if not isinstance(other, ReaderWriter):
+ return False
+ return self.reader is other.reader and self.writer is other.writer
+
+
+class ReaderWriterCompatibilityChecker:
+ ROOT_REFERENCE_TOKEN = "/"
+
+ def __init__(self):
+ self.memoize_map: Dict[ReaderWriter, SchemaCompatibilityResult] = {}
+
+ def get_compatibility(
+ self,
+ reader: Schema,
+ writer: Schema,
+ reference_token: str = ROOT_REFERENCE_TOKEN,
+ location: Optional[List[str]] = None
+ ) -> SchemaCompatibilityResult:
+ if location is None:
+ location = []
+ pair = ReaderWriter(reader, writer)
+ if pair in self.memoize_map:
+ result = self.memoize_map[pair]
+ if result.compatibility is
SchemaCompatibilityType.recursion_in_progress:
+ result = CompatibleResult
+ else:
+ self.memoize_map[pair] = SchemaCompatibilityResult()
+ result = self.calculate_compatibility(reader, writer, location +
[reference_token])
+ self.memoize_map[pair] = result
+ return result
+
+ # pylSchemaType.INT: disable=too-many-return-statements
+ def calculate_compatibility(
+ self,
+ reader: Schema,
+ writer: Schema,
+ location: List[str],
+ ) -> SchemaCompatibilityResult:
+ """
+ Calculates the compatibility of a reader/writer schema pair. Will be
positive if the reader is capable of reading
+ whatever the writer may write
+ :param reader: avro.schema.Schema
+ :param writer: avro.schema.Schema
+ :param location: List[str]
+ :return: SchemaCompatibilityResult
+ """
+ assert reader is not None
+ assert writer is not None
+ result = CompatibleResult
+ if reader.type == writer.type:
+ if reader.type in PRIMITIVE_TYPES:
+ return result
+ if reader.type == SchemaType.ARRAY:
+ reader, writer = cast(ArraySchema, reader), cast(ArraySchema,
writer)
+ return result.merged_with(self.get_compatibility(reader.items,
writer.items, "items", location))
+ if reader.type == SchemaType.MAP:
+ reader, writer = cast(MapSchema, reader), cast(MapSchema,
writer)
+ return
result.merged_with(self.get_compatibility(reader.values, writer.values,
"values", location))
+ if reader.type == SchemaType.FIXED:
+ reader, writer = cast(FixedSchema, reader), cast(FixedSchema,
writer)
+ result = result.merged_with(self.check_schema_names(reader,
writer, location))
+ return result.merged_with(self.check_fixed_size(reader,
writer, location))
+ if reader.type == SchemaType.ENUM:
+ reader, writer = cast(EnumSchema, reader), cast(EnumSchema,
writer)
+ result = result.merged_with(self.check_schema_names(reader,
writer, location))
+ return
result.merged_with(self.check_reader_enum_contains_writer_enum(reader, writer,
location))
+ if reader.type == SchemaType.RECORD:
+ reader, writer = cast(RecordSchema, reader),
cast(RecordSchema, writer)
+ result = result.merged_with(self.check_schema_names(reader,
writer, location))
+ return
result.merged_with(self.check_reader_writer_record_fields(reader, writer,
location))
+ if reader.type == SchemaType.UNION:
+ reader, writer = cast(UnionSchema, reader), cast(UnionSchema,
writer)
+ for i, writer_branch in enumerate(writer.schemas):
+ compat = self.get_compatibility(reader, writer_branch)
+ if compat.compatibility is
SchemaCompatibilityType.incompatible:
+ result = result.merged_with(
+ incompatible(
+ SchemaIncompatibilityType.missing_union_branch,
+ f"reader union lacking writer type:
{writer_branch.type.upper()}", location + [f"{i}"]
+ )
+ )
+ return result
+ raise AvroRuntimeException(f"Unknown schema type: {reader.type}")
+ if writer.type == SchemaType.UNION:
+ writer = cast(UnionSchema, writer)
+ for s in writer.schemas:
+ result = result.merged_with(self.get_compatibility(reader, s))
+ return result
+ if reader.type in {SchemaType.NULL, SchemaType.BOOLEAN,
SchemaType.INT}:
+ return result.merged_with(self.type_mismatch(reader, writer,
location))
+ if reader.type == SchemaType.LONG:
+ if writer.type == SchemaType.INT:
+ return result
+ return result.merged_with(self.type_mismatch(reader, writer,
location))
+ if reader.type == SchemaType.FLOAT:
+ if writer.type in {SchemaType.INT, SchemaType.LONG}:
+ return result
+ return result.merged_with(self.type_mismatch(reader, writer,
location))
+ if reader.type == SchemaType.DOUBLE:
+ if writer.type in {SchemaType.INT, SchemaType.LONG,
SchemaType.FLOAT}:
+ return result
+ return result.merged_with(self.type_mismatch(reader, writer,
location))
+ if reader.type == SchemaType.BYTES:
+ if writer.type == SchemaType.STRING:
+ return result
+ return result.merged_with(self.type_mismatch(reader, writer,
location))
+ if reader.type == SchemaType.STRING:
+ if writer.type == SchemaType.BYTES:
+ return result
+ return result.merged_with(self.type_mismatch(reader, writer,
location))
+ if reader.type in {SchemaType.ARRAY, SchemaType.MAP, SchemaType.FIXED,
SchemaType.ENUM, SchemaType.RECORD}:
+ return result.merged_with(self.type_mismatch(reader, writer,
location))
+ if reader.type == SchemaType.UNION:
+ reader = cast(UnionSchema, reader)
+ for reader_branch in reader.schemas:
+ compat = self.get_compatibility(reader_branch, writer)
+ if compat.compatibility is SchemaCompatibilityType.compatible:
+ return result
+ # No branch in reader compatible with writer
+ message = f"reader union lacking writer type {writer.type}"
+ return result.merged_with(
+ incompatible(
+ SchemaIncompatibilityType.missing_union_branch, message,
location
+ )
+ )
+ raise AvroRuntimeException(f"Unknown schema type: {reader.type}")
+
+ # pylSchemaType.INT: enable=too-many-return-statements
+
+ @staticmethod
Review comment:
There still seem to be some staticmethods in here :/
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]