Fokko commented on code in PR #8174:
URL: https://github.com/apache/iceberg/pull/8174#discussion_r1293477183


##########
python/mkdocs/docs/api.md:
##########
@@ -146,6 +146,29 @@ catalog.create_table(
 )
 ```
 
+### Update table schema
+
+Add new columns through the `Transaction` or `UpdateSchema` API:
+
+Use the Transaction API:
+
+```python
+table.transaction().update_schema().add_column("x", IntegerType(), 
"doc").commit()

Review Comment:
   Should we suggest using the transaction?
   ```suggestion
   with table.transaction() as transaction:
       transaction.update_schema().add_column("x", IntegerType(), 
"doc").commit()
   ```



##########
python/pyiceberg/table/__init__.py:
##########
@@ -841,3 +868,239 @@ def to_ray(self) -> ray.data.dataset.Dataset:
         import ray
 
         return ray.data.from_arrow(self.to_arrow())
+
+
+class UpdateSchema:
+    def __init__(self, schema: Schema, table: Optional[Table] = None, 
last_column_id: Optional[int] = None):
+        self._table = table
+        self._schema = schema
+        if last_column_id:
+            self._last_column_id = itertools.count(last_column_id + 1)
+        else:
+            self._last_column_id = itertools.count(schema.highest_field_id + 1)
+
+        self._identifier_field_names = schema.column_names
+        self._adds: Dict[int, List[NestedField]] = {}
+        self._added_name_to_id: Dict[str, int] = {}
+        self._id_to_parent: Dict[int, str] = {}
+        self._allow_incompatible_changes: bool = False
+        self._case_sensitive: bool = True
+
+    def __exit__(self, _: Any, value: Any, traceback: Any) -> None:

Review Comment:
   I think think this should also return the updated table, similar to the 
transaction.
   ```suggestion
       def __exit__(self, _: Any, value: Any, traceback: Any) -> Table:
   ```



##########
python/pyiceberg/table/__init__.py:
##########
@@ -841,3 +868,210 @@ def to_ray(self) -> ray.data.dataset.Dataset:
         import ray
 
         return ray.data.from_arrow(self.to_arrow())
+
+
+class _SchemaUpdate(UpdateSchema):
+    def __init__(self, schema: Schema, table: Optional[Table] = None, 
last_column_id: Optional[int] = None):

Review Comment:
   Should we change the actual code to accommodate the tests?



##########
python/pyiceberg/table/__init__.py:
##########
@@ -841,3 +868,239 @@ def to_ray(self) -> ray.data.dataset.Dataset:
         import ray
 
         return ray.data.from_arrow(self.to_arrow())
