This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f0317ee9d4 [core] python: add test for rest api (#5882)
f0317ee9d4 is described below
commit f0317ee9d405c843afea0bf99fb6a39019b3f3ac
Author: jerry <[email protected]>
AuthorDate: Tue Jul 15 11:13:42 2025 +0800
[core] python: add test for rest api (#5882)
---
.gitignore | 2 +-
{python => pypaimon}/README.md | 0
pypaimon/__init__.py | 16 +
{python => pypaimon}/api/__init__.py | 26 +-
pypaimon/api/api_response.py | 493 +++++++++++++++++++
{python => pypaimon}/api/api_resquest.py | 8 +-
{python => pypaimon}/api/auth.py | 0
{python => pypaimon}/api/client.py | 39 +-
pypaimon/api/rest_json.py | 51 ++
pypaimon/api/typedef.py | 61 +++
pypaimon/tests/__init__.py | 16 +
pypaimon/tests/api_test.py | 805 +++++++++++++++++++++++++++++++
python/api/api_response.py | 301 ------------
13 files changed, 1483 insertions(+), 335 deletions(-)
diff --git a/.gitignore b/.gitignore
index 8d59f6faa3..2a8e3fb02b 100644
--- a/.gitignore
+++ b/.gitignore
@@ -19,7 +19,7 @@ target
.java-version
dependency-reduced-pom.xml
metastore_db/
-python/.idea/
+pypaimon/.idea/
### VS Code ###
.vscode/
diff --git a/python/README.md b/pypaimon/README.md
similarity index 100%
rename from python/README.md
rename to pypaimon/README.md
diff --git a/pypaimon/__init__.py b/pypaimon/__init__.py
new file mode 100644
index 0000000000..a67d5ea255
--- /dev/null
+++ b/pypaimon/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/python/api/__init__.py b/pypaimon/api/__init__.py
similarity index 94%
rename from python/api/__init__.py
rename to pypaimon/api/__init__.py
index 29c50f85f2..573fd889b9 100644
--- a/python/api/__init__.py
+++ b/pypaimon/api/__init__.py
@@ -17,14 +17,19 @@
import logging
from typing import Dict, List, Optional, Callable
-from auth import RESTAuthFunction
-from api_response import PagedList, GetTableResponse, ListDatabasesResponse,
ListTablesResponse, GetDatabaseResponse, \
- ConfigResponse, PagedResponse, T
-from api_resquest import Identifier, CreateDatabaseRequest
+from urllib.parse import unquote
+import api
+from api.auth import RESTAuthFunction
+from api.api_response import PagedList, GetTableResponse,
ListDatabasesResponse, ListTablesResponse, \
+ GetDatabaseResponse, ConfigResponse, PagedResponse
+from api.api_resquest import CreateDatabaseRequest
+from api.typedef import Identifier
+from api.client import HttpClient
+from api.auth import DLFAuthProvider, DLFToken
+from api.typedef import T
class RESTCatalogOptions:
-
URI = "uri"
WAREHOUSE = "warehouse"
TOKEN_PROVIDER = "token.provider"
@@ -56,6 +61,11 @@ class RESTUtil:
import urllib.parse
return urllib.parse.quote(value)
+ @staticmethod
+ def decode_string(encoded: str) -> str:
+ """Decode URL-encoded string"""
+ return unquote(encoded)
+
@staticmethod
def extract_prefix_map(options: Dict[str, str], prefix: str) -> Dict[str,
str]:
result = {}
@@ -68,7 +78,6 @@ class RESTUtil:
class ResourcePaths:
-
V1 = "v1"
DATABASES = "databases"
TABLES = "tables"
@@ -104,7 +113,6 @@ class ResourcePaths:
class RESTApi:
-
HEADER_PREFIX = "header."
MAX_RESULTS = "maxResults"
PAGE_TOKEN = "pageToken"
@@ -113,10 +121,6 @@ class RESTApi:
def __init__(self, options: Dict[str, str], config_required: bool = True):
self.logger = logging.getLogger(self.__class__.__name__)
-
- from client import HttpClient
- from auth import DLFAuthProvider, DLFToken
-
self.client = HttpClient(options.get(RESTCatalogOptions.URI))
auth_provider = DLFAuthProvider(
DLFToken(options), options.get(RESTCatalogOptions.DLF_REGION)
diff --git a/pypaimon/api/api_response.py b/pypaimon/api/api_response.py
new file mode 100644
index 0000000000..4f2361c6b8
--- /dev/null
+++ b/pypaimon/api/api_response.py
@@ -0,0 +1,493 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+from abc import ABC, abstractmethod
+from typing import Dict, Optional, Any, Generic, List
+from dataclasses import dataclass, field
+from api.typedef import T
+
+
+@dataclass
+class PagedList(Generic[T]):
+ elements: List[T]
+ next_page_token: Optional[str] = None
+
+
+class RESTResponse(ABC):
+ pass
+
+
+@dataclass
+class ErrorResponse(RESTResponse):
+ FIELD_RESOURCE_TYPE: "resourceType"
+ FIELD_RESOURCE_NAME: "resourceName"
+ FIELD_MESSAGE: "message"
+ FIELD_CODE: "code"
+
+ resource_type: Optional[str] = None
+ resource_name: Optional[str] = None
+ message: Optional[str] = None
+ code: Optional[int] = None
+
+ def __init__(self,
+ resource_type: Optional[str] = None,
+ resource_name: Optional[str] = None,
+ message: Optional[str] = None,
+ code: Optional[int] = None):
+ self.resource_type = resource_type
+ self.resource_name = resource_name
+ self.message = message
+ self.code = code
+
+ @classmethod
+ def from_dict(cls, data: Dict[str, Any]) -> 'ErrorResponse':
+ return cls(
+ resource_type=data.get(cls.FIELD_RESOURCE_TYPE),
+ resource_name=data.get(cls.FIELD_RESOURCE_NAME),
+ message=data.get(cls.FIELD_MESSAGE),
+ code=data.get(cls.FIELD_CODE),
+ )
+
+ def to_dict(self) -> Dict[str, Any]:
+ return {
+ self.FIELD_RESOURCE_TYPE: self.resource_type,
+ self.FIELD_RESOURCE_NAME: self.resource_name,
+ self.FIELD_MESSAGE: self.message,
+ self.FIELD_CODE: self.code
+ }
+
+
+@dataclass
+class AuditRESTResponse(RESTResponse):
+ FIELD_OWNER = "owner"
+ FIELD_CREATED_AT = "createdAt"
+ FIELD_CREATED_BY = "createdBy"
+ FIELD_UPDATED_AT = "updatedAt"
+ FIELD_UPDATED_BY = "updatedBy"
+
+ owner: Optional[str] = None
+ created_at: Optional[int] = None
+ created_by: Optional[str] = None
+ updated_at: Optional[int] = None
+ updated_by: Optional[str] = None
+
+ def get_owner(self) -> Optional[str]:
+ return self.owner
+
+ def get_created_at(self) -> Optional[int]:
+ return self.created_at
+
+ def get_created_by(self) -> Optional[str]:
+ return self.created_by
+
+ def get_updated_at(self) -> Optional[int]:
+ return self.updated_at
+
+ def get_updated_by(self) -> Optional[str]:
+ return self.updated_by
+
+
+class PagedResponse(RESTResponse, Generic[T]):
+ FIELD_NEXT_PAGE_TOKEN = "nextPageToken"
+
+ @abstractmethod
+ def data(self) -> List[T]:
+ pass
+
+ @abstractmethod
+ def get_next_page_token(self) -> str:
+ pass
+
+
+@dataclass
+class ListDatabasesResponse(PagedResponse[str]):
+ FIELD_DATABASES = "databases"
+
+ databases: List[str]
+ next_page_token: str
+
+ @classmethod
+ def from_dict(cls, data: Dict[str, Any]) -> 'ListDatabasesResponse':
+ return cls(
+ databases=data.get(cls.FIELD_DATABASES),
+ next_page_token=data.get(cls.FIELD_NEXT_PAGE_TOKEN)
+ )
+
+ def to_dict(self) -> Dict[str, Any]:
+ return {
+ self.FIELD_DATABASES: self.databases,
+ self.FIELD_NEXT_PAGE_TOKEN: self.next_page_token
+ }
+
+ def data(self) -> List[str]:
+ return self.databases
+
+ def get_next_page_token(self) -> str:
+ return self.next_page_token
+
+
+@dataclass
+class ListTablesResponse(PagedResponse[str]):
+ FIELD_TABLES = "tables"
+
+ tables: Optional[List[str]]
+ next_page_token: Optional[str]
+
+ @classmethod
+ def from_dict(cls, data: Dict[str, Any]) -> 'ListTablesResponse':
+ return cls(
+ tables=data.get(cls.FIELD_TABLES),
+ next_page_token=data.get(cls.FIELD_NEXT_PAGE_TOKEN)
+ )
+
+ def to_dict(self) -> Dict[str, Any]:
+ return {
+ self.FIELD_TABLES: self.tables,
+ self.FIELD_NEXT_PAGE_TOKEN: self.next_page_token
+ }
+
+ def data(self) -> Optional[List[str]]:
+ return self.tables
+
+ def get_next_page_token(self) -> Optional[str]:
+ return self.next_page_token
+
+
+@dataclass
+class PaimonDataType:
+ FIELD_TYPE = "type"
+ FIELD_ELEMENT = "element"
+ FIELD_FIELDS = "fields"
+ FIELD_KEY = "key"
+ FIELD_VALUE = "value"
+
+ type: str
+ element: Optional['PaimonDataType'] = None
+ fields: List['DataField'] = field(default_factory=list)
+ key: Optional['PaimonDataType'] = None
+ value: Optional['PaimonDataType'] = None
+
+ @classmethod
+ def from_dict(cls, data: Any) -> 'PaimonDataType':
+ if isinstance(data, dict):
+ element = data.get(cls.FIELD_ELEMENT, None)
+ fields = data.get(cls.FIELD_FIELDS, None)
+ key = data.get(cls.FIELD_KEY, None)
+ value = data.get(cls.FIELD_VALUE, None)
+ if element is not None:
+ element =
PaimonDataType.from_dict(data.get(cls.FIELD_ELEMENT)),
+ if fields is not None:
+ fields = list(map(lambda f: DataField.from_dict(f), fields)),
+ if key is not None:
+ key = PaimonDataType.from_dict(key)
+ if value is not None:
+ value = PaimonDataType.from_dict(value)
+ return cls(
+ type=data.get(cls.FIELD_TYPE),
+ element=element,
+ fields=fields,
+ key=key,
+ value=value,
+ )
+ else:
+ return cls(type=data)
+
+ def to_dict(self) -> Any:
+ if self.element is None and self.fields is None and self.key:
+ return self.type
+ if self.element is not None:
+ return {self.FIELD_TYPE: self.type, self.FIELD_ELEMENT:
self.element}
+ elif self.fields is not None:
+ return {self.FIELD_TYPE: self.type, self.FIELD_FIELDS: self.fields}
+ elif self.value is not None:
+ return {self.FIELD_TYPE: self.type, self.FIELD_KEY: self.key,
self.FIELD_VALUE: self.value}
+ elif self.key is not None and self.value is None:
+ return {self.FIELD_TYPE: self.type, self.FIELD_KEY: self.key}
+
+
+@dataclass
+class DataField:
+ FIELD_ID = "id"
+ FIELD_NAME = "name"
+ FIELD_TYPE = "type"
+ FIELD_DESCRIPTION = "description"
+
+ description: str
+ id: int
+ name: str
+ type: PaimonDataType
+
+ @classmethod
+ def from_dict(cls, data: Dict[str, Any]) -> 'DataField':
+ return cls(
+ id=data.get(cls.FIELD_ID),
+ name=data.get(cls.FIELD_NAME),
+ type=PaimonDataType.from_dict(data.get(cls.FIELD_TYPE)),
+ description=data.get(cls.FIELD_DESCRIPTION),
+ )
+
+ def to_dict(self) -> Dict[str, Any]:
+ return {
+ self.FIELD_ID: self.id,
+ self.FIELD_NAME: self.name,
+ self.FIELD_TYPE: PaimonDataType.to_dict(self.type),
+ self.FIELD_DESCRIPTION: self.description
+ }
+
+
+@dataclass
+class Schema:
+ FIELD_FIELDS = "fields"
+ FIELD_PARTITION_KEYS = "partitionKeys"
+ FIELD_PRIMARY_KEYS = "primaryKeys"
+ FIELD_OPTIONS = "options"
+ FIELD_COMMENT = "comment"
+
+ fields: List[DataField] = field(default_factory=list)
+ partition_keys: List[str] = field(default_factory=list)
+ primary_keys: List[str] = field(default_factory=list)
+ options: Dict[str, str] = field(default_factory=dict)
+ comment: Optional[str] = None
+
+ @classmethod
+ def from_dict(cls, data: Dict[str, Any]) -> 'Schema':
+ return cls(
+ fields=list(map(lambda f: DataField.from_dict(f),
data.get(cls.FIELD_FIELDS))),
+ partition_keys=data.get(cls.FIELD_PARTITION_KEYS),
+ primary_keys=data.get(cls.FIELD_PRIMARY_KEYS),
+ options=data.get(cls.FIELD_OPTIONS),
+ comment=data.get(cls.FIELD_COMMENT)
+ )
+
+ def to_dict(self) -> Dict[str, Any]:
+ return {
+ self.FIELD_FIELDS: list(map(lambda f: DataField.to_dict(f),
self.fields)),
+ self.FIELD_PARTITION_KEYS: self.partition_keys,
+ self.FIELD_PRIMARY_KEYS: self.primary_keys,
+ self.FIELD_OPTIONS: self.options,
+ self.FIELD_COMMENT: self.comment
+ }
+
+
+@dataclass
+class TableSchema:
+ """Table schema with ID"""
+ id: int
+ fields: List[DataField]
+ highest_field_id: int
+ partition_keys: List[str]
+ primary_keys: List[str]
+ options: Dict[str, str]
+ comment: Optional[str]
+
+ def to_schema(self) -> Schema:
+ return Schema(
+ fields=self.fields,
+ partition_keys=self.partition_keys,
+ primary_keys=self.primary_keys,
+ options=self.options,
+ comment=self.comment
+ )
+
+
+@dataclass
+class TableMetadata:
+ """Table metadata"""
+ schema: TableSchema
+ is_external: bool
+ uuid: str
+
+
+@dataclass
+class RESTToken:
+ """REST authentication token"""
+ token: Dict[str, str]
+ expire_at_millis: int
+
+
+@dataclass
+class GetTableResponse(AuditRESTResponse):
+ """Response for getting table"""
+
+ # Field constants for JSON serialization
+ FIELD_ID = "id"
+ FIELD_NAME = "name"
+ FIELD_PATH = "path"
+ FIELD_IS_EXTERNAL = "isExternal"
+ FIELD_SCHEMA_ID = "schemaId"
+ FIELD_SCHEMA = "schema"
+
+ id: Optional[str] = None
+ name: Optional[str] = None
+ path: Optional[str] = None
+ is_external: Optional[bool] = None
+ schema_id: Optional[int] = None
+ schema: Optional[Schema] = None
+
+ def __init__(self,
+ id: str,
+ name: str,
+ path: str,
+ is_external: bool,
+ schema_id: int,
+ schema: Schema,
+ owner: Optional[str] = None,
+ created_at: Optional[int] = None,
+ created_by: Optional[str] = None,
+ updated_at: Optional[int] = None,
+ updated_by: Optional[str] = None):
+ super().__init__(owner, created_at, created_by, updated_at, updated_by)
+ self.id = id
+ self.name = name
+ self.path = path
+ self.is_external = is_external
+ self.schema_id = schema_id
+ self.schema = schema
+
+ @classmethod
+ def from_dict(cls, data: Dict[str, Any]) -> 'GetTableResponse':
+ return cls(
+ id=data.get(cls.FIELD_ID),
+ name=data.get(cls.FIELD_NAME),
+ path=data.get(cls.FIELD_PATH),
+ is_external=data.get(cls.FIELD_IS_EXTERNAL),
+ schema_id=data.get(cls.FIELD_SCHEMA_ID),
+ schema=Schema.from_dict(data.get(cls.FIELD_SCHEMA)),
+ owner=data.get(cls.FIELD_OWNER),
+ created_at=data.get(cls.FIELD_CREATED_AT),
+ created_by=data.get(cls.FIELD_CREATED_BY),
+ updated_at=data.get(cls.FIELD_UPDATED_AT),
+ updated_by=data.get(cls.FIELD_UPDATED_BY)
+ )
+
+ def to_dict(self) -> Dict[str, Any]:
+ result = {
+ self.FIELD_ID: self.id,
+ self.FIELD_NAME: self.name,
+ self.FIELD_PATH: self.path,
+ self.FIELD_IS_EXTERNAL: self.is_external,
+ self.FIELD_SCHEMA_ID: self.schema_id,
+ self.FIELD_SCHEMA: Schema.to_dict(self.schema)
+ }
+ if self.owner is not None:
+ result[self.FIELD_OWNER] = self.owner
+ if self.created_at is not None:
+ result[self.FIELD_CREATED_AT] = self.created_at
+ if self.created_by is not None:
+ result[self.FIELD_CREATED_BY] = self.created_by
+ if self.updated_at is not None:
+ result[self.FIELD_UPDATED_AT] = self.updated_at
+ if self.updated_by is not None:
+ result[self.FIELD_UPDATED_BY] = self.updated_by
+
+ return result
+
+
+@dataclass
+class GetDatabaseResponse(AuditRESTResponse):
+ FIELD_ID = "id"
+ FIELD_NAME = "name"
+ FIELD_LOCATION = "location"
+ FIELD_OPTIONS = "options"
+
+ id: Optional[str] = None
+ name: Optional[str] = None
+ location: Optional[str] = None
+ options: Optional[Dict[str, str]] = field(default_factory=dict)
+
+ def __init__(self,
+ id: Optional[str] = None,
+ name: Optional[str] = None,
+ location: Optional[str] = None,
+ options: Optional[Dict[str, str]] = None,
+ owner: Optional[str] = None,
+ created_at: Optional[int] = None,
+ created_by: Optional[str] = None,
+ updated_at: Optional[int] = None,
+ updated_by: Optional[str] = None):
+ super().__init__(owner, created_at, created_by, updated_at, updated_by)
+ self.id = id
+ self.name = name
+ self.location = location
+ self.options = options or {}
+
+ def get_id(self) -> Optional[str]:
+ return self.id
+
+ def get_name(self) -> Optional[str]:
+ return self.name
+
+ def get_location(self) -> Optional[str]:
+ return self.location
+
+ def get_options(self) -> Dict[str, str]:
+ return self.options or {}
+
+ def to_dict(self) -> Dict[str, Any]:
+ result = {
+ self.FIELD_ID: self.id,
+ self.FIELD_NAME: self.name,
+ self.FIELD_LOCATION: self.location,
+ self.FIELD_OPTIONS: self.options
+ }
+
+ if self.owner is not None:
+ result[self.FIELD_OWNER] = self.owner
+ if self.created_at is not None:
+ result[self.FIELD_CREATED_AT] = self.created_at
+ if self.created_by is not None:
+ result[self.FIELD_CREATED_BY] = self.created_by
+ if self.updated_at is not None:
+ result[self.FIELD_UPDATED_AT] = self.updated_at
+ if self.updated_by is not None:
+ result[self.FIELD_UPDATED_BY] = self.updated_by
+
+ return result
+
+ @classmethod
+ def from_dict(cls, data: Dict[str, Any]) -> 'GetDatabaseResponse':
+ return cls(
+ id=data.get(cls.FIELD_ID),
+ name=data.get(cls.FIELD_NAME),
+ location=data.get(cls.FIELD_LOCATION),
+ options=data.get(cls.FIELD_OPTIONS, {}),
+ owner=data.get(cls.FIELD_OWNER),
+ created_at=data.get(cls.FIELD_CREATED_AT),
+ created_by=data.get(cls.FIELD_CREATED_BY),
+ updated_at=data.get(cls.FIELD_UPDATED_AT),
+ updated_by=data.get(cls.FIELD_UPDATED_BY)
+ )
+
+
+@dataclass
+class ConfigResponse(RESTResponse):
+ FILED_DEFAULTS = "defaults"
+
+ defaults: Dict[str, str]
+
+ @classmethod
+ def from_dict(cls, data: Dict[str, str]) -> 'ConfigResponse':
+ return cls(defaults=data.get(cls.FILED_DEFAULTS))
+
+ def to_dict(self) -> Dict[str, Any]:
+ return {self.FILED_DEFAULTS: self.defaults}
+
+ def merge(self, options: Dict[str, str]) -> Dict[str, str]:
+ merged = options.copy()
+ merged.update(self.defaults)
+ return merged
diff --git a/python/api/api_resquest.py b/pypaimon/api/api_resquest.py
similarity index 89%
rename from python/api/api_resquest.py
rename to pypaimon/api/api_resquest.py
index d7fd882447..2d0d2583a8 100644
--- a/python/api/api_resquest.py
+++ b/pypaimon/api/api_resquest.py
@@ -18,19 +18,13 @@ limitations under the License.
from abc import ABC
from dataclasses import dataclass
-from typing import Dict, List, Any, Optional
+from typing import Dict, List
class RESTRequest(ABC):
pass
-@dataclass
-class Identifier:
- database_name: str
- object_name: str
-
-
@dataclass
class CreateDatabaseRequest(RESTRequest):
name: str
diff --git a/python/api/auth.py b/pypaimon/api/auth.py
similarity index 100%
rename from python/api/auth.py
rename to pypaimon/api/auth.py
diff --git a/python/api/client.py b/pypaimon/api/client.py
similarity index 91%
rename from python/api/client.py
rename to pypaimon/api/client.py
index 409b4d2c28..fc41e2672d 100644
--- a/python/api/client.py
+++ b/pypaimon/api/client.py
@@ -26,9 +26,9 @@ import requests
from requests.adapters import HTTPAdapter
from urllib3 import Retry
-from auth import RESTAuthParameter
-from api import RESTApi
-from api_response import ErrorResponse
+from api.auth import RESTAuthParameter
+from api.api_response import ErrorResponse
+from api.rest_json import JSON
T = TypeVar('T', bound='RESTResponse')
@@ -78,13 +78,22 @@ class ExponentialRetryInterceptor:
self.logger = logging.getLogger(self.__class__.__name__)
def create_retry_strategy(self) -> Retry:
- return Retry(
- total=self.max_retries,
- status_forcelist=[429, 500, 502, 503, 504],
- method_whitelist=["HEAD", "GET", "PUT", "DELETE", "OPTIONS",
"TRACE", "POST"],
- backoff_factor=1,
- raise_on_status=False
- )
+ retry_kwargs = {
+ 'total': self.max_retries,
+ 'read': self.max_retries,
+ 'connect': self.max_retries,
+ 'backoff_factor': 1,
+ 'status_forcelist': [429, 502, 503, 504],
+ 'raise_on_status': False,
+ 'raise_on_redirect': False,
+ }
+ retry_methods = ["GET", "HEAD", "PUT", "DELETE", "TRACE", "OPTIONS"]
+ retry_instance = Retry()
+ if hasattr(retry_instance, 'allowed_methods'):
+ retry_kwargs['allowed_methods'] = retry_methods
+ else:
+ retry_kwargs['method_whitelist'] = retry_methods
+ return Retry(**retry_kwargs)
class LoggingInterceptor:
@@ -156,7 +165,7 @@ def _normalize_uri(uri: str) -> str:
def _parse_error_response(response_body: Optional[str], status_code: int) ->
ErrorResponse:
if response_body:
try:
- return ErrorResponse.from_json(response_body)
+ return JSON.from_json(response_body, ErrorResponse)
except Exception:
return ErrorResponse(
resource_type=None,
@@ -200,7 +209,7 @@ class HttpClient(RESTClient):
self.session = requests.Session()
- retry_interceptor = ExponentialRetryInterceptor(max_retries=5)
+ retry_interceptor = ExponentialRetryInterceptor(max_retries=1)
retry_strategy = retry_interceptor.create_retry_strategy()
adapter = HTTPAdapter(max_retries=retry_strategy)
@@ -240,7 +249,7 @@ class HttpClient(RESTClient):
def post_with_response_type(self, path: str, body: RESTRequest,
response_type: Optional[Type[T]],
rest_auth_function:
Callable[[RESTAuthParameter], Dict[str, str]]) -> T:
try:
- body_str = RESTApi.to_json(body)
+ body_str = JSON.to_json(body)
auth_headers = _get_headers(path, "POST", body_str,
rest_auth_function)
url = self._get_request_url(path, None)
@@ -259,7 +268,7 @@ class HttpClient(RESTClient):
def delete_with_body(self, path: str, body: RESTRequest,
rest_auth_function: Callable[[RESTAuthParameter],
Dict[str, str]]) -> T:
try:
- body_str = RESTApi.to_json(body)
+ body_str = JSON.to_json(body)
auth_headers = _get_headers(path, "DELETE", body_str,
rest_auth_function)
url = self._get_request_url(path, None)
@@ -312,7 +321,7 @@ class HttpClient(RESTClient):
self.error_handler.accept(error, request_id)
if response_type is not None and response_body_str is not None:
- return response_type.from_json(response_body_str)
+ return JSON.from_json(response_body_str, response_type)
elif response_type is None:
return None
else:
diff --git a/pypaimon/api/rest_json.py b/pypaimon/api/rest_json.py
new file mode 100644
index 0000000000..57bdefdd06
--- /dev/null
+++ b/pypaimon/api/rest_json.py
@@ -0,0 +1,51 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import json
+from dataclasses import asdict
+from typing import Any, Type
+from api.typedef import T
+
+
+class JSON:
+ """Universal JSON serializer"""
+
+ @staticmethod
+ def from_json(json_str: str, target_class: Type[T]) -> T:
+ data = json.loads(json_str)
+ if hasattr(target_class, 'from_dict'):
+ return target_class.from_dict(data)
+ return data
+
+ @staticmethod
+ def to_json(obj: Any) -> str:
+ """Serialize any object to JSON"""
+ return json.dumps(obj, default=JSON._default_serializer)
+
+ @staticmethod
+ def _default_serializer(obj):
+ """Default serialization handler"""
+
+ # Handle objects with to_dict method
+ if hasattr(obj, 'to_dict') and callable(obj.to_dict):
+ return obj.to_dict()
+
+ # Handle dataclass objects
+ if hasattr(obj, '__dataclass_fields__'):
+ return asdict(obj)
+
+ raise TypeError(f"Object of type {type(obj).__name__} is not JSON")
\ No newline at end of file
diff --git a/pypaimon/api/typedef.py b/pypaimon/api/typedef.py
new file mode 100644
index 0000000000..157fa9c1d6
--- /dev/null
+++ b/pypaimon/api/typedef.py
@@ -0,0 +1,61 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from dataclasses import dataclass
+from typing import Optional, TypeVar
+
+T = TypeVar('T')
+
+@dataclass
+class Identifier:
+ """Table/View/Function identifier"""
+ database_name: str
+ object_name: str
+ branch_name: Optional[str] = None
+
+ @classmethod
+ def create(cls, database_name: str, object_name: str) -> 'Identifier':
+ return cls(database_name, object_name)
+
+ @classmethod
+ def from_string(cls, full_name: str) -> 'Identifier':
+ parts = full_name.split('.')
+ if len(parts) == 2:
+ return cls(parts[0], parts[1])
+ elif len(parts) == 3:
+ return cls(parts[0], parts[1], parts[2])
+ else:
+ raise ValueError(f"Invalid identifier format: {full_name}")
+
+ def get_full_name(self) -> str:
+ if self.branch_name:
+ return
f"{self.database_name}.{self.object_name}.{self.branch_name}"
+ return f"{self.database_name}.{self.object_name}"
+
+ def get_database_name(self) -> str:
+ return self.database_name
+
+ def get_table_name(self) -> str:
+ return self.object_name
+
+ def get_object_name(self) -> str:
+ return self.object_name
+
+ def get_branch_name(self) -> Optional[str]:
+ return self.branch_name
+
+ def is_system_table(self) -> bool:
+ return self.object_name.startswith('$')
\ No newline at end of file
diff --git a/pypaimon/tests/__init__.py b/pypaimon/tests/__init__.py
new file mode 100644
index 0000000000..a67d5ea255
--- /dev/null
+++ b/pypaimon/tests/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/pypaimon/tests/api_test.py b/pypaimon/tests/api_test.py
new file mode 100644
index 0000000000..577b9ca94c
--- /dev/null
+++ b/pypaimon/tests/api_test.py
@@ -0,0 +1,805 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import re
+import uuid
+from typing import Dict, List, Optional, Any, Union, Tuple
+from dataclasses import dataclass
+import threading
+from http.server import HTTPServer, BaseHTTPRequestHandler
+from urllib.parse import urlparse
+import unittest
+
+import api
+from api.api_response import (ConfigResponse, ListDatabasesResponse,
GetDatabaseResponse, TableMetadata, Schema,
+ GetTableResponse, ListTablesResponse,
TableSchema, RESTResponse, PagedList, DataField,
+ PaimonDataType)
+from api import RESTApi
+from api.rest_json import JSON
+from api.typedef import Identifier
+
+
+@dataclass
+class ErrorResponse(RESTResponse):
+ """Error response"""
+ RESOURCE_TYPE_DATABASE = "database"
+ RESOURCE_TYPE_TABLE = "table"
+ RESOURCE_TYPE_VIEW = "view"
+ RESOURCE_TYPE_FUNCTION = "function"
+ RESOURCE_TYPE_COLUMN = "column"
+ RESOURCE_TYPE_SNAPSHOT = "snapshot"
+ RESOURCE_TYPE_TAG = "tag"
+ RESOURCE_TYPE_BRANCH = "branch"
+ RESOURCE_TYPE_DEFINITION = "definition"
+ RESOURCE_TYPE_DIALECT = "dialect"
+
+ resource_type: Optional[str]
+ resource_name: Optional[str]
+ message: str
+ code: int
+
+
+class ResourcePaths:
+ """Resource path constants"""
+
+ TABLES = "tables"
+ VIEWS = "views"
+ FUNCTIONS = "functions"
+ SNAPSHOTS = "snapshots"
+ ROLLBACK = "rollback"
+
+ def __init__(self, prefix: str = ""):
+ self.prefix = prefix.rstrip('/')
+
+ def config(self) -> str:
+ return f"/v1/config"
+
+ def databases(self) -> str:
+ return f"/v1/{self.prefix}/databases"
+
+ def tables(self) -> str:
+ return f"{self.prefix}/tables"
+
+
+# Exception classes
+class CatalogException(Exception):
+ """Base catalog exception"""
+ pass
+
+
+class DatabaseNotExistException(CatalogException):
+ """Database not exist exception"""
+
+ def __init__(self, database: str):
+ self.database = database
+ super().__init__(f"Database {database} does not exist")
+
+
+class DatabaseAlreadyExistException(CatalogException):
+ """Database already exist exception"""
+
+ def __init__(self, database: str):
+ self.database = database
+ super().__init__(f"Database {database} already exists")
+
+
+class DatabaseNoPermissionException(CatalogException):
+ """Database no permission exception"""
+
+ def __init__(self, database: str):
+ self.database = database
+ super().__init__(f"No permission to access database {database}")
+
+
+class TableNotExistException(CatalogException):
+ """Table not exist exception"""
+
+ def __init__(self, identifier: Identifier):
+ self.identifier = identifier
+ super().__init__(f"Table {identifier.get_full_name()} does not exist")
+
+
+class TableAlreadyExistException(CatalogException):
+ """Table already exist exception"""
+
+ def __init__(self, identifier: Identifier):
+ self.identifier = identifier
+ super().__init__(f"Table {identifier.get_full_name()} already exists")
+
+
+class TableNoPermissionException(CatalogException):
+ """Table no permission exception"""
+
+ def __init__(self, identifier: Identifier):
+ self.identifier = identifier
+ super().__init__(f"No permission to access table
{identifier.get_full_name()}")
+
+
+class ViewNotExistException(CatalogException):
+ """View not exist exception"""
+
+ def __init__(self, identifier: Identifier):
+ self.identifier = identifier
+ super().__init__(f"View {identifier.get_full_name()} does not exist")
+
+
+class ViewAlreadyExistException(CatalogException):
+ """View already exist exception"""
+
+ def __init__(self, identifier: Identifier):
+ self.identifier = identifier
+ super().__init__(f"View {identifier.get_full_name()} already exists")
+
+
+class FunctionNotExistException(CatalogException):
+ """Function not exist exception"""
+
+ def __init__(self, identifier: Identifier):
+ self.identifier = identifier
+ super().__init__(f"Function {identifier.get_full_name()} does not
exist")
+
+
+class FunctionAlreadyExistException(CatalogException):
+ """Function already exist exception"""
+
+ def __init__(self, identifier: Identifier):
+ self.identifier = identifier
+ super().__init__(f"Function {identifier.get_full_name()} already
exists")
+
+
+class ColumnNotExistException(CatalogException):
+ """Column not exist exception"""
+
+ def __init__(self, column: str):
+ self.column = column
+ super().__init__(f"Column {column} does not exist")
+
+
+class ColumnAlreadyExistException(CatalogException):
+ """Column already exist exception"""
+
+ def __init__(self, column: str):
+ self.column = column
+ super().__init__(f"Column {column} already exists")
+
+
+class DefinitionNotExistException(CatalogException):
+ """Definition not exist exception"""
+
+ def __init__(self, identifier: Identifier, name: str):
+ self.identifier = identifier
+ self.name = name
+ super().__init__(f"Definition {name} does not exist in
{identifier.get_full_name()}")
+
+
+class DefinitionAlreadyExistException(CatalogException):
+ """Definition already exist exception"""
+
+ def __init__(self, identifier: Identifier, name: str):
+ self.identifier = identifier
+ self.name = name
+ super().__init__(f"Definition {name} already exists in
{identifier.get_full_name()}")
+
+
+class DialectNotExistException(CatalogException):
+ """Dialect not exist exception"""
+
+ def __init__(self, identifier: Identifier, dialect: str):
+ self.identifier = identifier
+ self.dialect = dialect
+ super().__init__(f"Dialect {dialect} does not exist in
{identifier.get_full_name()}")
+
+
+class DialectAlreadyExistException(CatalogException):
+ """Dialect already exist exception"""
+
+ def __init__(self, identifier: Identifier, dialect: str):
+ self.identifier = identifier
+ self.dialect = dialect
+ super().__init__(f"Dialect {dialect} already exists in
{identifier.get_full_name()}")
+
+
+# Constants
+DEFAULT_MAX_RESULTS = 100
+AUTHORIZATION_HEADER_KEY = "Authorization"
+
+# REST API parameter constants
+DATABASE_NAME_PATTERN = "databaseNamePattern"
+TABLE_NAME_PATTERN = "tableNamePattern"
+VIEW_NAME_PATTERN = "viewNamePattern"
+FUNCTION_NAME_PATTERN = "functionNamePattern"
+PARTITION_NAME_PATTERN = "partitionNamePattern"
+MAX_RESULTS = "maxResults"
+PAGE_TOKEN = "pageToken"
+
+# Core options
+PATH = "path"
+TYPE = "type"
+WAREHOUSE = "warehouse"
+SNAPSHOT_CLEAN_EMPTY_DIRECTORIES = "snapshot.clean-empty-directories"
+
+# Table types
+FORMAT_TABLE = "FORMAT_TABLE"
+OBJECT_TABLE = "OBJECT_TABLE"
+
+
+class RESTCatalogServer:
+ """Mock REST server for testing"""
+
+ def __init__(self, data_path: str, auth_provider, config: ConfigResponse,
warehouse: str):
+ self.logger = logging.getLogger(__name__)
+ self.warehouse = warehouse
+ self.config_response = config
+
+ # Initialize resource paths
+ prefix = config.defaults.get("prefix")
+ self.resource_paths = ResourcePaths(prefix)
+ self.database_uri = self.resource_paths.databases()
+
+ # Initialize storage
+ self.database_store: Dict[str, GetDatabaseResponse] = {}
+ self.table_metadata_store: Dict[str, TableMetadata] = {}
+ self.no_permission_databases: List[str] = []
+ self.no_permission_tables: List[str] = []
+
+ # Initialize mock catalog (simplified)
+ self.data_path = data_path
+ self.auth_provider = auth_provider
+
+ # HTTP server setup
+ self.server = None
+ self.server_thread = None
+ self.port = 0
+
+ def start(self) -> None:
+ """Start the mock server"""
+ handler = self._create_request_handler()
+ self.server = HTTPServer(('localhost', 0), handler)
+ self.port = self.server.server_port
+
+ self.server_thread = threading.Thread(target=self.server.serve_forever)
+ self.server_thread.daemon = True
+ self.server_thread.start()
+
+ self.logger.info(f"Mock REST server started on port {self.port}")
+
+ def get_url(self) -> str:
+ """Get server URL"""
+ return f"http://localhost:{self.port}"
+
+ def shutdown(self) -> None:
+ """Shutdown the server"""
+ if self.server:
+ self.server.shutdown()
+ self.server.server_close()
+ if self.server_thread:
+ self.server_thread.join()
+
+ def _create_request_handler(self):
+ """Create HTTP request handler"""
+ server_instance = self
+
+ class RequestHandler(BaseHTTPRequestHandler):
+ def do_GET(self):
+ self._handle_request('GET')
+
+ def do_POST(self):
+ self._handle_request('POST')
+
+ def do_DELETE(self):
+ self._handle_request('DELETE')
+
+ def _handle_request(self, method: str):
+ try:
+ # Parse request
+ parsed_url = urlparse(self.path)
+ resource_path = parsed_url.path
+ parameters = self._parse_query_params(parsed_url.query)
+
+ # Get request body
+ content_length = int(self.headers.get('Content-Length', 0))
+ data = self.rfile.read(content_length).decode('utf-8') if
content_length > 0 else ""
+
+ # Get headers
+ headers = dict(self.headers)
+
+ # Handle authentication
+ auth_token = headers.get(AUTHORIZATION_HEADER_KEY.lower())
+ if not self._authenticate(auth_token, resource_path,
parameters, method, data):
+ self._send_response(401, "Unauthorized")
+ return
+
+ # Route request
+ response, status_code = server_instance._route_request(
+ method, resource_path, parameters, data, headers
+ )
+
+ self._send_response(status_code, response)
+
+ except Exception as e:
+ server_instance.logger.error(f"Request handling error:
{e}")
+ self._send_response(500, str(e))
+
+ def _parse_query_params(self, query: str) -> Dict[str, str]:
+ """Parse query parameters"""
+ if not query:
+ return {}
+
+ params = {}
+ for pair in query.split('&'):
+ if '=' in pair:
+ key, value = pair.split('=', 1)
+ params[key.strip()] =
api.RESTUtil.decode_string(value.strip())
+ return params
+
+ def _authenticate(self, token: str, path: str, params: Dict[str,
str],
+ method: str, data: str) -> bool:
+ """Authenticate request"""
+ # Simplified authentication - always return True for mock
+ return True
+
+ def _send_response(self, status_code: int, body: str):
+ """Send HTTP response"""
+ self.send_response(status_code)
+ self.send_header('Content-Type', 'application/json')
+ self.end_headers()
+ self.wfile.write(body.encode('utf-8'))
+
+ def log_message(self, format, *args):
+ """Override to use our logger"""
+ server_instance.logger.debug(format % args)
+
+ return RequestHandler
+
+ def _route_request(self, method: str, resource_path: str, parameters:
Dict[str, str],
+ data: str, headers: Dict[str, str]) -> Tuple[str, int]:
+ """Route HTTP request to appropriate handler"""
+ try:
+ # Config endpoint
+ if resource_path.startswith(self.resource_paths.config()):
+ warehouse_param = parameters.get(WAREHOUSE)
+ if warehouse_param == self.warehouse:
+ return self._mock_response(self.config_response, 200)
+
+ # Databases endpoint
+ if resource_path == self.database_uri or
resource_path.startswith(self.database_uri + "?"):
+ return self._databases_api_handler(method, data, parameters)
+
+ # Global tables endpoint
+ if resource_path.startswith(self.resource_paths.tables()):
+ return self._tables_handle(parameters)
+
+ # Database-specific endpoints
+ if resource_path.startswith(self.database_uri + "/"):
+ return self._handle_database_resource(method, resource_path,
parameters, data)
+
+ return self._mock_response(ErrorResponse(None, None, "Not Found",
404), 404)
+
+ except DatabaseNotExistException as e:
+ response = ErrorResponse(
+ ErrorResponse.RESOURCE_TYPE_DATABASE, e.database, str(e), 404
+ )
+ return self._mock_response(response, 404)
+ except TableNotExistException as e:
+ response = ErrorResponse(
+ ErrorResponse.RESOURCE_TYPE_TABLE,
e.identifier.get_table_name(), str(e), 404
+ )
+ return self._mock_response(response, 404)
+ except DatabaseNoPermissionException as e:
+ response = ErrorResponse(
+ ErrorResponse.RESOURCE_TYPE_DATABASE, e.database, str(e), 403
+ )
+ return self._mock_response(response, 403)
+ except TableNoPermissionException as e:
+ response = ErrorResponse(
+ ErrorResponse.RESOURCE_TYPE_TABLE,
e.identifier.get_table_name(), str(e), 403
+ )
+ return self._mock_response(response, 403)
+ except Exception as e:
+ self.logger.error(f"Unexpected error: {e}")
+ response = ErrorResponse(None, None, str(e), 500)
+ return self._mock_response(response, 500)
+
+ def _handle_database_resource(self, method: str, resource_path: str,
+ parameters: Dict[str, str], data: str) ->
Tuple[str, int]:
+ """Handle database-specific resource requests"""
+ # Extract database name and resource path
+ path_parts = resource_path[len(self.database_uri) + 1:].split('/')
+ database_name = api.RESTUtil.decode_string(path_parts[0])
+
+ # Check database permissions
+ if database_name in self.no_permission_databases:
+ raise DatabaseNoPermissionException(database_name)
+
+ if database_name not in self.database_store:
+ raise DatabaseNotExistException(database_name)
+
+ # Handle different resource types
+ if len(path_parts) == 1:
+ # Database operations
+ return self._database_handle(method, data, database_name)
+
+ elif len(path_parts) == 2:
+ # Collection operations (tables, views, functions)
+ resource_type = path_parts[1]
+
+ if resource_type.startswith(ResourcePaths.TABLES):
+ return self._tables_handle(method, data, database_name,
parameters)
+
+ elif len(path_parts) >= 3:
+ # Individual resource operations
+ resource_type = path_parts[1]
+ resource_name = api.RESTUtil.decode_string(path_parts[2])
+ identifier = Identifier.create(database_name, resource_name)
+
+ if resource_type == ResourcePaths.TABLES:
+ return self._handle_table_resource(method, path_parts,
identifier, data, parameters)
+
+ return self._mock_response(ErrorResponse(None, None, "Not Found",
404), 404)
+
+ def _handle_table_resource(self, method: str, path_parts: List[str],
+ identifier: Identifier, data: str,
+ parameters: Dict[str, str]) -> Tuple[str, int]:
+ """Handle table-specific resource requests"""
+ # Check table permissions
+ if identifier.get_full_name() in self.no_permission_tables:
+ raise TableNoPermissionException(identifier)
+
+ if len(path_parts) == 3:
+ # Basic table operations
+ return self._table_handle(method, data, identifier)
+
+ elif len(path_parts) >= 4:
+ operation = path_parts[3]
+ return self._mock_response(ErrorResponse(None, None, "Not Found",
404), 404)
+
+ def _databases_api_handler(self, method: str, data: str,
+ parameters: Dict[str, str]) -> Tuple[str, int]:
+ """Handle databases API requests"""
+ if method == "GET":
+ database_name_pattern = parameters.get(DATABASE_NAME_PATTERN)
+ databases = [
+ db_name for db_name in self.database_store.keys()
+ if not database_name_pattern or
self._match_name_pattern(db_name, database_name_pattern)
+ ]
+ return self._generate_final_list_databases_response(parameters,
databases)
+
+ return self._mock_response(ErrorResponse(None, None, "Method Not
Allowed", 405), 405)
+
+ def _database_handle(self, method: str, data: str, database_name: str) ->
Tuple[str, int]:
+ """Handle individual database operations"""
+ if database_name not in self.database_store:
+ raise DatabaseNotExistException(database_name)
+
+ database = self.database_store[database_name]
+
+ if method == "GET":
+ response = database
+ return self._mock_response(response, 200)
+
+ elif method == "DELETE":
+ del self.database_store[database_name]
+ return self._mock_response("", 200)
+ return self._mock_response(ErrorResponse(None, None, "Method Not
Allowed", 405), 405)
+
+ def _tables_handle(self, method: str = None, data: str = None,
database_name: str = None,
+ parameters: Dict[str, str] = None) -> Tuple[str, int]:
+ """Handle tables operations"""
+ if parameters is None:
+ parameters = {}
+
+ if database_name:
+ # Database-specific tables
+ if method == "GET":
+ tables = self._list_tables(database_name, parameters)
+ return self._generate_final_list_tables_response(parameters,
tables)
+ return self._mock_response(ErrorResponse(None, None, "Method Not
Allowed", 405), 405)
+
+ def _table_handle(self, method: str, data: str, identifier: Identifier) ->
Tuple[str, int]:
+ """Handle individual table operations"""
+ if method == "GET":
+ if identifier.is_system_table():
+ # Handle system table
+ schema = Schema(fields=[], options={PATH:
f"/tmp/{identifier.get_full_name()}"})
+ table_metadata = self._create_table_metadata(identifier, 1,
schema, None, False)
+ else:
+ if identifier.get_full_name() not in self.table_metadata_store:
+ raise TableNotExistException(identifier)
+ table_metadata =
self.table_metadata_store[identifier.get_full_name()]
+
+ schema = table_metadata.schema.to_schema()
+ path = schema.options.pop(PATH, None)
+
+ response = self.mock_table(identifier, table_metadata, path,
schema);
+ return self._mock_response(response, 200)
+ #
+ # elif method == "POST":
+ # # Alter table
+ # request_body = JSON.from_json(data, AlterTableRequest)
+ # self._alter_table_impl(identifier, request_body.get_changes())
+ # return self._mock_response("", 200)
+
+ elif method == "DELETE":
+ # Drop table
+ if identifier.get_full_name() in self.table_metadata_store:
+ del self.table_metadata_store[identifier.get_full_name()]
+ if identifier.get_full_name() in self.table_latest_snapshot_store:
+ del
self.table_latest_snapshot_store[identifier.get_full_name()]
+ if identifier.get_full_name() in self.table_partitions_store:
+ del self.table_partitions_store[identifier.get_full_name()]
+
+ return self._mock_response("", 200)
+
+ return self._mock_response(ErrorResponse(None, None, "Method Not
Allowed", 405), 405)
+
+ # Utility methods
+ def _mock_response(self, response: Union[RESTResponse, str], http_code:
int) -> Tuple[str, int]:
+ """Create mock response"""
+ if isinstance(response, str):
+ return response, http_code
+
+ try:
+ return JSON.to_json(response), http_code
+ except Exception as e:
+ self.logger.error(f"Failed to serialize response: {e}")
+ return str(e), 500
+
+ def _get_max_results(self, parameters: Dict[str, str]) -> int:
+ """Get max results from parameters"""
+ max_results_str = parameters.get(MAX_RESULTS)
+ if max_results_str:
+ try:
+ max_results = int(max_results_str)
+ return min(max_results, DEFAULT_MAX_RESULTS) if max_results >
0 else DEFAULT_MAX_RESULTS
+ except ValueError:
+ raise ValueError(f"Invalid maxResults value:
{max_results_str}")
+ return DEFAULT_MAX_RESULTS
+
+ def _build_paged_entities(self, entities: List[Any], max_results: int,
+ page_token: Optional[str], desc: bool = False)
-> PagedList:
+ """Build paged entities"""
+ # Sort entities
+ sorted_entities = sorted(entities, key=self._get_paged_key,
reverse=desc)
+
+ # Apply pagination
+ paged_entities = []
+ for entity in sorted_entities:
+ if len(paged_entities) < max_results:
+ if not page_token or self._get_paged_key(entity) > page_token:
+ paged_entities.append(entity)
+ else:
+ break
+
+ # Determine next page token
+ next_page_token = None
+ if len(paged_entities) == max_results and len(sorted_entities) >
max_results:
+ next_page_token = self._get_paged_key(paged_entities[-1])
+
+ return PagedList(elements=paged_entities,
next_page_token=next_page_token)
+
+ def _get_paged_key(self, entity: Any) -> str:
+ """Get paging key for entity"""
+ if isinstance(entity, str):
+ return entity
+ elif hasattr(entity, 'get_name'):
+ return entity.get_name()
+ elif hasattr(entity, 'get_full_name'):
+ return entity.get_full_name()
+ elif hasattr(entity, 'name'):
+ return entity.name
+ else:
+ return str(entity)
+
+ def _match_name_pattern(self, name: str, pattern: str) -> bool:
+ """Match name against SQL pattern"""
+ if not pattern:
+ raise ValueError("Pattern cannot be empty")
+ regex_pattern = self._sql_pattern_to_regex(pattern)
+ return re.match(regex_pattern, name) is not None
+
+ def _sql_pattern_to_regex(self, pattern: str) -> str:
+ """Convert SQL pattern to regex"""
+ regex = []
+ escaped = False
+
+ for char in pattern:
+ if escaped:
+ regex.append(re.escape(char))
+ escaped = False
+ elif char == '\\':
+ escaped = True
+ elif char == '%':
+ regex.append('.*')
+ elif char == '_':
+ regex.append('.')
+ else:
+ regex.append(re.escape(char))
+
+ return '^' + ''.join(regex) + '$'
+
+ def _create_table_metadata(self, identifier: Identifier, schema_id: int,
+ schema: Schema, uuid_str: str, is_external:
bool) -> TableMetadata:
+ """Create table metadata"""
+ options = schema.options.copy()
+ path = f"/tmp/{identifier.get_full_name()}"
+ options[PATH] = path
+
+ table_schema = TableSchema(
+ id=schema_id,
+ fields=schema.fields,
+ highest_field_id=len(schema.fields) - 1,
+ partition_keys=schema.partition_keys,
+ primary_keys=schema.primary_keys,
+ options=options,
+ comment=schema.comment
+ )
+
+ return TableMetadata(
+ schema=table_schema,
+ is_external=is_external,
+ uuid=uuid_str or str(uuid.uuid4())
+ )
+
+ # List methods
+ def _list_tables(self, database_name: str, parameters: Dict[str, str]) ->
List[str]:
+ """List tables in database"""
+ table_name_pattern = parameters.get(TABLE_NAME_PATTERN)
+ tables = []
+
+ for full_name, metadata in self.table_metadata_store.items():
+ identifier = Identifier.from_string(full_name)
+ if (identifier.get_database_name() == database_name and
+ (not table_name_pattern or
self._match_name_pattern(identifier.get_table_name(),
+
table_name_pattern))):
+ tables.append(identifier.get_table_name())
+
+ return tables
+
+ # Response generation methods
+ def _generate_final_list_databases_response(self, parameters: Dict[str,
str],
+ databases: List[str]) ->
Tuple[str, int]:
+ """Generate final list databases response"""
+ if databases:
+ max_results = self._get_max_results(parameters)
+ page_token = parameters.get(PAGE_TOKEN)
+ paged_dbs = self._build_paged_entities(databases, max_results,
page_token)
+ response = ListDatabasesResponse(
+ databases=paged_dbs.elements,
+ next_page_token=paged_dbs.next_page_token
+ )
+ else:
+ response = ListDatabasesResponse(databases=[],
next_page_token=None)
+
+ return self._mock_response(response, 200)
+
+ def _generate_final_list_tables_response(self, parameters: Dict[str, str],
+ tables: List[str]) -> Tuple[str,
int]:
+ """Generate final list tables response"""
+ if tables:
+ max_results = self._get_max_results(parameters)
+ page_token = parameters.get(PAGE_TOKEN)
+ paged_tables = self._build_paged_entities(tables, max_results,
page_token)
+ response = ListTablesResponse(
+ tables=paged_tables.elements,
+ next_page_token=paged_tables.next_page_token
+ )
+ else:
+ response = ListTablesResponse(tables=[], next_page_token=None)
+
+ return self._mock_response(response, 200)
+
+ def add_no_permission_database(self, database: str) -> None:
+ """Add no permission database"""
+ self.no_permission_databases.append(database)
+
+ def add_no_permission_table(self, identifier: Identifier) -> None:
+ """Add no permission table"""
+ self.no_permission_tables.append(identifier.get_full_name())
+
+ def mock_database(self, name: str, options: dict[str, str]) ->
GetDatabaseResponse:
+ return GetDatabaseResponse(
+ id=str(uuid.uuid4()),
+ name=name,
+ location=f"{self.data_path}/{name}",
+ options=options,
+ owner="owner",
+ created_at=1,
+ created_by="created",
+ updated_at=1,
+ updated_by="updated"
+ )
+
+ def mock_table(self, identifier: Identifier, table_metadata:
TableMetadata, path: str,
+ schema: Schema) -> GetTableResponse:
+ return GetTableResponse(
+ id=str(table_metadata.uuid),
+ name=identifier.get_object_name(),
+ path=path,
+ is_external=table_metadata.is_external,
+ schema_id=table_metadata.schema.id,
+ schema=schema,
+ owner="owner",
+ created_at=1,
+ created_by="created",
+ updated_at=1,
+ updated_by="updated"
+ )
+
+
+class ApiTestCase(unittest.TestCase):
+
+ def test(self):
+ """Example usage of RESTCatalogServer"""
+ # Setup logging
+ logging.basicConfig(level=logging.INFO)
+
+ # Create config
+ config = ConfigResponse(defaults={"prefix": "mock-test"})
+
+ # Create mock auth provider
+ class MockAuthProvider:
+ def merge_auth_header(self, headers, auth_param):
+ return {AUTHORIZATION_HEADER_KEY: "Bearer test-token"}
+
+ # Create server
+ server = RESTCatalogServer(
+ data_path="/tmp/test_warehouse",
+ auth_provider=MockAuthProvider(),
+ config=config,
+ warehouse="test_warehouse"
+ )
+ try:
+ # Start server
+ server.start()
+ print(f"Server started at: {server.get_url()}")
+ test_databases = {
+ "default": server.mock_database("default", {"env": "test"}),
+ "test_db1": server.mock_database("test_db1", {"env": "test"}),
+ "test_db2": server.mock_database("test_db2", {"env": "test"}),
+ "prod_db": server.mock_database("prod_db", {"env": "prod"})
+ }
+ test_tables = {
+ "default.user": TableMetadata(uuid=str(uuid.uuid4()),
is_external=True,
+ schema=TableSchema(1,
+
[DataField("name", 0, "name", PaimonDataType('int'))],
+ 1, [], [],
{}, "")),
+ }
+ server.table_metadata_store.update(test_tables)
+ server.database_store.update(test_databases)
+ options = {
+ 'uri': f"http://localhost:{server.port}",
+ 'warehouse': 'test_warehouse',
+ 'dlf.region': 'cn-hangzhou',
+ "token.provider": "xxxx",
+ 'dlf.access-key-id': 'xxxx',
+ 'dlf.access-key-secret': 'xxxx'
+ }
+ api = RESTApi(options)
+ self.assertSetEqual(set(api.list_databases()), {*test_databases})
+ self.assertEqual(api.get_database('default'),
test_databases.get('default'))
+ table = api.get_table(Identifier.from_string('default.user'))
+ self.assertEqual(table.id, str(test_tables['default.user'].uuid))
+
+ finally:
+ # Shutdown server
+ server.shutdown()
+ print("Server stopped")
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/python/api/api_response.py b/python/api/api_response.py
deleted file mode 100644
index 332741547e..0000000000
--- a/python/api/api_response.py
+++ /dev/null
@@ -1,301 +0,0 @@
-"""
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-"""
-
-from abc import ABC, abstractmethod
-from typing import Dict, Optional, Any, Generic, TypeVar, List
-from dataclasses import dataclass, field
-import json
-from datetime import datetime
-from api_resquest import Identifier
-
-T = TypeVar('T')
-
-
-@dataclass
-class PagedList(Generic[T]):
- elements: List[T]
- next_page_token: Optional[str] = None
-
-
-class RESTResponse(ABC):
- pass
-
-
-@dataclass
-class ErrorResponse(RESTResponse):
- resource_type: Optional[str] = None
- resource_name: Optional[str] = None
- message: Optional[str] = None
- code: Optional[int] = None
-
- @classmethod
- def from_dict(cls, data: Dict[str, Any]) -> 'ErrorResponse':
- return cls(
- resource_type=data.get('resourceType'),
- resource_name=data.get('resourceName'),
- message=data.get('message'),
- code=data.get('code'),
- )
-
- @classmethod
- def from_json(cls, json_str: str) -> 'ErrorResponse':
- data = json.loads(json_str)
- return cls.from_dict(data)
-
-
-@dataclass
-class AuditRESTResponse(RESTResponse):
- FIELD_OWNER = "owner"
- FIELD_CREATED_AT = "createdAt"
- FIELD_CREATED_BY = "createdBy"
- FIELD_UPDATED_AT = "updatedAt"
- FIELD_UPDATED_BY = "updatedBy"
-
- owner: Optional[str] = None
- created_at: Optional[int] = None
- created_by: Optional[str] = None
- updated_at: Optional[int] = None
- updated_by: Optional[str] = None
-
- def get_owner(self) -> Optional[str]:
- return self.owner
-
- def get_created_at(self) -> Optional[int]:
- return self.created_at
-
- def get_created_by(self) -> Optional[str]:
- return self.created_by
-
- def get_updated_at(self) -> Optional[int]:
- return self.updated_at
-
- def get_updated_by(self) -> Optional[str]:
- return self.updated_by
-
- def get_created_datetime(self) -> Optional[datetime]:
- if self.created_at:
- return datetime.fromtimestamp(self.created_at / 1000)
- return None
-
- def get_updated_datetime(self) -> Optional[datetime]:
- if self.updated_at:
- return datetime.fromtimestamp(self.updated_at / 1000)
- return None
-
-
-class PagedResponse(RESTResponse, Generic[T]):
-
- @abstractmethod
- def data(self) -> List[T]:
- pass
-
- @abstractmethod
- def get_next_page_token(self) -> str:
- pass
-
-
-@dataclass
-class ListDatabasesResponse(PagedResponse[str]):
- databases: List[str]
- next_page_token: str
-
- @classmethod
- def from_dict(cls, data: Dict[str, Any]) -> 'ListDatabasesResponse':
- return cls(
- databases=data.get('databases'),
- next_page_token=data.get('nextPageToken')
- )
-
- @classmethod
- def from_json(cls, json_str: str) -> 'ListDatabasesResponse':
- data = json.loads(json_str)
- return cls.from_dict(data)
-
- def data(self) -> List[str]:
- return self.databases
-
- def get_next_page_token(self) -> str:
- return self.next_page_token
-
-
-@dataclass
-class ListTablesResponse(PagedResponse[str]):
- tables: Optional[List[str]]
- next_page_token: Optional[str]
-
- @classmethod
- def from_dict(cls, data: Dict[str, Any]) -> 'ListTablesResponse':
- return cls(
- tables=data.get('tables'),
- next_page_token=data.get('nextPageToken')
- )
-
- @classmethod
- def from_json(cls, json_str: str) -> 'ListTablesResponse':
- data = json.loads(json_str)
- return cls.from_dict(data)
-
- def data(self) -> Optional[List[str]]:
- return self.tables
-
- def get_next_page_token(self) -> Optional[str]:
- return self.next_page_token
-
-
-@dataclass
-class GetTableResponse(RESTResponse):
- identifier: Identifier
- schema: Any
- properties: Dict[str, str]
-
- @classmethod
- def from_dict(cls, data: Dict[str, Any]) -> Dict[str, Any]:
- return data
-
- @classmethod
- def from_json(cls, json_str: str) -> Dict[str, Any]:
- data = json.loads(json_str)
- return cls.from_dict(data)
-
-
-@dataclass
-class GetDatabaseResponse(AuditRESTResponse):
- FIELD_ID = "id"
- FIELD_NAME = "name"
- FIELD_LOCATION = "location"
- FIELD_OPTIONS = "options"
-
- id: Optional[str] = None
- name: Optional[str] = None
- location: Optional[str] = None
- options: Optional[Dict[str, str]] = field(default_factory=dict)
-
- def __init__(self,
- id: Optional[str] = None,
- name: Optional[str] = None,
- location: Optional[str] = None,
- options: Optional[Dict[str, str]] = None,
- owner: Optional[str] = None,
- created_at: Optional[int] = None,
- created_by: Optional[str] = None,
- updated_at: Optional[int] = None,
- updated_by: Optional[str] = None):
- super().__init__(owner, created_at, created_by, updated_at, updated_by)
- self.id = id
- self.name = name
- self.location = location
- self.options = options or {}
-
- def get_id(self) -> Optional[str]:
- return self.id
-
- def get_name(self) -> Optional[str]:
- return self.name
-
- def get_location(self) -> Optional[str]:
- return self.location
-
- def get_options(self) -> Dict[str, str]:
- return self.options or {}
-
- def to_dict(self) -> Dict[str, Any]:
- result = {
- self.FIELD_ID: self.id,
- self.FIELD_NAME: self.name,
- self.FIELD_LOCATION: self.location,
- self.FIELD_OPTIONS: self.options
- }
-
- if self.owner is not None:
- result[self.FIELD_OWNER] = self.owner
- if self.created_at is not None:
- result[self.FIELD_CREATED_AT] = self.created_at
- if self.created_by is not None:
- result[self.FIELD_CREATED_BY] = self.created_by
- if self.updated_at is not None:
- result[self.FIELD_UPDATED_AT] = self.updated_at
- if self.updated_by is not None:
- result[self.FIELD_UPDATED_BY] = self.updated_by
-
- return result
-
- @classmethod
- def from_dict(cls, data: Dict[str, Any]) -> 'GetDatabaseResponse':
- return cls(
- id=data.get(cls.FIELD_ID),
- name=data.get(cls.FIELD_NAME),
- location=data.get(cls.FIELD_LOCATION),
- options=data.get(cls.FIELD_OPTIONS, {}),
- owner=data.get(cls.FIELD_OWNER),
- created_at=data.get(cls.FIELD_CREATED_AT),
- created_by=data.get(cls.FIELD_CREATED_BY),
- updated_at=data.get(cls.FIELD_UPDATED_AT),
- updated_by=data.get(cls.FIELD_UPDATED_BY)
- )
-
- def to_json(self) -> str:
- return json.dumps(self.to_dict(), ensure_ascii=False, indent=2)
-
- @classmethod
- def from_json(cls, json_str: str) -> 'GetDatabaseResponse':
- data = json.loads(json_str)
- return cls.from_dict(data)
-
- def __str__(self) -> str:
- return f"GetDatabaseResponse(id={self.id}, name={self.name},
location={self.location})"
-
- def __repr__(self) -> str:
- return (f"GetDatabaseResponse(id={self.id!r}, name={self.name!r}, "
- f"location={self.location!r}, options={self.options!r}, "
- f"owner={self.owner!r}, created_at={self.created_at}, "
- f"created_by={self.created_by!r},
updated_at={self.updated_at}, "
- f"updated_by={self.updated_by!r})")
-
-
-@dataclass
-class ConfigResponse(RESTResponse):
- defaults: Dict[str, Any]
-
- @classmethod
- def from_dict(cls, data: Dict[str, Any]) -> 'ConfigResponse':
- return cls(
- defaults=data.get('defaults')
- )
-
- @classmethod
- def from_json(cls, json_str: str) -> 'ConfigResponse':
- data = json.loads(json_str)
- return cls.from_dict(data)
-
- def merge(self, options: Dict[str, Any]) -> Dict[str, Any]:
- merged = options.copy()
- merged.update(self.defaults)
- return merged
-
-
-class JSONSerializableGetDatabaseResponse(GetDatabaseResponse):
-
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
-
- def __json__(self) -> Dict[str, Any]:
- return self.to_dict()
-
- @classmethod
- def __from_json__(cls, data: Dict[str, Any]) ->
'JSONSerializableGetDatabaseResponse':
- return cls.from_dict(data)