This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new e718473dd8 [python] support alterTable (#6952)
e718473dd8 is described below
commit e718473dd830a1636654d0feb83053ed08979c3a
Author: XiaoHongbo <[email protected]>
AuthorDate: Mon Jan 5 19:31:26 2026 +0800
[python] support alterTable (#6952)
---
paimon-python/pypaimon/api/api_request.py | 8 +
paimon-python/pypaimon/api/rest_api.py | 13 +-
paimon-python/pypaimon/catalog/catalog.py | 10 +
.../pypaimon/catalog/filesystem_catalog.py | 33 ++-
.../pypaimon/catalog/rest/rest_catalog.py | 15 +
paimon-python/pypaimon/schema/schema_change.py | 289 +++++++++++++++++++
paimon-python/pypaimon/schema/schema_manager.py | 307 +++++++++++++++++++++
.../pypaimon/tests/filesystem_catalog_test.py | 86 ++++++
paimon-python/pypaimon/tests/rest/rest_server.py | 153 +++++++++-
.../pypaimon/tests/rest/rest_simple_test.py | 90 ++++++
10 files changed, 991 insertions(+), 13 deletions(-)
diff --git a/paimon-python/pypaimon/api/api_request.py
b/paimon-python/pypaimon/api/api_request.py
index d453250757..f2d062f8d0 100644
--- a/paimon-python/pypaimon/api/api_request.py
+++ b/paimon-python/pypaimon/api/api_request.py
@@ -23,6 +23,7 @@ from typing import Dict, List, Optional
from pypaimon.common.identifier import Identifier
from pypaimon.common.json_util import json_field
from pypaimon.schema.schema import Schema
+from pypaimon.schema.schema_change import SchemaChange
from pypaimon.snapshot.snapshot import Snapshot
from pypaimon.snapshot.snapshot_commit import PartitionStatistics
@@ -76,3 +77,10 @@ class CommitTableRequest(RESTRequest):
table_uuid: Optional[str] = json_field(FIELD_TABLE_UUID)
snapshot: Snapshot = json_field(FIELD_SNAPSHOT)
statistics: List[PartitionStatistics] = json_field(FIELD_STATISTICS)
+
+
+@dataclass
+class AlterTableRequest(RESTRequest):
+ FIELD_CHANGES = "changes"
+
+ changes: List[SchemaChange] = json_field(FIELD_CHANGES)
diff --git a/paimon-python/pypaimon/api/rest_api.py
b/paimon-python/pypaimon/api/rest_api.py
index 25165916a6..806831d954 100755
--- a/paimon-python/pypaimon/api/rest_api.py
+++ b/paimon-python/pypaimon/api/rest_api.py
@@ -18,7 +18,7 @@
import logging
from typing import Callable, Dict, List, Optional, Union
-from pypaimon.api.api_request import (AlterDatabaseRequest, CommitTableRequest,
+from pypaimon.api.api_request import (AlterDatabaseRequest, AlterTableRequest,
CommitTableRequest,
CreateDatabaseRequest,
CreateTableRequest, RenameTableRequest)
from pypaimon.api.api_response import (CommitTableResponse, ConfigResponse,
@@ -289,6 +289,17 @@ class RESTApi:
request,
self.rest_auth_function)
+ def alter_table(self, identifier: Identifier, changes: List):
+ database_name, table_name = self.__validate_identifier(identifier)
+ if not changes:
+ raise ValueError("Changes cannot be empty")
+
+ request = AlterTableRequest(changes)
+ return self.client.post(
+ self.resource_paths.table(database_name, table_name),
+ request,
+ self.rest_auth_function)
+
def load_table_token(self, identifier: Identifier) ->
GetTableTokenResponse:
database_name, table_name = self.__validate_identifier(identifier)
diff --git a/paimon-python/pypaimon/catalog/catalog.py
b/paimon-python/pypaimon/catalog/catalog.py
index a8e7dcd065..fa19355b0b 100644
--- a/paimon-python/pypaimon/catalog/catalog.py
+++ b/paimon-python/pypaimon/catalog/catalog.py
@@ -21,6 +21,7 @@ from typing import List, Optional, Union
from pypaimon.common.identifier import Identifier
from pypaimon.schema.schema import Schema
+from pypaimon.schema.schema_change import SchemaChange
from pypaimon.snapshot.snapshot import Snapshot
from pypaimon.snapshot.snapshot_commit import PartitionStatistics
@@ -54,6 +55,15 @@ class Catalog(ABC):
def create_table(self, identifier: Union[str, Identifier], schema: Schema,
ignore_if_exists: bool):
"""Create table with schema."""
+ @abstractmethod
+ def alter_table(
+ self,
+ identifier: Union[str, Identifier],
+ changes: List[SchemaChange],
+ ignore_if_not_exists: bool = False
+ ):
+ """Alter table with schema changes."""
+
def supports_version_management(self) -> bool:
"""
Whether this catalog supports version management for tables.
diff --git a/paimon-python/pypaimon/catalog/filesystem_catalog.py
b/paimon-python/pypaimon/catalog/filesystem_catalog.py
index b2eced586e..8d1b485b74 100644
--- a/paimon-python/pypaimon/catalog/filesystem_catalog.py
+++ b/paimon-python/pypaimon/catalog/filesystem_catalog.py
@@ -20,16 +20,19 @@ from typing import List, Optional, Union
from pypaimon.catalog.catalog import Catalog
from pypaimon.catalog.catalog_environment import CatalogEnvironment
-from pypaimon.catalog.catalog_exception import (DatabaseAlreadyExistException,
- DatabaseNotExistException,
- TableAlreadyExistException,
- TableNotExistException)
+from pypaimon.catalog.catalog_exception import (
+ DatabaseAlreadyExistException,
+ DatabaseNotExistException,
+ TableAlreadyExistException,
+ TableNotExistException
+)
from pypaimon.catalog.database import Database
from pypaimon.common.options import Options
from pypaimon.common.options.config import CatalogOptions
from pypaimon.common.options.core_options import CoreOptions
from pypaimon.common.file_io import FileIO
from pypaimon.common.identifier import Identifier
+from pypaimon.schema.schema_change import SchemaChange
from pypaimon.schema.schema_manager import SchemaManager
from pypaimon.snapshot.snapshot import Snapshot
from pypaimon.snapshot.snapshot_commit import PartitionStatistics
@@ -115,6 +118,28 @@ class FileSystemCatalog(Catalog):
db_path = self.get_database_path(identifier.get_database_name())
return f"{db_path}/{identifier.get_table_name()}"
+ def alter_table(
+ self,
+ identifier: Union[str, Identifier],
+ changes: List[SchemaChange],
+ ignore_if_not_exists: bool = False
+ ):
+ if not isinstance(identifier, Identifier):
+ identifier = Identifier.from_string(identifier)
+ try:
+ self.get_table(identifier)
+ except TableNotExistException:
+ if not ignore_if_not_exists:
+ raise
+ return
+
+ table_path = self.get_table_path(identifier)
+ schema_manager = SchemaManager(self.file_io, table_path)
+ try:
+ schema_manager.commit_changes(changes)
+ except Exception as e:
+ raise RuntimeError(f"Failed to alter table
{identifier.get_full_name()}: {e}") from e
+
def commit_snapshot(
self,
identifier: Identifier,
diff --git a/paimon-python/pypaimon/catalog/rest/rest_catalog.py
b/paimon-python/pypaimon/catalog/rest/rest_catalog.py
index c32c5bcb39..5a0ca01ce4 100644
--- a/paimon-python/pypaimon/catalog/rest/rest_catalog.py
+++ b/paimon-python/pypaimon/catalog/rest/rest_catalog.py
@@ -34,6 +34,7 @@ from pypaimon.common.options.core_options import CoreOptions
from pypaimon.common.file_io import FileIO
from pypaimon.common.identifier import Identifier
from pypaimon.schema.schema import Schema
+from pypaimon.schema.schema_change import SchemaChange
from pypaimon.schema.table_schema import TableSchema
from pypaimon.snapshot.snapshot import Snapshot
from pypaimon.snapshot.snapshot_commit import PartitionStatistics
@@ -177,6 +178,20 @@ class RESTCatalog(Catalog):
if not ignore_if_not_exists:
raise TableNotExistException(identifier) from e
+ def alter_table(
+ self,
+ identifier: Union[str, Identifier],
+ changes: List[SchemaChange],
+ ignore_if_not_exists: bool = False
+ ):
+ if not isinstance(identifier, Identifier):
+ identifier = Identifier.from_string(identifier)
+ try:
+ self.rest_api.alter_table(identifier, changes)
+ except NoSuchResourceException as e:
+ if not ignore_if_not_exists:
+ raise TableNotExistException(identifier) from e
+
def load_table_metadata(self, identifier: Identifier) -> TableMetadata:
response = self.rest_api.get_table(identifier)
return self.to_table_metadata(identifier.get_database_name(), response)
diff --git a/paimon-python/pypaimon/schema/schema_change.py
b/paimon-python/pypaimon/schema/schema_change.py
new file mode 100644
index 0000000000..8e62e7f5bb
--- /dev/null
+++ b/paimon-python/pypaimon/schema/schema_change.py
@@ -0,0 +1,289 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from abc import ABC
+from dataclasses import dataclass
+from enum import Enum
+from typing import List, Optional, Union
+
+from pypaimon.common.json_util import json_field
+from pypaimon.schema.data_types import DataType
+
+
+class Actions:
+ FIELD_ACTION = "action"
+ SET_OPTION_ACTION = "setOption"
+ REMOVE_OPTION_ACTION = "removeOption"
+ UPDATE_COMMENT_ACTION = "updateComment"
+ ADD_COLUMN_ACTION = "addColumn"
+ RENAME_COLUMN_ACTION = "renameColumn"
+ DROP_COLUMN_ACTION = "dropColumn"
+ UPDATE_COLUMN_TYPE_ACTION = "updateColumnType"
+ UPDATE_COLUMN_NULLABILITY_ACTION = "updateColumnNullability"
+ UPDATE_COLUMN_COMMENT_ACTION = "updateColumnComment"
+ UPDATE_COLUMN_DEFAULT_VALUE_ACTION = "updateColumnDefaultValue"
+ UPDATE_COLUMN_POSITION_ACTION = "updateColumnPosition"
+
+
+class SchemaChange(ABC):
+ @staticmethod
+ def set_option(key: str, value: str) -> "SetOption":
+ return SetOption(key=key, value=value)
+
+ @staticmethod
+ def remove_option(key: str) -> "RemoveOption":
+ return RemoveOption(key)
+
+ @staticmethod
+ def update_comment(comment: Optional[str]) -> "UpdateComment":
+ return UpdateComment(comment)
+
+ @staticmethod
+ def add_column(
+ field_name: Union[str, List[str]],
+ data_type: DataType,
+ comment: Optional[str] = None,
+ move: Optional["Move"] = None
+ ) -> "AddColumn":
+ if isinstance(field_name, str):
+ return AddColumn(field_names=[field_name], data_type=data_type,
comment=comment, move=move)
+ else:
+ return AddColumn(field_names=field_name, data_type=data_type,
comment=comment, move=move)
+
+ @staticmethod
+ def rename_column(field_name: Union[str, List[str]], new_name: str) ->
"RenameColumn":
+ if isinstance(field_name, str):
+ return RenameColumn([field_name], new_name)
+ else:
+ return RenameColumn(field_name, new_name)
+
+ @staticmethod
+ def drop_column(field_name: Union[str, List[str]]) -> "DropColumn":
+ if isinstance(field_name, str):
+ return DropColumn([field_name])
+ else:
+ return DropColumn(field_name)
+
+ @staticmethod
+ def update_column_type(
+ field_name: Union[str, List[str]],
+ new_data_type: DataType,
+ keep_nullability: bool = False
+ ) -> "UpdateColumnType":
+ if isinstance(field_name, str):
+ return UpdateColumnType([field_name], new_data_type,
keep_nullability)
+ else:
+ return UpdateColumnType(field_name, new_data_type,
keep_nullability)
+
+ @staticmethod
+ def update_column_nullability(
+ field_name: Union[str, List[str]],
+ new_nullability: bool
+ ) -> "UpdateColumnNullability":
+ if isinstance(field_name, str):
+ return UpdateColumnNullability([field_name], new_nullability)
+ else:
+ return UpdateColumnNullability(field_name, new_nullability)
+
+ @staticmethod
+ def update_column_comment(field_name: Union[str, List[str]], comment: str)
-> "UpdateColumnComment":
+ if isinstance(field_name, str):
+ return UpdateColumnComment([field_name], comment)
+ else:
+ return UpdateColumnComment(field_name, comment)
+
+ @staticmethod
+ def update_column_default_value(field_names: List[str], default_value:
str) -> "UpdateColumnDefaultValue":
+ return UpdateColumnDefaultValue(field_names, default_value)
+
+ @staticmethod
+ def update_column_position(move: "Move") -> "UpdateColumnPosition":
+ return UpdateColumnPosition(move)
+
+
+@dataclass
+class SetOption(SchemaChange):
+ FIELD_KEY = "key"
+ FIELD_VALUE = "value"
+ key: str = json_field(FIELD_KEY)
+ value: str = json_field(FIELD_VALUE)
+ action: str = json_field(Actions.FIELD_ACTION,
default=Actions.SET_OPTION_ACTION)
+
+ def __post_init__(self):
+ if not hasattr(self, 'action') or self.action is None:
+ self.action = Actions.SET_OPTION_ACTION
+
+
+@dataclass
+class RemoveOption(SchemaChange):
+ FIELD_KEY = "key"
+ key: str = json_field(FIELD_KEY)
+ action: str = json_field(Actions.FIELD_ACTION,
default=Actions.REMOVE_OPTION_ACTION)
+
+ def __post_init__(self):
+ if not hasattr(self, 'action') or self.action is None:
+ self.action = Actions.REMOVE_OPTION_ACTION
+
+
+@dataclass
+class UpdateComment(SchemaChange):
+ FIELD_COMMENT = "comment"
+ comment: Optional[str] = json_field(FIELD_COMMENT)
+ action: str = json_field(Actions.FIELD_ACTION,
default=Actions.UPDATE_COMMENT_ACTION)
+
+ def __post_init__(self):
+ if not hasattr(self, 'action') or self.action is None:
+ self.action = Actions.UPDATE_COMMENT_ACTION
+
+
+@dataclass
+class AddColumn(SchemaChange):
+ FIELD_FIELD_NAMES = "fieldNames"
+ FIELD_DATA_TYPE = "dataType"
+ FIELD_COMMENT = "comment"
+ FIELD_MOVE = "move"
+ field_names: List[str] = json_field(FIELD_FIELD_NAMES)
+ data_type: DataType = json_field(FIELD_DATA_TYPE)
+ comment: Optional[str] = json_field(FIELD_COMMENT)
+ move: Optional["Move"] = json_field(FIELD_MOVE)
+ action: str = json_field(Actions.FIELD_ACTION,
default=Actions.ADD_COLUMN_ACTION)
+
+ def __post_init__(self):
+ if not hasattr(self, 'action') or self.action is None:
+ self.action = Actions.ADD_COLUMN_ACTION
+
+
+@dataclass
+class RenameColumn(SchemaChange):
+ FIELD_FIELD_NAMES = "fieldNames"
+ FIELD_NEW_NAME = "newName"
+ field_names: List[str] = json_field(FIELD_FIELD_NAMES)
+ new_name: str = json_field(FIELD_NEW_NAME)
+ action: str = json_field(Actions.FIELD_ACTION,
default=Actions.RENAME_COLUMN_ACTION)
+
+ def __post_init__(self):
+ if not hasattr(self, 'action') or self.action is None:
+ self.action = Actions.RENAME_COLUMN_ACTION
+
+
+@dataclass
+class DropColumn(SchemaChange):
+ FIELD_FIELD_NAMES = "fieldNames"
+ field_names: List[str] = json_field(FIELD_FIELD_NAMES)
+ action: str = json_field(Actions.FIELD_ACTION,
default=Actions.DROP_COLUMN_ACTION)
+
+ def __post_init__(self):
+ if not hasattr(self, 'action') or self.action is None:
+ self.action = Actions.DROP_COLUMN_ACTION
+
+
+@dataclass
+class UpdateColumnType(SchemaChange):
+ FIELD_FIELD_NAMES = "fieldNames"
+ FIELD_NEW_DATA_TYPE = "newDataType"
+ FIELD_KEEP_NULLABILITY = "keepNullability"
+ field_names: List[str] = json_field(FIELD_FIELD_NAMES)
+ new_data_type: DataType = json_field(FIELD_NEW_DATA_TYPE)
+ keep_nullability: bool = json_field(FIELD_KEEP_NULLABILITY)
+ action: str = json_field(Actions.FIELD_ACTION,
default=Actions.UPDATE_COLUMN_TYPE_ACTION)
+
+ def __post_init__(self):
+ if not hasattr(self, 'action') or self.action is None:
+ self.action = Actions.UPDATE_COLUMN_TYPE_ACTION
+
+
+@dataclass
+class UpdateColumnNullability(SchemaChange):
+ FIELD_FIELD_NAMES = "fieldNames"
+ FIELD_NEW_NULLABILITY = "newNullability"
+ field_names: List[str] = json_field(FIELD_FIELD_NAMES)
+ new_nullability: bool = json_field(FIELD_NEW_NULLABILITY)
+ action: str = json_field(Actions.FIELD_ACTION,
default=Actions.UPDATE_COLUMN_NULLABILITY_ACTION)
+
+ def __post_init__(self):
+ if not hasattr(self, 'action') or self.action is None:
+ self.action = Actions.UPDATE_COLUMN_NULLABILITY_ACTION
+
+
+@dataclass
+class UpdateColumnComment(SchemaChange):
+ FIELD_FIELD_NAMES = "fieldNames"
+ FIELD_NEW_COMMENT = "newComment"
+ field_names: List[str] = json_field(FIELD_FIELD_NAMES)
+ new_comment: str = json_field(FIELD_NEW_COMMENT)
+ action: str = json_field(Actions.FIELD_ACTION,
default=Actions.UPDATE_COLUMN_COMMENT_ACTION)
+
+ def __post_init__(self):
+ if not hasattr(self, 'action') or self.action is None:
+ self.action = Actions.UPDATE_COLUMN_COMMENT_ACTION
+
+
+@dataclass
+class UpdateColumnDefaultValue(SchemaChange):
+ FIELD_FIELD_NAMES = "fieldNames"
+ FIELD_NEW_DEFAULT_VALUE = "newDefaultValue"
+ field_names: List[str] = json_field(FIELD_FIELD_NAMES)
+ new_default_value: str = json_field(FIELD_NEW_DEFAULT_VALUE)
+ action: str = json_field(Actions.FIELD_ACTION,
default=Actions.UPDATE_COLUMN_DEFAULT_VALUE_ACTION)
+
+ def __post_init__(self):
+ if not hasattr(self, 'action') or self.action is None:
+ self.action = Actions.UPDATE_COLUMN_DEFAULT_VALUE_ACTION
+
+
+@dataclass
+class UpdateColumnPosition(SchemaChange):
+ FIELD_MOVE = "move"
+ move: "Move" = json_field(FIELD_MOVE)
+ action: str = json_field(Actions.FIELD_ACTION,
default=Actions.UPDATE_COLUMN_POSITION_ACTION)
+
+ def __post_init__(self):
+ if not hasattr(self, 'action') or self.action is None:
+ self.action = Actions.UPDATE_COLUMN_POSITION_ACTION
+
+
+class MoveType(Enum):
+ FIRST = "FIRST"
+ AFTER = "AFTER"
+ BEFORE = "BEFORE"
+ LAST = "LAST"
+
+
+@dataclass
+class Move:
+ FIELD_FIELD_NAME = "fieldName"
+ FIELD_REFERENCE_FIELD_NAME = "referenceFieldName"
+ FIELD_TYPE = "type"
+
+ @staticmethod
+ def first(field_name: str) -> "Move":
+ return Move(field_name, None, MoveType.FIRST)
+
+ @staticmethod
+ def after(field_name: str, reference_field_name: str) -> "Move":
+ return Move(field_name, reference_field_name, MoveType.AFTER)
+
+ @staticmethod
+ def before(field_name: str, reference_field_name: str) -> "Move":
+ return Move(field_name, reference_field_name, MoveType.BEFORE)
+
+ @staticmethod
+ def last(field_name: str) -> "Move":
+ return Move(field_name, None, MoveType.LAST)
+
+ field_name: str = json_field(FIELD_FIELD_NAME)
+ reference_field_name: Optional[str] =
json_field(FIELD_REFERENCE_FIELD_NAME)
+ type: MoveType = json_field(FIELD_TYPE)
diff --git a/paimon-python/pypaimon/schema/schema_manager.py
b/paimon-python/pypaimon/schema/schema_manager.py
index 86f0f00c35..c8f007ba08 100644
--- a/paimon-python/pypaimon/schema/schema_manager.py
+++ b/paimon-python/pypaimon/schema/schema_manager.py
@@ -17,12 +17,187 @@
################################################################################
from typing import Optional, List
+from pypaimon.catalog.catalog_exception import ColumnAlreadyExistException,
ColumnNotExistException
from pypaimon.common.file_io import FileIO
from pypaimon.common.json_util import JSON
+from pypaimon.schema.data_types import AtomicInteger, DataField
from pypaimon.schema.schema import Schema
+from pypaimon.schema.schema_change import (
+ AddColumn, DropColumn, RemoveOption, RenameColumn,
+ SchemaChange, SetOption, UpdateColumnComment,
+ UpdateColumnNullability, UpdateColumnPosition,
+ UpdateColumnType, UpdateComment
+)
from pypaimon.schema.table_schema import TableSchema
+def _find_field_index(fields: List[DataField], field_name: str) ->
Optional[int]:
+ for i, field in enumerate(fields):
+ if field.name == field_name:
+ return i
+ return None
+
+
+def _get_rename_mappings(changes: List[SchemaChange]) -> dict:
+ rename_mappings = {}
+ for change in changes:
+ if isinstance(change, RenameColumn) and len(change.field_names) == 1:
+ rename_mappings[change.field_names[0]] = change.new_name
+ return rename_mappings
+
+
+def _handle_update_column_comment(
+ change: UpdateColumnComment, new_fields: List[DataField]
+):
+ field_name = change.field_names[-1]
+ field_index = _find_field_index(new_fields, field_name)
+ if field_index is None:
+ raise ColumnNotExistException(field_name)
+ field = new_fields[field_index]
+ new_fields[field_index] = DataField(
+ field.id, field.name, field.type, change.new_comment,
field.default_value
+ )
+
+
+def _handle_update_column_nullability(
+ change: UpdateColumnNullability, new_fields: List[DataField]
+):
+ field_name = change.field_names[-1]
+ field_index = _find_field_index(new_fields, field_name)
+ if field_index is None:
+ raise ColumnNotExistException(field_name)
+ field = new_fields[field_index]
+ from pypaimon.schema.data_types import DataTypeParser
+ field_type_dict = field.type.to_dict()
+ new_type = DataTypeParser.parse_data_type(field_type_dict)
+ new_type.nullable = change.new_nullability
+ new_fields[field_index] = DataField(
+ field.id, field.name, new_type, field.description, field.default_value
+ )
+
+
+def _handle_update_column_type(
+ change: UpdateColumnType, new_fields: List[DataField]
+):
+ field_name = change.field_names[-1]
+ field_index = _find_field_index(new_fields, field_name)
+ if field_index is None:
+ raise ColumnNotExistException(field_name)
+ field = new_fields[field_index]
+ from pypaimon.schema.data_types import DataTypeParser
+ new_type_dict = change.new_data_type.to_dict()
+ new_type = DataTypeParser.parse_data_type(new_type_dict)
+ if change.keep_nullability:
+ new_type.nullable = field.type.nullable
+ new_fields[field_index] = DataField(
+ field.id, field.name, new_type, field.description, field.default_value
+ )
+
+
+def _drop_column_validation(schema: 'TableSchema', change: DropColumn):
+ if len(change.field_names) > 1:
+ return
+ column_to_drop = change.field_names[0]
+ if column_to_drop in schema.partition_keys or column_to_drop in
schema.primary_keys:
+ raise ValueError(
+ f"Cannot drop partition key or primary key: [{column_to_drop}]"
+ )
+
+
+def _handle_drop_column(change: DropColumn, new_fields: List[DataField]):
+ field_name = change.field_names[-1]
+ field_index = _find_field_index(new_fields, field_name)
+ if field_index is None:
+ raise ColumnNotExistException(field_name)
+ new_fields.pop(field_index)
+ if not new_fields:
+ raise ValueError("Cannot drop all fields in table")
+
+
+def _assert_not_updating_partition_keys(
+ schema: 'TableSchema', field_names: List[str], operation: str):
+ if len(field_names) > 1:
+ return
+ field_name = field_names[0]
+ if field_name in schema.partition_keys:
+ raise ValueError(
+ f"Cannot {operation} partition column: [{field_name}]"
+ )
+
+
+def _assert_not_updating_primary_keys(
+ schema: 'TableSchema', field_names: List[str], operation: str):
+ if len(field_names) > 1:
+ return
+ field_name = field_names[0]
+ if field_name in schema.primary_keys:
+ raise ValueError(f"Cannot {operation} primary key")
+
+
+def _handle_rename_column(change: RenameColumn, new_fields: List[DataField]):
+ field_name = change.field_names[-1]
+ new_name = change.new_name
+ field_index = _find_field_index(new_fields, field_name)
+ if field_index is None:
+ raise ColumnNotExistException(field_name)
+ if _find_field_index(new_fields, new_name) is not None:
+ raise ColumnAlreadyExistException(new_name)
+ field = new_fields[field_index]
+ new_fields[field_index] = DataField(
+ field.id, new_name, field.type, field.description, field.default_value
+ )
+
+
+def _apply_move(fields: List[DataField], new_field: Optional[DataField], move):
+ from pypaimon.schema.schema_change import MoveType
+
+ if new_field:
+ pass
+ else:
+ field_name = move.field_name
+ new_field = None
+ for i, field in enumerate(fields):
+ if field.name == field_name:
+ new_field = fields.pop(i)
+ break
+ if new_field is None:
+ raise ColumnNotExistException(field_name)
+
+ field_map = {field.name: i for i, field in enumerate(fields)}
+ if move.type == MoveType.FIRST:
+ fields.insert(0, new_field)
+ elif move.type == MoveType.AFTER:
+ if move.reference_field_name not in field_map:
+ raise ColumnNotExistException(move.reference_field_name)
+ fields.insert(field_map[move.reference_field_name] + 1, new_field)
+ elif move.type == MoveType.BEFORE:
+ if move.reference_field_name not in field_map:
+ raise ColumnNotExistException(move.reference_field_name)
+ fields.insert(field_map[move.reference_field_name], new_field)
+ elif move.type == MoveType.LAST:
+ fields.append(new_field)
+ else:
+ raise ValueError(f"Unsupported move type: {move.type}")
+
+
+def _handle_add_column(
+ change: AddColumn, new_fields: List[DataField], highest_field_id:
AtomicInteger
+):
+ if not change.data_type.nullable:
+ raise ValueError(
+ f"Column {'.'.join(change.field_names)} cannot specify NOT NULL in
the table."
+ )
+ field_id = highest_field_id.increment_and_get()
+ field_name = change.field_names[-1]
+ if _find_field_index(new_fields, field_name) is not None:
+ raise ColumnAlreadyExistException(field_name)
+ new_field = DataField(field_id, field_name, change.data_type,
change.comment)
+ if change.move:
+ _apply_move(new_fields, new_field, change.move)
+ else:
+ new_fields.append(new_field)
+
+
class SchemaManager:
def __init__(self, file_io: FileIO, table_path: str):
@@ -94,3 +269,135 @@ class SchemaManager:
except ValueError:
continue
return versions
+
+ def commit_changes(self, changes: List[SchemaChange]) -> TableSchema:
+ while True:
+ old_table_schema = self.latest()
+ if old_table_schema is None:
+ raise RuntimeError(
+ f"Table schema does not exist at path: {self.table_path}. "
+ "This may happen if the table was deleted concurrently."
+ )
+
+ new_table_schema = self._generate_table_schema(old_table_schema,
changes)
+ try:
+ success = self.commit(new_table_schema)
+ if success:
+ return new_table_schema
+ except Exception as e:
+ raise RuntimeError(f"Failed to commit schema changes: {e}")
from e
+
+ def _generate_table_schema(
+ self, old_table_schema: TableSchema, changes: List[SchemaChange]
+ ) -> TableSchema:
+ new_options = dict(old_table_schema.options)
+ new_fields = []
+ for field in old_table_schema.fields:
+ from pypaimon.schema.data_types import DataTypeParser
+ field_type_dict = field.type.to_dict()
+ copied_type = DataTypeParser.parse_data_type(field_type_dict)
+ new_fields.append(DataField(
+ field.id,
+ field.name,
+ copied_type,
+ field.description,
+ field.default_value
+ ))
+ highest_field_id = AtomicInteger(old_table_schema.highest_field_id)
+ new_comment = old_table_schema.comment
+
+ for change in changes:
+ if isinstance(change, SetOption):
+ new_options[change.key] = change.value
+ elif isinstance(change, RemoveOption):
+ new_options.pop(change.key, None)
+ elif isinstance(change, UpdateComment):
+ new_comment = change.comment
+ elif isinstance(change, AddColumn):
+ _handle_add_column(change, new_fields, highest_field_id)
+ elif isinstance(change, RenameColumn):
+ _assert_not_updating_partition_keys(
+ old_table_schema, change.field_names, "rename"
+ )
+ _handle_rename_column(change, new_fields)
+ elif isinstance(change, DropColumn):
+ _drop_column_validation(old_table_schema, change)
+ _handle_drop_column(change, new_fields)
+ elif isinstance(change, UpdateColumnType):
+ _assert_not_updating_partition_keys(
+ old_table_schema, change.field_names, "update"
+ )
+ _assert_not_updating_primary_keys(
+ old_table_schema, change.field_names, "update"
+ )
+ _handle_update_column_type(change, new_fields)
+ elif isinstance(change, UpdateColumnNullability):
+ if change.new_nullability:
+ _assert_not_updating_primary_keys(
+ old_table_schema, change.field_names, "change
nullability of"
+ )
+ _handle_update_column_nullability(change, new_fields)
+ elif isinstance(change, UpdateColumnComment):
+ _handle_update_column_comment(change, new_fields)
+ elif isinstance(change, UpdateColumnPosition):
+ _apply_move(new_fields, None, change.move)
+ else:
+ raise NotImplementedError(f"Unsupported change:
{type(change)}")
+
+ rename_mappings = _get_rename_mappings(changes)
+ new_primary_keys = SchemaManager._apply_not_nested_column_rename(
+ old_table_schema.primary_keys, rename_mappings
+ )
+ new_options =
SchemaManager._apply_rename_columns_to_options(new_options, rename_mappings)
+
+ new_schema = Schema(
+ fields=new_fields,
+ partition_keys=old_table_schema.partition_keys,
+ primary_keys=new_primary_keys,
+ options=new_options,
+ comment=new_comment
+ )
+
+ return TableSchema(
+ version=old_table_schema.version,
+ id=old_table_schema.id + 1,
+ fields=new_schema.fields,
+ highest_field_id=highest_field_id.get(),
+ partition_keys=new_schema.partition_keys,
+ primary_keys=new_schema.primary_keys,
+ options=new_schema.options,
+ comment=new_schema.comment
+ )
+
+ @staticmethod
+ def _apply_not_nested_column_rename(
+ columns: List[str], rename_mappings: dict
+ ) -> List[str]:
+ return [rename_mappings.get(col, col) for col in columns]
+
+ @staticmethod
+ def _apply_rename_columns_to_options(
+ options: dict, rename_mappings: dict
+ ) -> dict:
+ if not rename_mappings:
+ return options
+ new_options = dict(options)
+ from pypaimon.common.options.core_options import CoreOptions
+
+ bucket_key = CoreOptions.BUCKET_KEY.key()
+ if bucket_key in new_options:
+ bucket_columns = new_options[bucket_key].split(",")
+ new_bucket_columns = SchemaManager._apply_not_nested_column_rename(
+ bucket_columns, rename_mappings
+ )
+ new_options[bucket_key] = ",".join(new_bucket_columns)
+
+ sequence_field = "sequence.field"
+ if sequence_field in new_options:
+ sequence_fields = new_options[sequence_field].split(",")
+ new_sequence_fields =
SchemaManager._apply_not_nested_column_rename(
+ sequence_fields, rename_mappings
+ )
+ new_options[sequence_field] = ",".join(new_sequence_fields)
+
+ return new_options
diff --git a/paimon-python/pypaimon/tests/filesystem_catalog_test.py
b/paimon-python/pypaimon/tests/filesystem_catalog_test.py
index 530e33aa78..02c7102ab8 100644
--- a/paimon-python/pypaimon/tests/filesystem_catalog_test.py
+++ b/paimon-python/pypaimon/tests/filesystem_catalog_test.py
@@ -25,6 +25,7 @@ from pypaimon.catalog.catalog_exception import
(DatabaseAlreadyExistException,
TableAlreadyExistException,
TableNotExistException)
from pypaimon.schema.data_types import AtomicType, DataField
+from pypaimon.schema.schema_change import SchemaChange
from pypaimon.table.file_store_table import FileStoreTable
@@ -85,3 +86,88 @@ class FileSystemCatalogTest(unittest.TestCase):
self.assertEqual(table.fields[2].name, "f2")
self.assertTrue(isinstance(table.fields[2].type, AtomicType))
self.assertEqual(table.fields[2].type.type, "STRING")
+
+ def test_alter_table(self):
+ catalog = CatalogFactory.create({
+ "warehouse": self.warehouse
+ })
+ catalog.create_database("test_db", False)
+
+ identifier = "test_db.test_table"
+ schema = Schema(
+ fields=[
+ DataField.from_dict({"id": 0, "name": "col1", "type":
"STRING", "description": "field1"}),
+ DataField.from_dict({"id": 1, "name": "col2", "type":
"STRING", "description": "field2"})
+ ],
+ partition_keys=[],
+ primary_keys=[],
+ options={},
+ comment="comment"
+ )
+ catalog.create_table(identifier, schema, False)
+
+ catalog.alter_table(
+ identifier,
+ [SchemaChange.add_column("col3", AtomicType("DATE"))],
+ False
+ )
+ table = catalog.get_table(identifier)
+ self.assertEqual(len(table.fields), 3)
+ self.assertEqual(table.fields[2].name, "col3")
+ self.assertEqual(table.fields[2].type.type, "DATE")
+
+ catalog.alter_table(
+ identifier,
+ [SchemaChange.update_comment("new comment")],
+ False
+ )
+ table = catalog.get_table(identifier)
+ self.assertEqual(table.table_schema.comment, "new comment")
+
+ catalog.alter_table(
+ identifier,
+ [SchemaChange.rename_column("col1", "new_col1")],
+ False
+ )
+ table = catalog.get_table(identifier)
+ self.assertEqual(table.fields[0].name, "new_col1")
+
+ catalog.alter_table(
+ identifier,
+ [SchemaChange.update_column_type("col2", AtomicType("BIGINT"))],
+ False
+ )
+ table = catalog.get_table(identifier)
+ self.assertEqual(table.fields[1].type.type, "BIGINT")
+
+ catalog.alter_table(
+ identifier,
+ [SchemaChange.update_column_comment("col2", "col2 field")],
+ False
+ )
+ table = catalog.get_table(identifier)
+ self.assertEqual(table.fields[1].description, "col2 field")
+
+ catalog.alter_table(
+ identifier,
+ [SchemaChange.set_option("write-buffer-size", "256 MB")],
+ False
+ )
+ table = catalog.get_table(identifier)
+ self.assertEqual(table.table_schema.options.get("write-buffer-size"),
"256 MB")
+
+ catalog.alter_table(
+ identifier,
+ [SchemaChange.remove_option("write-buffer-size")],
+ False
+ )
+ table = catalog.get_table(identifier)
+ self.assertNotIn("write-buffer-size", table.table_schema.options)
+
+ catalog.alter_table(
+ identifier,
+ [SchemaChange.drop_column("col3")],
+ False
+ )
+ table = catalog.get_table(identifier)
+ self.assertEqual(len(table.fields), 2)
diff --git a/paimon-python/pypaimon/tests/rest/rest_server.py
b/paimon-python/pypaimon/tests/rest/rest_server.py
index b97d015dbe..e0bedeac1b 100755
--- a/paimon-python/pypaimon/tests/rest/rest_server.py
+++ b/paimon-python/pypaimon/tests/rest/rest_server.py
@@ -27,7 +27,7 @@ from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union
from urllib.parse import urlparse
-from pypaimon.api.api_request import (CreateDatabaseRequest,
+from pypaimon.api.api_request import (AlterTableRequest, CreateDatabaseRequest,
CreateTableRequest, RenameTableRequest)
from pypaimon.api.api_response import (ConfigResponse, GetDatabaseResponse,
GetTableResponse, ListDatabasesResponse,
@@ -44,6 +44,8 @@ from pypaimon.catalog.rest.table_metadata import TableMetadata
from pypaimon.common.identifier import Identifier
from pypaimon.common.json_util import JSON
from pypaimon import Schema
+from pypaimon.schema.schema_change import Actions, SchemaChange
+from pypaimon.schema.schema_manager import SchemaManager
from pypaimon.schema.table_schema import TableSchema
@@ -91,6 +93,105 @@ FORMAT_TABLE = "FORMAT_TABLE"
OBJECT_TABLE = "OBJECT_TABLE"
+def _dict_to_schema_change(change_dict: dict) -> SchemaChange:
+ from pypaimon.schema.schema_change import (
+ SetOption, RemoveOption, UpdateComment, AddColumn, RenameColumn,
+ DropColumn, UpdateColumnType, UpdateColumnNullability,
+ UpdateColumnComment, UpdateColumnDefaultValue, UpdateColumnPosition,
Move, MoveType
+ )
+
+ action = change_dict.get(Actions.FIELD_ACTION)
+ if action == Actions.SET_OPTION_ACTION:
+ return SetOption(key=change_dict["key"], value=change_dict["value"])
+ elif action == Actions.REMOVE_OPTION_ACTION:
+ return RemoveOption(key=change_dict["key"])
+ elif action == Actions.UPDATE_COMMENT_ACTION:
+ return UpdateComment(comment=change_dict.get("comment"))
+ elif action == Actions.ADD_COLUMN_ACTION:
+ from pypaimon.schema.data_types import DataTypeParser
+ data_type_value = change_dict.get("dataType") or
change_dict.get(AddColumn.FIELD_DATA_TYPE)
+ if data_type_value is None:
+ raise ValueError(f"Missing dataType field in AddColumn change:
{change_dict}")
+ data_type = DataTypeParser.parse_data_type(data_type_value)
+ move = None
+ if "move" in change_dict and change_dict["move"] is not None:
+ move_dict = change_dict["move"]
+ if isinstance(move_dict, dict):
+ move_type_str = move_dict.get("type") or
move_dict.get(Move.FIELD_TYPE)
+ if move_type_str is None:
+ raise ValueError(f"Missing type field in Move:
{move_dict}")
+ move_type = MoveType(move_type_str)
+ field_name = move_dict.get("fieldName") or
move_dict.get(Move.FIELD_FIELD_NAME)
+ if field_name is None:
+ raise ValueError(f"Missing fieldName field in Move:
{move_dict}")
+ reference_field = (
+ move_dict.get("referenceFieldName") or
+ move_dict.get(Move.FIELD_REFERENCE_FIELD_NAME)
+ )
+ move = Move(
+ field_name=field_name,
+ reference_field_name=reference_field,
+ type=move_type
+ )
+ field_names = change_dict.get("fieldNames") or
change_dict.get(AddColumn.FIELD_FIELD_NAMES)
+ if field_names is None:
+ raise ValueError(f"Missing fieldNames field in AddColumn change:
{change_dict}")
+ return AddColumn(
+ field_names=field_names,
+ data_type=data_type,
+ comment=change_dict.get("comment") or
change_dict.get(AddColumn.FIELD_COMMENT),
+ move=move
+ )
+ elif action == Actions.RENAME_COLUMN_ACTION:
+ return RenameColumn(field_names=change_dict["fieldNames"],
new_name=change_dict["newName"])
+ elif action == Actions.DROP_COLUMN_ACTION:
+ return DropColumn(field_names=change_dict["fieldNames"])
+ elif action == Actions.UPDATE_COLUMN_TYPE_ACTION:
+ from pypaimon.schema.data_types import DataTypeParser
+ new_type = DataTypeParser.parse_data_type(change_dict["newDataType"])
+ return UpdateColumnType(
+ field_names=change_dict["fieldNames"],
+ new_data_type=new_type,
+ keep_nullability=change_dict.get("keepNullability", False)
+ )
+ elif action == Actions.UPDATE_COLUMN_NULLABILITY_ACTION:
+ return UpdateColumnNullability(
+ field_names=change_dict["fieldNames"],
+ new_nullability=change_dict["newNullability"]
+ )
+ elif action == Actions.UPDATE_COLUMN_COMMENT_ACTION:
+ return UpdateColumnComment(
+ field_names=change_dict["fieldNames"],
+ new_comment=change_dict.get("newComment")
+ )
+ elif action == Actions.UPDATE_COLUMN_DEFAULT_VALUE_ACTION:
+ return UpdateColumnDefaultValue(
+ field_names=change_dict["fieldNames"],
+ new_default_value=change_dict["newDefaultValue"]
+ )
+ elif action == Actions.UPDATE_COLUMN_POSITION_ACTION:
+ move_dict = change_dict.get("move") or
change_dict.get(UpdateColumnPosition.FIELD_MOVE)
+ if move_dict is None:
+ raise ValueError(f"Missing move field in UpdateColumnPosition
change: {change_dict}")
+ if not isinstance(move_dict, dict):
+ raise ValueError(f"move field must be a dict in
UpdateColumnPosition change: {change_dict}")
+ move_type_str = move_dict.get("type") or move_dict.get(Move.FIELD_TYPE)
+ if move_type_str is None:
+ raise ValueError(f"Missing type field in Move: {move_dict}")
+ move_type = MoveType(move_type_str)
+ field_name = move_dict.get("fieldName") or
move_dict.get(Move.FIELD_FIELD_NAME)
+ if field_name is None:
+ raise ValueError(f"Missing fieldName field in Move: {move_dict}")
+ move = Move(
+ field_name=field_name,
+ reference_field_name=move_dict.get("referenceFieldName") or
move_dict.get(Move.FIELD_REFERENCE_FIELD_NAME),
+ type=move_type
+ )
+ return UpdateColumnPosition(move=move)
+ else:
+ raise ValueError(f"Unknown schema change action: {action}")
+
+
class RESTCatalogServer:
"""Mock REST server for testing"""
@@ -453,13 +554,11 @@ class RESTCatalogServer:
schema = table_metadata.schema.to_schema()
response = self.mock_table(identifier, table_metadata, table_path,
schema)
return self._mock_response(response, 200)
- #
- # elif method == "POST":
- # # Alter table
- # request_body = JSON.from_json(data, AlterTableRequest)
- # self._alter_table_impl(identifier, request_body.get_changes())
- # return self._mock_response("", 200)
-
+ elif method == "POST":
+ # Alter table
+ request_body = JSON.from_json(data, AlterTableRequest)
+ self._alter_table_impl(identifier, request_body.changes)
+ return self._mock_response("", 200)
elif method == "DELETE":
# Drop table
if identifier.get_full_name() not in self.table_metadata_store:
@@ -665,6 +764,44 @@ class RESTCatalogServer:
return '^' + ''.join(regex) + '$'
+ def _alter_table_impl(self, identifier: Identifier, changes: List) -> None:
+ if identifier.get_full_name() not in self.table_metadata_store:
+ raise TableNotExistException(identifier)
+
+ schema_changes = []
+ for change in changes:
+ if isinstance(change, dict):
+ try:
+ schema_changes.append(_dict_to_schema_change(change))
+ except (KeyError, TypeError) as e:
+ raise ValueError(f"Failed to convert change dict to
SchemaChange: {change}, error: {e}") from e
+ else:
+ schema_changes.append(change)
+
+ table_metadata = self.table_metadata_store[identifier.get_full_name()]
+
+ table_path = (
+ Path(self.data_path) / self.warehouse /
+ identifier.get_database_name() / identifier.get_object_name()
+ )
+ schema_manager = SchemaManager(self._get_file_io(), str(table_path))
+ new_schema = schema_manager.commit_changes(schema_changes)
+
+ updated_metadata = TableMetadata(
+ schema=new_schema,
+ is_external=table_metadata.is_external,
+ uuid=table_metadata.uuid
+ )
+ self.table_metadata_store[identifier.get_full_name()] =
updated_metadata
+
+ def _get_file_io(self):
+ """Get FileIO instance for SchemaManager"""
+ from pypaimon.common.file_io import FileIO
+ from pypaimon.common.options import Options
+ warehouse_path = str(Path(self.data_path) / self.warehouse)
+ options = Options({"warehouse": warehouse_path})
+ return FileIO(warehouse_path, options)
+
def _create_table_metadata(self, identifier: Identifier, schema_id: int,
schema: Schema, uuid_str: str, is_external:
bool) -> TableMetadata:
"""Create table metadata"""
diff --git a/paimon-python/pypaimon/tests/rest/rest_simple_test.py
b/paimon-python/pypaimon/tests/rest/rest_simple_test.py
index c5be4ad473..2adce40496 100644
--- a/paimon-python/pypaimon/tests/rest/rest_simple_test.py
+++ b/paimon-python/pypaimon/tests/rest/rest_simple_test.py
@@ -21,6 +21,8 @@ import pyarrow as pa
from pypaimon import Schema
from pypaimon.catalog.catalog_exception import DatabaseAlreadyExistException,
TableAlreadyExistException, \
DatabaseNotExistException, TableNotExistException
+from pypaimon.schema.data_types import AtomicType, DataField
+from pypaimon.schema.schema_change import SchemaChange
from pypaimon.tests.rest.rest_base_test import RESTBaseTest
from pypaimon.write.row_key_extractor import FixedBucketRowKeyExtractor,
DynamicBucketRowKeyExtractor, \
UnawareBucketRowKeyExtractor
@@ -710,3 +712,91 @@ class RESTSimpleTest(RESTBaseTest):
self.rest_catalog.drop_database("db1", True)
except DatabaseNotExistException:
self.fail("drop_database with ignore_if_not_exists=True should not
raise DatabaseNotExistException")
+
+ def test_alter_table(self):
+ catalog = self.rest_catalog
+ catalog.create_database("test_db_alter", True)
+
+ identifier = "test_db_alter.test_table"
+ schema = Schema(
+ fields=[
+ DataField.from_dict({"id": 0, "name": "col1", "type":
"STRING", "description": "field1"}),
+ DataField.from_dict({"id": 1, "name": "col2", "type":
"STRING", "description": "field2"})
+ ],
+ partition_keys=[],
+ primary_keys=[],
+ options={},
+ comment="comment"
+ )
+ catalog.create_table(identifier, schema, False)
+
+ catalog.alter_table(
+ identifier,
+ [SchemaChange.add_column("col3", AtomicType("DATE"))],
+ False
+ )
+ table = catalog.get_table(identifier)
+ self.assertEqual(len(table.fields), 3)
+ self.assertEqual(table.fields[2].name, "col3")
+ self.assertEqual(table.fields[2].type.type, "DATE")
+
+ catalog.alter_table(
+ identifier,
+ [SchemaChange.update_comment("new comment")],
+ False
+ )
+ table = catalog.get_table(identifier)
+ self.assertEqual(table.table_schema.comment, "new comment")
+
+ catalog.alter_table(
+ identifier,
+ [SchemaChange.rename_column("col1", "new_col1")],
+ False
+ )
+ table = catalog.get_table(identifier)
+ self.assertEqual(table.fields[0].name, "new_col1")
+
+ catalog.alter_table(
+ identifier,
+ [SchemaChange.update_column_type("col2", AtomicType("BIGINT"))],
+ False
+ )
+ table = catalog.get_table(identifier)
+ self.assertEqual(table.fields[1].type.type, "BIGINT")
+
+ catalog.alter_table(
+ identifier,
+ [SchemaChange.update_column_comment("col2", "col2 field")],
+ False
+ )
+ table = catalog.get_table(identifier)
+ self.assertEqual(table.fields[1].description, "col2 field")
+
+ catalog.alter_table(
+ identifier,
+ [SchemaChange.set_option("write-buffer-size", "256 MB")],
+ False
+ )
+ table = catalog.get_table(identifier)
+ self.assertEqual(table.table_schema.options.get("write-buffer-size"),
"256 MB")
+
+ catalog.alter_table(
+ identifier,
+ [SchemaChange.remove_option("write-buffer-size")],
+ False
+ )
+ table = catalog.get_table(identifier)
+ self.assertNotIn("write-buffer-size", table.table_schema.options)
+
+ with self.assertRaises(TableNotExistException):
+ catalog.alter_table(
+ "test_db_alter.non_existing_table",
+ [SchemaChange.add_column("col2", AtomicType("INT"))],
+ False
+ )
+
+ catalog.alter_table(
+ "test_db_alter.non_existing_table",
+ [SchemaChange.add_column("col2", AtomicType("INT"))],
+ True
+ )