dramaticlly commented on code in PR #4717:
URL: https://github.com/apache/iceberg/pull/4717#discussion_r868762915


##########
python/src/iceberg/table/partitioning.py:
##########
@@ -64,3 +67,100 @@ def __str__(self):
 
     def __repr__(self):
         return f"PartitionField(field_id={self.field_id}, name={self.name}, 
transform={repr(self.transform)}, source_id={self.source_id})"
+
+    def __hash__(self):
+        return hash((self.source_id, self.field_id, self.name, self.transform))
+
+
+class PartitionSpec:
+    """
+    PartitionSpec capture the transformation from table data to partition 
values
+
+    Attributes:
+        schema(Schema): the schema of data table
+        spec_id(int): any change to PartitionSpec will produce a new specId
+        fields(List[PartitionField): list of partition fields to produce 
partition values
+        last_assigned_field_id(int): auto-increment partition field id 
starting from PARTITION_DATA_ID_START
+    """
+
+    PARTITION_DATA_ID_START: int = 1000
+
+    def __init__(self, schema: Schema, spec_id: int, fields: 
Tuple[PartitionField], last_assigned_field_id: int):
+        self._schema = schema
+        self._spec_id = spec_id
+        self._fields = fields
+        self._last_assigned_field_id = last_assigned_field_id
+        # derived
+        self.fields_by_source_id: Dict[int, List[PartitionField]] = {}
+
+    @property
+    def schema(self) -> Schema:
+        return self._schema
+
+    @property
+    def spec_id(self) -> int:
+        return self._spec_id
+
+    @property
+    def fields(self) -> Tuple[PartitionField]:
+        return self._fields
+
+    @property
+    def last_assigned_field_id(self) -> int:
+        return self._last_assigned_field_id
+
+    def __eq__(self, other):
+        return self.spec_id == other.spec_id and self.fields == other.fields
+
+    def __str__(self):
+        if self.is_unpartitioned():
+            return "[]"
+        else:
+            delimiter = "\n  "
+            partition_fields_in_str = (str(partition_field) for 
partition_field in self.fields)
+            head = f"[{delimiter}"
+            tail = f"\n]"
+            return f"{head}{delimiter.join(partition_fields_in_str)}{tail}"
+
+    def __repr__(self):
+        return f"PartitionSpec: {str(self)}"
+
+    def __hash__(self):
+        return hash((self.spec_id, self.fields))
+
+    def is_unpartitioned(self) -> bool:
+        return len(self.fields) < 1
+
+    def get_fields_by_source_id(self, filed_id: int) -> List[PartitionField]:
+        if not self.fields_by_source_id:
+            for partition_field in self.fields:
+                source_column = 
self.schema.find_column_name(partition_field.source_id)
+                if not source_column:
+                    raise ValueError(f"Cannot find source column: 
{partition_field.source_id}")
+                existing = 
self.fields_by_source_id.get(partition_field.source_id, [])
+                existing.append(partition_field)
+                self.fields_by_source_id[partition_field.source_id] = existing
+        return self.fields_by_source_id.get(filed_id)  # type: ignore

Review Comment:
   I think this relate to mypy hint when `dict.get(field_id)` will return 
`Optional[List[PartitionField]]` instead of `List[PartitionField]`
   
   do you prefer `self.fields_by_source_id[field_id]` or adjust the return type 
hint to be optional?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to