Fokko commented on code in PR #8374:
URL: https://github.com/apache/iceberg/pull/8374#discussion_r1314249586


##########
python/pyiceberg/table/__init__.py:
##########
@@ -934,206 +976,630 @@ def case_sensitive(self, case_sensitive: bool) -> 
UpdateSchema:
         return self
 
     def add_column(
-        self, name: str, type_var: IcebergType, doc: Optional[str] = None, 
parent: Optional[str] = None, required: bool = False
+        self, path: Union[str, Tuple[str, ...]], field_type: IcebergType, doc: 
Optional[str] = None, required: bool = False
     ) -> UpdateSchema:
         """Add a new column to a nested struct or Add a new top-level column.
 
+        Because "." may be interpreted as a column path separator or may be 
used in field names, it
+        is not allowed to add nested column by passing in a string. To add to 
nested structures or
+        to add fields with names that contain "." use a tuple instead to 
indicate the path.
+
+        If type is a nested type, its field IDs are reassigned when added to 
the existing schema.
+
         Args:
-            name: Name for the new column.
-            type_var: Type for the new column.
+            path: Name for the new column.
+            field_type: Type for the new column.
             doc: Documentation string for the new column.
-            parent: Name of the parent struct to the column will be added to.
             required: Whether the new column is required.
 
         Returns:
-            This for method chaining
+            This for method chaining.
         """
