Fokko commented on code in PR #8374:
URL: https://github.com/apache/iceberg/pull/8374#discussion_r1306714078
##########
python/pyiceberg/table/__init__.py:
##########
@@ -934,206 +963,611 @@ def case_sensitive(self, case_sensitive: bool) ->
UpdateSchema:
return self
def add_column(
- self, name: str, type_var: IcebergType, doc: Optional[str] = None,
parent: Optional[str] = None, required: bool = False
+ self, path: Union[str, Tuple[str, ...]], field_type: IcebergType, doc:
Optional[str] = None, required: bool = False
) -> UpdateSchema:
"""Add a new column to a nested struct or Add a new top-level column.
Args:
- name: Name for the new column.
- type_var: Type for the new column.
+ path: Name for the new column.
+ field_type: Type for the new column.
doc: Documentation string for the new column.
- parent: Name of the parent struct to the column will be added to.
required: Whether the new column is required.
Returns:
- This for method chaining
+ This for method chaining.
"""
- if "." in name:
- raise ValueError(f"Cannot add column with ambiguous name: {name}")
+ path = (path,) if isinstance(path, str) else path
+
+ if "." in path[-1]:
+ raise ValueError(f"Cannot add column with ambiguous name:
{path[-1]}, provide a tuple instead")
if required and not self._allow_incompatible_changes:
# Table format version 1 and 2 cannot add required column because
there is no initial value
- raise ValueError(f"Incompatible change: cannot add required
column: {name}")
+ raise ValueError(f'Incompatible change: cannot add required
column: {".".join(path)}')
+
+ name = path[-1]
+ parent = path[:-1]
+
+ full_name = ".".join(path)
+ parent_id: int = TABLE_ROOT_ID
+
+ if len(parent) > 0:
+ parent_field = self._schema.find_field(".".join(parent),
self._case_sensitive)
+ parent_type = parent_field.field_type
+ if isinstance(parent_type, MapType):
+ parent_field = parent_type.value_field
+ elif isinstance(parent_type, ListType):
+ parent_field = parent_type.element_field
+
+ if not parent_field.field_type.is_struct:
+ raise ValueError(f"Cannot add column '{name}' to non-struct
type: {'.'.join(parent)}")
+
+ parent_id = parent_field.field_id
+
+ exists = False
+ try:
+ exists = self._schema.find_field(full_name, self._case_sensitive)
is not None
+ except ValueError:
+ pass
+
+ if exists:
+ raise ValueError(f"Cannot add column, name already exists:
{full_name}")
+
+ # assign new IDs in order
+ new_id = self.assign_new_column_id()
+
+ # update tracking for moves
+ self._added_name_to_id[full_name] = new_id
+
+ new_type = assign_fresh_schema_ids(field_type,
self.assign_new_column_id)
+ field = NestedField(field_id=new_id, name=name, field_type=new_type,
required=required, doc=doc)
+
+ self._adds[parent_id] = self._adds.get(parent_id, []) + [field]
- self._internal_add_column(parent, name, not required, type_var, doc)
return self
- def allow_incompatible_changes(self) -> UpdateSchema:
- """Allow incompatible changes to the schema.
+ def delete_column(self, path: Union[str, Tuple[str, ...]]) -> UpdateSchema:
+ """Deletes a column from a table.
+
+ Args:
+ path: The path to the column.
Returns:
- This for method chaining
+ The UpdateSchema with the delete operation staged.
"""
- self._allow_incompatible_changes = True
+ name = (path,) if isinstance(path, str) else path
+ full_name = ".".join(name)
+
+ field = self._schema.find_field(full_name, self._case_sensitive)
+
+ if field.field_id in self._adds:
+ raise ValueError(f"Cannot delete a column that has additions:
{full_name}")
+ if field.field_id in self._updates:
+ raise ValueError(f"Cannot delete a column that has updates:
{full_name}")
+
+ self._deletes.add(field.field_id)
+
return self
- def commit(self) -> None:
- """Apply the pending changes and commit."""
- new_schema = self._apply()
- updates = [
- AddSchemaUpdate(schema=new_schema,
last_column_id=new_schema.highest_field_id),
- SetCurrentSchemaUpdate(schema_id=-1),
- ]
- requirements =
[AssertCurrentSchemaId(current_schema_id=self._schema.schema_id)]
+ def rename_column(self, path_from: Union[str, Tuple[str, ...]], new_name:
str) -> UpdateSchema:
+ """Updates the name of a column.
+
+ Args:
+ path_from: The path to the column to be renamed.
+ new_name: The new path of the column.
+
+ Returns:
+ The UpdateSchema with the rename operation staged.
+ """
+ name_from = tuple(path_from.split(".")) if isinstance(path_from, str)
else path_from
Review Comment:
This was for updating the identifier fields. I've updated that to:
```python
if path_from in self._identifier_field_names:
self._identifier_field_names.remove(path_from)
self._identifier_field_names.add(f"{path_from[:-len(field_from.name)]}{new_name}")
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]