rdblue commented on code in PR #8374:
URL: https://github.com/apache/iceberg/pull/8374#discussion_r1305003741


##########
python/pyiceberg/table/__init__.py:
##########
@@ -934,206 +963,611 @@ def case_sensitive(self, case_sensitive: bool) -> 
UpdateSchema:
         return self
 
     def add_column(
-        self, name: str, type_var: IcebergType, doc: Optional[str] = None, 
parent: Optional[str] = None, required: bool = False
+        self, path: Union[str, Tuple[str, ...]], field_type: IcebergType, doc: 
Optional[str] = None, required: bool = False
     ) -> UpdateSchema:
         """Add a new column to a nested struct or Add a new top-level column.
 
         Args:
-            name: Name for the new column.
-            type_var: Type for the new column.
+            path: Name for the new column.
+            field_type: Type for the new column.
             doc: Documentation string for the new column.
-            parent: Name of the parent struct to the column will be added to.
             required: Whether the new column is required.
 
         Returns:
-            This for method chaining
+            This for method chaining.
         """
-        if "." in name:
-            raise ValueError(f"Cannot add column with ambiguous name: {name}")
+        path = (path,) if isinstance(path, str) else path
+
+        if "." in path[-1]:
+            raise ValueError(f"Cannot add column with ambiguous name: 
{path[-1]}, provide a tuple instead")
 
         if required and not self._allow_incompatible_changes:
             # Table format version 1 and 2 cannot add required column because 
there is no initial value
-            raise ValueError(f"Incompatible change: cannot add required 
column: {name}")
+            raise ValueError(f'Incompatible change: cannot add required 
column: {".".join(path)}')
+
+        name = path[-1]
+        parent = path[:-1]
+
+        full_name = ".".join(path)
+        parent_id: int = TABLE_ROOT_ID
+
+        if len(parent) > 0:
+            parent_field = self._schema.find_field(".".join(parent), 
self._case_sensitive)
+            parent_type = parent_field.field_type
+            if isinstance(parent_type, MapType):
+                parent_field = parent_type.value_field
+            elif isinstance(parent_type, ListType):
+                parent_field = parent_type.element_field
+
+            if not parent_field.field_type.is_struct:
+                raise ValueError(f"Cannot add column '{name}' to non-struct 
type: {'.'.join(parent)}")
+
+            parent_id = parent_field.field_id
+
+        exists = False
+        try:
+            exists = self._schema.find_field(full_name, self._case_sensitive) 
is not None
+        except ValueError:
+            pass
+
+        if exists:
+            raise ValueError(f"Cannot add column, name already exists: 
{full_name}")
+
+        # assign new IDs in order
+        new_id = self.assign_new_column_id()
+
+        # update tracking for moves
+        self._added_name_to_id[full_name] = new_id
+
+        new_type = assign_fresh_schema_ids(field_type, 
self.assign_new_column_id)
+        field = NestedField(field_id=new_id, name=name, field_type=new_type, 
required=required, doc=doc)
+
+        self._adds[parent_id] = self._adds.get(parent_id, []) + [field]
 
-        self._internal_add_column(parent, name, not required, type_var, doc)
         return self
 
-    def allow_incompatible_changes(self) -> UpdateSchema:
-        """Allow incompatible changes to the schema.
+    def delete_column(self, path: Union[str, Tuple[str, ...]]) -> UpdateSchema:
+        """Deletes a column from a table.
+
+        Args:
+            path: The path to the column.
 
         Returns:
-            This for method chaining
+            The UpdateSchema with the delete operation staged.
         """
-        self._allow_incompatible_changes = True
+        name = (path,) if isinstance(path, str) else path
+        full_name = ".".join(name)
+
+        field = self._schema.find_field(full_name, self._case_sensitive)
+
+        if field.field_id in self._adds:
+            raise ValueError(f"Cannot delete a column that has additions: 
{full_name}")
+        if field.field_id in self._updates:
+            raise ValueError(f"Cannot delete a column that has updates: 
{full_name}")
+
+        self._deletes.add(field.field_id)
+
         return self
 
-    def commit(self) -> None:
-        """Apply the pending changes and commit."""
-        new_schema = self._apply()
-        updates = [
-            AddSchemaUpdate(schema=new_schema, 
last_column_id=new_schema.highest_field_id),
-            SetCurrentSchemaUpdate(schema_id=-1),
-        ]
-        requirements = 
[AssertCurrentSchemaId(current_schema_id=self._schema.schema_id)]
+    def rename_column(self, path_from: Union[str, Tuple[str, ...]], new_name: 
str) -> UpdateSchema:
+        """Updates the name of a column.
+
+        Args:
+            path_from: The path to the column to be renamed.
+            new_name: The new path of the column.
+
+        Returns:
+            The UpdateSchema with the rename operation staged.
+        """
+        name_from = tuple(path_from.split(".")) if isinstance(path_from, str) 
else path_from
+        new_name = new_name.split(".")[-1]
 
-        if self._transaction is not None:
-            self._transaction._append_updates(*updates)  # pylint: 
disable=W0212
-            self._transaction._append_requirements(*requirements)  # pylint: 
disable=W0212
+        parent_path = name_from[:-1]
+        full_name_from = ".".join(name_from)
+
+        from_field = self._schema.find_field(full_name_from, 
self._case_sensitive)
+
+        if from_field.field_id in self._deletes:
+            raise ValueError(f"Cannot rename a column that will be deleted: 
{full_name_from}")
+
+        if updated := self._updates.get(from_field.field_id):
+            self._updates[from_field.field_id] = NestedField(
+                field_id=updated.field_id,
+                name=new_name,
+                field_type=updated.field_type,
+                doc=updated.doc,
+                required=updated.required,
+            )
         else:
-            table_update_response = self._table.catalog._commit_table(  # 
pylint: disable=W0212
-                CommitTableRequest(identifier=self._table.identifier[1:], 
updates=updates, requirements=requirements)
+            self._updates[from_field.field_id] = NestedField(
+                field_id=from_field.field_id,
+                name=new_name,
+                field_type=from_field.field_type,
+                doc=from_field.doc,
+                required=from_field.required,
             )
-            self._table.metadata = table_update_response.metadata
-            self._table.metadata_location = 
table_update_response.metadata_location
 
-    def _apply(self) -> Schema:
-        """Apply the pending changes to the original schema and returns the 
result.
+        if path_from in self._identifier_field_names:
+            self._identifier_field_names.remove(full_name_from)
+            
self._identifier_field_names.add(f"{'.'.join(parent_path)}.{new_name}")
+
+        return self
+
+    def require_column(self, path: Union[str, Tuple[str, ...]]) -> 
UpdateSchema:
+        """Makes a column required.
+
+        This is a breaking change since writers have to make sure that
+        this value is not-null.
+
+        Args:
+            path: The path to the field
 
         Returns:
-            the result Schema when all pending updates are applied
+            The UpdateSchema with the requirement change staged.
         """
-        return _apply_changes(self._schema, self._adds, 
self._identifier_field_names)
+        self._set_column_requirement(path, True)
+        return self
 
-    def _internal_add_column(
-        self, parent: Optional[str], name: str, is_optional: bool, type_var: 
IcebergType, doc: Optional[str]
-    ) -> None:
-        full_name: str = name
-        parent_id: int = TABLE_ROOT_ID
+    def make_column_optional(self, path: Union[str, Tuple[str, ...]]) -> 
UpdateSchema:
+        """Makes a column optional.
 
-        exist_field: Optional[NestedField] = None
-        if parent:
-            parent_field = self._schema.find_field(parent, 
self._case_sensitive)
-            parent_type = parent_field.field_type
-            if isinstance(parent_type, MapType):
-                parent_field = parent_type.value_field
-            elif isinstance(parent_type, ListType):
-                parent_field = parent_type.element_field
+        Args:
+            path: The path to the field.
 
-            if not parent_field.field_type.is_struct:
-                raise ValueError(f"Cannot add column to non-struct type: 
{parent}")
+        Returns:
+            The UpdateSchema with the requirement change staged.
+        """
+        self._set_column_requirement(path, False)
+        return self
 
-            parent_id = parent_field.field_id
+    def _set_column_requirement(self, path: Union[str, Tuple[str, ...]], 
required: bool) -> None:
+        path = (path,) if isinstance(path, str) else path
+        name = ".".join(path)
+
+        field = self._schema.find_field(name, self._case_sensitive)
 
-            try:
-                exist_field = self._schema.find_field(parent + "." + name, 
self._case_sensitive)
-            except ValueError:
-                pass
+        if (field.required and required) or (field.optional and not required):
+            # if the change is a noop, allow it even if 
allowIncompatibleChanges is false
+            return
 
-            if exist_field:
-                raise ValueError(f"Cannot add column, name already exists: 
{parent}.{name}")
+        if self._allow_incompatible_changes and not required:
+            raise ValueError(f"Cannot change column nullability: {name}: 
optional -> required")
 
-            full_name = parent_field.name + "." + name
+        if field.field_id in self._deletes:
+            raise ValueError(f"Cannot update a column that will be deleted: 
{name}")
 
+        if updated := self._updates.get(field.field_id):
+            self._updates[field.field_id] = NestedField(
+                field_id=updated.field_id,
+                name=updated.name,
+                field_type=updated.field_type,
+                doc=updated.doc,
+                required=required,
+            )
         else:
-            try:
-                exist_field = self._schema.find_field(name, 
self._case_sensitive)
-            except ValueError:
-                pass
+            self._updates[field.field_id] = NestedField(
+                field_id=field.field_id,
+                name=field.name,
+                field_type=field.field_type,
+                doc=field.doc,
+                required=required,
+            )
 
-            if exist_field:
-                raise ValueError(f"Cannot add column, name already exists: 
{name}")
+    def update_column(self, path: Union[str, Tuple[str, ...]], field_type: 
IcebergType) -> UpdateSchema:
+        """Update the type of column.
 
-        # assign new IDs in order
-        new_id = self.assign_new_column_id()
+        Args:
+            path: The path to the field.
+            field_type: The new type
 
-        # update tracking for moves
-        self._added_name_to_id[full_name] = new_id
+        Returns:
+            The UpdateSchema with the type update staged.
+        """
+        path = (path,) if isinstance(path, str) else path
+        full_name = ".".join(path)
+
+        field = self._schema.find_field(full_name, self._case_sensitive)
+
+        if field.field_id in self._deletes:
+            raise ValueError(f"Cannot update a column that will be deleted: 
{full_name}")
+
+        if field.field_type == field_type:
+            # Nothing changed
+            return self
+
+        try:
+            promote(field.field_type, field_type)
+        except ResolveError as e:
+            raise ValidationError(f"Cannot change column type: {full_name}: 
{field.field_type} -> {field_type}") from e
+
+        if updated := self._updates.get(field.field_id):
+            self._updates[field.field_id] = NestedField(
+                field_id=updated.field_id,
+                name=updated.name,
+                field_type=field_type,
+                doc=updated.doc,
+                required=updated.required,
+            )
+        else:
+            self._updates[field.field_id] = NestedField(
+                field_id=field.field_id,
+                name=field.name,
+                field_type=field_type,
+                doc=field.doc,
+                required=field.required,
+            )
 
-        new_type = assign_fresh_schema_ids(type_var, self.assign_new_column_id)
-        field = NestedField(new_id, name, new_type, not is_optional, doc)
+        return self
 
-        self._adds.setdefault(parent_id, []).append(field)
+    def update_column_doc(self, path: Union[str, Tuple[str, ...]], doc: str) 
-> UpdateSchema:
+        """Update the documentation of column.
+
+        Args:
+            path: The path to the field.
+            doc: The new documentation of the column
+
+        Returns:
+            The UpdateSchema with the doc update staged.
+        """
+        path = (path,) if isinstance(path, str) else path
+        full_name = ".".join(path)
+
+        field = self._schema.find_field(full_name, self._case_sensitive)
+
+        if field.field_id in self._deletes:
+            raise ValueError(f"Cannot update a column that will be deleted: 
{full_name}")
+
+        if field.doc == doc:
+            # Noop
+            return self
+
+        if updated := self._updates.get(field.field_id):
+            self._updates[field.field_id] = NestedField(
+                field_id=updated.field_id,
+                name=updated.name,
+                field_type=updated.field_type,
+                doc=doc,
+                required=updated.required,
+            )
+        else:
+            self._updates[field.field_id] = NestedField(
+                field_id=field.field_id,
+                name=field.name,
+                field_type=field.field_type,
+                doc=doc,
+                required=field.required,
+            )
+
+        return self
+
+    def _find_for_move(self, name: str) -> Optional[int]:
+        try:
+            return self._schema.find_field(name, self._case_sensitive).field_id
+        except ValueError:
+            pass
+
+        return self._added_name_to_id.get(name)
+
+    def _move(self, full_name: str, move: Move) -> None:

Review Comment:
   Why not keep `full_name` in `Move`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to