-        if "." in name:
-            raise ValueError(f"Cannot add column with ambiguous name: {name}")
+        if isinstance(path, str):
+            if "." in path:
+                raise ValueError(f"Cannot add column with ambiguous name: 
{path}, provide a tuple instead")
+            path = (path,)
 
         if required and not self._allow_incompatible_changes:
             # Table format version 1 and 2 cannot add required column because 
there is no initial value
-            raise ValueError(f"Incompatible change: cannot add required 
column: {name}")
+            raise ValueError(f'Incompatible change: cannot add required 
column: {".".join(path)}')
+
+        name = path[-1]
+        parent = path[:-1]
+
+        full_name = ".".join(path)
+        parent_full_path = ".".join(parent)
+        parent_id: int = TABLE_ROOT_ID
+
+        if len(parent) > 0:
+            parent_field = self._schema.find_field(parent_full_path, 
self._case_sensitive)
+            parent_type = parent_field.field_type
+            if isinstance(parent_type, MapType):
+                parent_field = parent_type.value_field
+            elif isinstance(parent_type, ListType):
+                parent_field = parent_type.element_field
+
+            if not parent_field.field_type.is_struct:
+                raise ValueError(f"Cannot add column '{name}' to non-struct 
type: {'.'.join(parent)}")
+
+            parent_id = parent_field.field_id
+
+        existing_field = None
+        try:
+            existing_field = self._schema.find_field(full_name, 
self._case_sensitive)
+        except ValueError:
+            pass
+
+        if existing_field is not None and existing_field.field_id not in 
self._deletes:
+            raise ValueError(f"Cannot add column, name already exists: 
{full_name}")
+
+        # assign new IDs in order
+        new_id = self.assign_new_column_id()
+
+        # update tracking for moves
+        self._added_name_to_id[full_name] = new_id
+        self._id_to_parent[new_id] = parent_full_path
+
+        new_type = assign_fresh_schema_ids(field_type, 
self.assign_new_column_id)
+        field = NestedField(field_id=new_id, name=name, field_type=new_type, 
required=required, doc=doc)
+
+        if parent_id in self._adds:
+            self._adds[parent_id].append(field)
+        else:
+            self._adds[parent_id] = [field]
 
-        self._internal_add_column(parent, name, not required, type_var, doc)
         return self
 
-    def allow_incompatible_changes(self) -> UpdateSchema:
-        """Allow incompatible changes to the schema.
+    def delete_column(self, path: Union[str, Tuple[str, ...]]) -> UpdateSchema:
+        """Delete a column from a table.
+
+        Args:
+            path: The path to the column.
 
         Returns:
-            This for method chaining
+            The UpdateSchema with the delete operation staged.
         """
-        self._allow_incompatible_changes = True
+        name = (path,) if isinstance(path, str) else path
+        full_name = ".".join(name)
+
+        field = self._schema.find_field(full_name, 
case_sensitive=self._case_sensitive)
+
+        if field.field_id in self._adds:
+            raise ValueError(f"Cannot delete a column that has additions: 
{full_name}")
+        if field.field_id in self._updates:
+            raise ValueError(f"Cannot delete a column that has updates: 
{full_name}")
+
+        self._deletes.add(field.field_id)
+
         return self
 
-    def commit(self) -> None:
-        """Apply the pending changes and commit."""
-        new_schema = self._apply()
-        updates = [
-            AddSchemaUpdate(schema=new_schema, 
last_column_id=new_schema.highest_field_id),
-            SetCurrentSchemaUpdate(schema_id=-1),
-        ]
-        requirements = 
[AssertCurrentSchemaId(current_schema_id=self._schema.schema_id)]
+    def rename_column(self, path_from: Union[str, Tuple[str, ...]], new_name: 
str) -> UpdateSchema:
+        """Update the name of a column.
+
+        Args:
+            path_from: The path to the column to be renamed.
+            new_name: The new path of the column.
 
-        if self._transaction is not None:
-            self._transaction._append_updates(*updates)  # pylint: 
disable=W0212
-            self._transaction._append_requirements(*requirements)  # pylint: 
disable=W0212
+        Returns:
+            The UpdateSchema with the rename operation staged.
+        """
+        path_from = ".".join(path_from) if isinstance(path_from, tuple) else 
path_from
+        field_from = self._schema.find_field(path_from, self._case_sensitive)
+
+        if field_from.field_id in self._deletes:
+            raise ValueError(f"Cannot rename a column that will be deleted: 
{path_from}")
+
+        if updated := self._updates.get(field_from.field_id):
+            self._updates[field_from.field_id] = NestedField(
+                field_id=updated.field_id,
+                name=new_name,
+                field_type=updated.field_type,
+                doc=updated.doc,
+                required=updated.required,
+            )
         else:
-            table_update_response = self._table.catalog._commit_table(  # 
pylint: disable=W0212
-                CommitTableRequest(identifier=self._table.identifier[1:], 
updates=updates, requirements=requirements)
+            self._updates[field_from.field_id] = NestedField(
+                field_id=field_from.field_id,
+                name=new_name,
+                field_type=field_from.field_type,
+                doc=field_from.doc,
+                required=field_from.required,
             )
-            self._table.metadata = table_update_response.metadata
-            self._table.metadata_location = 
table_update_response.metadata_location
 
-    def _apply(self) -> Schema:
-        """Apply the pending changes to the original schema and returns the 
result.
+        if path_from in self._identifier_field_names:
+            self._identifier_field_names.remove(path_from)
+            
self._identifier_field_names.add(f"{path_from[:-len(field_from.name)]}{new_name}")
+
+        return self
+
+    def require_column(self, path: Union[str, Tuple[str, ...]]) -> 
UpdateSchema:
+        """Make a column required.
+
+        This is a breaking change since writers have to make sure that
+        this value is not-null.
+
+        Args:
+            path: The path to the field
 
         Returns:
-            the result Schema when all pending updates are applied
+            The UpdateSchema with the requirement change staged.
         """
-        return _apply_changes(self._schema, self._adds, 
self._identifier_field_names)
+        self._set_column_requirement(path, required=True)
+        return self
 
-    def _internal_add_column(
-        self, parent: Optional[str], name: str, is_optional: bool, type_var: 
IcebergType, doc: Optional[str]
-    ) -> None:
-        full_name: str = name
-        parent_id: int = TABLE_ROOT_ID
+    def make_column_optional(self, path: Union[str, Tuple[str, ...]]) -> 
UpdateSchema:
+        """Make a column optional.
 
-        exist_field: Optional[NestedField] = None
-        if parent:
-            parent_field = self._schema.find_field(parent, 
self._case_sensitive)
-            parent_type = parent_field.field_type
-            if isinstance(parent_type, MapType):
-                parent_field = parent_type.value_field
-            elif isinstance(parent_type, ListType):
-                parent_field = parent_type.element_field
+        Args:
+            path: The path to the field.
 
-            if not parent_field.field_type.is_struct:
-                raise ValueError(f"Cannot add column to non-struct type: 
{parent}")
+        Returns:
+            The UpdateSchema with the requirement change staged.
+        """
+        self._set_column_requirement(path, False)
+        return self
 
-            parent_id = parent_field.field_id
+    def set_identifier_fields(self, *fields: str) -> None:
+        self._identifier_field_names = set(fields)
 
-            try:
-                exist_field = self._schema.find_field(parent + "." + name, 
self._case_sensitive)
-            except ValueError:
-                pass
+    def _set_column_requirement(self, path: Union[str, Tuple[str, ...]], 
required: bool) -> None:
+        path = (path,) if isinstance(path, str) else path
+        name = ".".join(path)
 
-            if exist_field:
-                raise ValueError(f"Cannot add column, name already exists: 
{parent}.{name}")
+        field = self._schema.find_field(name, self._case_sensitive)
 
-            full_name = parent_field.name + "." + name
+        if (field.required and required) or (field.optional and not required):
+            # if the change is a noop, allow it even if 
allowIncompatibleChanges is false
+            return
 
+        if not self._allow_incompatible_changes and required:
+            raise ValueError(f"Cannot change column nullability: {name}: 
optional -> required")
+
+        if field.field_id in self._deletes:
+            raise ValueError(f"Cannot update a column that will be deleted: 
{name}")
+
+        if updated := self._updates.get(field.field_id):
+            self._updates[field.field_id] = NestedField(
+                field_id=updated.field_id,
+                name=updated.name,
+                field_type=updated.field_type,
+                doc=updated.doc,
+                required=required,
+            )
         else:
-            try:
-                exist_field = self._schema.find_field(name, 
self._case_sensitive)
-            except ValueError:
-                pass
+            self._updates[field.field_id] = NestedField(
+                field_id=field.field_id,
+                name=field.name,
+                field_type=field.field_type,
+                doc=field.doc,
+                required=required,
+            )
+
+    def update_column(
+        self,
+        path: Union[str, Tuple[str, ...]],
+        field_type: Optional[IcebergType] = None,
+        required: Optional[bool] = None,
+        doc: Optional[str] = None,
+    ) -> UpdateSchema:
+        """Update the type of column.
 
-            if exist_field:
-                raise ValueError(f"Cannot add column, name already exists: 
{name}")
+        Args:
+            path: The path to the field.
+            field_type: The new type
+            required: If the field should be required
+            doc: Documentation describing the column
 
-        # assign new IDs in order
-        new_id = self.assign_new_column_id()
+        Returns:
+            The UpdateSchema with the type update staged.
+        """
+        path = (path,) if isinstance(path, str) else path
+        full_name = ".".join(path)
+
+        field = self._schema.find_field(full_name, self._case_sensitive)
+
+        if field.field_id in self._deletes:
+            raise ValueError(f"Cannot update a column that will be deleted: 
{full_name}")
+
+        if field_type is not None:
+            if not self._allow_incompatible_changes and field.field_type != 
field_type:
+                try:
+                    promote(field.field_type, field_type)
+                except ResolveError as e:
+                    raise ValidationError(f"Cannot change column type: 
{full_name}: {field.field_type} -> {field_type}") from e
+
+        if updated := self._updates.get(field.field_id):
+            self._updates[field.field_id] = NestedField(
+                field_id=updated.field_id,
+                name=updated.name,
+                field_type=field_type or updated.field_type,
+                doc=doc or updated.doc,
+                required=updated.required,
+            )
+        else:
+            self._updates[field.field_id] = NestedField(
+                field_id=field.field_id,
+                name=field.name,
+                field_type=field_type or field.field_type,
+                doc=doc or field.doc,
+                required=field.required,
+            )
 
-        # update tracking for moves
-        self._added_name_to_id[full_name] = new_id
+        if required is not None:
+            self._set_column_requirement(path, required=required)
 
-        new_type = assign_fresh_schema_ids(type_var, self.assign_new_column_id)
-        field = NestedField(new_id, name, new_type, not is_optional, doc)
+        return self
+
+    def update_column_doc(self, path: Union[str, Tuple[str, ...]], doc: str) 
-> UpdateSchema:
+        """Update the documentation of column.
 
-        self._adds.setdefault(parent_id, []).append(field)
+        Args:
+            path: The path to the field.
+            doc: The new documentation of the column
+
+        Returns:
+            The UpdateSchema with the doc update staged.
+        """
+        path = (path,) if isinstance(path, str) else path
+        full_name = ".".join(path)
+
+        field = self._schema.find_field(full_name, self._case_sensitive)
+
+        if field.field_id in self._deletes:
+            raise ValueError(f"Cannot update a column that will be deleted: 
{full_name}")
+
+        if field.doc == doc:
+            # Noop
+            return self
+
+        if updated := self._updates.get(field.field_id):
+            self._updates[field.field_id] = NestedField(
+                field_id=updated.field_id,
+                name=updated.name,
+                field_type=updated.field_type,
+                doc=doc,
+                required=updated.required,
+            )
+        else:
+            self._updates[field.field_id] = NestedField(
+                field_id=field.field_id,
+                name=field.name,
+                field_type=field.field_type,
+                doc=doc,
+                required=field.required,
+            )
+
+        return self
+
+    def _find_for_move(self, name: str) -> Optional[int]:
+        try:
+            return self._schema.find_field(name, self._case_sensitive).field_id
+        except ValueError:
+            pass
+
+        return self._added_name_to_id.get(name)
+
+    def _move(self, move: Move) -> None:
+        if parent_name := self._id_to_parent.get(move.field_id):
+            parent_field = self._schema.find_field(parent_name, 
case_sensitive=self._case_sensitive)
+            if not parent_field.field_type.is_struct:
+                raise ValueError(f"Cannot move fields in non-struct type: 
{parent_field.field_type}")
+
+            if move.op == MoveOperation.After or move.op == 
MoveOperation.Before:
+                if move.other_field_id is None:
+                    raise ValueError("Expected other field when performing 
before/after move")
+
+                if self._id_to_parent.get(move.field_id) != 
self._id_to_parent.get(move.other_field_id):
+                    raise ValueError(f"Cannot move field {move.full_name} to a 
different struct")
+
+            self._moves[parent_field.field_id] = 
self._moves.get(parent_field.field_id, []) + [move]
+        else:
+            if move.op == MoveOperation.After or move.op == 
MoveOperation.Before:
+                if move.other_field_id is None:
+                    raise ValueError("Expected other field when performing 
before/after move")
+
+                if other_struct := self._id_to_parent.get(move.other_field_id):
+                    raise ValueError(f"Cannot move field {move.full_name} to a 
different struct: {other_struct}")
+
+            self._moves[TABLE_ROOT_ID] = self._moves.get(TABLE_ROOT_ID, []) + 
[move]
+
+    def move_first(self, path: Union[str, Tuple[str, ...]]) -> UpdateSchema:
+        """Move the field to the first position of the parent struct.
+
+        Args:
+            path: The path to the field.
+
+        Returns:
+            The UpdateSchema with the move operation staged.
+        """
+        full_name = ".".join(path) if isinstance(path, tuple) else path
+
+        field_id = self._find_for_move(full_name)
+
+        if field_id is None:
+            raise ValueError(f"Cannot move missing column: {full_name}")
+
+        self._move(Move(field_id=field_id, full_name=full_name, 
op=MoveOperation.First))
+
+        return self
+
+    def move_before(self, path: Union[str, Tuple[str, ...]], before_path: 
Union[str, Tuple[str, ...]]) -> UpdateSchema:
+        """Move the field to before another field.
+
+        Args:
+            path: The path to the field.
+
+        Returns:
+            The UpdateSchema with the move operation staged.
+        """
+        full_name = ".".join(path) if isinstance(path, tuple) else path
+        field_id = self._find_for_move(full_name)
+
+        if field_id is None:
+            raise ValueError(f"Cannot move missing column: {full_name}")
+
+        before_full_name = (
+            ".".join(
+                before_path,
+            )
+            if isinstance(before_path, tuple)
+            else before_path
+        )
+        before_field_id = self._find_for_move(before_full_name)
+
+        if before_field_id is None:
+            raise ValueError(f"Cannot move {full_name} before missing column: 
{before_full_name}")
+
+        if field_id == before_field_id:
+            raise ValueError(f"Cannot move {full_name} before itself")
+
+        self._move(Move(field_id=field_id, full_name=full_name, 
other_field_id=before_field_id, op=MoveOperation.Before))
+
+        return self
+
+    def move_after(self, path: Union[str, Tuple[str, ...]], after_name: 
Union[str, Tuple[str, ...]]) -> UpdateSchema:
+        """Move the field to after another field.
+
+        Args:
+            path: The path to the field.
+
+        Returns:
+            The UpdateSchema with the move operation staged.
+        """
+        full_name = ".".join(path) if isinstance(path, tuple) else path
+
+        field_id = self._find_for_move(full_name)
+
+        if field_id is None:
+            raise ValueError(f"Cannot move missing column: {full_name}")
+
+        after_full_name = ".".join(after_name) if isinstance(after_name, 
tuple) else after_name
+        after_field_id = self._find_for_move(after_full_name)
+
+        if after_field_id is None:
+            raise ValueError(f"Cannot move {full_name} after missing column: 
{after_full_name}")
+
+        if field_id == after_field_id:
+            raise ValueError(f"Cannot move {full_name} after itself")
+
+        self._move(Move(field_id=field_id, full_name=full_name, 
other_field_id=after_field_id, op=MoveOperation.After))
+
+        return self
+
+    def commit(self) -> None:
+        """Apply the pending changes and commit."""
+        new_schema = self._apply()
+
+        if new_schema != self._schema:
+            last_column_id = max(self._table.metadata.last_column_id, 
new_schema.highest_field_id)
+            updates = (
+                AddSchemaUpdate(schema=new_schema, 
last_column_id=last_column_id),
+                SetCurrentSchemaUpdate(schema_id=-1),
+            )
+            requirements = 
(AssertCurrentSchemaId(current_schema_id=self._schema.schema_id),)
+
+            if self._transaction is not None:
+                self._transaction._append_updates(*updates)  # pylint: 
disable=W0212
+                self._transaction._append_requirements(*requirements)  # 
pylint: disable=W0212
+            else:
+                self._table._do_commit(updates=updates, 
requirements=requirements)  # pylint: disable=W0212
+
+    def _apply(self) -> Schema:
+        """Apply the pending changes to the original schema and returns the 
result.
+
+        Returns:
+            the result Schema when all pending updates are applied
+        """
+        struct = visit(self._schema, _ApplyChanges(self._adds, self._updates, 
self._deletes, self._moves))
+        if struct is None:
+            # Should never happen
+            raise ValueError("Could not apply changes")
+
+        new_schema = Schema(*struct.fields)
+
+        field_ids = set()
+        for name in self._identifier_field_names:
+            try:
+                field = new_schema.find_field(name, self._case_sensitive)
+            except ValueError as e:
+                raise ValueError(
+                    f"Cannot find identifier field {name}. In case of 
deletion, update the identifier fields first."
+                ) from e
+
+            field_ids.add(field.field_id)
+
+        return Schema(*struct.fields, identifier_field_ids=field_ids)
 
     def assign_new_column_id(self) -> int:
         return next(self._last_column_id)
 
 
-def _apply_changes(schema_: Schema, adds: Dict[int, List[NestedField]], 
identifier_field_names: List[str]) -> Schema:
-    struct = visit(schema_, _ApplyChanges(adds))
-    name_to_id: Dict[str, int] = index_by_name(struct)
-    for name in identifier_field_names:
-        if name not in name_to_id:
-            raise ValueError(f"Cannot add field {name} as an identifier field: 
not found in current schema or added columns")
+class _ApplyChanges(SchemaVisitor[Optional[IcebergType]]):
+    _adds: Dict[int, List[NestedField]]
+    _updates: Dict[int, NestedField]
+    _deletes: Set[int]
+    _moves: Dict[int, List[Move]]
 
-    return Schema(*struct.fields)
+    def __init__(
+        self, adds: Dict[int, List[NestedField]], updates: Dict[int, 
NestedField], deletes: Set[int], moves: Dict[int, List[Move]]
+    ) -> None:
+        self._adds = adds
+        self._updates = updates
+        self._deletes = deletes
+        self._moves = moves
 
+    def schema(self, schema: Schema, struct_result: Optional[IcebergType]) -> 
Optional[IcebergType]:
+        added = self._adds.get(TABLE_ROOT_ID)
+        moves = self._moves.get(TABLE_ROOT_ID)
 
-class _ApplyChanges(SchemaVisitor[IcebergType]):
-    def __init__(self, adds: Dict[int, List[NestedField]]):
-        self.adds = adds
+        if added is not None or moves is not None:
+            if not isinstance(struct_result, StructType):
+                raise ValueError(f"Cannot add fields to non-struct: 
{struct_result}")
 
-    def schema(self, schema: Schema, struct_result: IcebergType) -> 
IcebergType:
-        fields = _ApplyChanges.add_fields(schema.as_struct().fields, 
self.adds.get(TABLE_ROOT_ID))
-        if len(fields) > 0:
-            return StructType(*fields)
+            if new_fields := _add_and_move_fields(struct_result.fields, added 
or [], moves or []):
+                return StructType(*new_fields)
 
         return struct_result
 
-    def struct(self, struct: StructType, field_results: List[IcebergType]) -> 
IcebergType:
-        has_change = False
-        new_fields: List[NestedField] = []
-        for i in range(len(field_results)):
-            type_: Optional[IcebergType] = field_results[i]
-            if type_ is None:
-                has_change = True
+    def struct(self, struct: StructType, field_results: 
List[Optional[IcebergType]]) -> Optional[IcebergType]:
+        has_changes = False
+        new_fields = []
+
+        for idx, result_type in enumerate(field_results):
+            result_type = field_results[idx]
+
+            # Has been deleted
+            if result_type is None:
+                has_changes = True
                 continue
 
-            field: NestedField = struct.fields[i]
-            new_fields.append(field)
+            field = struct.fields[idx]
+
+            name = field.name
+            doc = field.doc
+            required = field.required
+
+            # There is an update
+            if update := self._updates.get(field.field_id):
+                name = update.name
+                doc = update.doc
+                required = update.required
+
+            if field.name == name and field.field_type == result_type and 
field.required == required and field.doc == doc:
+                new_fields.append(field)
+            else:
+                has_changes = True
+                new_fields.append(
+                    NestedField(field_id=field.field_id, name=name, 
field_type=result_type, required=required, doc=doc)
+                )
 
-        if has_change:
+        if has_changes:
             return StructType(*new_fields)
 
         return struct
 
-    def field(self, field: NestedField, field_result: IcebergType) -> 
IcebergType:
-        field_id: int = field.field_id
-        if field_id in self.adds:
-            new_fields = self.adds[field_id]
-            if len(new_fields) > 0:
-                fields = _ApplyChanges.add_fields(field_result.fields, 
new_fields)
-                if len(fields) > 0:
-                    return StructType(*fields)
+    def field(self, field: NestedField, field_result: Optional[IcebergType]) 
-> Optional[IcebergType]:
+        # the API validates deletes, updates, and additions don't conflict 
handle deletes
+        if field.field_id in self._deletes:
+            return None
+
+        # handle updates
+        if (update := self._updates.get(field.field_id)) and field.field_type 
!= update.field_type:
+            return update.field_type
+
+        if isinstance(field_result, StructType):
+            # handle add & moves
+            added = self._adds.get(field.field_id)
+            moves = self._moves.get(field.field_id)
+            if added is not None or moves is not None:
+                if not isinstance(field.field_type, StructType):
+                    raise ValueError(f"Cannot add fields to non-struct: 
{field}")
+
+                if new_fields := _add_and_move_fields(field_result.fields, 
added or [], moves or []):
+                    return StructType(*new_fields)
 
         return field_result
 
-    def list(self, list_type: ListType, element_result: IcebergType) -> 
IcebergType:
-        element_field: NestedField = list_type.element_field
-        element_type = self.field(element_field, element_result)
+    def list(self, list_type: ListType, element_result: Optional[IcebergType]) 
-> Optional[IcebergType]:
+        element_type = self.field(list_type.element_field, element_result)
         if element_type is None:
-            raise ValueError(f"Cannot delete element type from list: 
{element_field}")
+            raise ValueError(f"Cannot delete element type from list: 
{element_result}")
+
+        return ListType(element_id=list_type.element_id, element=element_type, 
element_required=list_type.element_required)
 
-        is_element_optional = not list_type.element_required
+    def map(
+        self, map_type: MapType, key_result: Optional[IcebergType], 
value_result: Optional[IcebergType]
+    ) -> Optional[IcebergType]:
+        key_id: int = map_type.key_field.field_id
 
-        if is_element_optional == element_field.required and 
list_type.element_type == element_type:
-            return list_type
+        if key_id in self._deletes:
+            raise ValueError(f"Cannot delete map keys: {map_type}")
 
-        return ListType(list_type.element_id, element_type, 
is_element_optional)
+        if key_id in self._updates:
+            raise ValueError(f"Cannot update map keys: {map_type}")
 
-    def map(self, map_type: MapType, key_result: IcebergType, value_result: 
IcebergType) -> IcebergType:
-        key_id: int = map_type.key_field.field_id
-        if key_id in self.adds:
+        if key_id in self._adds:
             raise ValueError(f"Cannot add fields to map keys: {map_type}")
 
+        if map_type.key_type != key_result:
+            raise ValueError(f"Cannot alter map keys: {map_type}")
+
         value_field: NestedField = map_type.value_field
         value_type = self.field(value_field, value_result)
         if value_type is None:
             raise ValueError(f"Cannot delete value type from map: 
{value_field}")
 
-        is_value_optional = not map_type.value_required
+        return MapType(
+            key_id=map_type.key_id,
+            key_type=map_type.key_type,
+            value_id=map_type.value_id,
+            value_type=value_type,
+            value_required=map_type.value_required,
+        )
 
-        if is_value_optional != value_field.required and map_type.value_type 
== value_type:
-            return map_type
+    def primitive(self, primitive: PrimitiveType) -> Optional[IcebergType]:
+        return primitive
 
-        return MapType(map_type.key_id, map_type.key_field, map_type.value_id, 
value_type, not is_value_optional)
 
-    def primitive(self, primitive: PrimitiveType) -> IcebergType:
-        return primitive
+def _add_fields(fields: Tuple[NestedField, ...], adds: 
Optional[List[NestedField]]) -> Tuple[NestedField, ...]:
+    adds = adds or []
+    return fields + tuple(adds)
+
 
-    @staticmethod
-    def add_fields(fields: Tuple[NestedField, ...], adds: 
Optional[List[NestedField]]) -> List[NestedField]:
-        new_fields: List[NestedField] = []
-        new_fields.extend(fields)
-        if adds:
-            new_fields.extend(adds)
-        return new_fields
+def _move_fields(fields: Tuple[NestedField, ...], moves: List[Move]) -> 
Tuple[NestedField, ...]:
+    reordered = list(copy(fields))
+    for move in moves:
+        # Find the field that we're about to move
+        field = next(field for field in reordered if field.field_id == 
move.field_id)
+        # Remove the field that we're about to move from the list
+        reordered = [field for field in reordered if field.field_id != 
move.field_id]
+
+        if move.op == MoveOperation.First:
+            reordered = [field] + reordered
+        elif move.op == MoveOperation.Before or move.op == MoveOperation.After:
+            other_field_id = move.other_field_id
+            other_field_pos = next(i for i, field in enumerate(reordered) if 
field.field_id == other_field_id)
+            if move.op == MoveOperation.Before:
+                reordered.insert(other_field_pos, field)
+            else:
+                reordered.insert(other_field_pos + 1, field)
+        else:
+            raise ValueError(f"Unknown operation: {move.op}")
+
+    return tuple(reordered)
+
+
+def _add_and_move_fields(
+    fields: Tuple[NestedField, ...], adds: List[NestedField], moves: List[Move]
+) -> Optional[Tuple[NestedField, ...]]:
+    if adds:

Review Comment:
   This will check the length. I prefer to pass in empty lists instead of 
None's, but I think we need to update `adds` to `len(adds) > 0` to make it more 
explicit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to