This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push: new 4ea66af1d6 [python] Support schema manager for PyPaimon (#5974) 4ea66af1d6 is described below commit 4ea66af1d6694fcd8fbffdc5a70d51c9074664d7 Author: ChengHui Chen <27797326+chenghuic...@users.noreply.github.com> AuthorDate: Tue Jul 29 20:54:40 2025 +0800 [python] Support schema manager for PyPaimon (#5974) --- paimon-python/dev/lint-python.sh | 2 +- paimon-python/pypaimon/api/api_response.py | 2 +- paimon-python/pypaimon/api/data_types.py | 86 ++++++++++ paimon-python/pypaimon/api/table_schema.py | 83 ---------- paimon-python/pypaimon/catalog/table_metadata.py | 2 +- paimon-python/pypaimon/common/file_io.py | 14 +- paimon-python/pypaimon/rest/rest_catalog.py | 4 +- paimon-python/pypaimon/{api => schema}/schema.py | 0 paimon-python/pypaimon/schema/schema_manager.py | 83 ++++++++++ paimon-python/pypaimon/schema/table_schema.py | 174 +++++++++++++++++++++ paimon-python/pypaimon/table/file_store_table.py | 2 +- .../pypaimon/table/file_store_table_factory.py | 2 +- paimon-python/pypaimon/tests/api_test.py | 8 +- paimon-python/pypaimon/tests/pvfs_test.py | 5 +- paimon-python/pypaimon/tests/rest_server.py | 3 +- 15 files changed, 373 insertions(+), 97 deletions(-) diff --git a/paimon-python/dev/lint-python.sh b/paimon-python/dev/lint-python.sh index 9c53310399..ae214d35f6 100755 --- a/paimon-python/dev/lint-python.sh +++ b/paimon-python/dev/lint-python.sh @@ -133,7 +133,7 @@ function check_stage() { ######################### # Flake8 check function flake8_check() { - local PYTHON_SOURCE="$(find . \( -path ./dev -o -path ./.tox \) -prune -o -type f -name "*.py" -print )" + local PYTHON_SOURCE="$(find . \( -path ./dev -o -path ./.tox -o -path ./.venv \) -prune -o -type f -name "*.py" -print )" print_function "STAGE" "flake8 checks" if [ ! -f "$FLAKE8_PATH" ]; then diff --git a/paimon-python/pypaimon/api/api_response.py b/paimon-python/pypaimon/api/api_response.py index b8d559d611..b3ec148e94 100644 --- a/paimon-python/pypaimon/api/api_response.py +++ b/paimon-python/pypaimon/api/api_response.py @@ -21,7 +21,7 @@ from typing import Dict, Optional, Generic, List from dataclasses import dataclass from .rest_json import json_field -from .schema import Schema +from pypaimon.schema.schema import Schema from .typedef import T diff --git a/paimon-python/pypaimon/api/data_types.py b/paimon-python/pypaimon/api/data_types.py index 6e6ea97456..c6cf516ad5 100644 --- a/paimon-python/pypaimon/api/data_types.py +++ b/paimon-python/pypaimon/api/data_types.py @@ -16,12 +16,15 @@ # under the License. import json +import re import threading from abc import ABC, abstractmethod from dataclasses import dataclass from enum import Enum from typing import Dict, Any, Optional, List, Union +import pyarrow + class AtomicInteger: @@ -192,6 +195,43 @@ class DataField: return result + def to_pyarrow_field(self): + data_type = self.type + if not isinstance(data_type, AtomicType): + raise ValueError(f"Unsupported data type: {data_type.__class__}") + type_name = data_type.type.upper() + if type_name == 'INT': + type_name = pyarrow.int32() + elif type_name == 'BIGINT': + type_name = pyarrow.int64() + elif type_name == 'FLOAT': + type_name = pyarrow.float32() + elif type_name == 'DOUBLE': + type_name = pyarrow.float64() + elif type_name == 'BOOLEAN': + type_name = pyarrow.bool_() + elif type_name == 'STRING': + type_name = pyarrow.string() + elif type_name == 'BINARY': + type_name = pyarrow.binary() + elif type_name == 'DATE': + type_name = pyarrow.date32() + elif type_name == 'TIMESTAMP': + type_name = pyarrow.timestamp('ms') + elif type_name.startswith('DECIMAL'): + match = re.match(r'DECIMAL\((\d+),\s*(\d+)\)', type_name) + if match: + precision, scale = map(int, match.groups()) + type_name = pyarrow.decimal128(precision, scale) + else: + type_name = pyarrow.decimal128(38, 18) + else: + raise ValueError(f"Unsupported data type: {type_name}") + metadata = {} + if self.description: + metadata[b'description'] = self.description.encode('utf-8') + return pyarrow.field(self.name, type_name, nullable=data_type.nullable, metadata=metadata) + @dataclass class RowType(DataType): @@ -266,6 +306,36 @@ class DataTypeParser: except ValueError: raise Exception(f"Unknown type: {base_type}") + @staticmethod + def parse_atomic_type_pyarrow_field(field: pyarrow.Field) -> DataType: + type_name = str(field.type) + if type_name.startswith('int') or type_name.startswith('uint'): + type_name = 'INT' + elif type_name.startswith('float'): + type_name = 'FLOAT' + elif type_name.startswith('double'): + type_name = 'DOUBLE' + elif type_name.startswith('bool'): + type_name = 'BOOLEAN' + elif type_name.startswith('string'): + type_name = 'STRING' + elif type_name.startswith('binary'): + type_name = 'BINARY' + elif type_name.startswith('date'): + type_name = 'DATE' + elif type_name.startswith('timestamp'): + type_name = 'TIMESTAMP' + elif type_name.startswith('decimal'): + match = re.match(r'decimal\((\d+),\s*(\d+)\)', type_name) + if match: + precision, scale = map(int, match.groups()) + type_name = f'DECIMAL({precision},{scale})' + else: + type_name = 'DECIMAL(38,18)' + else: + raise ValueError(f"Unknown type: {type_name}") + return AtomicType(type_name, field.nullable) + @staticmethod def parse_data_type( json_data: Union[Dict[str, Any], str], field_id: Optional[AtomicInteger] = None @@ -370,3 +440,19 @@ def parse_data_field_from_json( ) -> DataField: json_data = json.loads(json_str) return DataTypeParser.parse_data_field(json_data, field_id) + + +def parse_data_fields_from_pyarrow_schema(pa_schema: pyarrow.Schema) -> list[DataField]: + fields = [] + for i, pa_field in enumerate(pa_schema): + pa_field: pyarrow.Field + data_type = DataTypeParser.parse_atomic_type_pyarrow_field(pa_field) + data_field = DataField( + id=i, + name=pa_field.name, + type=data_type, + description=pa_field.metadata.get(b'description', b'').decode + ('utf-8') if pa_field.metadata and b'description' in pa_field.metadata else None + ) + fields.append(data_field) + return fields diff --git a/paimon-python/pypaimon/api/table_schema.py b/paimon-python/pypaimon/api/table_schema.py deleted file mode 100644 index ac0c2b7a11..0000000000 --- a/paimon-python/pypaimon/api/table_schema.py +++ /dev/null @@ -1,83 +0,0 @@ -""" -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" - -import time -from typing import List, Dict, Optional - -from pypaimon.api.schema import Schema -from pypaimon.api.data_types import DataField - - -class TableSchema: - - def __init__(self, id: int, fields: List[DataField], highest_field_id: int, - partition_keys: List[str], primary_keys: List[str], options: Dict[str, str], - comment: Optional[str] = None, time_millis: Optional[int] = None): - self.id = id - self.fields = fields - self.highest_field_id = highest_field_id - self.partition_keys = partition_keys or [] - self.primary_keys = primary_keys or [] - self.options = options or {} - self.comment = comment - self.time_millis = time_millis if time_millis is not None else int(time.time() * 1000) - - def to_schema(self) -> Schema: - # pa_schema = schema_util.convert_data_fields_to_pa_schema(self.fields) - return Schema( - fields=self.fields, - partition_keys=self.partition_keys, - primary_keys=self.primary_keys, - options=self.options, - comment=self.comment - ) - - @staticmethod - def create(schema_id: int, schema: Schema) -> "TableSchema": - fields: List[DataField] = schema.fields - - partition_keys: List[str] = schema.partition_keys - - primary_keys: List[str] = schema.primary_keys - - options: Dict[str, str] = schema.options - - highest_field_id: int = None - - return TableSchema( - schema_id, - fields, - highest_field_id, - partition_keys, - primary_keys, - options, - schema.comment, - int(time.time()) - ) - - def copy(self, new_options: Optional[Dict[str, str]] = None) -> "TableSchema": - return TableSchema( - id=self.id, - fields=self.fields, - highest_field_id=self.highest_field_id, - partition_keys=self.partition_keys, - primary_keys=self.primary_keys, - options=new_options, - comment=self.comment, - time_millis=self.time_millis - ) diff --git a/paimon-python/pypaimon/catalog/table_metadata.py b/paimon-python/pypaimon/catalog/table_metadata.py index db79ac522a..02297efaa8 100644 --- a/paimon-python/pypaimon/catalog/table_metadata.py +++ b/paimon-python/pypaimon/catalog/table_metadata.py @@ -17,7 +17,7 @@ limitations under the License. """ from typing import Optional -from pypaimon.api.table_schema import TableSchema +from pypaimon.schema.table_schema import TableSchema class TableMetadata: diff --git a/paimon-python/pypaimon/common/file_io.py b/paimon-python/pypaimon/common/file_io.py index 438b73ce1d..9e7418966f 100644 --- a/paimon-python/pypaimon/common/file_io.py +++ b/paimon-python/pypaimon/common/file_io.py @@ -22,4 +22,16 @@ from pathlib import Path class FileIO(ABC): @abstractmethod def exists(self, path: Path) -> bool: - raise NotImplementedError("Method 'exists' must be implemented by subclasses.") + """""" + + @abstractmethod + def read_file_utf8(self, path: Path) -> str: + """""" + + @abstractmethod + def try_to_write_atomic(self, path: Path, content: str) -> bool: + """""" + + @abstractmethod + def list_status(self, path: Path): + """""" diff --git a/paimon-python/pypaimon/rest/rest_catalog.py b/paimon-python/pypaimon/rest/rest_catalog.py index 364fec8247..c1ba37cd78 100644 --- a/paimon-python/pypaimon/rest/rest_catalog.py +++ b/paimon-python/pypaimon/rest/rest_catalog.py @@ -24,7 +24,7 @@ from pypaimon.api.core_options import CoreOptions from pypaimon.api.identifier import Identifier from pypaimon.api.options import Options -from pypaimon.api.table_schema import TableSchema +from pypaimon.schema.table_schema import TableSchema from pypaimon.catalog.catalog import Catalog from pypaimon.catalog.catalog_context import CatalogContext @@ -98,7 +98,7 @@ class RESTCatalog(Catalog): return self.to_table_metadata(identifier.get_database_name(), response) def to_table_metadata(self, db: str, response: GetTableResponse) -> TableMetadata: - schema = TableSchema.create(response.get_schema_id(), response.get_schema()) + schema = TableSchema.from_schema(response.get_schema_id(), response.get_schema()) options: Dict[str, str] = dict(schema.options) options[CoreOptions.PATH] = response.get_path() response.put_audit_options_to(options) diff --git a/paimon-python/pypaimon/api/schema.py b/paimon-python/pypaimon/schema/schema.py similarity index 100% rename from paimon-python/pypaimon/api/schema.py rename to paimon-python/pypaimon/schema/schema.py diff --git a/paimon-python/pypaimon/schema/schema_manager.py b/paimon-python/pypaimon/schema/schema_manager.py index c87240062b..43e773c46e 100644 --- a/paimon-python/pypaimon/schema/schema_manager.py +++ b/paimon-python/pypaimon/schema/schema_manager.py @@ -16,8 +16,11 @@ # limitations under the License. ################################################################################ from pathlib import Path +from typing import Optional from pypaimon.common.file_io import FileIO +from pypaimon.schema.schema import Schema +from pypaimon.schema.table_schema import TableSchema class SchemaManager: @@ -27,3 +30,83 @@ class SchemaManager: self.file_io = file_io self.table_path = table_path self.schema_path = table_path / "schema" + + def latest(self) -> Optional['TableSchema']: + try: + versions = self._list_versioned_files() + if not versions: + return None + + max_version = max(versions) + return self._read_schema(max_version) + except Exception as e: + raise RuntimeError(f"Failed to load schema from path: {self.schema_path}") from e + + def create_table(self, schema: Schema, external_table: bool = False) -> TableSchema: + while True: + latest = self.latest() + if latest is not None: + if external_table: + self._check_schema_for_external_table(latest.to_schema(), schema) + return latest + else: + raise RuntimeError("Schema in filesystem exists, creation is not allowed.") + + table_schema = TableSchema.from_schema(schema_id=0, schema=schema) + success = self.commit(table_schema) + if success: + return table_schema + + def commit(self, new_schema: TableSchema) -> bool: + schema_path = self._to_schema_path(new_schema.id) + try: + return self.file_io.try_to_write_atomic(schema_path, new_schema.to_json()) + except Exception as e: + raise RuntimeError(f"Failed to commit schema: {e}") from e + + def _to_schema_path(self, schema_id: int) -> Path: + return self.schema_path / f"{self.schema_prefix}{schema_id}" + + def _read_schema(self, schema_id: int) -> Optional['TableSchema']: + schema_path = self._to_schema_path(schema_id) + if not self.file_io.exists(schema_path): + return None + + return TableSchema.from_path(self.file_io, schema_path) + + def _list_versioned_files(self) -> list[int]: + if not self.file_io.exists(self.schema_path): + return [] + + statuses = self.file_io.list_status(self.schema_path) + if statuses is None: + return [] + + versions = [] + for status in statuses: + name = Path(status.path).name + if name.startswith(self.schema_prefix): + try: + version = int(name[len(self.schema_prefix):]) + versions.append(version) + except ValueError: + continue + return versions + + @staticmethod + def _check_schema_for_external_table(exists_schema: Schema, new_schema: Schema): + if ((not new_schema.pa_schema or new_schema.pa_schema.equals(exists_schema.pa_schema)) + and (not new_schema.partition_keys or new_schema.partition_keys == exists_schema.partition_keys) + and (not new_schema.primary_keys or new_schema.primary_keys == exists_schema.primary_keys)): + exists_options = exists_schema.options + new_options = new_schema.options + for key, value in new_options.items(): + if (key != 'owner' and key != 'path' + and (key not in exists_options or exists_options[key] != value)): + raise ValueError( + f"New schema's options are not equal to the exists schema's, " + f"new schema: {new_options}, exists schema: {exists_options}") + else: + raise ValueError( + f"New schema is not equal to the exists schema, " + f"new schema: {new_schema}, exists schema: {exists_schema}") diff --git a/paimon-python/pypaimon/schema/table_schema.py b/paimon-python/pypaimon/schema/table_schema.py new file mode 100644 index 0000000000..11ee24ae98 --- /dev/null +++ b/paimon-python/pypaimon/schema/table_schema.py @@ -0,0 +1,174 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +import json +import time +from pathlib import Path +from typing import List, Dict, Optional + +import pyarrow + +from pypaimon.api import data_types +from pypaimon.api.core_options import CoreOptions +from pypaimon.common.file_io import FileIO +from pypaimon.schema.schema import Schema +from pypaimon.api.data_types import DataField + + +class TableSchema: + PAIMON_07_VERSION = 1 + PAIMON_08_VERSION = 2 + CURRENT_VERSION = 3 + + def __init__(self, version: int, id: int, fields: List[DataField], highest_field_id: int, + partition_keys: List[str], primary_keys: List[str], options: Dict[str, str], + comment: Optional[str] = None, time_millis: Optional[int] = None): + self.version = version + self.id = id + self.fields = fields + self.highest_field_id = highest_field_id + self.partition_keys = partition_keys or [] + self.primary_keys = primary_keys or [] + self.options = options or {} + self.comment = comment + self.time_millis = time_millis if time_millis is not None else int(time.time() * 1000) + + def to_schema(self) -> Schema: + try: + pa_fields = [] + for field in self.fields: + pa_fields.append(field.to_pyarrow_field()) + pyarrow.schema(pa_fields) + except Exception as e: + print(e) + return Schema( + fields=self.fields, + partition_keys=self.partition_keys, + primary_keys=self.primary_keys, + options=self.options, + comment=self.comment + ) + + @staticmethod + def from_path(file_io: FileIO, schema_path: Path): + try: + json_str = file_io.read_file_utf8(schema_path) + return TableSchema.from_json(json_str) + except FileNotFoundError as e: + raise RuntimeError(f"Schema file not found: {schema_path}") from e + except Exception as e: + raise RuntimeError(f"Failed to read schema from {schema_path}") from e + + @staticmethod + def from_json(json_str: str): + try: + data = json.loads(json_str) + + version = data.get("version", TableSchema.PAIMON_07_VERSION) + options = data["options"] + if version <= TableSchema.PAIMON_07_VERSION and CoreOptions.BUCKET not in options: + options[CoreOptions.BUCKET] = "1" + if version <= TableSchema.PAIMON_08_VERSION and CoreOptions.FILE_FORMAT not in options: + options[CoreOptions.FILE_FORMAT] = "orc" + fields = [DataField.from_dict(field) for field in data["fields"]] + + return TableSchema( + version=version, + id=data["id"], + fields=fields, + highest_field_id=data["highestFieldId"], + partition_keys=data["partitionKeys"], + primary_keys=data["primaryKeys"], + options=options, + comment=data.get("comment"), + time_millis=data.get("timeMillis") + ) + except json.JSONDecodeError as e: + raise RuntimeError(f"Invalid JSON format: {json_str}") from e + except KeyError as e: + raise RuntimeError(f"Missing required field in schema JSON: {e}") from e + except Exception as e: + raise RuntimeError(f"Failed to parse schema from JSON: {e}") from e + + @staticmethod + def from_schema(schema_id: int, schema: Schema) -> "TableSchema": + fields: List[DataField] = schema.fields + if not schema.fields: + fields = data_types.parse_data_fields_from_pyarrow_schema(schema.pa_schema) + partition_keys: List[str] = schema.partition_keys + primary_keys: List[str] = schema.primary_keys + options: Dict[str, str] = schema.options + highest_field_id: int = None # max(field.id for field in fields) + + return TableSchema( + TableSchema.CURRENT_VERSION, + schema_id, + fields, + highest_field_id, + partition_keys, + primary_keys, + options, + schema.comment, + int(time.time()) + ) + + def to_json(self) -> str: + data = { + "version": self.version, + "id": self.id, + "fields": [field.to_dict() for field in self.fields], + "highestFieldId": self.highest_field_id, + "partitionKeys": self.partition_keys, + "primaryKeys": self.primary_keys, + "options": self.options, + "timeMillis": self.time_millis + } + if self.comment is not None: + data["comment"] = self.comment + return json.dumps(data, indent=2, ensure_ascii=False) + + def copy(self, new_options: Optional[Dict[str, str]] = None) -> "TableSchema": + return TableSchema( + version=self.version, + id=self.id, + fields=self.fields, + highest_field_id=self.highest_field_id, + partition_keys=self.partition_keys, + primary_keys=self.primary_keys, + options=new_options, + comment=self.comment, + time_millis=self.time_millis + ) + + def get_primary_key_fields(self) -> List[DataField]: + if not self.primary_keys: + return [] + field_map = {field.name: field for field in self.fields} + return [field_map[name] for name in self.primary_keys if name in field_map] + + def get_partition_key_fields(self) -> List[DataField]: + if not self.partition_keys: + return [] + field_map = {field.name: field for field in self.fields} + return [field_map[name] for name in self.partition_keys if name in field_map] + + def get_trimmed_primary_key_fields(self) -> List[DataField]: + if not self.primary_keys or not self.partition_keys: + return self.get_primary_key_fields() + adjusted = [pk for pk in self.primary_keys if pk not in self.partition_keys] + field_map = {field.name: field for field in self.fields} + return [field_map[name] for name in adjusted if name in field_map] diff --git a/paimon-python/pypaimon/table/file_store_table.py b/paimon-python/pypaimon/table/file_store_table.py index de5f8a870f..3e8f5bb166 100644 --- a/paimon-python/pypaimon/table/file_store_table.py +++ b/paimon-python/pypaimon/table/file_store_table.py @@ -19,7 +19,7 @@ from pathlib import Path from pypaimon.api.identifier import Identifier -from pypaimon.api.table_schema import TableSchema +from pypaimon.schema.table_schema import TableSchema from pypaimon.common.file_io import FileIO from pypaimon.schema.schema_manager import SchemaManager from pypaimon.table.table import Table diff --git a/paimon-python/pypaimon/table/file_store_table_factory.py b/paimon-python/pypaimon/table/file_store_table_factory.py index 5569f60d8e..a8a2142a38 100644 --- a/paimon-python/pypaimon/table/file_store_table_factory.py +++ b/paimon-python/pypaimon/table/file_store_table_factory.py @@ -17,7 +17,7 @@ limitations under the License. """ from pathlib import Path -from pypaimon.api.table_schema import TableSchema +from pypaimon.schema.table_schema import TableSchema from pypaimon.common.file_io import FileIO from pypaimon.table.catalog_environment import CatalogEnvironment from pypaimon.table.file_store_table import FileStoreTable diff --git a/paimon-python/pypaimon/tests/api_test.py b/paimon-python/pypaimon/tests/api_test.py index d32c5c201c..8012a5a364 100644 --- a/paimon-python/pypaimon/tests/api_test.py +++ b/paimon-python/pypaimon/tests/api_test.py @@ -27,7 +27,7 @@ from ..api.auth import BearTokenAuthProvider from ..api.identifier import Identifier from ..api.options import Options from ..api.rest_json import JSON -from ..api.table_schema import TableSchema +from pypaimon.schema.table_schema import TableSchema from ..api.token_loader import DLFTokenLoaderFactory, DLFToken from ..api.data_types import AtomicInteger, DataTypeParser, AtomicType, ArrayType, MapType, RowType, DataField @@ -157,7 +157,8 @@ class ApiTestCase(unittest.TestCase): MapType(False, AtomicType('INT'), AtomicType('INT'))), 'desc arr11'), ] - schema = TableSchema(len(data_fields), data_fields, len(data_fields), [], [], {}, "") + schema = TableSchema(TableSchema.CURRENT_VERSION, len(data_fields), data_fields, len(data_fields), + [], [], {}, "") test_tables = { "default.user": TableMetadata(uuid=str(uuid.uuid4()), is_external=True, schema=schema), } @@ -213,7 +214,8 @@ class ApiTestCase(unittest.TestCase): MapType(False, AtomicType('INT'), AtomicType('INT'))), 'desc arr11'), ] - schema = TableSchema(len(data_fields), data_fields, len(data_fields), [], [], {}, "") + schema = TableSchema(TableSchema.CURRENT_VERSION, len(data_fields), data_fields, len(data_fields), + [], [], {}, "") test_tables = { "default.user": TableMetadata(uuid=str(uuid.uuid4()), is_external=True, schema=schema), } diff --git a/paimon-python/pypaimon/tests/pvfs_test.py b/paimon-python/pypaimon/tests/pvfs_test.py index c78fcf94a0..c20f337302 100644 --- a/paimon-python/pypaimon/tests/pvfs_test.py +++ b/paimon-python/pypaimon/tests/pvfs_test.py @@ -25,7 +25,7 @@ from pathlib import Path from pypaimon.api import ConfigResponse from pypaimon.api.auth import BearTokenAuthProvider from pypaimon.api.data_types import DataField, AtomicType -from pypaimon.api.table_schema import TableSchema +from pypaimon.schema.table_schema import TableSchema from pypaimon.catalog.table_metadata import TableMetadata from pypaimon.pvfs import PaimonVirtualFileSystem from pypaimon.tests.api_test import RESTCatalogServer @@ -70,7 +70,8 @@ class PVFSTestCase(unittest.TestCase): DataField(0, "id", AtomicType('INT'), 'id'), DataField(1, "name", AtomicType('STRING'), 'name') ] - schema = TableSchema(len(data_fields), data_fields, len(data_fields), [], [], {}, "") + schema = TableSchema(TableSchema.CURRENT_VERSION, len(data_fields), data_fields, len(data_fields), + [], [], {}, "") self.server.database_store.update(self.test_databases) self.test_tables = { f"{self.database}.{self.table}": TableMetadata(uuid=str(uuid.uuid4()), is_external=True, schema=schema), diff --git a/paimon-python/pypaimon/tests/rest_server.py b/paimon-python/pypaimon/tests/rest_server.py index 0bcf5d1dd3..27ba27907c 100644 --- a/paimon-python/pypaimon/tests/rest_server.py +++ b/paimon-python/pypaimon/tests/rest_server.py @@ -33,7 +33,7 @@ from ..api.api_response import (ConfigResponse, ListDatabasesResponse, GetDataba Schema, GetTableResponse, ListTablesResponse, RESTResponse, PagedList) from ..api.rest_json import JSON -from ..api.table_schema import TableSchema +from pypaimon.schema.table_schema import TableSchema from ..catalog.table_metadata import TableMetadata @@ -642,6 +642,7 @@ class RESTCatalogServer: """Create table metadata""" options = schema.options.copy() table_schema = TableSchema( + version=TableSchema.CURRENT_VERSION, id=schema_id, fields=schema.fields, highest_field_id=len(schema.fields) - 1,