smaheshwar-pltr commented on code in PR #3220:
URL: https://github.com/apache/iceberg-python/pull/3220#discussion_r3261049094
##########
pyiceberg/catalog/__init__.py:
##########
@@ -444,6 +463,136 @@ def create_table_if_not_exists(
except TableAlreadyExistsError:
return self.load_table(identifier)
+ def replace_table(
+ self,
+ identifier: str | Identifier,
+ schema: Schema | pa.Schema,
+ location: str | None = None,
+ partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
+ sort_order: SortOrder = UNSORTED_SORT_ORDER,
+ properties: Properties = EMPTY_DICT,
+ ) -> Table:
+ """Atomically replace a table's schema, spec, sort order, location,
and properties.
+
+ The table UUID and history (snapshots, schemas, specs, sort orders)
are preserved.
+ The current snapshot is cleared (main branch ref is removed).
+
+ Args:
+ identifier (str | Identifier): Table identifier.
+ schema (Schema): New table schema.
+ location (str | None): New table location. Defaults to the
existing location.
+ partition_spec (PartitionSpec): New partition spec.
+ sort_order (SortOrder): New sort order.
+ properties (Properties): Properties to apply. Merged on top of the
existing
+ table properties: keys present here override existing values;
existing keys
+ not present here are preserved. To remove a property, follow
up with a
+ transaction that removes it explicitly.
+
+ Returns:
+ Table: the replaced table instance.
+
+ Raises:
+ NoSuchTableError: If the table does not exist.
+ TableAlreadyExistsError: If a view exists at the same identifier.
+ """
+ return self.replace_table_transaction(
+ identifier, schema, location, partition_spec, sort_order,
properties
+ ).commit_transaction()
+
+ def replace_table_transaction(
Review Comment:
Stub on the base (rather than `@abstractmethod`) so external `Catalog`
subclasses that don't override it keep instantiating — only callers of
`replace_table*` hit the `NotImplementedError`.
##########
pyiceberg/catalog/__init__.py:
##########
@@ -323,6 +328,20 @@ def delete_data_files(io: FileIO, manifests_to_delete:
list[ManifestFile]) -> No
deleted_files[path] = True
+def _raise_if_view_exists(catalog: Catalog, identifier: str | Identifier) ->
None:
Review Comment:
Mirrors Java's [view-collision pre-flight in
`RESTSessionCatalog.replaceTransaction`](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java#L1035-L1037).
`NotImplementedError` from `view_exists` is treated as "no view" for catalogs
that don't model them.
##########
pyiceberg/catalog/__init__.py:
##########
@@ -444,6 +463,136 @@ def create_table_if_not_exists(
except TableAlreadyExistsError:
return self.load_table(identifier)
+ def replace_table(
+ self,
+ identifier: str | Identifier,
+ schema: Schema | pa.Schema,
+ location: str | None = None,
+ partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
+ sort_order: SortOrder = UNSORTED_SORT_ORDER,
+ properties: Properties = EMPTY_DICT,
+ ) -> Table:
+ """Atomically replace a table's schema, spec, sort order, location,
and properties.
+
+ The table UUID and history (snapshots, schemas, specs, sort orders)
are preserved.
+ The current snapshot is cleared (main branch ref is removed).
+
+ Args:
+ identifier (str | Identifier): Table identifier.
+ schema (Schema): New table schema.
+ location (str | None): New table location. Defaults to the
existing location.
+ partition_spec (PartitionSpec): New partition spec.
+ sort_order (SortOrder): New sort order.
+ properties (Properties): Properties to apply. Merged on top of the
existing
+ table properties: keys present here override existing values;
existing keys
+ not present here are preserved. To remove a property, follow
up with a
+ transaction that removes it explicitly.
+
+ Returns:
+ Table: the replaced table instance.
+
+ Raises:
+ NoSuchTableError: If the table does not exist.
+ TableAlreadyExistsError: If a view exists at the same identifier.
+ """
+ return self.replace_table_transaction(
+ identifier, schema, location, partition_spec, sort_order,
properties
+ ).commit_transaction()
+
+ def replace_table_transaction(
+ self,
+ identifier: str | Identifier,
+ schema: Schema | pa.Schema,
+ location: str | None = None,
+ partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
+ sort_order: SortOrder = UNSORTED_SORT_ORDER,
+ properties: Properties = EMPTY_DICT,
+ ) -> ReplaceTableTransaction:
+ """Create a ReplaceTableTransaction.
+
+ The transaction can be used to stage additional changes (schema
evolution,
+ partition evolution, etc.) before committing.
+
+ Args:
+ identifier (str | Identifier): Table identifier.
+ schema (Schema): New table schema.
+ location (str | None): New table location. Defaults to the
existing location.
+ partition_spec (PartitionSpec): New partition spec.
+ sort_order (SortOrder): New sort order.
+ properties (Properties): Properties to apply. Merged on top of the
existing
+ table properties: keys present here override existing values;
existing keys
+ not present here are preserved. To remove a property, follow
up with a
+ transaction that removes it explicitly.
+
+ Returns:
+ ReplaceTableTransaction: A transaction for the replace operation.
+
+ Raises:
+ NoSuchTableError: If the table does not exist.
+ TableAlreadyExistsError: If a view exists at the same identifier.
+ """
+ raise NotImplementedError("replace_table_transaction is not supported
for this catalog type")
+
+ def _replace_staged_table(
+ self,
+ identifier: str | Identifier,
+ schema: Schema | pa.Schema,
+ location: str | None,
+ partition_spec: PartitionSpec,
+ sort_order: SortOrder,
+ properties: Properties,
+ ) -> tuple[StagedTable, Schema, PartitionSpec, SortOrder, str]:
+ """Load the existing table and build fresh schema/spec/sort-order for
replacement.
+
+ - reuses existing field IDs by name (from the current schema)
+ - reuses partition field IDs by `(source, transform)` across all specs
(v2+),
+ or carries forward the current spec with `VoidTransform`s (v1)
+ - reassigns sort field IDs against the fresh schema
+ - resolves `location` to the existing table's location when omitted
+
+ Returns:
+ A tuple `(staged_table, fresh_schema, fresh_partition_spec,
fresh_sort_order, resolved_location)`.
+ """
+ existing_table = self.load_table(identifier)
+ existing_metadata = existing_table.metadata
+
+ requested_format_version =
properties.get(TableProperties.FORMAT_VERSION)
+ if requested_format_version is not None and
int(requested_format_version) < existing_metadata.format_version:
+ raise ValueError(
+ f"Cannot downgrade format-version from
{existing_metadata.format_version} to {requested_format_version}"
Review Comment:
Java's
[`buildReplacement`](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/core/src/main/java/org/apache/iceberg/TableMetadata.java#L733-L738)
reads `format-version` from properties and only upgrades. Rejecting downgrade
explicitly here — otherwise `_convert_schema_if_needed` would run with v1
semantics while the actual upgrade silently drops, producing a confusing
mismatch.
##########
pyiceberg/catalog/__init__.py:
##########
@@ -444,6 +463,136 @@ def create_table_if_not_exists(
except TableAlreadyExistsError:
return self.load_table(identifier)
+ def replace_table(
+ self,
+ identifier: str | Identifier,
+ schema: Schema | pa.Schema,
+ location: str | None = None,
+ partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
+ sort_order: SortOrder = UNSORTED_SORT_ORDER,
+ properties: Properties = EMPTY_DICT,
+ ) -> Table:
+ """Atomically replace a table's schema, spec, sort order, location,
and properties.
+
+ The table UUID and history (snapshots, schemas, specs, sort orders)
are preserved.
+ The current snapshot is cleared (main branch ref is removed).
+
+ Args:
+ identifier (str | Identifier): Table identifier.
+ schema (Schema): New table schema.
+ location (str | None): New table location. Defaults to the
existing location.
+ partition_spec (PartitionSpec): New partition spec.
+ sort_order (SortOrder): New sort order.
+ properties (Properties): Properties to apply. Merged on top of the
existing
+ table properties: keys present here override existing values;
existing keys
+ not present here are preserved. To remove a property, follow
up with a
+ transaction that removes it explicitly.
+
+ Returns:
+ Table: the replaced table instance.
+
+ Raises:
+ NoSuchTableError: If the table does not exist.
+ TableAlreadyExistsError: If a view exists at the same identifier.
+ """
+ return self.replace_table_transaction(
+ identifier, schema, location, partition_spec, sort_order,
properties
+ ).commit_transaction()
+
+ def replace_table_transaction(
+ self,
+ identifier: str | Identifier,
+ schema: Schema | pa.Schema,
+ location: str | None = None,
+ partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
+ sort_order: SortOrder = UNSORTED_SORT_ORDER,
+ properties: Properties = EMPTY_DICT,
+ ) -> ReplaceTableTransaction:
+ """Create a ReplaceTableTransaction.
+
+ The transaction can be used to stage additional changes (schema
evolution,
+ partition evolution, etc.) before committing.
+
+ Args:
+ identifier (str | Identifier): Table identifier.
+ schema (Schema): New table schema.
+ location (str | None): New table location. Defaults to the
existing location.
+ partition_spec (PartitionSpec): New partition spec.
+ sort_order (SortOrder): New sort order.
+ properties (Properties): Properties to apply. Merged on top of the
existing
+ table properties: keys present here override existing values;
existing keys
+ not present here are preserved. To remove a property, follow
up with a
+ transaction that removes it explicitly.
+
+ Returns:
+ ReplaceTableTransaction: A transaction for the replace operation.
+
+ Raises:
+ NoSuchTableError: If the table does not exist.
+ TableAlreadyExistsError: If a view exists at the same identifier.
+ """
+ raise NotImplementedError("replace_table_transaction is not supported
for this catalog type")
+
+ def _replace_staged_table(
Review Comment:
Maps to Java's
[`TableMetadata.buildReplacement`](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/core/src/main/java/org/apache/iceberg/TableMetadata.java#L706-L746).
All the bookkeeping (fresh schema, partition spec, sort order, location
resolution, `StagedTable` construction) lives here so `MetastoreCatalog` and
`RestCatalog` share it — analogous to how `_create_staged_table` is factored.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]