smaheshwar-pltr commented on code in PR #3220:
URL: https://github.com/apache/iceberg-python/pull/3220#discussion_r3261053061


##########
pyiceberg/table/__init__.py:
##########
@@ -1009,6 +1012,155 @@ def commit_transaction(self) -> Table:
         return self._table
 
 
+class ReplaceTableTransaction(Transaction):
+    """A transaction that replaces an existing table's schema, spec, sort 
order, location, and properties.
+
+    The existing table UUID, snapshots, snapshot log, metadata log, and 
history are preserved.
+    The "main" branch ref is removed (current-snapshot-id set to -1), and new
+    schema/spec/sort-order/location/properties are applied.
+    """
+
+    def __init__(
+        self,
+        table: StagedTable,
+        new_schema: Schema,
+        new_spec: PartitionSpec,
+        new_sort_order: SortOrder,
+        new_location: str,
+        new_properties: Properties,
+    ) -> None:
+        super().__init__(table, autocommit=False)
+        self._initial_changes(table.metadata, new_schema, new_spec, 
new_sort_order, new_location, new_properties)
+
+    def _initial_changes(
+        self,
+        table_metadata: TableMetadata,
+        new_schema: Schema,
+        new_spec: PartitionSpec,
+        new_sort_order: SortOrder,
+        new_location: str,
+        new_properties: Properties,
+    ) -> None:
+        """Set the initial changes that transform the existing table into the 
replacement.
+
+        Always emits `SetCurrentSchema` / `SetDefaultPartitionSpec` / 
`SetDefaultSortOrder`
+        (even when the resulting id is reused) so the request body 
unambiguously signals a
+        replace. Bumps `format-version` when the new properties request it.
+        """
+        # Upgrade format-version if requested via properties.
+        requested_format_version_str = 
new_properties.get(TableProperties.FORMAT_VERSION)
+        if requested_format_version_str is not None:
+            requested_format_version = int(requested_format_version_str)
+            if requested_format_version > table_metadata.format_version:
+                self._updates += 
(UpgradeFormatVersionUpdate(format_version=requested_format_version),)
+
+        # Remove the main branch ref to clear the current snapshot.
+        self._updates += (RemoveSnapshotRefUpdate(ref_name=MAIN_BRANCH),)

Review Comment:
   Only `main` is cleared, matching Java's 
