This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new e718473dd8 [python] support alterTable (#6952)
e718473dd8 is described below

commit e718473dd830a1636654d0feb83053ed08979c3a
Author: XiaoHongbo <[email protected]>
AuthorDate: Mon Jan 5 19:31:26 2026 +0800

    [python] support alterTable (#6952)
---
 paimon-python/pypaimon/api/api_request.py          |   8 +
 paimon-python/pypaimon/api/rest_api.py             |  13 +-
 paimon-python/pypaimon/catalog/catalog.py          |  10 +
 .../pypaimon/catalog/filesystem_catalog.py         |  33 ++-
 .../pypaimon/catalog/rest/rest_catalog.py          |  15 +
 paimon-python/pypaimon/schema/schema_change.py     | 289 +++++++++++++++++++
 paimon-python/pypaimon/schema/schema_manager.py    | 307 +++++++++++++++++++++
 .../pypaimon/tests/filesystem_catalog_test.py      |  86 ++++++
 paimon-python/pypaimon/tests/rest/rest_server.py   | 153 +++++++++-
 .../pypaimon/tests/rest/rest_simple_test.py        |  90 ++++++
 10 files changed, 991 insertions(+), 13 deletions(-)

diff --git a/paimon-python/pypaimon/api/api_request.py 
b/paimon-python/pypaimon/api/api_request.py
index d453250757..f2d062f8d0 100644
--- a/paimon-python/pypaimon/api/api_request.py
+++ b/paimon-python/pypaimon/api/api_request.py
@@ -23,6 +23,7 @@ from typing import Dict, List, Optional
 from pypaimon.common.identifier import Identifier
 from pypaimon.common.json_util import json_field
 from pypaimon.schema.schema import Schema
+from pypaimon.schema.schema_change import SchemaChange
 from pypaimon.snapshot.snapshot import Snapshot
 from pypaimon.snapshot.snapshot_commit import PartitionStatistics
 
@@ -76,3 +77,10 @@ class CommitTableRequest(RESTRequest):
     table_uuid: Optional[str] = json_field(FIELD_TABLE_UUID)
     snapshot: Snapshot = json_field(FIELD_SNAPSHOT)
     statistics: List[PartitionStatistics] = json_field(FIELD_STATISTICS)
+
+
+@dataclass
+class AlterTableRequest(RESTRequest):
+    FIELD_CHANGES = "changes"
+
+    changes: List[SchemaChange] = json_field(FIELD_CHANGES)
diff --git a/paimon-python/pypaimon/api/rest_api.py 
b/paimon-python/pypaimon/api/rest_api.py
index 25165916a6..806831d954 100755
--- a/paimon-python/pypaimon/api/rest_api.py
+++ b/paimon-python/pypaimon/api/rest_api.py
@@ -18,7 +18,7 @@
 import logging
 from typing import Callable, Dict, List, Optional, Union
 
-from pypaimon.api.api_request import (AlterDatabaseRequest, CommitTableRequest,
+from pypaimon.api.api_request import (AlterDatabaseRequest, AlterTableRequest, 
CommitTableRequest,
                                       CreateDatabaseRequest,
                                       CreateTableRequest, RenameTableRequest)
 from pypaimon.api.api_response import (CommitTableResponse, ConfigResponse,
@@ -289,6 +289,17 @@ class RESTApi:
             request,
             self.rest_auth_function)
 
+    def alter_table(self, identifier: Identifier, changes: List):
+        database_name, table_name = self.__validate_identifier(identifier)
+        if not changes:
+            raise ValueError("Changes cannot be empty")
+
+        request = AlterTableRequest(changes)
+        return self.client.post(
+            self.resource_paths.table(database_name, table_name),
+            request,
+            self.rest_auth_function)
+
     def load_table_token(self, identifier: Identifier) -> 
GetTableTokenResponse:
         database_name, table_name = self.__validate_identifier(identifier)
 
diff --git a/paimon-python/pypaimon/catalog/catalog.py 
b/paimon-python/pypaimon/catalog/catalog.py
index a8e7dcd065..fa19355b0b 100644
--- a/paimon-python/pypaimon/catalog/catalog.py
+++ b/paimon-python/pypaimon/catalog/catalog.py
@@ -21,6 +21,7 @@ from typing import List, Optional, Union
 
 from pypaimon.common.identifier import Identifier
 from pypaimon.schema.schema import Schema
+from pypaimon.schema.schema_change import SchemaChange
 from pypaimon.snapshot.snapshot import Snapshot
 from pypaimon.snapshot.snapshot_commit import PartitionStatistics
 
@@ -54,6 +55,15 @@ class Catalog(ABC):
     def create_table(self, identifier: Union[str, Identifier], schema: Schema, 
ignore_if_exists: bool):
         """Create table with schema."""
 
+    @abstractmethod
+    def alter_table(
+        self,
+        identifier: Union[str, Identifier],
+        changes: List[SchemaChange],
+        ignore_if_not_exists: bool = False
+    ):
+        """Alter table with schema changes."""
+
     def supports_version_management(self) -> bool:
         """
         Whether this catalog supports version management for tables.
diff --git a/paimon-python/pypaimon/catalog/filesystem_catalog.py 
b/paimon-python/pypaimon/catalog/filesystem_catalog.py
index b2eced586e..8d1b485b74 100644
--- a/paimon-python/pypaimon/catalog/filesystem_catalog.py
+++ b/paimon-python/pypaimon/catalog/filesystem_catalog.py
@@ -20,16 +20,19 @@ from typing import List, Optional, Union
 
 from pypaimon.catalog.catalog import Catalog
 from pypaimon.catalog.catalog_environment import CatalogEnvironment
-from pypaimon.catalog.catalog_exception import (DatabaseAlreadyExistException,
-                                                DatabaseNotExistException,
-                                                TableAlreadyExistException,
-                                                TableNotExistException)
+from pypaimon.catalog.catalog_exception import (
+    DatabaseAlreadyExistException,
+    DatabaseNotExistException,
+    TableAlreadyExistException,
+    TableNotExistException
+)
 from pypaimon.catalog.database import Database
 from pypaimon.common.options import Options
 from pypaimon.common.options.config import CatalogOptions
 from pypaimon.common.options.core_options import CoreOptions
 from pypaimon.common.file_io import FileIO
 from pypaimon.common.identifier import Identifier
+from pypaimon.schema.schema_change import SchemaChange
 from pypaimon.schema.schema_manager import SchemaManager
 from pypaimon.snapshot.snapshot import Snapshot
 from pypaimon.snapshot.snapshot_commit import PartitionStatistics
@@ -115,6 +118,28 @@ class FileSystemCatalog(Catalog):
         db_path = self.get_database_path(identifier.get_database_name())
         return f"{db_path}/{identifier.get_table_name()}"
 
+    def alter_table(
+        self,
+        identifier: Union[str, Identifier],
+        changes: List[SchemaChange],
+        ignore_if_not_exists: bool = False
+    ):
+        if not isinstance(identifier, Identifier):
+            identifier = Identifier.from_string(identifier)
+        try:
+            self.get_table(identifier)
+        except TableNotExistException:
+            if not ignore_if_not_exists:
+                raise
+            return
+
+        table_path = self.get_table_path(identifier)
+        schema_manager = SchemaManager(self.file_io, table_path)
+        try:
+            schema_manager.commit_changes(changes)
+        except Exception as e:
+            raise RuntimeError(f"Failed to alter table 
{identifier.get_full_name()}: {e}") from e
+
     def commit_snapshot(
             self,
             identifier: Identifier,
diff --git a/paimon-python/pypaimon/catalog/rest/rest_catalog.py 
b/paimon-python/pypaimon/catalog/rest/rest_catalog.py
index c32c5bcb39..5a0ca01ce4 100644
--- a/paimon-python/pypaimon/catalog/rest/rest_catalog.py
+++ b/paimon-python/pypaimon/catalog/rest/rest_catalog.py
@@ -34,6 +34,7 @@ from pypaimon.common.options.core_options import CoreOptions
 from pypaimon.common.file_io import FileIO
 from pypaimon.common.identifier import Identifier
 from pypaimon.schema.schema import Schema
+from pypaimon.schema.schema_change import SchemaChange
 from pypaimon.schema.table_schema import TableSchema
 from pypaimon.snapshot.snapshot import Snapshot
 from pypaimon.snapshot.snapshot_commit import PartitionStatistics
@@ -177,6 +178,20 @@ class RESTCatalog(Catalog):
             if not ignore_if_not_exists:
                 raise TableNotExistException(identifier) from e
 
+    def alter_table(
+        self,
+        identifier: Union[str, Identifier],
+        changes: List[SchemaChange],
+        ignore_if_not_exists: bool = False
+    ):
+        if not isinstance(identifier, Identifier):
+            identifier = Identifier.from_string(identifier)
+        try:
+            self.rest_api.alter_table(identifier, changes)
+        except NoSuchResourceException as e:
+            if not ignore_if_not_exists:
+                raise TableNotExistException(identifier) from e
+
     def load_table_metadata(self, identifier: Identifier) -> TableMetadata:
         response = self.rest_api.get_table(identifier)
         return self.to_table_metadata(identifier.get_database_name(), response)
diff --git a/paimon-python/pypaimon/schema/schema_change.py 
b/paimon-python/pypaimon/schema/schema_change.py
new file mode 100644
index 0000000000..8e62e7f5bb
--- /dev/null
+++ b/paimon-python/pypaimon/schema/schema_change.py
@@ -0,0 +1,289 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+from abc import ABC
+from dataclasses import dataclass
+from enum import Enum
+from typing import List, Optional, Union
+
+from pypaimon.common.json_util import json_field
+from pypaimon.schema.data_types import DataType
+
+
+class Actions:
+    FIELD_ACTION = "action"
+    SET_OPTION_ACTION = "setOption"
+    REMOVE_OPTION_ACTION = "removeOption"
+    UPDATE_COMMENT_ACTION = "updateComment"
+    ADD_COLUMN_ACTION = "addColumn"
+    RENAME_COLUMN_ACTION = "renameColumn"
+    DROP_COLUMN_ACTION = "dropColumn"
+    UPDATE_COLUMN_TYPE_ACTION = "updateColumnType"
+    UPDATE_COLUMN_NULLABILITY_ACTION = "updateColumnNullability"
+    UPDATE_COLUMN_COMMENT_ACTION = "updateColumnComment"
+    UPDATE_COLUMN_DEFAULT_VALUE_ACTION = "updateColumnDefaultValue"
+    UPDATE_COLUMN_POSITION_ACTION = "updateColumnPosition"
+
+
+class SchemaChange(ABC):
+    @staticmethod
+    def set_option(key: str, value: str) -> "SetOption":
+        return SetOption(key=key, value=value)
+
+    @staticmethod
+    def remove_option(key: str) -> "RemoveOption":
+        return RemoveOption(key)
+
+    @staticmethod
+    def update_comment(comment: Optional[str]) -> "UpdateComment":
+        return UpdateComment(comment)
+
+    @staticmethod
+    def add_column(
+            field_name: Union[str, List[str]],
+            data_type: DataType,
+            comment: Optional[str] = None,
+            move: Optional["Move"] = None
+    ) -> "AddColumn":
+        if isinstance(field_name, str):
+            return AddColumn(field_names=[field_name], data_type=data_type, 
comment=comment, move=move)
+        else:
+            return AddColumn(field_names=field_name, data_type=data_type, 
comment=comment, move=move)
+
+    @staticmethod
+    def rename_column(field_name: Union[str, List[str]], new_name: str) -> 
"RenameColumn":
+        if isinstance(field_name, str):
+            return RenameColumn([field_name], new_name)
+        else:
+            return RenameColumn(field_name, new_name)
+
+    @staticmethod
+    def drop_column(field_name: Union[str, List[str]]) -> "DropColumn":
+        if isinstance(field_name, str):
+            return DropColumn([field_name])
+        else:
+            return DropColumn(field_name)
+
+    @staticmethod
+    def update_column_type(
+            field_name: Union[str, List[str]],
+            new_data_type: DataType,
+            keep_nullability: bool = False
+    ) -> "UpdateColumnType":
+        if isinstance(field_name, str):
+            return UpdateColumnType([field_name], new_data_type, 
keep_nullability)
+        else:
+            return UpdateColumnType(field_name, new_data_type, 
keep_nullability)
+
+    @staticmethod
+    def update_column_nullability(
+            field_name: Union[str, List[str]],
+            new_nullability: bool
+    ) -> "UpdateColumnNullability":
+        if isinstance(field_name, str):
+            return UpdateColumnNullability([field_name], new_nullability)
+        else:
+            return UpdateColumnNullability(field_name, new_nullability)
+
+    @staticmethod
+    def update_column_comment(field_name: Union[str, List[str]], comment: str) 
-> "UpdateColumnComment":
+        if isinstance(field_name, str):
+            return UpdateColumnComment([field_name], comment)
+        else:
+            return UpdateColumnComment(field_name, comment)
+
+    @staticmethod
+    def update_column_default_value(field_names: List[str], default_value: 
str) -> "UpdateColumnDefaultValue":
+        return UpdateColumnDefaultValue(field_names, default_value)
+
+    @staticmethod
+    def update_column_position(move: "Move") -> "UpdateColumnPosition":
+        return UpdateColumnPosition(move)
+
+
+@dataclass
+class SetOption(SchemaChange):
+    FIELD_KEY = "key"
+    FIELD_VALUE = "value"
+    key: str = json_field(FIELD_KEY)
+    value: str = json_field(FIELD_VALUE)
+    action: str = json_field(Actions.FIELD_ACTION, 
default=Actions.SET_OPTION_ACTION)
+
+    def __post_init__(self):
+        if not hasattr(self, 'action') or self.action is None:
+            self.action = Actions.SET_OPTION_ACTION
+
+
+@dataclass
+class RemoveOption(SchemaChange):
+    FIELD_KEY = "key"
+    key: str = json_field(FIELD_KEY)
+    action: str = json_field(Actions.FIELD_ACTION, 
default=Actions.REMOVE_OPTION_ACTION)
+
+    def __post_init__(self):
+        if not hasattr(self, 'action') or self.action is None:
+            self.action = Actions.REMOVE_OPTION_ACTION
+
+
+@dataclass
+class UpdateComment(SchemaChange):
+    FIELD_COMMENT = "comment"
+    comment: Optional[str] = json_field(FIELD_COMMENT)
+    action: str = json_field(Actions.FIELD_ACTION, 
default=Actions.UPDATE_COMMENT_ACTION)
+
+    def __post_init__(self):
+        if not hasattr(self, 'action') or self.action is None:
+            self.action = Actions.UPDATE_COMMENT_ACTION
+
+
+@dataclass
+class AddColumn(SchemaChange):
+    FIELD_FIELD_NAMES = "fieldNames"
+    FIELD_DATA_TYPE = "dataType"
+    FIELD_COMMENT = "comment"
+    FIELD_MOVE = "move"
+    field_names: List[str] = json_field(FIELD_FIELD_NAMES)
+    data_type: DataType = json_field(FIELD_DATA_TYPE)
+    comment: Optional[str] = json_field(FIELD_COMMENT)
+    move: Optional["Move"] = json_field(FIELD_MOVE)
+    action: str = json_field(Actions.FIELD_ACTION, 
default=Actions.ADD_COLUMN_ACTION)
+
+    def __post_init__(self):
+        if not hasattr(self, 'action') or self.action is None:
+            self.action = Actions.ADD_COLUMN_ACTION
+
+
+@dataclass
+class RenameColumn(SchemaChange):
+    FIELD_FIELD_NAMES = "fieldNames"
+    FIELD_NEW_NAME = "newName"
+    field_names: List[str] = json_field(FIELD_FIELD_NAMES)
+    new_name: str = json_field(FIELD_NEW_NAME)
+    action: str = json_field(Actions.FIELD_ACTION, 
default=Actions.RENAME_COLUMN_ACTION)
+
+    def __post_init__(self):
+        if not hasattr(self, 'action') or self.action is None:
+            self.action = Actions.RENAME_COLUMN_ACTION
+
+
+@dataclass
+class DropColumn(SchemaChange):
+    FIELD_FIELD_NAMES = "fieldNames"
+    field_names: List[str] = json_field(FIELD_FIELD_NAMES)
+    action: str = json_field(Actions.FIELD_ACTION, 
default=Actions.DROP_COLUMN_ACTION)
+
+    def __post_init__(self):
+        if not hasattr(self, 'action') or self.action is None:
+            self.action = Actions.DROP_COLUMN_ACTION
+
+
+@dataclass
+class UpdateColumnType(SchemaChange):
+    FIELD_FIELD_NAMES = "fieldNames"
+    FIELD_NEW_DATA_TYPE = "newDataType"
+    FIELD_KEEP_NULLABILITY = "keepNullability"
+    field_names: List[str] = json_field(FIELD_FIELD_NAMES)
+    new_data_type: DataType = json_field(FIELD_NEW_DATA_TYPE)
+    keep_nullability: bool = json_field(FIELD_KEEP_NULLABILITY)
+    action: str = json_field(Actions.FIELD_ACTION, 
default=Actions.UPDATE_COLUMN_TYPE_ACTION)
+
+    def __post_init__(self):
+        if not hasattr(self, 'action') or self.action is None:
+            self.action = Actions.UPDATE_COLUMN_TYPE_ACTION
+
+
+@dataclass
+class UpdateColumnNullability(SchemaChange):
+    FIELD_FIELD_NAMES = "fieldNames"
+    FIELD_NEW_NULLABILITY = "newNullability"
+    field_names: List[str] = json_field(FIELD_FIELD_NAMES)
+    new_nullability: bool = json_field(FIELD_NEW_NULLABILITY)
+    action: str = json_field(Actions.FIELD_ACTION, 
default=Actions.UPDATE_COLUMN_NULLABILITY_ACTION)
+
+    def __post_init__(self):
+        if not hasattr(self, 'action') or self.action is None:
+            self.action = Actions.UPDATE_COLUMN_NULLABILITY_ACTION
+
+
+@dataclass
+class UpdateColumnComment(SchemaChange):
+    FIELD_FIELD_NAMES = "fieldNames"
+    FIELD_NEW_COMMENT = "newComment"
+    field_names: List[str] = json_field(FIELD_FIELD_NAMES)
+    new_comment: str = json_field(FIELD_NEW_COMMENT)
+    action: str = json_field(Actions.FIELD_ACTION, 
default=Actions.UPDATE_COLUMN_COMMENT_ACTION)
+
+    def __post_init__(self):
+        if not hasattr(self, 'action') or self.action is None:
+            self.action = Actions.UPDATE_COLUMN_COMMENT_ACTION
+
+
+@dataclass
+class UpdateColumnDefaultValue(SchemaChange):
+    FIELD_FIELD_NAMES = "fieldNames"
+    FIELD_NEW_DEFAULT_VALUE = "newDefaultValue"
+    field_names: List[str] = json_field(FIELD_FIELD_NAMES)
+    new_default_value: str = json_field(FIELD_NEW_DEFAULT_VALUE)
+    action: str = json_field(Actions.FIELD_ACTION, 
default=Actions.UPDATE_COLUMN_DEFAULT_VALUE_ACTION)
+
+    def __post_init__(self):
+        if not hasattr(self, 'action') or self.action is None:
+            self.action = Actions.UPDATE_COLUMN_DEFAULT_VALUE_ACTION
+
+
+@dataclass
+class UpdateColumnPosition(SchemaChange):
+    FIELD_MOVE = "move"
+    move: "Move" = json_field(FIELD_MOVE)
+    action: str = json_field(Actions.FIELD_ACTION, 
default=Actions.UPDATE_COLUMN_POSITION_ACTION)
+
+    def __post_init__(self):
+        if not hasattr(self, 'action') or self.action is None:
+            self.action = Actions.UPDATE_COLUMN_POSITION_ACTION
+
+
+class MoveType(Enum):
+    FIRST = "FIRST"
+    AFTER = "AFTER"
+    BEFORE = "BEFORE"
+    LAST = "LAST"
+
+
+@dataclass
+class Move:
+    FIELD_FIELD_NAME = "fieldName"
+    FIELD_REFERENCE_FIELD_NAME = "referenceFieldName"
+    FIELD_TYPE = "type"
+
+    @staticmethod
+    def first(field_name: str) -> "Move":
+        return Move(field_name, None, MoveType.FIRST)
+
+    @staticmethod
+    def after(field_name: str, reference_field_name: str) -> "Move":
+        return Move(field_name, reference_field_name, MoveType.AFTER)
+
+    @staticmethod
+    def before(field_name: str, reference_field_name: str) -> "Move":
+        return Move(field_name, reference_field_name, MoveType.BEFORE)
+
+    @staticmethod
+    def last(field_name: str) -> "Move":
+        return Move(field_name, None, MoveType.LAST)
+
+    field_name: str = json_field(FIELD_FIELD_NAME)
+    reference_field_name: Optional[str] = 
json_field(FIELD_REFERENCE_FIELD_NAME)
+    type: MoveType = json_field(FIELD_TYPE)
diff --git a/paimon-python/pypaimon/schema/schema_manager.py 
b/paimon-python/pypaimon/schema/schema_manager.py
index 86f0f00c35..c8f007ba08 100644
--- a/paimon-python/pypaimon/schema/schema_manager.py
+++ b/paimon-python/pypaimon/schema/schema_manager.py
@@ -17,12 +17,187 @@
 
################################################################################
 from typing import Optional, List
 
+from pypaimon.catalog.catalog_exception import ColumnAlreadyExistException, 
ColumnNotExistException
 from pypaimon.common.file_io import FileIO
 from pypaimon.common.json_util import JSON
+from pypaimon.schema.data_types import AtomicInteger, DataField
 from pypaimon.schema.schema import Schema
+from pypaimon.schema.schema_change import (
+    AddColumn, DropColumn, RemoveOption, RenameColumn,
+    SchemaChange, SetOption, UpdateColumnComment,
+    UpdateColumnNullability, UpdateColumnPosition,
+    UpdateColumnType, UpdateComment
+)
 from pypaimon.schema.table_schema import TableSchema
 
 
+def _find_field_index(fields: List[DataField], field_name: str) -> 
Optional[int]:
+    for i, field in enumerate(fields):
+        if field.name == field_name:
+            return i
+    return None
+
+
+def _get_rename_mappings(changes: List[SchemaChange]) -> dict:
+    rename_mappings = {}
+    for change in changes:
+        if isinstance(change, RenameColumn) and len(change.field_names) == 1:
+            rename_mappings[change.field_names[0]] = change.new_name
+    return rename_mappings
+
+
+def _handle_update_column_comment(
+        change: UpdateColumnComment, new_fields: List[DataField]
+):
+    field_name = change.field_names[-1]
+    field_index = _find_field_index(new_fields, field_name)
+    if field_index is None:
+        raise ColumnNotExistException(field_name)
+    field = new_fields[field_index]
+    new_fields[field_index] = DataField(
+        field.id, field.name, field.type, change.new_comment, 
field.default_value
+    )
+
+
+def _handle_update_column_nullability(
+        change: UpdateColumnNullability, new_fields: List[DataField]
+):
+    field_name = change.field_names[-1]
+    field_index = _find_field_index(new_fields, field_name)
+    if field_index is None:
+        raise ColumnNotExistException(field_name)
+    field = new_fields[field_index]
+    from pypaimon.schema.data_types import DataTypeParser
+    field_type_dict = field.type.to_dict()
+    new_type = DataTypeParser.parse_data_type(field_type_dict)
+    new_type.nullable = change.new_nullability
+    new_fields[field_index] = DataField(
+        field.id, field.name, new_type, field.description, field.default_value
+    )
+
+
+def _handle_update_column_type(
+        change: UpdateColumnType, new_fields: List[DataField]
+):
+    field_name = change.field_names[-1]
+    field_index = _find_field_index(new_fields, field_name)
+    if field_index is None:
+        raise ColumnNotExistException(field_name)
+    field = new_fields[field_index]
+    from pypaimon.schema.data_types import DataTypeParser
+    new_type_dict = change.new_data_type.to_dict()
+    new_type = DataTypeParser.parse_data_type(new_type_dict)
+    if change.keep_nullability:
+        new_type.nullable = field.type.nullable
+    new_fields[field_index] = DataField(
+        field.id, field.name, new_type, field.description, field.default_value
+    )
+
+
+def _drop_column_validation(schema: 'TableSchema', change: DropColumn):
+    if len(change.field_names) > 1:
+        return
+    column_to_drop = change.field_names[0]
+    if column_to_drop in schema.partition_keys or column_to_drop in 
schema.primary_keys:
+        raise ValueError(
+            f"Cannot drop partition key or primary key: [{column_to_drop}]"
+        )
+
+
+def _handle_drop_column(change: DropColumn, new_fields: List[DataField]):
+    field_name = change.field_names[-1]
+    field_index = _find_field_index(new_fields, field_name)
+    if field_index is None:
+        raise ColumnNotExistException(field_name)
+    new_fields.pop(field_index)
+    if not new_fields:
+        raise ValueError("Cannot drop all fields in table")
+
+
+def _assert_not_updating_partition_keys(
+        schema: 'TableSchema', field_names: List[str], operation: str):
+    if len(field_names) > 1:
+        return
+    field_name = field_names[0]
+    if field_name in schema.partition_keys:
+        raise ValueError(
+            f"Cannot {operation} partition column: [{field_name}]"
+        )
+
+
+def _assert_not_updating_primary_keys(
+        schema: 'TableSchema', field_names: List[str], operation: str):
+    if len(field_names) > 1:
+        return
+    field_name = field_names[0]
+    if field_name in schema.primary_keys:
+        raise ValueError(f"Cannot {operation} primary key")
+
+
+def _handle_rename_column(change: RenameColumn, new_fields: List[DataField]):
+    field_name = change.field_names[-1]
+    new_name = change.new_name
+    field_index = _find_field_index(new_fields, field_name)
+    if field_index is None:
+        raise ColumnNotExistException(field_name)
+    if _find_field_index(new_fields, new_name) is not None:
+        raise ColumnAlreadyExistException(new_name)
+    field = new_fields[field_index]
+    new_fields[field_index] = DataField(
+        field.id, new_name, field.type, field.description, field.default_value
+    )
+
+
+def _apply_move(fields: List[DataField], new_field: Optional[DataField], move):
+    from pypaimon.schema.schema_change import MoveType
+
+    if new_field:
+        pass
+    else:
+        field_name = move.field_name
+        new_field = None
+        for i, field in enumerate(fields):
+            if field.name == field_name:
+                new_field = fields.pop(i)
+                break
+        if new_field is None:
+            raise ColumnNotExistException(field_name)
+
+    field_map = {field.name: i for i, field in enumerate(fields)}
+    if move.type == MoveType.FIRST:
+        fields.insert(0, new_field)
+    elif move.type == MoveType.AFTER:
+        if move.reference_field_name not in field_map:
+            raise ColumnNotExistException(move.reference_field_name)
+        fields.insert(field_map[move.reference_field_name] + 1, new_field)
+    elif move.type == MoveType.BEFORE:
+        if move.reference_field_name not in field_map:
+            raise ColumnNotExistException(move.reference_field_name)
+        fields.insert(field_map[move.reference_field_name], new_field)
+    elif move.type == MoveType.LAST:
+        fields.append(new_field)
+    else:
+        raise ValueError(f"Unsupported move type: {move.type}")
+
+
+def _handle_add_column(
+        change: AddColumn, new_fields: List[DataField], highest_field_id: 
AtomicInteger
+):
+    if not change.data_type.nullable:
+        raise ValueError(
+            f"Column {'.'.join(change.field_names)} cannot specify NOT NULL in 
the table."
+        )
+    field_id = highest_field_id.increment_and_get()
+    field_name = change.field_names[-1]
+    if _find_field_index(new_fields, field_name) is not None:
+        raise ColumnAlreadyExistException(field_name)
+    new_field = DataField(field_id, field_name, change.data_type, 
change.comment)
+    if change.move:
+        _apply_move(new_fields, new_field, change.move)
+    else:
+        new_fields.append(new_field)
+
+
 class SchemaManager:
 
     def __init__(self, file_io: FileIO, table_path: str):
@@ -94,3 +269,135 @@ class SchemaManager:
                 except ValueError:
                     continue
         return versions
+
+    def commit_changes(self, changes: List[SchemaChange]) -> TableSchema:
+        while True:
+            old_table_schema = self.latest()
+            if old_table_schema is None:
+                raise RuntimeError(
+                    f"Table schema does not exist at path: {self.table_path}. "
+                    "This may happen if the table was deleted concurrently."
+                )
+            
+            new_table_schema = self._generate_table_schema(old_table_schema, 
changes)
+            try:
+                success = self.commit(new_table_schema)
+                if success:
+                    return new_table_schema
+            except Exception as e:
+                raise RuntimeError(f"Failed to commit schema changes: {e}") 
from e
+
+    def _generate_table_schema(
+        self, old_table_schema: TableSchema, changes: List[SchemaChange]
+    ) -> TableSchema:
+        new_options = dict(old_table_schema.options)
+        new_fields = []
+        for field in old_table_schema.fields:
+            from pypaimon.schema.data_types import DataTypeParser
+            field_type_dict = field.type.to_dict()
+            copied_type = DataTypeParser.parse_data_type(field_type_dict)
+            new_fields.append(DataField(
+                field.id,
+                field.name,
+                copied_type,
+                field.description,
+                field.default_value
+            ))
+        highest_field_id = AtomicInteger(old_table_schema.highest_field_id)
+        new_comment = old_table_schema.comment
+
+        for change in changes:
+            if isinstance(change, SetOption):
+                new_options[change.key] = change.value
+            elif isinstance(change, RemoveOption):
+                new_options.pop(change.key, None)
+            elif isinstance(change, UpdateComment):
+                new_comment = change.comment
+            elif isinstance(change, AddColumn):
+                _handle_add_column(change, new_fields, highest_field_id)
+            elif isinstance(change, RenameColumn):
+                _assert_not_updating_partition_keys(
+                    old_table_schema, change.field_names, "rename"
+                )
+                _handle_rename_column(change, new_fields)
+            elif isinstance(change, DropColumn):
+                _drop_column_validation(old_table_schema, change)
+                _handle_drop_column(change, new_fields)
+            elif isinstance(change, UpdateColumnType):
+                _assert_not_updating_partition_keys(
+                    old_table_schema, change.field_names, "update"
+                )
+                _assert_not_updating_primary_keys(
+                    old_table_schema, change.field_names, "update"
+                )
+                _handle_update_column_type(change, new_fields)
+            elif isinstance(change, UpdateColumnNullability):
+                if change.new_nullability:
+                    _assert_not_updating_primary_keys(
+                        old_table_schema, change.field_names, "change 
nullability of"
+                    )
+                _handle_update_column_nullability(change, new_fields)
+            elif isinstance(change, UpdateColumnComment):
+                _handle_update_column_comment(change, new_fields)
+            elif isinstance(change, UpdateColumnPosition):
+                _apply_move(new_fields, None, change.move)
+            else:
+                raise NotImplementedError(f"Unsupported change: 
{type(change)}")
+
+        rename_mappings = _get_rename_mappings(changes)
+        new_primary_keys = SchemaManager._apply_not_nested_column_rename(
+            old_table_schema.primary_keys, rename_mappings
+        )
+        new_options = 
SchemaManager._apply_rename_columns_to_options(new_options, rename_mappings)
+
+        new_schema = Schema(
+            fields=new_fields,
+            partition_keys=old_table_schema.partition_keys,
+            primary_keys=new_primary_keys,
+            options=new_options,
+            comment=new_comment
+        )
+
+        return TableSchema(
+            version=old_table_schema.version,
+            id=old_table_schema.id + 1,
+            fields=new_schema.fields,
+            highest_field_id=highest_field_id.get(),
+            partition_keys=new_schema.partition_keys,
+            primary_keys=new_schema.primary_keys,
+            options=new_schema.options,
+            comment=new_schema.comment
+        )
+
+    @staticmethod
+    def _apply_not_nested_column_rename(
+        columns: List[str], rename_mappings: dict
+    ) -> List[str]:
+        return [rename_mappings.get(col, col) for col in columns]
+
+    @staticmethod
+    def _apply_rename_columns_to_options(
+        options: dict, rename_mappings: dict
+    ) -> dict:
+        if not rename_mappings:
+            return options
+        new_options = dict(options)
+        from pypaimon.common.options.core_options import CoreOptions
+
+        bucket_key = CoreOptions.BUCKET_KEY.key()
+        if bucket_key in new_options:
+            bucket_columns = new_options[bucket_key].split(",")
+            new_bucket_columns = SchemaManager._apply_not_nested_column_rename(
+                bucket_columns, rename_mappings
+            )
+            new_options[bucket_key] = ",".join(new_bucket_columns)
+
+        sequence_field = "sequence.field"
+        if sequence_field in new_options:
+            sequence_fields = new_options[sequence_field].split(",")
+            new_sequence_fields = 
SchemaManager._apply_not_nested_column_rename(
+                sequence_fields, rename_mappings
+            )
+            new_options[sequence_field] = ",".join(new_sequence_fields)
+
+        return new_options
diff --git a/paimon-python/pypaimon/tests/filesystem_catalog_test.py 
b/paimon-python/pypaimon/tests/filesystem_catalog_test.py
index 530e33aa78..02c7102ab8 100644
--- a/paimon-python/pypaimon/tests/filesystem_catalog_test.py
+++ b/paimon-python/pypaimon/tests/filesystem_catalog_test.py
@@ -25,6 +25,7 @@ from pypaimon.catalog.catalog_exception import 
(DatabaseAlreadyExistException,
                                                 TableAlreadyExistException,
                                                 TableNotExistException)
 from pypaimon.schema.data_types import AtomicType, DataField
+from pypaimon.schema.schema_change import SchemaChange
 from pypaimon.table.file_store_table import FileStoreTable
 
 
@@ -85,3 +86,88 @@ class FileSystemCatalogTest(unittest.TestCase):
         self.assertEqual(table.fields[2].name, "f2")
         self.assertTrue(isinstance(table.fields[2].type, AtomicType))
         self.assertEqual(table.fields[2].type.type, "STRING")
+
+    def test_alter_table(self):
+        catalog = CatalogFactory.create({
+            "warehouse": self.warehouse
+        })
+        catalog.create_database("test_db", False)
+
+        identifier = "test_db.test_table"
+        schema = Schema(
+            fields=[
+                DataField.from_dict({"id": 0, "name": "col1", "type": 
"STRING", "description": "field1"}),
+                DataField.from_dict({"id": 1, "name": "col2", "type": 
"STRING", "description": "field2"})
+            ],
+            partition_keys=[],
+            primary_keys=[],
+            options={},
+            comment="comment"
+        )
+        catalog.create_table(identifier, schema, False)
+
+        catalog.alter_table(
+            identifier,
+            [SchemaChange.add_column("col3", AtomicType("DATE"))],
+            False
+        )
+        table = catalog.get_table(identifier)
+        self.assertEqual(len(table.fields), 3)
+        self.assertEqual(table.fields[2].name, "col3")
+        self.assertEqual(table.fields[2].type.type, "DATE")
+
+        catalog.alter_table(
+            identifier,
+            [SchemaChange.update_comment("new comment")],
+            False
+        )
+        table = catalog.get_table(identifier)
+        self.assertEqual(table.table_schema.comment, "new comment")
+
+        catalog.alter_table(
+            identifier,
+            [SchemaChange.rename_column("col1", "new_col1")],
+            False
+        )
+        table = catalog.get_table(identifier)
+        self.assertEqual(table.fields[0].name, "new_col1")
+
+        catalog.alter_table(
+            identifier,
+            [SchemaChange.update_column_type("col2", AtomicType("BIGINT"))],
+            False
+        )
+        table = catalog.get_table(identifier)
+        self.assertEqual(table.fields[1].type.type, "BIGINT")
+
+        catalog.alter_table(
+            identifier,
+            [SchemaChange.update_column_comment("col2", "col2 field")],
+            False
+        )
+        table = catalog.get_table(identifier)
+        self.assertEqual(table.fields[1].description, "col2 field")
+
+        catalog.alter_table(
+            identifier,
+            [SchemaChange.set_option("write-buffer-size", "256 MB")],
+            False
+        )
+        table = catalog.get_table(identifier)
+        self.assertEqual(table.table_schema.options.get("write-buffer-size"), 
"256 MB")
+
+        catalog.alter_table(
+            identifier,
+            [SchemaChange.remove_option("write-buffer-size")],
+            False
+        )
+        table = catalog.get_table(identifier)
+        self.assertNotIn("write-buffer-size", table.table_schema.options)
+
+        catalog.alter_table(
+            identifier,
+            [SchemaChange.drop_column("col3")],
+            False
+        )
+        table = catalog.get_table(identifier)
+        self.assertEqual(len(table.fields), 2)
diff --git a/paimon-python/pypaimon/tests/rest/rest_server.py 
b/paimon-python/pypaimon/tests/rest/rest_server.py
index b97d015dbe..e0bedeac1b 100755
--- a/paimon-python/pypaimon/tests/rest/rest_server.py
+++ b/paimon-python/pypaimon/tests/rest/rest_server.py
@@ -27,7 +27,7 @@ from pathlib import Path
 from typing import Any, Dict, List, Optional, Tuple, Union
 from urllib.parse import urlparse
 
-from pypaimon.api.api_request import (CreateDatabaseRequest,
+from pypaimon.api.api_request import (AlterTableRequest, CreateDatabaseRequest,
                                       CreateTableRequest, RenameTableRequest)
 from pypaimon.api.api_response import (ConfigResponse, GetDatabaseResponse,
                                        GetTableResponse, ListDatabasesResponse,
@@ -44,6 +44,8 @@ from pypaimon.catalog.rest.table_metadata import TableMetadata
 from pypaimon.common.identifier import Identifier
 from pypaimon.common.json_util import JSON
 from pypaimon import Schema
+from pypaimon.schema.schema_change import Actions, SchemaChange
+from pypaimon.schema.schema_manager import SchemaManager
 from pypaimon.schema.table_schema import TableSchema
 
 
@@ -91,6 +93,105 @@ FORMAT_TABLE = "FORMAT_TABLE"
 OBJECT_TABLE = "OBJECT_TABLE"
 
 
+def _dict_to_schema_change(change_dict: dict) -> SchemaChange:
+    from pypaimon.schema.schema_change import (
+        SetOption, RemoveOption, UpdateComment, AddColumn, RenameColumn,
+        DropColumn, UpdateColumnType, UpdateColumnNullability,
+        UpdateColumnComment, UpdateColumnDefaultValue, UpdateColumnPosition, 
Move, MoveType
+    )
+
+    action = change_dict.get(Actions.FIELD_ACTION)
+    if action == Actions.SET_OPTION_ACTION:
+        return SetOption(key=change_dict["key"], value=change_dict["value"])
+    elif action == Actions.REMOVE_OPTION_ACTION:
+        return RemoveOption(key=change_dict["key"])
+    elif action == Actions.UPDATE_COMMENT_ACTION:
+        return UpdateComment(comment=change_dict.get("comment"))
+    elif action == Actions.ADD_COLUMN_ACTION:
+        from pypaimon.schema.data_types import DataTypeParser
+        data_type_value = change_dict.get("dataType") or 
change_dict.get(AddColumn.FIELD_DATA_TYPE)
+        if data_type_value is None:
+            raise ValueError(f"Missing dataType field in AddColumn change: 
{change_dict}")
+        data_type = DataTypeParser.parse_data_type(data_type_value)
+        move = None
+        if "move" in change_dict and change_dict["move"] is not None:
+            move_dict = change_dict["move"]
+            if isinstance(move_dict, dict):
+                move_type_str = move_dict.get("type") or 
move_dict.get(Move.FIELD_TYPE)
+                if move_type_str is None:
+                    raise ValueError(f"Missing type field in Move: 
{move_dict}")
+                move_type = MoveType(move_type_str)
+                field_name = move_dict.get("fieldName") or 
move_dict.get(Move.FIELD_FIELD_NAME)
+                if field_name is None:
+                    raise ValueError(f"Missing fieldName field in Move: 
{move_dict}")
+                reference_field = (
+                    move_dict.get("referenceFieldName") or
+                    move_dict.get(Move.FIELD_REFERENCE_FIELD_NAME)
+                )
+                move = Move(
+                    field_name=field_name,
+                    reference_field_name=reference_field,
+                    type=move_type
+                )
+        field_names = change_dict.get("fieldNames") or 
change_dict.get(AddColumn.FIELD_FIELD_NAMES)
+        if field_names is None:
+            raise ValueError(f"Missing fieldNames field in AddColumn change: 
{change_dict}")
+        return AddColumn(
+            field_names=field_names,
+            data_type=data_type,
+            comment=change_dict.get("comment") or 
change_dict.get(AddColumn.FIELD_COMMENT),
+            move=move
+        )
+    elif action == Actions.RENAME_COLUMN_ACTION:
+        return RenameColumn(field_names=change_dict["fieldNames"], 
new_name=change_dict["newName"])
+    elif action == Actions.DROP_COLUMN_ACTION:
+        return DropColumn(field_names=change_dict["fieldNames"])
+    elif action == Actions.UPDATE_COLUMN_TYPE_ACTION:
+        from pypaimon.schema.data_types import DataTypeParser
+        new_type = DataTypeParser.parse_data_type(change_dict["newDataType"])
+        return UpdateColumnType(
+            field_names=change_dict["fieldNames"],
+            new_data_type=new_type,
+            keep_nullability=change_dict.get("keepNullability", False)
+        )
+    elif action == Actions.UPDATE_COLUMN_NULLABILITY_ACTION:
+        return UpdateColumnNullability(
+            field_names=change_dict["fieldNames"],
+            new_nullability=change_dict["newNullability"]
+        )
+    elif action == Actions.UPDATE_COLUMN_COMMENT_ACTION:
+        return UpdateColumnComment(
+            field_names=change_dict["fieldNames"],
+            new_comment=change_dict.get("newComment")
+        )
+    elif action == Actions.UPDATE_COLUMN_DEFAULT_VALUE_ACTION:
+        return UpdateColumnDefaultValue(
+            field_names=change_dict["fieldNames"],
+            new_default_value=change_dict["newDefaultValue"]
+        )
+    elif action == Actions.UPDATE_COLUMN_POSITION_ACTION:
+        move_dict = change_dict.get("move") or 
change_dict.get(UpdateColumnPosition.FIELD_MOVE)
+        if move_dict is None:
+            raise ValueError(f"Missing move field in UpdateColumnPosition 
change: {change_dict}")
+        if not isinstance(move_dict, dict):
+            raise ValueError(f"move field must be a dict in 
UpdateColumnPosition change: {change_dict}")
+        move_type_str = move_dict.get("type") or move_dict.get(Move.FIELD_TYPE)
+        if move_type_str is None:
+            raise ValueError(f"Missing type field in Move: {move_dict}")
+        move_type = MoveType(move_type_str)
+        field_name = move_dict.get("fieldName") or 
move_dict.get(Move.FIELD_FIELD_NAME)
+        if field_name is None:
+            raise ValueError(f"Missing fieldName field in Move: {move_dict}")
+        move = Move(
+            field_name=field_name,
+            reference_field_name=move_dict.get("referenceFieldName") or 
move_dict.get(Move.FIELD_REFERENCE_FIELD_NAME),
+            type=move_type
+        )
+        return UpdateColumnPosition(move=move)
+    else:
+        raise ValueError(f"Unknown schema change action: {action}")
+
+
 class RESTCatalogServer:
     """Mock REST server for testing"""
 
@@ -453,13 +554,11 @@ class RESTCatalogServer:
             schema = table_metadata.schema.to_schema()
             response = self.mock_table(identifier, table_metadata, table_path, 
schema)
             return self._mock_response(response, 200)
-        #
-        # elif method == "POST":
-        #     # Alter table
-        #     request_body = JSON.from_json(data, AlterTableRequest)
-        #     self._alter_table_impl(identifier, request_body.get_changes())
-        #     return self._mock_response("", 200)
-
+        elif method == "POST":
+            # Alter table
+            request_body = JSON.from_json(data, AlterTableRequest)
+            self._alter_table_impl(identifier, request_body.changes)
+            return self._mock_response("", 200)
         elif method == "DELETE":
             # Drop table
             if identifier.get_full_name() not in self.table_metadata_store:
@@ -665,6 +764,44 @@ class RESTCatalogServer:
 
         return '^' + ''.join(regex) + '$'
 
+    def _alter_table_impl(self, identifier: Identifier, changes: List) -> None:
+        if identifier.get_full_name() not in self.table_metadata_store:
+            raise TableNotExistException(identifier)
+
+        schema_changes = []
+        for change in changes:
+            if isinstance(change, dict):
+                try:
+                    schema_changes.append(_dict_to_schema_change(change))
+                except (KeyError, TypeError) as e:
+                    raise ValueError(f"Failed to convert change dict to 
SchemaChange: {change}, error: {e}") from e
+            else:
+                schema_changes.append(change)
+
+        table_metadata = self.table_metadata_store[identifier.get_full_name()]
+
+        table_path = (
+            Path(self.data_path) / self.warehouse /
+            identifier.get_database_name() / identifier.get_object_name()
+        )
+        schema_manager = SchemaManager(self._get_file_io(), str(table_path))
+        new_schema = schema_manager.commit_changes(schema_changes)
+
+        updated_metadata = TableMetadata(
+            schema=new_schema,
+            is_external=table_metadata.is_external,
+            uuid=table_metadata.uuid
+        )
+        self.table_metadata_store[identifier.get_full_name()] = 
updated_metadata
+
+    def _get_file_io(self):
+        """Get FileIO instance for SchemaManager"""
+        from pypaimon.common.file_io import FileIO
+        from pypaimon.common.options import Options
+        warehouse_path = str(Path(self.data_path) / self.warehouse)
+        options = Options({"warehouse": warehouse_path})
+        return FileIO(warehouse_path, options)
+
     def _create_table_metadata(self, identifier: Identifier, schema_id: int,
                                schema: Schema, uuid_str: str, is_external: 
bool) -> TableMetadata:
         """Create table metadata"""
diff --git a/paimon-python/pypaimon/tests/rest/rest_simple_test.py 
b/paimon-python/pypaimon/tests/rest/rest_simple_test.py
index c5be4ad473..2adce40496 100644
--- a/paimon-python/pypaimon/tests/rest/rest_simple_test.py
+++ b/paimon-python/pypaimon/tests/rest/rest_simple_test.py
@@ -21,6 +21,8 @@ import pyarrow as pa
 from pypaimon import Schema
 from pypaimon.catalog.catalog_exception import DatabaseAlreadyExistException, 
TableAlreadyExistException, \
     DatabaseNotExistException, TableNotExistException
+from pypaimon.schema.data_types import AtomicType, DataField
+from pypaimon.schema.schema_change import SchemaChange
 from pypaimon.tests.rest.rest_base_test import RESTBaseTest
 from pypaimon.write.row_key_extractor import FixedBucketRowKeyExtractor, 
DynamicBucketRowKeyExtractor, \
     UnawareBucketRowKeyExtractor
@@ -710,3 +712,91 @@ class RESTSimpleTest(RESTBaseTest):
             self.rest_catalog.drop_database("db1", True)
         except DatabaseNotExistException:
             self.fail("drop_database with ignore_if_not_exists=True should not 
raise DatabaseNotExistException")
+
+    def test_alter_table(self):
+        catalog = self.rest_catalog
+        catalog.create_database("test_db_alter", True)
+
+        identifier = "test_db_alter.test_table"
+        schema = Schema(
+            fields=[
+                DataField.from_dict({"id": 0, "name": "col1", "type": 
"STRING", "description": "field1"}),
+                DataField.from_dict({"id": 1, "name": "col2", "type": 
"STRING", "description": "field2"})
+            ],
+            partition_keys=[],
+            primary_keys=[],
+            options={},
+            comment="comment"
+        )
+        catalog.create_table(identifier, schema, False)
+
+        catalog.alter_table(
+            identifier,
+            [SchemaChange.add_column("col3", AtomicType("DATE"))],
+            False
+        )
+        table = catalog.get_table(identifier)
+        self.assertEqual(len(table.fields), 3)
+        self.assertEqual(table.fields[2].name, "col3")
+        self.assertEqual(table.fields[2].type.type, "DATE")
+
+        catalog.alter_table(
+            identifier,
+            [SchemaChange.update_comment("new comment")],
+            False
+        )
+        table = catalog.get_table(identifier)
+        self.assertEqual(table.table_schema.comment, "new comment")
+
+        catalog.alter_table(
+            identifier,
+            [SchemaChange.rename_column("col1", "new_col1")],
+            False
+        )
+        table = catalog.get_table(identifier)
+        self.assertEqual(table.fields[0].name, "new_col1")
+
+        catalog.alter_table(
+            identifier,
+            [SchemaChange.update_column_type("col2", AtomicType("BIGINT"))],
+            False
+        )
+        table = catalog.get_table(identifier)
+        self.assertEqual(table.fields[1].type.type, "BIGINT")
+
+        catalog.alter_table(
+            identifier,
+            [SchemaChange.update_column_comment("col2", "col2 field")],
+            False
+        )
+        table = catalog.get_table(identifier)
+        self.assertEqual(table.fields[1].description, "col2 field")
+
+        catalog.alter_table(
+            identifier,
+            [SchemaChange.set_option("write-buffer-size", "256 MB")],
+            False
+        )
+        table = catalog.get_table(identifier)
+        self.assertEqual(table.table_schema.options.get("write-buffer-size"), 
"256 MB")
+
+        catalog.alter_table(
+            identifier,
+            [SchemaChange.remove_option("write-buffer-size")],
+            False
+        )
+        table = catalog.get_table(identifier)
+        self.assertNotIn("write-buffer-size", table.table_schema.options)
+
+        with self.assertRaises(TableNotExistException):
+            catalog.alter_table(
+                "test_db_alter.non_existing_table",
+                [SchemaChange.add_column("col2", AtomicType("INT"))],
+                False
+            )
+
+        catalog.alter_table(
+            "test_db_alter.non_existing_table",
+            [SchemaChange.add_column("col2", AtomicType("INT"))],
+            True
+        )

Reply via email to