Fokko commented on code in PR #7873:
URL: https://github.com/apache/iceberg/pull/7873#discussion_r1263592703
##########
python/pyiceberg/utils/schema_conversion.py:
##########
@@ -468,3 +476,129 @@ def _convert_fixed_type(self, avro_type: Dict[str, Any])
-> FixedType:
An Iceberg equivalent fixed type.
"""
return FixedType(length=avro_type["size"])
+
+
+class ConvertSchemaToAvro(SchemaVisitorPerPrimitiveType[AvroType]):
+ """Converts an Iceberg schema to an Avro schema."""
+
+ schema_name: Optional[str]
+ last_list_field_id: int
+ last_map_key_field_id: int
+ last_map_value_field_id: int
+
+ def __init__(self, schema_name: Optional[str]) -> None:
+ """Converts an Iceberg schema to an Avro schema.
+
+ Args:
+ schema_name: The name of the root record.
+ """
+ self.schema_name = schema_name
+
+ def schema(self, schema: Schema, struct_result: AvroType) -> AvroType:
+ if isinstance(struct_result, dict) and self.schema_name is not None:
+ struct_result["name"] = self.schema_name
+ return struct_result
+
+ def before_list_element(self, element: NestedField) -> None:
+ self.last_list_field_id = element.field_id
+
+ def before_map_key(self, key: NestedField) -> None:
+ self.last_map_key_field_id = key.field_id
+
+ def before_map_value(self, value: NestedField) -> None:
+ self.last_map_value_field_id = value.field_id
+
+ def struct(self, struct: StructType, field_results: List[AvroType]) ->
AvroType:
+ return {"type": "record", "fields": field_results}
+
+ def field(self, field: NestedField, field_result: AvroType) -> AvroType:
+ # Sets the schema name
+ if isinstance(field_result, dict) and field_result.get("type") ==
"record":
+ field_result["name"] = f"r{field.field_id}"
+
+ result = {
+ "name": field.name,
+ "field-id": field.field_id,
+ "type": field_result if field.required else ["null", field_result],
+ }
+
+ if field.optional:
+ result["default"] = None
+
+ if field.doc is not None:
+ result["doc"] = field.doc
+
+ return result
+
+ def list(self, list_type: ListType, element_result: AvroType) -> AvroType:
+ # Sets the schema name in case of a record
+ if isinstance(element_result, dict) and element_result.get("type") ==
"record":
+ element_result["name"] = f"r{self.last_list_field_id}"
+ return {"type": "array", "element-id": self.last_list_field_id,
"items": element_result}
+
+ def map(self, map_type: MapType, key_result: AvroType, value_result:
AvroType) -> AvroType:
+ if isinstance(key_result, StringType):
+ # Avro Maps does not support other keys than a String,
+ return {
+ "type": "map",
+ "values": value_result,
+ }
+ else:
+ # Creates a logical map that's a list of schema's
+ # binary compatible
+ return {
+ "type": "array",
+ "items": {
+ "type": "record",
+ "name":
f"k{self.last_map_key_field_id}_v{self.last_map_value_field_id}",
+ "fields": [
+ {"name": "key", "type": key_result, "field-id":
self.last_map_key_field_id},
+ {"name": "value", "type": value_result, "field-id":
self.last_map_value_field_id},
+ ],
+ },
+ "logicalType": "map",
+ }
+
+ def visit_fixed(self, fixed_type: FixedType) -> AvroType:
+ return {"type": "fixed", "size": len(fixed_type)}
+
+ def visit_decimal(self, decimal_type: DecimalType) -> AvroType:
+ return {"type": "bytes", "logicalType": "decimal", "precision":
decimal_type.precision, "scale": decimal_type.scale}
+
+ def visit_boolean(self, boolean_type: BooleanType) -> AvroType:
+ return "boolean"
+
+ def visit_integer(self, integer_type: IntegerType) -> AvroType:
+ return "int"
+
+ def visit_long(self, long_type: LongType) -> AvroType:
+ return "long"
+
+ def visit_float(self, float_type: FloatType) -> AvroType:
+ return "float"
+
+ def visit_double(self, double_type: DoubleType) -> AvroType:
+ return "double"
+
+ def visit_date(self, date_type: DateType) -> AvroType:
+ return {"type": "int", "logicalType": "date"}
+
+ def visit_time(self, time_type: TimeType) -> AvroType:
+ return {"type": "long", "logicalType": "time-micros"}
+
+ def visit_timestamp(self, timestamp_type: TimestampType) -> AvroType:
+ # Iceberg only supports micro's
+ return {"type": "long", "logicalType": "timestamp-micros"}
+
+ def visit_timestamptz(self, timestamptz_type: TimestamptzType) -> AvroType:
+ # Iceberg only supports micro's
+ return {"type": "long", "logicalType": "timestamp-micros"}
Review Comment:
Great catch, this actually uncovered another bug. Thanks!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]