This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 6a344217 Partition Evolution Support (#245)
6a344217 is described below

commit 6a34421763b26229b16d6c2d4f7b331f01ff4c3b
Author: Amogh Jahagirdar <[email protected]>
AuthorDate: Wed Feb 28 06:54:33 2024 -0800

    Partition Evolution Support (#245)
---
 mkdocs/docs/api.md                            |  57 +++
 pyiceberg/partitioning.py                     | 131 ++++++-
 pyiceberg/table/__init__.py                   | 295 +++++++++++++++-
 pyiceberg/table/metadata.py                   |   3 +-
 tests/catalog/test_hive.py                    |   4 +-
 tests/integration/test_partition_evolution.py | 490 ++++++++++++++++++++++++++
 6 files changed, 966 insertions(+), 14 deletions(-)

diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md
index 53801922..724a45c5 100644
--- a/mkdocs/docs/api.md
+++ b/mkdocs/docs/api.md
@@ -418,6 +418,63 @@ with table.update_schema(allow_incompatible_changes=True) 
as update:
     update.delete_column("some_field")
 ```
 
+## Partition evolution
+
+PyIceberg supports partition evolution. See the [partition 
evolution](https://iceberg.apache.org/spec/#partition-evolution)
+for more details.
+
+The API to use when evolving partitions is the `update_spec` API on the table.
+
+```python
+with table.update_spec() as update:
+    update.add_field("id", BucketTransform(16), "bucketed_id")
+    update.add_field("event_ts", DayTransform(), "day_ts")
+```
+
+Updating the partition spec can also be done as part of a transaction with 
other operations.
+
+```python
+with table.transaction() as transaction:
+    with transaction.update_spec() as update_spec:
+        update_spec.add_field("id", BucketTransform(16), "bucketed_id")
+        update_spec.add_field("event_ts", DayTransform(), "day_ts")
+    # ... Update properties etc
+```
+
+### Add fields
+
+New partition fields can be added via the `add_field` API which takes in the 
field name to partition on,
+the partition transform, and an optional partition name. If the partition name 
is not specified,
+one will be created.
+
+```python
+with table.update_spec() as update:
+    update.add_field("id", BucketTransform(16), "bucketed_id")
+    update.add_field("event_ts", DayTransform(), "day_ts")
+    # identity is a shortcut API for adding an IdentityTransform
+    update.identity("some_field")
+```
+
+### Remove fields
+
+Partition fields can also be removed via the `remove_field` API if it no 
longer makes sense to partition on those fields.
+
+```python
+with table.update_spec() as update:some_partition_name
+    # Remove the partition field with the name
+    update.remove_field("some_partition_name")
+```
+
+### Rename fields
+
+Partition fields can also be renamed via the `rename_field` API.
+
+```python
+with table.update_spec() as update:
+    # Rename the partition field with the name bucketed_id to sharded_id
+    update.rename_field("bucketed_id", "sharded_id")
+```
+
 ## Table properties
 
 Set and remove properties through the `Transaction` API:
diff --git a/pyiceberg/partitioning.py b/pyiceberg/partitioning.py
index f6307f0f..cd5a957b 100644
--- a/pyiceberg/partitioning.py
+++ b/pyiceberg/partitioning.py
@@ -16,14 +16,9 @@
 # under the License.
 from __future__ import annotations
 
-from functools import cached_property
-from typing import (
-    Any,
-    Dict,
-    List,
-    Optional,
-    Tuple,
-)
+from abc import ABC, abstractmethod
+from functools import cached_property, singledispatch
+from typing import Any, Dict, Generic, List, Optional, Tuple, TypeVar
 
 from pydantic import (
     BeforeValidator,
@@ -34,7 +29,18 @@ from pydantic import (
 from typing_extensions import Annotated
 
 from pyiceberg.schema import Schema
-from pyiceberg.transforms import Transform, parse_transform
+from pyiceberg.transforms import (
+    BucketTransform,
+    DayTransform,
+    HourTransform,
+    IdentityTransform,
+    Transform,
+    TruncateTransform,
+    UnknownTransform,
+    VoidTransform,
+    YearTransform,
+    parse_transform,
+)
 from pyiceberg.typedef import IcebergBaseModel
 from pyiceberg.types import NestedField, StructType
 
@@ -143,7 +149,7 @@ class PartitionSpec(IcebergBaseModel):
     def last_assigned_field_id(self) -> int:
         if self.fields:
             return max(pf.field_id for pf in self.fields)
-        return PARTITION_FIELD_ID_START
+        return PARTITION_FIELD_ID_START - 1
 
     @cached_property
     def source_id_to_fields_map(self) -> Dict[int, List[PartitionField]]:
@@ -215,3 +221,108 @@ def assign_fresh_partition_spec_ids(spec: PartitionSpec, 
old_schema: Schema, fre
             )
         )
     return PartitionSpec(*partition_fields, spec_id=INITIAL_PARTITION_SPEC_ID)
+
+
+T = TypeVar("T")
+
+
+class PartitionSpecVisitor(Generic[T], ABC):
+    @abstractmethod
+    def identity(self, field_id: int, source_name: str, source_id: int) -> T:
+        """Visit identity partition field."""
+
+    @abstractmethod
+    def bucket(self, field_id: int, source_name: str, source_id: int, 
num_buckets: int) -> T:
+        """Visit bucket partition field."""
+
+    @abstractmethod
+    def truncate(self, field_id: int, source_name: str, source_id: int, width: 
int) -> T:
+        """Visit truncate partition field."""
+
+    @abstractmethod
+    def year(self, field_id: int, source_name: str, source_id: int) -> T:
+        """Visit year partition field."""
+
+    @abstractmethod
+    def month(self, field_id: int, source_name: str, source_id: int) -> T:
+        """Visit month partition field."""
+
+    @abstractmethod
+    def day(self, field_id: int, source_name: str, source_id: int) -> T:
+        """Visit day partition field."""
+
+    @abstractmethod
+    def hour(self, field_id: int, source_name: str, source_id: int) -> T:
+        """Visit hour partition field."""
+
+    @abstractmethod
+    def always_null(self, field_id: int, source_name: str, source_id: int) -> 
T:
+        """Visit void partition field."""
+
+    @abstractmethod
+    def unknown(self, field_id: int, source_name: str, source_id: int, 
transform: str) -> T:
+        """Visit unknown partition field."""
+        raise ValueError(f"Unknown transform is not supported: {transform}")
+
+
+class _PartitionNameGenerator(PartitionSpecVisitor[str]):
+    def identity(self, field_id: int, source_name: str, source_id: int) -> str:
+        return source_name
+
+    def bucket(self, field_id: int, source_name: str, source_id: int, 
num_buckets: int) -> str:
+        return f"{source_name}_bucket_{num_buckets}"
+
+    def truncate(self, field_id: int, source_name: str, source_id: int, width: 
int) -> str:
+        return source_name + "_trunc_" + str(width)
+
+    def year(self, field_id: int, source_name: str, source_id: int) -> str:
+        return source_name + "_year"
+
+    def month(self, field_id: int, source_name: str, source_id: int) -> str:
+        return source_name + "_month"
+
+    def day(self, field_id: int, source_name: str, source_id: int) -> str:
+        return source_name + "_day"
+
+    def hour(self, field_id: int, source_name: str, source_id: int) -> str:
+        return source_name + "_hour"
+
+    def always_null(self, field_id: int, source_name: str, source_id: int) -> 
str:
+        return source_name + "_null"
+
+    def unknown(self, field_id: int, source_name: str, source_id: int, 
transform: str) -> str:
+        return super().unknown(field_id, source_name, source_id, transform)
+
+
+R = TypeVar("R")
+
+
+@singledispatch
+def _visit(spec: PartitionSpec, schema: Schema, visitor: 
PartitionSpecVisitor[R]) -> List[R]:
+    return [_visit_partition_field(schema, field, visitor) for field in 
spec.fields]
+
+
+def _visit_partition_field(schema: Schema, field: PartitionField, visitor: 
PartitionSpecVisitor[R]) -> R:
+    source_name = schema.find_column_name(field.source_id)
+    if not source_name:
+        raise ValueError(f"Could not find field with id {field.source_id}")
+
+    transform = field.transform
+    if isinstance(transform, IdentityTransform):
+        return visitor.identity(field.field_id, source_name, field.source_id)
+    elif isinstance(transform, BucketTransform):
+        return visitor.bucket(field.field_id, source_name, field.source_id, 
transform.num_buckets)
+    elif isinstance(transform, TruncateTransform):
+        return visitor.truncate(field.field_id, source_name, field.source_id, 
transform.width)
+    elif isinstance(transform, DayTransform):
+        return visitor.day(field.field_id, source_name, field.source_id)
+    elif isinstance(transform, HourTransform):
+        return visitor.hour(field.field_id, source_name, field.source_id)
+    elif isinstance(transform, YearTransform):
+        return visitor.year(field.field_id, source_name, field.source_id)
+    elif isinstance(transform, VoidTransform):
+        return visitor.always_null(field.field_id, source_name, 
field.source_id)
+    elif isinstance(transform, UnknownTransform):
+        return visitor.unknown(field.field_id, source_name, field.source_id, 
repr(transform))
+    else:
+        raise ValueError(f"Unknown transform {transform}")
diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index 060f1377..e49f9400 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -51,6 +51,7 @@ from pyiceberg.expressions import (
     And,
     BooleanExpression,
     EqualTo,
+    Reference,
     parser,
     visitors,
 )
@@ -67,7 +68,15 @@ from pyiceberg.manifest import (
     write_manifest,
     write_manifest_list,
 )
-from pyiceberg.partitioning import PartitionSpec
+from pyiceberg.partitioning import (
+    INITIAL_PARTITION_SPEC_ID,
+    PARTITION_FIELD_ID_START,
+    IdentityTransform,
+    PartitionField,
+    PartitionSpec,
+    _PartitionNameGenerator,
+    _visit_partition_field,
+)
 from pyiceberg.schema import (
     PartnerAccessor,
     Schema,
@@ -99,6 +108,7 @@ from pyiceberg.table.snapshots import (
     update_snapshot_summaries,
 )
 from pyiceberg.table.sorting import SortOrder
+from pyiceberg.transforms import TimeTransform, Transform, VoidTransform
 from pyiceberg.typedef import (
     EMPTY_DICT,
     IcebergBaseModel,
@@ -372,6 +382,14 @@ class Transaction:
         """
         return UpdateSnapshot(self._table, self)
 
+    def update_spec(self) -> UpdateSpec:
+        """Create a new UpdateSpec to update the partitioning of the table.
+
+        Returns:
+            A new UpdateSpec.
+        """
+        return UpdateSpec(self._table, self)
+
     def remove_properties(self, *removals: str) -> Transaction:
         """Remove properties.
 
@@ -634,6 +652,43 @@ def _(update: SetCurrentSchemaUpdate, base_metadata: 
TableMetadata, context: _Ta
     return base_metadata.model_copy(update={"current_schema_id": 
new_schema_id})
 
 
+@_apply_table_update.register(AddPartitionSpecUpdate)
+def _(update: AddPartitionSpecUpdate, base_metadata: TableMetadata, context: 
_TableMetadataUpdateContext) -> TableMetadata:
+    for spec in base_metadata.partition_specs:
+        if spec.spec_id == update.spec.spec_id:
+            raise ValueError(f"Partition spec with id {spec.spec_id} already 
exists: {spec}")
+    context.add_update(update)
+    return base_metadata.model_copy(
+        update={
+            "partition_specs": base_metadata.partition_specs + [update.spec],
+            "last_partition_id": max(
+                max(field.field_id for field in update.spec.fields),
+                base_metadata.last_partition_id or PARTITION_FIELD_ID_START - 
1,
+            ),
+        }
+    )
+
+
+@_apply_table_update.register(SetDefaultSpecUpdate)
+def _(update: SetDefaultSpecUpdate, base_metadata: TableMetadata, context: 
_TableMetadataUpdateContext) -> TableMetadata:
+    new_spec_id = update.spec_id
+    if new_spec_id == -1:
+        new_spec_id = max(spec.spec_id for spec in 
base_metadata.partition_specs)
+    if new_spec_id == base_metadata.default_spec_id:
+        return base_metadata
+    found_spec_id = False
+    for spec in base_metadata.partition_specs:
+        found_spec_id = spec.spec_id == new_spec_id
+        if found_spec_id:
+            break
+
+    if not found_spec_id:
+        raise ValueError(f"Failed to find spec with id {new_spec_id}")
+
+    context.add_update(update)
+    return base_metadata.model_copy(update={"default_spec_id": new_spec_id})
+
+
 @_apply_table_update.register(AddSnapshotUpdate)
 def _(update: AddSnapshotUpdate, base_metadata: TableMetadata, context: 
_TableMetadataUpdateContext) -> TableMetadata:
     if len(base_metadata.schemas) == 0:
@@ -969,6 +1024,12 @@ class Table:
         """Return a dict of the sort orders of this table."""
         return {sort_order.order_id: sort_order for sort_order in 
self.metadata.sort_orders}
 
+    def last_partition_id(self) -> int:
+        """Return the highest assigned partition field ID across all specs or 
999 if only the unpartitioned spec exists."""
+        if self.metadata.last_partition_id:
+            return self.metadata.last_partition_id
+        return PARTITION_FIELD_ID_START - 1
+
     @property
     def properties(self) -> Dict[str, str]:
         """Properties of the table."""
@@ -1095,6 +1156,9 @@ class Table:
                 for data_file in data_files:
                     update_snapshot.append_data_file(data_file)
 
+    def update_spec(self, case_sensitive: bool = True) -> UpdateSpec:
+        return UpdateSpec(self, case_sensitive=case_sensitive)
+
     def refs(self) -> Dict[str, SnapshotRef]:
         """Return the snapshot references in the table."""
         return self.metadata.refs
@@ -2655,3 +2719,232 @@ class UpdateSnapshot:
             operation=Operation.OVERWRITE if self._table.current_snapshot() is 
not None else Operation.APPEND,
             transaction=self._transaction,
         )
+
+
+class UpdateSpec:
+    _table: Table
+    _name_to_field: Dict[str, PartitionField] = {}
+    _name_to_added_field: Dict[str, PartitionField] = {}
+    _transform_to_field: Dict[Tuple[int, str], PartitionField] = {}
+    _transform_to_added_field: Dict[Tuple[int, str], PartitionField] = {}
+    _renames: Dict[str, str] = {}
+    _added_time_fields: Dict[int, PartitionField] = {}
+    _case_sensitive: bool
+    _adds: List[PartitionField]
+    _deletes: Set[int]
+    _last_assigned_partition_id: int
+    _transaction: Optional[Transaction]
+
+    def __init__(self, table: Table, transaction: Optional[Transaction] = 
None, case_sensitive: bool = True) -> None:
+        self._table = table
+        self._name_to_field = {field.name: field for field in 
table.spec().fields}
+        self._name_to_added_field = {}
+        self._transform_to_field = {(field.source_id, repr(field.transform)): 
field for field in table.spec().fields}
+        self._transform_to_added_field = {}
+        self._adds = []
+        self._deletes = set()
+        self._last_assigned_partition_id = table.last_partition_id()
+        self._renames = {}
+        self._transaction = transaction
+        self._case_sensitive = case_sensitive
+        self._added_time_fields = {}
+
+    def add_field(
+        self,
+        source_column_name: str,
+        transform: Transform[Any, Any],
+        partition_field_name: Optional[str] = None,
+    ) -> UpdateSpec:
+        ref = Reference(source_column_name)
+        bound_ref = ref.bind(self._table.schema(), self._case_sensitive)
+        # verify transform can actually bind it
+        output_type = bound_ref.field.field_type
+        if not transform.can_transform(output_type):
+            raise ValueError(f"{transform} cannot transform {output_type} 
values from {bound_ref.field.name}")
+
+        transform_key = (bound_ref.field.field_id, repr(transform))
+        existing_partition_field = self._transform_to_field.get(transform_key)
+        if existing_partition_field and 
self._is_duplicate_partition(transform, existing_partition_field):
+            raise ValueError(f"Duplicate partition field for 
${ref.name}=${ref}, ${existing_partition_field} already exists")
+
+        added = self._transform_to_added_field.get(transform_key)
+        if added:
+            raise ValueError(f"Already added partition: {added.name}")
+
+        new_field = self._partition_field((bound_ref.field.field_id, 
transform), partition_field_name)
+        if new_field.name in self._name_to_added_field:
+            raise ValueError(f"Already added partition field with name: 
{new_field.name}")
+
+        if isinstance(new_field.transform, TimeTransform):
+            existing_time_field = 
self._added_time_fields.get(new_field.source_id)
+            if existing_time_field:
+                raise ValueError(f"Cannot add time partition field: 
{new_field.name} conflicts with {existing_time_field.name}")
+            self._added_time_fields[new_field.source_id] = new_field
+        self._transform_to_added_field[transform_key] = new_field
+
+        existing_partition_field = self._name_to_field.get(new_field.name)
+        if existing_partition_field and new_field.field_id not in 
self._deletes:
+            if isinstance(existing_partition_field.transform, VoidTransform):
+                self.rename_field(
+                    existing_partition_field.name, 
existing_partition_field.name + "_" + str(existing_partition_field.field_id)
+                )
+            else:
+                raise ValueError(f"Cannot add duplicate partition field name: 
{existing_partition_field.name}")
+
+        self._name_to_added_field[new_field.name] = new_field
+        self._adds.append(new_field)
+        return self
+
+    def add_identity(self, source_column_name: str) -> UpdateSpec:
+        return self.add_field(source_column_name, IdentityTransform(), None)
+
+    def remove_field(self, name: str) -> UpdateSpec:
+        added = self._name_to_added_field.get(name)
+        if added:
+            raise ValueError(f"Cannot delete newly added field {name}")
+        renamed = self._renames.get(name)
+        if renamed:
+            raise ValueError(f"Cannot rename and delete field {name}")
+        field = self._name_to_field.get(name)
+        if not field:
+            raise ValueError(f"No such partition field: {name}")
+
+        self._deletes.add(field.field_id)
+        return self
+
+    def rename_field(self, name: str, new_name: str) -> UpdateSpec:
+        existing_field = self._name_to_field.get(new_name)
+        if existing_field and isinstance(existing_field.transform, 
VoidTransform):
+            return self.rename_field(name, name + "_" + 
str(existing_field.field_id))
+        added = self._name_to_added_field.get(name)
+        if added:
+            raise ValueError("Cannot rename recently added partitions")
+        field = self._name_to_field.get(name)
+        if not field:
+            raise ValueError(f"Cannot find partition field {name}")
+        if field.field_id in self._deletes:
+            raise ValueError(f"Cannot delete and rename partition field 
{name}")
+        self._renames[name] = new_name
+        return self
+
+    def commit(self) -> None:
+        new_spec = self._apply()
+        if self._table.metadata.default_spec_id != new_spec.spec_id:
+            if new_spec.spec_id not in self._table.specs():
+                updates = [AddPartitionSpecUpdate(spec=new_spec), 
SetDefaultSpecUpdate(spec_id=-1)]
+            else:
+                updates = [SetDefaultSpecUpdate(spec_id=new_spec.spec_id)]
+
+            required_last_assigned_partitioned_id = 
self._table.last_partition_id()
+            requirements = 
[AssertLastAssignedPartitionId(last_assigned_partition_id=required_last_assigned_partitioned_id)]
+
+            if self._transaction is not None:
+                self._transaction._append_updates(*updates)  # pylint: 
disable=W0212
+                self._transaction._append_requirements(*requirements)  # 
pylint: disable=W0212
+            else:
+                
requirements.append(AssertDefaultSpecId(default_spec_id=self._table.spec().spec_id))
+                self._table._do_commit(updates=tuple(updates), 
requirements=tuple(requirements))  # pylint: disable=W0212
+
+    def __exit__(self, _: Any, value: Any, traceback: Any) -> None:
+        """Close and commit the change."""
+        return self.commit()
+
+    def __enter__(self) -> UpdateSpec:
+        """Update the table."""
+        return self
+
+    def _apply(self) -> PartitionSpec:
+        def _check_and_add_partition_name(schema: Schema, name: str, 
source_id: int, partition_names: Set[str]) -> None:
+            try:
+                field = schema.find_field(name)
+            except ValueError:
+                field = None
+
+            if source_id is not None and field is not None and field.field_id 
!= source_id:
+                raise ValueError(f"Cannot create identity partition from a 
different field in the schema {name}")
+            elif field is not None and source_id != field.field_id:
+                raise ValueError(f"Cannot create partition from name that 
exists in schema {name}")
+            if not name:
+                raise ValueError("Undefined name")
+            if name in partition_names:
+                raise ValueError(f"Partition name has to be unique: {name}")
+            partition_names.add(name)
+
+        def _add_new_field(
+            schema: Schema, source_id: int, field_id: int, name: str, 
transform: Transform[Any, Any], partition_names: Set[str]
+        ) -> PartitionField:
+            _check_and_add_partition_name(schema, name, source_id, 
partition_names)
+            return PartitionField(source_id, field_id, transform, name)
+
+        partition_fields = []
+        partition_names: Set[str] = set()
+        for field in self._table.spec().fields:
+            if field.field_id not in self._deletes:
+                renamed = self._renames.get(field.name)
+                if renamed:
+                    new_field = _add_new_field(
+                        self._table.schema(), field.source_id, field.field_id, 
renamed, field.transform, partition_names
+                    )
+                else:
+                    new_field = _add_new_field(
+                        self._table.schema(), field.source_id, field.field_id, 
field.name, field.transform, partition_names
+                    )
+                partition_fields.append(new_field)
+            elif self._table.format_version == 1:
+                renamed = self._renames.get(field.name)
+                if renamed:
+                    new_field = _add_new_field(
+                        self._table.schema(), field.source_id, field.field_id, 
renamed, VoidTransform(), partition_names
+                    )
+                else:
+                    new_field = _add_new_field(
+                        self._table.schema(), field.source_id, field.field_id, 
field.name, VoidTransform(), partition_names
+                    )
+
+                partition_fields.append(new_field)
+
+        for added_field in self._adds:
+            new_field = PartitionField(
+                source_id=added_field.source_id,
+                field_id=added_field.field_id,
+                transform=added_field.transform,
+                name=added_field.name,
+            )
+            partition_fields.append(new_field)
+
+        # Reuse spec id or create a new one.
+        new_spec = PartitionSpec(*partition_fields)
+        new_spec_id = INITIAL_PARTITION_SPEC_ID
+        for spec in self._table.specs().values():
+            if new_spec.compatible_with(spec):
+                new_spec_id = spec.spec_id
+                break
+            elif new_spec_id <= spec.spec_id:
+                new_spec_id = spec.spec_id + 1
+        return PartitionSpec(*partition_fields, spec_id=new_spec_id)
+
+    def _partition_field(self, transform_key: Tuple[int, Transform[Any, Any]], 
name: Optional[str]) -> PartitionField:
+        if self._table.metadata.format_version == 2:
+            source_id, transform = transform_key
+            historical_fields = []
+            for spec in self._table.specs().values():
+                for field in spec.fields:
+                    historical_fields.append((field.source_id, field.field_id, 
repr(field.transform), field.name))
+
+            for field_key in historical_fields:
+                if field_key[0] == source_id and field_key[2] == 
repr(transform):
+                    if name is None or field_key[3] == name:
+                        return PartitionField(source_id, field_key[1], 
transform, name)
+
+        new_field_id = self._new_field_id()
+        if name is None:
+            tmp_field = PartitionField(transform_key[0], new_field_id, 
transform_key[1], 'unassigned_field_name')
+            name = _visit_partition_field(self._table.schema(), tmp_field, 
_PartitionNameGenerator())
+        return PartitionField(transform_key[0], new_field_id, 
transform_key[1], name)
+
+    def _new_field_id(self) -> int:
+        self._last_assigned_partition_id += 1
+        return self._last_assigned_partition_id
+
+    def _is_duplicate_partition(self, transform: Transform[Any, Any], 
partition_field: PartitionField) -> bool:
+        return partition_field.field_id not in self._deletes and 
partition_field.transform == transform
diff --git a/pyiceberg/table/metadata.py b/pyiceberg/table/metadata.py
index a5dfb6ce..ea7a02f7 100644
--- a/pyiceberg/table/metadata.py
+++ b/pyiceberg/table/metadata.py
@@ -310,7 +310,8 @@ class TableMetadataV1(TableMetadataCommonFields, 
IcebergBaseModel):
                 data[PARTITION_SPECS] = [{"field-id": 0, "fields": ()}]
 
         data[LAST_PARTITION_ID] = max(
-            [field.get(FIELD_ID) for spec in data[PARTITION_SPECS] for field 
in spec[FIELDS]], default=PARTITION_FIELD_ID_START
+            [field.get(FIELD_ID) for spec in data[PARTITION_SPECS] for field 
in spec[FIELDS]],
+            default=PARTITION_FIELD_ID_START - 1,
         )
 
         return data
diff --git a/tests/catalog/test_hive.py b/tests/catalog/test_hive.py
index dc2689e0..e59b7599 100644
--- a/tests/catalog/test_hive.py
+++ b/tests/catalog/test_hive.py
@@ -277,7 +277,7 @@ def test_create_table(table_schema_simple: Schema, 
hive_database: HiveDatabase,
             )
         ],
         current_schema_id=0,
-        last_partition_id=1000,
+        last_partition_id=999,
         properties={"owner": "javaberg", 'write.parquet.compression-codec': 
'zstd'},
         partition_specs=[PartitionSpec()],
         default_spec_id=0,
@@ -330,7 +330,7 @@ def test_create_v1_table(table_schema_simple: Schema, 
hive_database: HiveDatabas
         schema=expected_schema,
         schemas=[expected_schema],
         current_schema_id=0,
-        last_partition_id=1000,
+        last_partition_id=999,
         properties={"owner": "javaberg", "write.parquet.compression-codec": 
"zstd"},
         partition_spec=[],
         partition_specs=[expected_spec],
diff --git a/tests/integration/test_partition_evolution.py 
b/tests/integration/test_partition_evolution.py
new file mode 100644
index 00000000..16feef56
--- /dev/null
+++ b/tests/integration/test_partition_evolution.py
@@ -0,0 +1,490 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# pylint:disable=redefined-outer-name
+
+import pytest
+
+from pyiceberg.catalog import Catalog, load_catalog
+from pyiceberg.exceptions import NoSuchTableError
+from pyiceberg.partitioning import PartitionField, PartitionSpec
+from pyiceberg.schema import Schema
+from pyiceberg.table import Table
+from pyiceberg.transforms import (
+    BucketTransform,
+    DayTransform,
+    HourTransform,
+    IdentityTransform,
+    MonthTransform,
+    TruncateTransform,
+    VoidTransform,
+    YearTransform,
+)
+from pyiceberg.types import (
+    LongType,
+    NestedField,
+    StringType,
+    TimestampType,
+)
+
+
[email protected]()
+def catalog_rest() -> Catalog:
+    return load_catalog(
+        "local",
+        **{
+            "type": "rest",
+            "uri": "http://localhost:8181";,
+            "s3.endpoint": "http://localhost:9000";,
+            "s3.access-key-id": "admin",
+            "s3.secret-access-key": "password",
+        },
+    )
+
+
[email protected]()
+def catalog_hive() -> Catalog:
+    return load_catalog(
+        "local",
+        **{
+            "type": "hive",
+            "uri": "http://localhost:9083";,
+            "s3.endpoint": "http://localhost:9000";,
+            "s3.access-key-id": "admin",
+            "s3.secret-access-key": "password",
+        },
+    )
+
+
+def _simple_table(catalog: Catalog, table_schema_simple: Schema) -> Table:
+    return _create_table_with_schema(catalog, table_schema_simple, "1")
+
+
+def _table(catalog: Catalog) -> Table:
+    schema_with_timestamp = Schema(
+        NestedField(1, "id", LongType(), required=False),
+        NestedField(2, "event_ts", TimestampType(), required=False),
+        NestedField(3, "str", StringType(), required=False),
+    )
+    return _create_table_with_schema(catalog, schema_with_timestamp, "1")
+
+
+def _table_v2(catalog: Catalog) -> Table:
+    schema_with_timestamp = Schema(
+        NestedField(1, "id", LongType(), required=False),
+        NestedField(2, "event_ts", TimestampType(), required=False),
+        NestedField(3, "str", StringType(), required=False),
+    )
+    return _create_table_with_schema(catalog, schema_with_timestamp, "2")
+
+
+def _create_table_with_schema(catalog: Catalog, schema: Schema, 
format_version: str) -> Table:
+    tbl_name = "default.test_schema_evolution"
+    try:
+        catalog.drop_table(tbl_name)
+    except NoSuchTableError:
+        pass
+    return catalog.create_table(identifier=tbl_name, schema=schema, 
properties={"format-version": format_version})
+
+
[email protected]
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'), 
pytest.lazy_fixture('catalog_rest')])
+def test_add_identity_partition(catalog: Catalog, table_schema_simple: Schema) 
-> None:
+    simple_table = _simple_table(catalog, table_schema_simple)
+    simple_table.update_spec().add_identity("foo").commit()
+    specs = simple_table.specs()
+    assert len(specs) == 2
+    spec = simple_table.spec()
+    assert spec.spec_id == 1
+    assert spec.last_assigned_field_id == 1000
+
+
[email protected]
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'), 
pytest.lazy_fixture('catalog_rest')])
+def test_add_year(catalog: Catalog) -> None:
+    table = _table(catalog)
+    table.update_spec().add_field("event_ts", YearTransform(), 
"year_transform").commit()
+    _validate_new_partition_fields(table, 1000, 1, 1000, PartitionField(2, 
1000, YearTransform(), "year_transform"))
+
+
[email protected]
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'), 
pytest.lazy_fixture('catalog_rest')])
+def test_add_month(catalog: Catalog) -> None:
+    table = _table(catalog)
+    table.update_spec().add_field("event_ts", MonthTransform(), 
"month_transform").commit()
+    _validate_new_partition_fields(table, 1000, 1, 1000, PartitionField(2, 
1000, MonthTransform(), "month_transform"))
+
+
[email protected]
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'), 
pytest.lazy_fixture('catalog_rest')])
+def test_add_day(catalog: Catalog) -> None:
+    table = _table(catalog)
+    table.update_spec().add_field("event_ts", DayTransform(), 
"day_transform").commit()
+    _validate_new_partition_fields(table, 1000, 1, 1000, PartitionField(2, 
1000, DayTransform(), "day_transform"))
+
+
[email protected]
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'), 
pytest.lazy_fixture('catalog_rest')])
+def test_add_hour(catalog: Catalog) -> None:
+    table = _table(catalog)
+    table.update_spec().add_field("event_ts", HourTransform(), 
"hour_transform").commit()
+    _validate_new_partition_fields(table, 1000, 1, 1000, PartitionField(2, 
1000, HourTransform(), "hour_transform"))
+
+
[email protected]
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'), 
pytest.lazy_fixture('catalog_rest')])
+def test_add_bucket(catalog: Catalog, table_schema_simple: Schema) -> None:
+    simple_table = _create_table_with_schema(catalog, table_schema_simple, "1")
+    simple_table.update_spec().add_field("foo", BucketTransform(12), 
"bucket_transform").commit()
+    _validate_new_partition_fields(simple_table, 1000, 1, 1000, 
PartitionField(1, 1000, BucketTransform(12), "bucket_transform"))
+
+
[email protected]
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'), 
pytest.lazy_fixture('catalog_rest')])
+def test_add_truncate(catalog: Catalog, table_schema_simple: Schema) -> None:
+    simple_table = _create_table_with_schema(catalog, table_schema_simple, "1")
+    simple_table.update_spec().add_field("foo", TruncateTransform(1), 
"truncate_transform").commit()
+    _validate_new_partition_fields(
+        simple_table, 1000, 1, 1000, PartitionField(1, 1000, 
TruncateTransform(1), "truncate_transform")
+    )
+
+
[email protected]
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'), 
pytest.lazy_fixture('catalog_rest')])
+def test_multiple_adds(catalog: Catalog) -> None:
+    table = _table(catalog)
+    table.update_spec().add_identity("id").add_field("event_ts", 
HourTransform(), "hourly_partitioned").add_field(
+        "str", TruncateTransform(2), "truncate_str"
+    ).commit()
+    _validate_new_partition_fields(
+        table,
+        1002,
+        1,
+        1002,
+        PartitionField(1, 1000, IdentityTransform(), "id"),
+        PartitionField(2, 1001, HourTransform(), "hourly_partitioned"),
+        PartitionField(3, 1002, TruncateTransform(2), "truncate_str"),
+    )
+
+
[email protected]
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'), 
pytest.lazy_fixture('catalog_rest')])
+def test_add_hour_to_day(catalog: Catalog) -> None:
+    table = _table(catalog)
+    table.update_spec().add_field("event_ts", DayTransform(), 
"daily_partitioned").commit()
+    table.update_spec().add_field("event_ts", HourTransform(), 
"hourly_partitioned").commit()
+    _validate_new_partition_fields(
+        table,
+        1001,
+        2,
+        1001,
+        PartitionField(2, 1000, DayTransform(), "daily_partitioned"),
+        PartitionField(2, 1001, HourTransform(), "hourly_partitioned"),
+    )
+
+
[email protected]
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'), 
pytest.lazy_fixture('catalog_rest')])
+def test_add_multiple_buckets(catalog: Catalog) -> None:
+    table = _table(catalog)
+    table.update_spec().add_field("id", BucketTransform(16)).add_field("id", 
BucketTransform(4)).commit()
+    _validate_new_partition_fields(
+        table,
+        1001,
+        1,
+        1001,
+        PartitionField(1, 1000, BucketTransform(16), "id_bucket_16"),
+        PartitionField(1, 1001, BucketTransform(4), "id_bucket_4"),
+    )
+
+
[email protected]
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'), 
pytest.lazy_fixture('catalog_rest')])
+def test_remove_identity(catalog: Catalog) -> None:
+    table = _table(catalog)
+    table.update_spec().add_identity("id").commit()
+    table.update_spec().remove_field("id").commit()
+    assert len(table.specs()) == 3
+    assert table.spec().spec_id == 2
+    assert table.spec() == PartitionSpec(
+        PartitionField(source_id=1, field_id=1000, transform=VoidTransform(), 
name='id'), spec_id=2
+    )
+
+
[email protected]
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'), 
pytest.lazy_fixture('catalog_rest')])
+def test_remove_identity_v2(catalog: Catalog) -> None:
+    table_v2 = _table_v2(catalog)
+    table_v2.update_spec().add_identity("id").commit()
+    table_v2.update_spec().remove_field("id").commit()
+    assert len(table_v2.specs()) == 2
+    assert table_v2.spec().spec_id == 0
+    assert table_v2.spec() == PartitionSpec(spec_id=0)
+
+
[email protected]
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'), 
pytest.lazy_fixture('catalog_rest')])
+def test_remove_bucket(catalog: Catalog) -> None:
+    table = _table(catalog)
+    with table.update_spec() as update:
+        update.add_field("id", BucketTransform(16), "bucketed_id")
+        update.add_field("event_ts", DayTransform(), "day_ts")
+    with table.update_spec() as remove:
+        remove.remove_field("bucketed_id")
+
+    assert len(table.specs()) == 3
+    _validate_new_partition_fields(
+        table,
+        1001,
+        2,
+        1001,
+        PartitionField(source_id=1, field_id=1000, transform=VoidTransform(), 
name='bucketed_id'),
+        PartitionField(source_id=2, field_id=1001, transform=DayTransform(), 
name='day_ts'),
+    )
+
+
[email protected]
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'), 
pytest.lazy_fixture('catalog_rest')])
+def test_remove_bucket_v2(catalog: Catalog) -> None:
+    table_v2 = _table_v2(catalog)
+    with table_v2.update_spec() as update:
+        update.add_field("id", BucketTransform(16), "bucketed_id")
+        update.add_field("event_ts", DayTransform(), "day_ts")
+    with table_v2.update_spec() as remove:
+        remove.remove_field("bucketed_id")
+    assert len(table_v2.specs()) == 3
+    _validate_new_partition_fields(
+        table_v2, 1001, 2, 1001, PartitionField(source_id=2, field_id=1001, 
transform=DayTransform(), name='day_ts')
+    )
+
+
[email protected]
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'), 
pytest.lazy_fixture('catalog_rest')])
+def test_remove_day(catalog: Catalog) -> None:
+    table = _table(catalog)
+    with table.update_spec() as update:
+        update.add_field("id", BucketTransform(16), "bucketed_id")
+        update.add_field("event_ts", DayTransform(), "day_ts")
+    with table.update_spec() as remove:
+        remove.remove_field("day_ts")
+
+    assert len(table.specs()) == 3
+    _validate_new_partition_fields(
+        table,
+        1001,
+        2,
+        1001,
+        PartitionField(source_id=1, field_id=1000, 
transform=BucketTransform(16), name='bucketed_id'),
+        PartitionField(source_id=2, field_id=1001, transform=VoidTransform(), 
name='day_ts'),
+    )
+
+
[email protected]
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'), 
pytest.lazy_fixture('catalog_rest')])
+def test_remove_day_v2(catalog: Catalog) -> None:
+    table_v2 = _table_v2(catalog)
+    with table_v2.update_spec() as update:
+        update.add_field("id", BucketTransform(16), "bucketed_id")
+        update.add_field("event_ts", DayTransform(), "day_ts")
+    with table_v2.update_spec() as remove:
+        remove.remove_field("day_ts")
+    assert len(table_v2.specs()) == 3
+    _validate_new_partition_fields(
+        table_v2, 1000, 2, 1001, PartitionField(source_id=1, field_id=1000, 
transform=BucketTransform(16), name='bucketed_id')
+    )
+
+
[email protected]
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'), 
pytest.lazy_fixture('catalog_rest')])
+def test_rename(catalog: Catalog) -> None:
+    table = _table(catalog)
+    table.update_spec().add_identity("id").commit()
+    table.update_spec().rename_field("id", "sharded_id").commit()
+    assert len(table.specs()) == 3
+    assert table.spec().spec_id == 2
+    _validate_new_partition_fields(table, 1000, 2, 1000, PartitionField(1, 
1000, IdentityTransform(), "sharded_id"))
+
+
[email protected]
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'), 
pytest.lazy_fixture('catalog_rest')])
+def test_cannot_add_and_remove(catalog: Catalog) -> None:
+    table = _table(catalog)
+    with pytest.raises(ValueError) as exc_info:
+        table.update_spec().add_identity("id").remove_field("id").commit()
+    assert "Cannot delete newly added field id" in str(exc_info.value)
+
+
[email protected]
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'), 
pytest.lazy_fixture('catalog_rest')])
+def test_cannot_add_redundant_time_partition(catalog: Catalog) -> None:
+    table = _table(catalog)
+    with pytest.raises(ValueError) as exc_info:
+        table.update_spec().add_field("event_ts", YearTransform(), 
"year_transform").add_field(
+            "event_ts", HourTransform(), "hour_transform"
+        ).commit()
+    assert "Cannot add time partition field: hour_transform conflicts with 
year_transform" in str(exc_info.value)
+
+
[email protected]
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'), 
pytest.lazy_fixture('catalog_rest')])
+def test_cannot_delete_and_rename(catalog: Catalog) -> None:
+    table = _table(catalog)
+    with pytest.raises(ValueError) as exc_info:
+        table.update_spec().add_identity("id").commit()
+        table.update_spec().remove_field("id").rename_field("id", 
"sharded_id").commit()
+    assert "Cannot delete and rename partition field id" in str(exc_info.value)
+
+
[email protected]
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'), 
pytest.lazy_fixture('catalog_rest')])
+def test_cannot_rename_and_delete(catalog: Catalog) -> None:
+    table = _table(catalog)
+    with pytest.raises(ValueError) as exc_info:
+        table.update_spec().add_identity("id").commit()
+        table.update_spec().rename_field("id", 
"sharded_id").remove_field("id").commit()
+    assert "Cannot rename and delete field id" in str(exc_info.value)
+
+
[email protected]
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'), 
pytest.lazy_fixture('catalog_rest')])
+def test_cannot_add_same_tranform_for_same_field(catalog: Catalog) -> None:
+    table = _table(catalog)
+    with pytest.raises(ValueError) as exc_info:
+        table.update_spec().add_field("str", TruncateTransform(4), 
"truncated_str").add_field(
+            "str", TruncateTransform(4)
+        ).commit()
+    assert "Already added partition" in str(exc_info.value)
+
+
[email protected]
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'), 
pytest.lazy_fixture('catalog_rest')])
+def test_cannot_add_same_field_multiple_times(catalog: Catalog) -> None:
+    table = _table(catalog)
+    with pytest.raises(ValueError) as exc_info:
+        table.update_spec().add_field("id", IdentityTransform(), 
"duplicate").add_field(
+            "id", IdentityTransform(), "duplicate"
+        ).commit()
+    assert "Already added partition" in str(exc_info.value)
+
+
[email protected]
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'), 
pytest.lazy_fixture('catalog_rest')])
+def test_cannot_add_multiple_specs_same_name(catalog: Catalog) -> None:
+    table = _table(catalog)
+    with pytest.raises(ValueError) as exc_info:
+        table.update_spec().add_field("id", IdentityTransform(), 
"duplicate").add_field(
+            "event_ts", IdentityTransform(), "duplicate"
+        ).commit()
+    assert "Already added partition" in str(exc_info.value)
+
+
[email protected]
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'), 
pytest.lazy_fixture('catalog_rest')])
+def test_change_specs_and_schema_transaction(catalog: Catalog) -> None:
+    table = _table(catalog)
+    with table.transaction() as transaction:
+        with transaction.update_spec() as update_spec:
+            update_spec.add_identity("id").add_field("event_ts", 
HourTransform(), "hourly_partitioned").add_field(
+                "str", TruncateTransform(2), "truncate_str"
+            )
+
+        with transaction.update_schema() as update_schema:
+            update_schema.add_column("col_string", StringType())
+
+    _validate_new_partition_fields(
+        table,
+        1002,
+        1,
+        1002,
+        PartitionField(1, 1000, IdentityTransform(), "id"),
+        PartitionField(2, 1001, HourTransform(), "hourly_partitioned"),
+        PartitionField(3, 1002, TruncateTransform(2), "truncate_str"),
+    )
+
+    assert table.schema() == Schema(
+        NestedField(field_id=1, name='id', field_type=LongType(), 
required=False),
+        NestedField(field_id=2, name='event_ts', field_type=TimestampType(), 
required=False),
+        NestedField(field_id=3, name='str', field_type=StringType(), 
required=False),
+        NestedField(field_id=4, name='col_string', field_type=StringType(), 
required=False),
+        schema_id=1,
+        identifier_field_ids=[],
+    )
+
+
[email protected]
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'), 
pytest.lazy_fixture('catalog_rest')])
+def test_multiple_adds_and_remove_v1(catalog: Catalog) -> None:
+    table = _table(catalog)
+    with table.update_spec() as update:
+        update.add_field("id", BucketTransform(16), "bucketed_id")
+        update.add_field("event_ts", DayTransform(), "day_ts")
+    with table.update_spec() as update:
+        update.remove_field("day_ts").remove_field("bucketed_id")
+    with table.update_spec() as update:
+        update.add_field("str", TruncateTransform(2), "truncated_str")
+    _validate_new_partition_fields(
+        table,
+        1002,
+        3,
+        1002,
+        PartitionField(1, 1000, VoidTransform(), "bucketed_id"),
+        PartitionField(2, 1001, VoidTransform(), "day_ts"),
+        PartitionField(3, 1002, TruncateTransform(2), "truncated_str"),
+    )
+
+
[email protected]
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'), 
pytest.lazy_fixture('catalog_rest')])
+def test_multiple_adds_and_remove_v2(catalog: Catalog) -> None:
+    table_v2 = _table_v2(catalog)
+    with table_v2.update_spec() as update:
+        update.add_field("id", BucketTransform(16), "bucketed_id")
+        update.add_field("event_ts", DayTransform(), "day_ts")
+    with table_v2.update_spec() as update:
+        update.remove_field("day_ts").remove_field("bucketed_id")
+    with table_v2.update_spec() as update:
+        update.add_field("str", TruncateTransform(2), "truncated_str")
+    _validate_new_partition_fields(table_v2, 1002, 2, 1002, PartitionField(3, 
1002, TruncateTransform(2), "truncated_str"))
+
+
[email protected]
[email protected]('catalog', [pytest.lazy_fixture('catalog_hive'), 
pytest.lazy_fixture('catalog_rest')])
+def test_multiple_remove_and_add_reuses_v2(catalog: Catalog) -> None:
+    table_v2 = _table_v2(catalog)
+    with table_v2.update_spec() as update:
+        update.add_field("id", BucketTransform(16), "bucketed_id")
+        update.add_field("event_ts", DayTransform(), "day_ts")
+    with table_v2.update_spec() as update:
+        update.remove_field("day_ts").remove_field("bucketed_id")
+    with table_v2.update_spec() as update:
+        update.add_field("id", BucketTransform(16), "bucketed_id")
+    _validate_new_partition_fields(table_v2, 1000, 2, 1001, PartitionField(1, 
1000, BucketTransform(16), "bucketed_id"))
+
+
+def _validate_new_partition_fields(
+    table: Table,
+    expected_spec_last_assigned_field_id: int,
+    expected_spec_id: int,
+    expected_metadata_last_assigned_field_id: int,
+    *expected_partition_fields: PartitionField,
+) -> None:
+    spec = table.spec()
+    assert spec.spec_id == expected_spec_id
+    assert spec.last_assigned_field_id == expected_spec_last_assigned_field_id
+    assert table.last_partition_id() == 
expected_metadata_last_assigned_field_id
+    assert len(spec.fields) == len(expected_partition_fields)
+    for i in range(len(spec.fields)):
+        assert spec.fields[i] == expected_partition_fields[i]


Reply via email to