Fokko commented on PR #4858: URL: https://github.com/apache/iceberg/pull/4858#issuecomment-1140323416
Subclassing them in another module would be the way to go. In that class, we then add all the convenience methods and additional validation, while inheriting the (de)serialization of the actual data from the Open API spec. This would rewrite from PR https://github.com/apache/iceberg/pull/4717/ https://github.com/apache/iceberg/blob/f5de18f554711b4ae00d6ebefbaa126a7f83f792/python/src/iceberg/table/partitioning.py#L48-L111 To the following (tests are passing :): ```python from iceberg.openapi import rest_catalog class PartitionSpec(rest_catalog.PartitionSpec): """ PartitionSpec capture the transformation from table data to partition values Attributes: table_schema(IcebergSchema): the schema of data table """ # Fokko: I've aliased the schema to IcebergSchema, because we also have a schema in the open_api spec # This would go away later on table_schema: IcebergSchema = Field() _source_id_to_fields_map: Dict[int, List[PartitionField]] = Field(init=False) @root_validator def check_fields_in_schema(cls, values: Dict[str, Any]): schema: IcebergSchema = values['table_schema'] source_id_to_fields_map = dict() for partition_field in values['fields']: source_column = schema.find_column_name(partition_field.source_id) if not source_column: raise ValueError(f"Cannot find source column: {partition_field.source_id}") existing = source_id_to_fields_map.get(partition_field.source_id, []) existing.append(partition_field) source_id_to_fields_map[partition_field.source_id] = existing values["_source_id_to_fields_map"] = source_id_to_fields_map return values def __eq__(self, other): """ Equality check on spec_id and partition fields only """ return self.spec_id == other.spec_id and self.fields == other.fields def __str__(self): """ PartitionSpec str method highlight the partition field only """ result_str = "[" for partition_field in self.fields: result_str += f"\n {str(partition_field)}" if len(self.fields) > 0: result_str += "\n" result_str += "]" return result_str def is_unpartitioned(self) -> bool: return len(self.fields) < 1 def fields_by_source_id(self, field_id: int) -> List[PartitionField]: return self._source_id_to_fields_map[field_id] def compatible_with(self, other: "PartitionSpec") -> bool: """ Returns true if this partition spec is equivalent to the other, with partition field_id ignored. That is, if both specs have the same number of fields, field order, field name, source column ids, and transforms. """ return all( this_field.source_id == that_field.source_id and this_field.transform == that_field.transform and this_field.name == that_field.name for this_field, that_field in zip(self.fields, other.fields) ) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