[`buildReplacement`](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/core/src/main/java/org/apache/iceberg/TableMetadata.java#L739)
 which calls `removeRef(SnapshotRef.MAIN_BRANCH)`. Other branches / tags 
survive replace.



##########
pyiceberg/table/__init__.py:
##########
@@ -1009,6 +1012,155 @@ def commit_transaction(self) -> Table:
         return self._table
 
 
+class ReplaceTableTransaction(Transaction):

Review Comment:
   Same role as Java's 
[`Transactions.replaceTableTransaction`](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/core/src/main/java/org/apache/iceberg/Transactions.java#L39-L46)
 — collects the metadata updates that transform the existing table into the 
replacement and commits them with the replace-specific requirements set.



##########
pyiceberg/table/__init__.py:
##########
@@ -1009,6 +1012,155 @@ def commit_transaction(self) -> Table:
         return self._table
 
 
+class ReplaceTableTransaction(Transaction):
+    """A transaction that replaces an existing table's schema, spec, sort 
order, location, and properties.
+
+    The existing table UUID, snapshots, snapshot log, metadata log, and 
history are preserved.
+    The "main" branch ref is removed (current-snapshot-id set to -1), and new
+    schema/spec/sort-order/location/properties are applied.
+    """
+
+    def __init__(
+        self,
+        table: StagedTable,
+        new_schema: Schema,
+        new_spec: PartitionSpec,
+        new_sort_order: SortOrder,
+        new_location: str,
+        new_properties: Properties,
+    ) -> None:
+        super().__init__(table, autocommit=False)
+        self._initial_changes(table.metadata, new_schema, new_spec, 
new_sort_order, new_location, new_properties)
+
+    def _initial_changes(
+        self,
+        table_metadata: TableMetadata,
+        new_schema: Schema,
+        new_spec: PartitionSpec,
+        new_sort_order: SortOrder,
+        new_location: str,
+        new_properties: Properties,
+    ) -> None:
+        """Set the initial changes that transform the existing table into the 
replacement.
+
+        Always emits `SetCurrentSchema` / `SetDefaultPartitionSpec` / 
`SetDefaultSortOrder`
+        (even when the resulting id is reused) so the request body 
unambiguously signals a
+        replace. Bumps `format-version` when the new properties request it.
+        """
+        # Upgrade format-version if requested via properties.
+        requested_format_version_str = 
new_properties.get(TableProperties.FORMAT_VERSION)
+        if requested_format_version_str is not None:
+            requested_format_version = int(requested_format_version_str)
+            if requested_format_version > table_metadata.format_version:
+                self._updates += 
(UpgradeFormatVersionUpdate(format_version=requested_format_version),)
+
+        # Remove the main branch ref to clear the current snapshot.
+        self._updates += (RemoveSnapshotRefUpdate(ref_name=MAIN_BRANCH),)
+
+        # Schema: reuse an existing schema_id if structurally identical, else 
add a new one
+        # with a fresh schema_id (max + 1, matching UpdateSchema's convention).
+        existing_schema_id = self._find_matching_schema_id(table_metadata, 
new_schema)

Review Comment:
   Per Java's 
[`reuseOrCreateNewSchemaId`](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/core/src/main/java/org/apache/iceberg/TableMetadata.java#L1642-L1653)
 — walks all historical schemas, reuses the id if structurally identical, 
otherwise `max(id) + 1`. The unconditional `SetCurrentSchemaUpdate` (also for 
the reuse branch) mirrors Java's 
[`RESTSessionCatalog.replaceTransaction`](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java#L1060-L1064)
 which always ensures a `SetCurrentSchema` change is emitted.



##########
pyiceberg/table/__init__.py:
##########
@@ -1009,6 +1012,155 @@ def commit_transaction(self) -> Table:
         return self._table
 
 
+class ReplaceTableTransaction(Transaction):
+    """A transaction that replaces an existing table's schema, spec, sort 
order, location, and properties.
+
+    The existing table UUID, snapshots, snapshot log, metadata log, and 
history are preserved.
+    The "main" branch ref is removed (current-snapshot-id set to -1), and new
+    schema/spec/sort-order/location/properties are applied.
+    """
+
+    def __init__(
+        self,
+        table: StagedTable,
+        new_schema: Schema,
+        new_spec: PartitionSpec,
+        new_sort_order: SortOrder,
+        new_location: str,
+        new_properties: Properties,
+    ) -> None:
+        super().__init__(table, autocommit=False)
+        self._initial_changes(table.metadata, new_schema, new_spec, 
new_sort_order, new_location, new_properties)
+
+    def _initial_changes(
+        self,
+        table_metadata: TableMetadata,
+        new_schema: Schema,
+        new_spec: PartitionSpec,
+        new_sort_order: SortOrder,
+        new_location: str,
+        new_properties: Properties,
+    ) -> None:
+        """Set the initial changes that transform the existing table into the 
replacement.
+
+        Always emits `SetCurrentSchema` / `SetDefaultPartitionSpec` / 
`SetDefaultSortOrder`
+        (even when the resulting id is reused) so the request body 
unambiguously signals a
+        replace. Bumps `format-version` when the new properties request it.
+        """
+        # Upgrade format-version if requested via properties.
+        requested_format_version_str = 
new_properties.get(TableProperties.FORMAT_VERSION)
+        if requested_format_version_str is not None:
+            requested_format_version = int(requested_format_version_str)
+            if requested_format_version > table_metadata.format_version:
+                self._updates += 
(UpgradeFormatVersionUpdate(format_version=requested_format_version),)
+
+        # Remove the main branch ref to clear the current snapshot.
+        self._updates += (RemoveSnapshotRefUpdate(ref_name=MAIN_BRANCH),)
+
+        # Schema: reuse an existing schema_id if structurally identical, else 
add a new one
+        # with a fresh schema_id (max + 1, matching UpdateSchema's convention).
+        existing_schema_id = self._find_matching_schema_id(table_metadata, 
new_schema)
+        if existing_schema_id is not None:
+            self._updates += 
(SetCurrentSchemaUpdate(schema_id=existing_schema_id),)
+        else:
+            next_schema_id = max((s.schema_id for s in 
table_metadata.schemas), default=-1) + 1
+            schema_with_fresh_id = new_schema.model_copy(update={"schema_id": 
next_schema_id})
+            self._updates += (
+                AddSchemaUpdate(schema_=schema_with_fresh_id),
+                SetCurrentSchemaUpdate(schema_id=-1),
+            )
+
+        # Partition spec: same reuse-or-add pattern. Assign a fresh spec_id on 
add to avoid
+        # collisions with existing specs (AddPartitionSpecUpdate refuses 
duplicate IDs).
+        effective_spec = UNPARTITIONED_PARTITION_SPEC if 
new_spec.is_unpartitioned() else new_spec
+        existing_spec_id = self._find_matching_spec_id(table_metadata, 
effective_spec)

Review Comment:
   Per Java's 
[`reuseOrCreateNewSpecId`](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/core/src/main/java/org/apache/iceberg/TableMetadata.java#L1688-L1700).
 `SetDefaultSpecUpdate` is unconditional, also per the 
[`RESTSessionCatalog.replaceTransaction` 
block](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java#L1066-L1070).



##########
pyiceberg/table/__init__.py:
##########
@@ -1009,6 +1012,155 @@ def commit_transaction(self) -> Table:
         return self._table
 
 
+class ReplaceTableTransaction(Transaction):
+    """A transaction that replaces an existing table's schema, spec, sort 
order, location, and properties.
+
+    The existing table UUID, snapshots, snapshot log, metadata log, and 
history are preserved.
+    The "main" branch ref is removed (current-snapshot-id set to -1), and new
+    schema/spec/sort-order/location/properties are applied.
+    """
+
+    def __init__(
+        self,
+        table: StagedTable,
+        new_schema: Schema,
+        new_spec: PartitionSpec,
+        new_sort_order: SortOrder,
+        new_location: str,
+        new_properties: Properties,
+    ) -> None:
+        super().__init__(table, autocommit=False)
+        self._initial_changes(table.metadata, new_schema, new_spec, 
new_sort_order, new_location, new_properties)
+
+    def _initial_changes(
+        self,
+        table_metadata: TableMetadata,
+        new_schema: Schema,
+        new_spec: PartitionSpec,
+        new_sort_order: SortOrder,
+        new_location: str,
+        new_properties: Properties,
+    ) -> None:
+        """Set the initial changes that transform the existing table into the 
replacement.
+
+        Always emits `SetCurrentSchema` / `SetDefaultPartitionSpec` / 
`SetDefaultSortOrder`
+        (even when the resulting id is reused) so the request body 
unambiguously signals a
+        replace. Bumps `format-version` when the new properties request it.
+        """
+        # Upgrade format-version if requested via properties.
+        requested_format_version_str = 
new_properties.get(TableProperties.FORMAT_VERSION)
+        if requested_format_version_str is not None:
+            requested_format_version = int(requested_format_version_str)
+            if requested_format_version > table_metadata.format_version:
+                self._updates += 
(UpgradeFormatVersionUpdate(format_version=requested_format_version),)
+
+        # Remove the main branch ref to clear the current snapshot.
+        self._updates += (RemoveSnapshotRefUpdate(ref_name=MAIN_BRANCH),)
+
+        # Schema: reuse an existing schema_id if structurally identical, else 
add a new one
+        # with a fresh schema_id (max + 1, matching UpdateSchema's convention).
+        existing_schema_id = self._find_matching_schema_id(table_metadata, 
new_schema)
+        if existing_schema_id is not None:
+            self._updates += 
(SetCurrentSchemaUpdate(schema_id=existing_schema_id),)
+        else:
+            next_schema_id = max((s.schema_id for s in 
table_metadata.schemas), default=-1) + 1
+            schema_with_fresh_id = new_schema.model_copy(update={"schema_id": 
next_schema_id})
+            self._updates += (
+                AddSchemaUpdate(schema_=schema_with_fresh_id),
+                SetCurrentSchemaUpdate(schema_id=-1),
+            )
+
+        # Partition spec: same reuse-or-add pattern. Assign a fresh spec_id on 
add to avoid
+        # collisions with existing specs (AddPartitionSpecUpdate refuses 
duplicate IDs).
+        effective_spec = UNPARTITIONED_PARTITION_SPEC if 
new_spec.is_unpartitioned() else new_spec
+        existing_spec_id = self._find_matching_spec_id(table_metadata, 
effective_spec)
+        if existing_spec_id is not None:
+            self._updates += (SetDefaultSpecUpdate(spec_id=existing_spec_id),)
+        else:
+            next_spec_id = max((s.spec_id for s in 
table_metadata.partition_specs), default=-1) + 1
+            spec_with_fresh_id = PartitionSpec(*effective_spec.fields, 
spec_id=next_spec_id)
+            self._updates += (
+                AddPartitionSpecUpdate(spec=spec_with_fresh_id),
+                SetDefaultSpecUpdate(spec_id=-1),
+            )
+
+        # Sort order: same reuse-or-add pattern with fresh order_id on add.
+        effective_sort_order = UNSORTED_SORT_ORDER if 
new_sort_order.is_unsorted else new_sort_order
+        existing_order_id = self._find_matching_sort_order_id(table_metadata, 
effective_sort_order)

Review Comment:
   Per Java's 
[`reuseOrCreateNewSortOrderId`](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/core/src/main/java/org/apache/iceberg/TableMetadata.java#L1736-L1752).
 Unsorted reuses id 0 in Java; the `effective_sort_order` substitution above 
achieves the same.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to