Fokko commented on code in PR #6323:
URL: https://github.com/apache/iceberg/pull/6323#discussion_r1206031076
##########
python/pyiceberg/table/__init__.py:
##########
@@ -69,21 +71,268 @@
import ray
from duckdb import DuckDBPyConnection
+ from pyiceberg.catalog import Catalog
ALWAYS_TRUE = AlwaysTrue()
+class AlterTable:
+ _table: Table
+ _updates: Tuple[BaseTableUpdate, ...]
+
+ def __init__(self, table: Table, actions: Optional[Tuple[BaseTableUpdate,
...]] = None):
+ self._table = table
+ self._updates = actions or ()
+
+ def _append_updates(self, *new_updates: BaseTableUpdate) -> AlterTable:
+ """Appends updates to the set of staged updates
+
+ Args:
+ *new_updates: Any new updates
+
+ Raises:
+ ValueError: When the type of update is not unique.
+
+ Returns:
+ A new AlterTable object with the new updates appended
+ """
+ for new_update in new_updates:
+ type_new_update = type(new_update)
+ if any(type(update) == type_new_update for update in
self._updates):
+ raise ValueError(f"Updates in a single commit need to be
unique, duplicate: {type_new_update}")
+ return AlterTable(self._table, self._updates + new_updates)
+
+ def set_table_version(self, format_version: int) -> AlterTable:
+ """Sets the table to a certain version
+
+ Args:
+ format_version: The newly set version
+
+ Returns:
+ The alter table builder
+ """
+ if format_version not in {1, 2}:
+ raise ValueError(f"Format version not (yet) supported:
{format_version}")
+ return
self._append_updates(UpgradeFormatVersionUpdate(format_version=format_version))
+
+ def set_schema(self, new_schema: Schema) -> AlterTable:
+ """Set the schema, and updates the current-schema-id
+
+ Args:
+ new_schema: The new schema
+
+ Returns:
+ The alter table builder
+
+ Raises:
+ ValueError: When a schema with the same fields already exists
+ """
+ last_column_id = max(self._table.schema().highest_field_id,
new_schema.highest_field_id)
+
+ exists = [schema for schema in self._table.schemas().values() if
new_schema.fields == schema.fields]
+ if len(exists) > 0:
+ raise ValueError(f"Schema already exists, schema-id
{exists[0].schema_id}")
+
+ return self._append_updates(AddSchemaUpdate(schema_=new_schema,
last_column_id=last_column_id), SetCurrentSchemaUpdate())
+
+ def set_partition_spec(self, spec: PartitionSpec) -> AlterTable:
+ """Sets the partition spec, and updates the default-spec-id
+
+ Args:
+ spec: The new partition spec
+
+ Returns:
+ The alter table builder
+ """
+ return self._append_updates(AddPartitionSpecUpdate(spec=spec),
SetDefaultSpecUpdate())
+
+ def set_sort_order(self, sort_order: SortOrder) -> AlterTable:
+ """Sets the sort order, and updates the default-sort-order-id
+
+ Args:
+ sort_order: The new sort order
+
+ Returns:
+ The alter table builder
+ """
+ return self._append_updates(AddSortOrderUpdate(sort_order=sort_order),
SetDefaultSortOrderUpdate())
+
+ def set_properties(self, **updates: str) -> AlterTable:
+ """Set properties
+
+ When a property is already set, it will be overwritten
+
+ Args:
+ updates: The properties set on the table
+
+ Returns:
+ The alter table builder
+ """
+ return self._append_updates(SetPropertiesUpdate(updates=updates))
+
+ def unset_properties(self, *removals: str) -> AlterTable:
+ """Removes properties
+
+ Args:
+ removals: Properties to be removed
+
+ Returns:
+ The alter table builder
+ """
+ return self._append_updates(RemovePropertiesUpdate(removals=removals))
+
+ def update_location(self, location: str) -> AlterTable:
+ """Sets the new table location
+
+ Args:
+ location: The new location of the table
+
+ Returns:
+ The alter table builder
+ """
+ return self._append_updates(SetLocationUpdate(location=location))
+
+ def commit(self) -> Table:
+ """Commits the changes to the catalog
+
+ Returns:
+ The table with the updates applied
+ """
+ # Strip the catalog name
+ if len(self._updates) > 0:
+ table_response =
self._table.catalog.alter_table(self._table.identifier[1:], self._updates)
+ return Table(
+ self._table.identifier,
+ metadata=table_response.metadata,
+ metadata_location=table_response.metadata_location,
+ io=self._table.io,
+ catalog=self._table.catalog,
+ )
+ else:
+ return self._table
+
+
+class TableUpdateAction(Enum):
Review Comment:
I think keeping this in `__init__.py` makes more sense because otherwise, we
would have a lot of circular imports. They are very closely related. Happy to
move this if you think it should be separated in another file.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]