Fokko commented on code in PR #4717:
URL: https://github.com/apache/iceberg/pull/4717#discussion_r894871233
##########
python/src/iceberg/table/partitioning.py:
##########
@@ -29,38 +36,81 @@ class PartitionField:
name(str): The name of this partition field
"""
- def __init__(self, source_id: int, field_id: int, transform: Transform,
name: str):
- self._source_id = source_id
- self._field_id = field_id
- self._transform = transform
- self._name = name
+ source_id: int
+ field_id: int
+ transform: Transform
+ name: str
+
+ def __str__(self):
+ return f"{self.field_id}: {self.name}:
{self.transform}({self.source_id})"
+
- @property
- def source_id(self) -> int:
- return self._source_id
+@dataclass(eq=False, frozen=True)
+class PartitionSpec:
+ """
+ PartitionSpec captures the transformation from table data to partition
values
- @property
- def field_id(self) -> int:
- return self._field_id
+ Attributes:
+ schema(Schema): the schema of data table
+ spec_id(int): any change to PartitionSpec will produce a new specId
+ fields(List[PartitionField): list of partition fields to produce
partition values
+ last_assigned_field_id(int): auto-increment partition field id
starting from PARTITION_DATA_ID_START
+ """
- @property
- def name(self) -> str:
- return self._name
+ schema: Schema
+ spec_id: int
+ fields: Tuple[PartitionField, ...]
+ last_assigned_field_id: int
+ source_id_to_fields_map: Dict[int, List[PartitionField]] =
field(init=False, repr=False)
- @property
- def transform(self) -> Transform:
- return self._transform
+ def __post_init__(self):
+ source_id_to_fields_map = dict()
+ for partition_field in self.fields:
+ source_column =
self.schema.find_column_name(partition_field.source_id)
+ if not source_column:
+ raise ValueError(f"Cannot find source column:
{partition_field.source_id}")
+ existing = source_id_to_fields_map.get(partition_field.source_id,
[])
+ existing.append(partition_field)
+ source_id_to_fields_map[partition_field.source_id] = existing
+ object.__setattr__(self, "source_id_to_fields_map",
source_id_to_fields_map)
def __eq__(self, other):
- return (
- self.field_id == other.field_id
- and self.source_id == other.source_id
- and self.name == other.name
- and self.transform == other.transform
- )
+ """
+ Produce a boolean to return True if two objects are considered equal
+
+ Note:
+ Equality of PartitionSpec is determined by spec_id and partition
fields only
+ """
+ if not isinstance(other, PartitionSpec):
+ return False
+ return self.spec_id == other.spec_id and self.fields == other.fields
def __str__(self):
- return f"{self.field_id}: {self.name}:
{self.transform}({self.source_id})"
+ """
+ Produce a human-readable string representation of PartitionSpec
- def __repr__(self):
- return f"PartitionField(field_id={self.field_id}, name={self.name},
transform={repr(self.transform)}, source_id={self.source_id})"
+ Note:
+ Only include list of partition fields in the PartitionSpec's
string representation
+ """
+ result_str = "["
+ if self.fields:
+ result_str += "\n " + "\n ".join([str(field) for field in
self.fields]) + "\n"
+ result_str += "]"
+ return result_str
+
+ def is_unpartitioned(self) -> bool:
+ return len(self.fields) < 1
+
+ def fields_by_source_id(self, field_id: int) -> List[PartitionField]:
+ return self.source_id_to_fields_map[field_id]
+
+ def compatible_with(self, other: "PartitionSpec") -> bool:
Review Comment:
We should also assert the number of fields. The following test is passing:
```python
def test_partition_compatible_with(table_schema_simple: Schema):
bucket_transform = bucket(IntegerType(), 4)
field1 = PartitionField(3, 100, bucket_transform, "id")
field2 = PartitionField(3, 102, bucket_transform, "id")
lhs = PartitionSpec(table_schema_simple, 0, (field1,), 1001)
rhs = PartitionSpec(table_schema_simple, 0, (field1, field2), 1001)
assert lhs.compatible_with(rhs)
```
<img width="1624" alt="image"
src="https://user-images.githubusercontent.com/1134248/173146156-8e4f9459-92f7-4cc2-ada9-a1a11582b402.png">
We also do this on the Java side:
https://github.com/apache/iceberg/blob/master/api/src/main/java/org/apache/iceberg/PartitionSpec.java#L211
The Python zip will only zip the number of fields that are in the shortest
list.
##########
python/src/iceberg/table/partitioning.py:
##########
@@ -29,38 +36,81 @@ class PartitionField:
name(str): The name of this partition field
"""
- def __init__(self, source_id: int, field_id: int, transform: Transform,
name: str):
- self._source_id = source_id
- self._field_id = field_id
- self._transform = transform
- self._name = name
+ source_id: int
+ field_id: int
+ transform: Transform
+ name: str
+
+ def __str__(self):
+ return f"{self.field_id}: {self.name}:
{self.transform}({self.source_id})"
+
- @property
- def source_id(self) -> int:
- return self._source_id
+@dataclass(eq=False, frozen=True)
+class PartitionSpec:
+ """
+ PartitionSpec captures the transformation from table data to partition
values
- @property
- def field_id(self) -> int:
- return self._field_id
+ Attributes:
+ schema(Schema): the schema of data table
+ spec_id(int): any change to PartitionSpec will produce a new specId
+ fields(List[PartitionField): list of partition fields to produce
partition values
+ last_assigned_field_id(int): auto-increment partition field id
starting from PARTITION_DATA_ID_START
+ """
- @property
- def name(self) -> str:
- return self._name
+ schema: Schema
+ spec_id: int
+ fields: Tuple[PartitionField, ...]
+ last_assigned_field_id: int
+ source_id_to_fields_map: Dict[int, List[PartitionField]] =
field(init=False, repr=False)
- @property
- def transform(self) -> Transform:
- return self._transform
+ def __post_init__(self):
+ source_id_to_fields_map = dict()
+ for partition_field in self.fields:
+ source_column =
self.schema.find_column_name(partition_field.source_id)
+ if not source_column:
+ raise ValueError(f"Cannot find source column:
{partition_field.source_id}")
+ existing = source_id_to_fields_map.get(partition_field.source_id,
[])
+ existing.append(partition_field)
+ source_id_to_fields_map[partition_field.source_id] = existing
+ object.__setattr__(self, "source_id_to_fields_map",
source_id_to_fields_map)
def __eq__(self, other):
- return (
- self.field_id == other.field_id
- and self.source_id == other.source_id
- and self.name == other.name
- and self.transform == other.transform
- )
+ """
+ Produce a boolean to return True if two objects are considered equal
+
+ Note:
+ Equality of PartitionSpec is determined by spec_id and partition
fields only
+ """
+ if not isinstance(other, PartitionSpec):
+ return False
+ return self.spec_id == other.spec_id and self.fields == other.fields
def __str__(self):
- return f"{self.field_id}: {self.name}:
{self.transform}({self.source_id})"
+ """
+ Produce a human-readable string representation of PartitionSpec
- def __repr__(self):
- return f"PartitionField(field_id={self.field_id}, name={self.name},
transform={repr(self.transform)}, source_id={self.source_id})"
+ Note:
+ Only include list of partition fields in the PartitionSpec's
string representation
+ """
+ result_str = "["
+ if self.fields:
+ result_str += "\n " + "\n ".join([str(field) for field in
self.fields]) + "\n"
+ result_str += "]"
+ return result_str
+
+ def is_unpartitioned(self) -> bool:
+ return len(self.fields) < 1
Review Comment:
supernit: We could shorten this to:
```suggestion
return not self.fields
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]