+
+
+class UpdateSchema:
+    def __init__(self, schema: Schema, table: Optional[Table] = None, 
last_column_id: Optional[int] = None):
+        self._table = table
+        self._schema = schema
+        if last_column_id:
+            self._last_column_id = itertools.count(last_column_id + 1)
+        else:
+            self._last_column_id = itertools.count(schema.highest_field_id + 1)
+
+        self._identifier_field_names = schema.column_names
+        self._adds: Dict[int, List[NestedField]] = {}
+        self._added_name_to_id: Dict[str, int] = {}
+        self._id_to_parent: Dict[int, str] = {}
+        self._allow_incompatible_changes: bool = False
+        self._case_sensitive: bool = True
+
+    def __exit__(self, _: Any, value: Any, traceback: Any) -> None:
+        """Closes and commits the change."""
+        return self.commit()
+
+    def __enter__(self) -> UpdateSchema:
+        """Update the table."""
+        return self
+
+    def case_sensitive(self, case_sensitive: bool) -> UpdateSchema:
+        """Determines if the case of schema needs to be considered when 
comparing column names.
+
+        Args:
+            case_sensitive: When false case is not considered in column name 
comparisons.
+
+        Returns:
+            This for method chaining
+        """
+        self._case_sensitive = case_sensitive
+        return self
+
+    def add_column(
+        self, name: str, type_var: IcebergType, doc: Optional[str] = None, 
parent: Optional[str] = None, required: bool = False
+    ) -> UpdateSchema:
+        """Add a new column to a nested struct or Add a new top-level column.
+
+        Args:
+            name: Name for the new column.
+            type_var: Type for the new column.
+            doc: Documentation string for the new column.
+            parent: Name of the parent struct to the column will be added to.
+            required: Whether the new column is required.
+
+        Returns:
+            This for method chaining
+        """
+        if "." in name:
+            raise ValueError(f"Cannot add column with ambiguous name: {name}")
+
+        if required and not self._allow_incompatible_changes:
+            # Table format version 1 and 2 cannot add required column because 
there is no initial value
+            raise ValueError(f"Incompatible change: cannot add required 
column: {name}")
+
+        self._internal_add_column(parent, name, not required, type_var, doc)
+        return self
+
+    def allow_incompatible_changes(self) -> UpdateSchema:
+        """Allow incompatible changes to the schema.
+
+        Returns:
+            This for method chaining
+        """
+        self._allow_incompatible_changes = True
+        return self
+
+    def commit(self) -> None:
+        """Apply the pending changes and commit."""
+        if self._table is None:
+            raise ValueError("Cannot commit schema update, table is not set")
+
+        # Strip the catalog name
+        self._table.catalog._commit_table(  # pylint: disable=W0212
+            CommitTableRequest(
+                identifier=self._table.identifier[1:],
+                updates=[AddSchemaUpdate(schema=self._apply())],
+            )
+        )

Review Comment:
   ```suggestion
       def commit(self) -> Table:
           """Apply the pending changes and commit."""
           if self._table is None:
               raise ValueError("Cannot commit schema update, table is not set")
   
           # Strip the catalog name
           table_update_response = self._table.catalog._commit_table(  # 
pylint: disable=W0212
               CommitTableRequest(
                   identifier=self._table.identifier[1:],
                   updates=[AddSchemaUpdate(schema=self._apply())],
               )
           )
           
           self._table.metadata = table_update_response.metadata
           self._table.metadata_location = 
table_update_response.metadata_location
           
           return self._table
   ```



##########
python/pyiceberg/schema.py:
##########
@@ -1082,45 +1100,23 @@ def build_position_accessors(schema_or_type: 
Union[Schema, IcebergType]) -> Dict
     return visit(schema_or_type, _BuildPositionAccessors())
 
 
-class _FindLastFieldId(SchemaVisitor[int]):
-    """Traverses the schema to get the highest field-id."""
-
-    def schema(self, schema: Schema, struct_result: int) -> int:
-        return struct_result
-
-    def struct(self, struct: StructType, field_results: List[int]) -> int:
-        return max(field_results)
-
-    def field(self, field: NestedField, field_result: int) -> int:
-        return max(field.field_id, field_result)
-
-    def list(self, list_type: ListType, element_result: int) -> int:
-        return element_result
-
-    def map(self, map_type: MapType, key_result: int, value_result: int) -> 
int:
-        return max(key_result, value_result)
-
-    def primitive(self, primitive: PrimitiveType) -> int:
-        return 0
-
-
-def assign_fresh_schema_ids(schema: Schema) -> Schema:
+def assign_fresh_schema_ids(schema_or_type: Union[Schema, IcebergType], 
next_id: Optional[Callable[[], int]] = None) -> Schema:
     """Traverses the schema, and sets new IDs."""
-    return pre_order_visit(schema, _SetFreshIDs())
+    return pre_order_visit(schema_or_type, _SetFreshIDs(next_id_func=next_id))
 
 
 class _SetFreshIDs(PreOrderSchemaVisitor[IcebergType]):
     """Traverses the schema and assigns monotonically increasing ids."""
 
-    counter: itertools.count  # type: ignore
     reserved_ids: Dict[int, int]
 
-    def __init__(self, start: int = 1) -> None:
+    def __init__(self, start: int = 1, next_id_func: Optional[Callable[[], 
int]] = None) -> None:
         self.counter = itertools.count(start)
         self.reserved_ids = {}
+        self.next_id_func = next_id_func if next_id_func is not None else 
lambda: next(self.counter)

Review Comment:
   Looks like this one wasn't adressed?
   ```suggestion
           self.reserved_ids = {}
           self.next_id_func = next_id_func if next_id_func is not None else 
lambda: next(itertools.count(start))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to