This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f0317ee9d4 [core] python: add test for rest api (#5882)
f0317ee9d4 is described below

commit f0317ee9d405c843afea0bf99fb6a39019b3f3ac
Author: jerry <[email protected]>
AuthorDate: Tue Jul 15 11:13:42 2025 +0800

    [core] python: add test for rest api (#5882)
---
 .gitignore                               |   2 +-
 {python => pypaimon}/README.md           |   0
 pypaimon/__init__.py                     |  16 +
 {python => pypaimon}/api/__init__.py     |  26 +-
 pypaimon/api/api_response.py             | 493 +++++++++++++++++++
 {python => pypaimon}/api/api_resquest.py |   8 +-
 {python => pypaimon}/api/auth.py         |   0
 {python => pypaimon}/api/client.py       |  39 +-
 pypaimon/api/rest_json.py                |  51 ++
 pypaimon/api/typedef.py                  |  61 +++
 pypaimon/tests/__init__.py               |  16 +
 pypaimon/tests/api_test.py               | 805 +++++++++++++++++++++++++++++++
 python/api/api_response.py               | 301 ------------
 13 files changed, 1483 insertions(+), 335 deletions(-)

diff --git a/.gitignore b/.gitignore
index 8d59f6faa3..2a8e3fb02b 100644
--- a/.gitignore
+++ b/.gitignore
@@ -19,7 +19,7 @@ target
 .java-version
 dependency-reduced-pom.xml
 metastore_db/
-python/.idea/
+pypaimon/.idea/
 
 ### VS Code ###
 .vscode/
diff --git a/python/README.md b/pypaimon/README.md
similarity index 100%
rename from python/README.md
rename to pypaimon/README.md
diff --git a/pypaimon/__init__.py b/pypaimon/__init__.py
new file mode 100644
index 0000000000..a67d5ea255
--- /dev/null
+++ b/pypaimon/__init__.py
@@ -0,0 +1,16 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
diff --git a/python/api/__init__.py b/pypaimon/api/__init__.py
similarity index 94%
rename from python/api/__init__.py
rename to pypaimon/api/__init__.py
index 29c50f85f2..573fd889b9 100644
--- a/python/api/__init__.py
+++ b/pypaimon/api/__init__.py
@@ -17,14 +17,19 @@
 
 import logging
 from typing import Dict, List, Optional, Callable
-from auth import RESTAuthFunction
-from api_response import PagedList, GetTableResponse, ListDatabasesResponse, 
ListTablesResponse, GetDatabaseResponse, \
-    ConfigResponse, PagedResponse, T
-from api_resquest import Identifier, CreateDatabaseRequest
+from urllib.parse import unquote
+import api
+from api.auth import RESTAuthFunction
+from api.api_response import PagedList, GetTableResponse, 
ListDatabasesResponse, ListTablesResponse, \
+    GetDatabaseResponse, ConfigResponse, PagedResponse
+from api.api_resquest import CreateDatabaseRequest
+from api.typedef import Identifier
+from api.client import HttpClient
+from api.auth import DLFAuthProvider, DLFToken
+from api.typedef import T
 
 
 class RESTCatalogOptions:
-
     URI = "uri"
     WAREHOUSE = "warehouse"
     TOKEN_PROVIDER = "token.provider"
@@ -56,6 +61,11 @@ class RESTUtil:
         import urllib.parse
         return urllib.parse.quote(value)
 
+    @staticmethod
+    def decode_string(encoded: str) -> str:
+        """Decode URL-encoded string"""
+        return unquote(encoded)
+
     @staticmethod
     def extract_prefix_map(options: Dict[str, str], prefix: str) -> Dict[str, 
str]:
         result = {}
@@ -68,7 +78,6 @@ class RESTUtil:
 
 
 class ResourcePaths:
-
     V1 = "v1"
     DATABASES = "databases"
     TABLES = "tables"
@@ -104,7 +113,6 @@ class ResourcePaths:
 
 
 class RESTApi:
-
     HEADER_PREFIX = "header."
     MAX_RESULTS = "maxResults"
     PAGE_TOKEN = "pageToken"
@@ -113,10 +121,6 @@ class RESTApi:
 
     def __init__(self, options: Dict[str, str], config_required: bool = True):
         self.logger = logging.getLogger(self.__class__.__name__)
-
-        from client import HttpClient
-        from auth import DLFAuthProvider, DLFToken
-
         self.client = HttpClient(options.get(RESTCatalogOptions.URI))
         auth_provider = DLFAuthProvider(
             DLFToken(options), options.get(RESTCatalogOptions.DLF_REGION)
diff --git a/pypaimon/api/api_response.py b/pypaimon/api/api_response.py
new file mode 100644
index 0000000000..4f2361c6b8
--- /dev/null
+++ b/pypaimon/api/api_response.py
@@ -0,0 +1,493 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+from abc import ABC, abstractmethod
+from typing import Dict, Optional, Any, Generic, List
+from dataclasses import dataclass, field
+from api.typedef import T
+
+
+@dataclass
+class PagedList(Generic[T]):
+    elements: List[T]
+    next_page_token: Optional[str] = None
+
+
+class RESTResponse(ABC):
+    pass
+
+
+@dataclass
+class ErrorResponse(RESTResponse):
+    FIELD_RESOURCE_TYPE: "resourceType"
+    FIELD_RESOURCE_NAME: "resourceName"
+    FIELD_MESSAGE: "message"
+    FIELD_CODE: "code"
+
+    resource_type: Optional[str] = None
+    resource_name: Optional[str] = None
+    message: Optional[str] = None
+    code: Optional[int] = None
+
+    def __init__(self,
+                 resource_type: Optional[str] = None,
+                 resource_name: Optional[str] = None,
+                 message: Optional[str] = None,
+                 code: Optional[int] = None):
+        self.resource_type = resource_type
+        self.resource_name = resource_name
+        self.message = message
+        self.code = code
+
+    @classmethod
+    def from_dict(cls, data: Dict[str, Any]) -> 'ErrorResponse':
+        return cls(
+            resource_type=data.get(cls.FIELD_RESOURCE_TYPE),
+            resource_name=data.get(cls.FIELD_RESOURCE_NAME),
+            message=data.get(cls.FIELD_MESSAGE),
+            code=data.get(cls.FIELD_CODE),
+        )
+
+    def to_dict(self) -> Dict[str, Any]:
+        return {
+            self.FIELD_RESOURCE_TYPE: self.resource_type,
+            self.FIELD_RESOURCE_NAME: self.resource_name,
+            self.FIELD_MESSAGE: self.message,
+            self.FIELD_CODE: self.code
+        }
+
+
+@dataclass
+class AuditRESTResponse(RESTResponse):
+    FIELD_OWNER = "owner"
+    FIELD_CREATED_AT = "createdAt"
+    FIELD_CREATED_BY = "createdBy"
+    FIELD_UPDATED_AT = "updatedAt"
+    FIELD_UPDATED_BY = "updatedBy"
+
+    owner: Optional[str] = None
+    created_at: Optional[int] = None
+    created_by: Optional[str] = None
+    updated_at: Optional[int] = None
+    updated_by: Optional[str] = None
+
+    def get_owner(self) -> Optional[str]:
+        return self.owner
+
+    def get_created_at(self) -> Optional[int]:
+        return self.created_at
+
+    def get_created_by(self) -> Optional[str]:
+        return self.created_by
+
+    def get_updated_at(self) -> Optional[int]:
+        return self.updated_at
+
+    def get_updated_by(self) -> Optional[str]:
+        return self.updated_by
+
+
+class PagedResponse(RESTResponse, Generic[T]):
+    FIELD_NEXT_PAGE_TOKEN = "nextPageToken"
+
+    @abstractmethod
+    def data(self) -> List[T]:
+        pass
+
+    @abstractmethod
+    def get_next_page_token(self) -> str:
+        pass
+
+
+@dataclass
+class ListDatabasesResponse(PagedResponse[str]):
+    FIELD_DATABASES = "databases"
+
+    databases: List[str]
+    next_page_token: str
+
+    @classmethod
+    def from_dict(cls, data: Dict[str, Any]) -> 'ListDatabasesResponse':
+        return cls(
+            databases=data.get(cls.FIELD_DATABASES),
+            next_page_token=data.get(cls.FIELD_NEXT_PAGE_TOKEN)
+        )
+
+    def to_dict(self) -> Dict[str, Any]:
+        return {
+            self.FIELD_DATABASES: self.databases,
+            self.FIELD_NEXT_PAGE_TOKEN: self.next_page_token
+        }
+
+    def data(self) -> List[str]:
+        return self.databases
+
+    def get_next_page_token(self) -> str:
+        return self.next_page_token
+
+
+@dataclass
+class ListTablesResponse(PagedResponse[str]):
+    FIELD_TABLES = "tables"
+
+    tables: Optional[List[str]]
+    next_page_token: Optional[str]
+
+    @classmethod
+    def from_dict(cls, data: Dict[str, Any]) -> 'ListTablesResponse':
+        return cls(
+            tables=data.get(cls.FIELD_TABLES),
+            next_page_token=data.get(cls.FIELD_NEXT_PAGE_TOKEN)
+        )
+
+    def to_dict(self) -> Dict[str, Any]:
+        return {
+            self.FIELD_TABLES: self.tables,
+            self.FIELD_NEXT_PAGE_TOKEN: self.next_page_token
+        }
+
+    def data(self) -> Optional[List[str]]:
+        return self.tables
+
+    def get_next_page_token(self) -> Optional[str]:
+        return self.next_page_token
+
+
+@dataclass
+class PaimonDataType:
+    FIELD_TYPE = "type"
+    FIELD_ELEMENT = "element"
+    FIELD_FIELDS = "fields"
+    FIELD_KEY = "key"
+    FIELD_VALUE = "value"
+
+    type: str
+    element: Optional['PaimonDataType'] = None
+    fields: List['DataField'] = field(default_factory=list)
+    key: Optional['PaimonDataType'] = None
+    value: Optional['PaimonDataType'] = None
+
+    @classmethod
+    def from_dict(cls, data: Any) -> 'PaimonDataType':
+        if isinstance(data, dict):
+            element = data.get(cls.FIELD_ELEMENT, None)
+            fields = data.get(cls.FIELD_FIELDS, None)
+            key = data.get(cls.FIELD_KEY, None)
+            value = data.get(cls.FIELD_VALUE, None)
+            if element is not None:
+                element = 
PaimonDataType.from_dict(data.get(cls.FIELD_ELEMENT)),
+            if fields is not None:
+                fields = list(map(lambda f: DataField.from_dict(f), fields)),
+            if key is not None:
+                key = PaimonDataType.from_dict(key)
+            if value is not None:
+                value = PaimonDataType.from_dict(value)
+            return cls(
+                type=data.get(cls.FIELD_TYPE),
+                element=element,
+                fields=fields,
+                key=key,
+                value=value,
+            )
+        else:
+            return cls(type=data)
+
+    def to_dict(self) -> Any:
+        if self.element is None and self.fields is None and self.key:
+            return self.type
+        if self.element is not None:
+            return {self.FIELD_TYPE: self.type, self.FIELD_ELEMENT: 
self.element}
+        elif self.fields is not None:
+            return {self.FIELD_TYPE: self.type, self.FIELD_FIELDS: self.fields}
+        elif self.value is not None:
+            return {self.FIELD_TYPE: self.type, self.FIELD_KEY: self.key, 
self.FIELD_VALUE: self.value}
+        elif self.key is not None and self.value is None:
+            return {self.FIELD_TYPE: self.type, self.FIELD_KEY: self.key}
+
+
+@dataclass
+class DataField:
+    FIELD_ID = "id"
+    FIELD_NAME = "name"
+    FIELD_TYPE = "type"
+    FIELD_DESCRIPTION = "description"
+
+    description: str
+    id: int
+    name: str
+    type: PaimonDataType
+
+    @classmethod
+    def from_dict(cls, data: Dict[str, Any]) -> 'DataField':
+        return cls(
+            id=data.get(cls.FIELD_ID),
+            name=data.get(cls.FIELD_NAME),
+            type=PaimonDataType.from_dict(data.get(cls.FIELD_TYPE)),
+            description=data.get(cls.FIELD_DESCRIPTION),
+        )
+
+    def to_dict(self) -> Dict[str, Any]:
+        return {
+            self.FIELD_ID: self.id,
+            self.FIELD_NAME: self.name,
+            self.FIELD_TYPE: PaimonDataType.to_dict(self.type),
+            self.FIELD_DESCRIPTION: self.description
+        }
+
+
+@dataclass
+class Schema:
+    FIELD_FIELDS = "fields"
+    FIELD_PARTITION_KEYS = "partitionKeys"
+    FIELD_PRIMARY_KEYS = "primaryKeys"
+    FIELD_OPTIONS = "options"
+    FIELD_COMMENT = "comment"
+
+    fields: List[DataField] = field(default_factory=list)
+    partition_keys: List[str] = field(default_factory=list)
+    primary_keys: List[str] = field(default_factory=list)
+    options: Dict[str, str] = field(default_factory=dict)
+    comment: Optional[str] = None
+
+    @classmethod
+    def from_dict(cls, data: Dict[str, Any]) -> 'Schema':
+        return cls(
+            fields=list(map(lambda f: DataField.from_dict(f), 
data.get(cls.FIELD_FIELDS))),
+            partition_keys=data.get(cls.FIELD_PARTITION_KEYS),
+            primary_keys=data.get(cls.FIELD_PRIMARY_KEYS),
+            options=data.get(cls.FIELD_OPTIONS),
+            comment=data.get(cls.FIELD_COMMENT)
+        )
+
+    def to_dict(self) -> Dict[str, Any]:
+        return {
+            self.FIELD_FIELDS: list(map(lambda f: DataField.to_dict(f), 
self.fields)),
+            self.FIELD_PARTITION_KEYS: self.partition_keys,
+            self.FIELD_PRIMARY_KEYS: self.primary_keys,
+            self.FIELD_OPTIONS: self.options,
+            self.FIELD_COMMENT: self.comment
+        }
+
+
+@dataclass
+class TableSchema:
+    """Table schema with ID"""
+    id: int
+    fields: List[DataField]
+    highest_field_id: int
+    partition_keys: List[str]
+    primary_keys: List[str]
+    options: Dict[str, str]
+    comment: Optional[str]
+
+    def to_schema(self) -> Schema:
+        return Schema(
+            fields=self.fields,
+            partition_keys=self.partition_keys,
+            primary_keys=self.primary_keys,
+            options=self.options,
+            comment=self.comment
+        )
+
+
+@dataclass
+class TableMetadata:
+    """Table metadata"""
+    schema: TableSchema
+    is_external: bool
+    uuid: str
+
+
+@dataclass
+class RESTToken:
+    """REST authentication token"""
+    token: Dict[str, str]
+    expire_at_millis: int
+
+
+@dataclass
+class GetTableResponse(AuditRESTResponse):
+    """Response for getting table"""
+
+    # Field constants for JSON serialization
+    FIELD_ID = "id"
+    FIELD_NAME = "name"
+    FIELD_PATH = "path"
+    FIELD_IS_EXTERNAL = "isExternal"
+    FIELD_SCHEMA_ID = "schemaId"
+    FIELD_SCHEMA = "schema"
+
+    id: Optional[str] = None
+    name: Optional[str] = None
+    path: Optional[str] = None
+    is_external: Optional[bool] = None
+    schema_id: Optional[int] = None
+    schema: Optional[Schema] = None
+
+    def __init__(self,
+                 id: str,
+                 name: str,
+                 path: str,
+                 is_external: bool,
+                 schema_id: int,
+                 schema: Schema,
+                 owner: Optional[str] = None,
+                 created_at: Optional[int] = None,
+                 created_by: Optional[str] = None,
+                 updated_at: Optional[int] = None,
+                 updated_by: Optional[str] = None):
+        super().__init__(owner, created_at, created_by, updated_at, updated_by)
+        self.id = id
+        self.name = name
+        self.path = path
+        self.is_external = is_external
+        self.schema_id = schema_id
+        self.schema = schema
+
+    @classmethod
+    def from_dict(cls, data: Dict[str, Any]) -> 'GetTableResponse':
+        return cls(
+            id=data.get(cls.FIELD_ID),
+            name=data.get(cls.FIELD_NAME),
+            path=data.get(cls.FIELD_PATH),
+            is_external=data.get(cls.FIELD_IS_EXTERNAL),
+            schema_id=data.get(cls.FIELD_SCHEMA_ID),
+            schema=Schema.from_dict(data.get(cls.FIELD_SCHEMA)),
+            owner=data.get(cls.FIELD_OWNER),
+            created_at=data.get(cls.FIELD_CREATED_AT),
+            created_by=data.get(cls.FIELD_CREATED_BY),
+            updated_at=data.get(cls.FIELD_UPDATED_AT),
+            updated_by=data.get(cls.FIELD_UPDATED_BY)
+        )
+
+    def to_dict(self) -> Dict[str, Any]:
+        result = {
+            self.FIELD_ID: self.id,
+            self.FIELD_NAME: self.name,
+            self.FIELD_PATH: self.path,
+            self.FIELD_IS_EXTERNAL: self.is_external,
+            self.FIELD_SCHEMA_ID: self.schema_id,
+            self.FIELD_SCHEMA: Schema.to_dict(self.schema)
+        }
+        if self.owner is not None:
+            result[self.FIELD_OWNER] = self.owner
+        if self.created_at is not None:
+            result[self.FIELD_CREATED_AT] = self.created_at
+        if self.created_by is not None:
+            result[self.FIELD_CREATED_BY] = self.created_by
+        if self.updated_at is not None:
+            result[self.FIELD_UPDATED_AT] = self.updated_at
+        if self.updated_by is not None:
+            result[self.FIELD_UPDATED_BY] = self.updated_by
+
+        return result
+
+
+@dataclass
+class GetDatabaseResponse(AuditRESTResponse):
+    FIELD_ID = "id"
+    FIELD_NAME = "name"
+    FIELD_LOCATION = "location"
+    FIELD_OPTIONS = "options"
+
+    id: Optional[str] = None
+    name: Optional[str] = None
+    location: Optional[str] = None
+    options: Optional[Dict[str, str]] = field(default_factory=dict)
+
+    def __init__(self,
+                 id: Optional[str] = None,
+                 name: Optional[str] = None,
+                 location: Optional[str] = None,
+                 options: Optional[Dict[str, str]] = None,
+                 owner: Optional[str] = None,
+                 created_at: Optional[int] = None,
+                 created_by: Optional[str] = None,
+                 updated_at: Optional[int] = None,
+                 updated_by: Optional[str] = None):
+        super().__init__(owner, created_at, created_by, updated_at, updated_by)
+        self.id = id
+        self.name = name
+        self.location = location
+        self.options = options or {}
+
+    def get_id(self) -> Optional[str]:
+        return self.id
+
+    def get_name(self) -> Optional[str]:
+        return self.name
+
+    def get_location(self) -> Optional[str]:
+        return self.location
+
+    def get_options(self) -> Dict[str, str]:
+        return self.options or {}
+
+    def to_dict(self) -> Dict[str, Any]:
+        result = {
+            self.FIELD_ID: self.id,
+            self.FIELD_NAME: self.name,
+            self.FIELD_LOCATION: self.location,
+            self.FIELD_OPTIONS: self.options
+        }
+
+        if self.owner is not None:
+            result[self.FIELD_OWNER] = self.owner
+        if self.created_at is not None:
+            result[self.FIELD_CREATED_AT] = self.created_at
+        if self.created_by is not None:
+            result[self.FIELD_CREATED_BY] = self.created_by
+        if self.updated_at is not None:
+            result[self.FIELD_UPDATED_AT] = self.updated_at
+        if self.updated_by is not None:
+            result[self.FIELD_UPDATED_BY] = self.updated_by
+
+        return result
+
+    @classmethod
+    def from_dict(cls, data: Dict[str, Any]) -> 'GetDatabaseResponse':
+        return cls(
+            id=data.get(cls.FIELD_ID),
+            name=data.get(cls.FIELD_NAME),
+            location=data.get(cls.FIELD_LOCATION),
+            options=data.get(cls.FIELD_OPTIONS, {}),
+            owner=data.get(cls.FIELD_OWNER),
+            created_at=data.get(cls.FIELD_CREATED_AT),
+            created_by=data.get(cls.FIELD_CREATED_BY),
+            updated_at=data.get(cls.FIELD_UPDATED_AT),
+            updated_by=data.get(cls.FIELD_UPDATED_BY)
+        )
+
+
+@dataclass
+class ConfigResponse(RESTResponse):
+    FILED_DEFAULTS = "defaults"
+
+    defaults: Dict[str, str]
+
+    @classmethod
+    def from_dict(cls, data: Dict[str, str]) -> 'ConfigResponse':
+        return cls(defaults=data.get(cls.FILED_DEFAULTS))
+
+    def to_dict(self) -> Dict[str, Any]:
+        return {self.FILED_DEFAULTS: self.defaults}
+
+    def merge(self, options: Dict[str, str]) -> Dict[str, str]:
+        merged = options.copy()
+        merged.update(self.defaults)
+        return merged
diff --git a/python/api/api_resquest.py b/pypaimon/api/api_resquest.py
similarity index 89%
rename from python/api/api_resquest.py
rename to pypaimon/api/api_resquest.py
index d7fd882447..2d0d2583a8 100644
--- a/python/api/api_resquest.py
+++ b/pypaimon/api/api_resquest.py
@@ -18,19 +18,13 @@ limitations under the License.
 
 from abc import ABC
 from dataclasses import dataclass
-from typing import Dict, List, Any, Optional
+from typing import Dict, List
 
 
 class RESTRequest(ABC):
     pass
 
 
-@dataclass
-class Identifier:
-    database_name: str
-    object_name: str
-
-
 @dataclass
 class CreateDatabaseRequest(RESTRequest):
     name: str
diff --git a/python/api/auth.py b/pypaimon/api/auth.py
similarity index 100%
rename from python/api/auth.py
rename to pypaimon/api/auth.py
diff --git a/python/api/client.py b/pypaimon/api/client.py
similarity index 91%
rename from python/api/client.py
rename to pypaimon/api/client.py
index 409b4d2c28..fc41e2672d 100644
--- a/python/api/client.py
+++ b/pypaimon/api/client.py
@@ -26,9 +26,9 @@ import requests
 from requests.adapters import HTTPAdapter
 from urllib3 import Retry
 
-from auth import RESTAuthParameter
-from api import RESTApi
-from api_response import ErrorResponse
+from api.auth import RESTAuthParameter
+from api.api_response import ErrorResponse
+from api.rest_json import JSON
 
 T = TypeVar('T', bound='RESTResponse')
 
@@ -78,13 +78,22 @@ class ExponentialRetryInterceptor:
         self.logger = logging.getLogger(self.__class__.__name__)
 
     def create_retry_strategy(self) -> Retry:
-        return Retry(
-            total=self.max_retries,
-            status_forcelist=[429, 500, 502, 503, 504],
-            method_whitelist=["HEAD", "GET", "PUT", "DELETE", "OPTIONS", 
"TRACE", "POST"],
-            backoff_factor=1,
-            raise_on_status=False
-        )
+        retry_kwargs = {
+            'total': self.max_retries,
+            'read': self.max_retries,
+            'connect': self.max_retries,
+            'backoff_factor': 1,
+            'status_forcelist': [429, 502, 503, 504],
+            'raise_on_status': False,
+            'raise_on_redirect': False,
+        }
+        retry_methods = ["GET", "HEAD", "PUT", "DELETE", "TRACE", "OPTIONS"]
+        retry_instance = Retry()
+        if hasattr(retry_instance, 'allowed_methods'):
+            retry_kwargs['allowed_methods'] = retry_methods
+        else:
+            retry_kwargs['method_whitelist'] = retry_methods
+        return Retry(**retry_kwargs)
 
 
 class LoggingInterceptor:
@@ -156,7 +165,7 @@ def _normalize_uri(uri: str) -> str:
 def _parse_error_response(response_body: Optional[str], status_code: int) -> 
ErrorResponse:
     if response_body:
         try:
-            return ErrorResponse.from_json(response_body)
+            return JSON.from_json(response_body, ErrorResponse)
         except Exception:
             return ErrorResponse(
                 resource_type=None,
@@ -200,7 +209,7 @@ class HttpClient(RESTClient):
 
         self.session = requests.Session()
 
-        retry_interceptor = ExponentialRetryInterceptor(max_retries=5)
+        retry_interceptor = ExponentialRetryInterceptor(max_retries=1)
         retry_strategy = retry_interceptor.create_retry_strategy()
         adapter = HTTPAdapter(max_retries=retry_strategy)
 
@@ -240,7 +249,7 @@ class HttpClient(RESTClient):
     def post_with_response_type(self, path: str, body: RESTRequest, 
response_type: Optional[Type[T]],
                                 rest_auth_function: 
Callable[[RESTAuthParameter], Dict[str, str]]) -> T:
         try:
-            body_str = RESTApi.to_json(body)
+            body_str = JSON.to_json(body)
             auth_headers = _get_headers(path, "POST", body_str, 
rest_auth_function)
             url = self._get_request_url(path, None)
 
@@ -259,7 +268,7 @@ class HttpClient(RESTClient):
     def delete_with_body(self, path: str, body: RESTRequest,
                          rest_auth_function: Callable[[RESTAuthParameter], 
Dict[str, str]]) -> T:
         try:
-            body_str = RESTApi.to_json(body)
+            body_str = JSON.to_json(body)
             auth_headers = _get_headers(path, "DELETE", body_str, 
rest_auth_function)
             url = self._get_request_url(path, None)
 
@@ -312,7 +321,7 @@ class HttpClient(RESTClient):
                 self.error_handler.accept(error, request_id)
 
             if response_type is not None and response_body_str is not None:
-                return response_type.from_json(response_body_str)
+                return JSON.from_json(response_body_str, response_type)
             elif response_type is None:
                 return None
             else:
diff --git a/pypaimon/api/rest_json.py b/pypaimon/api/rest_json.py
new file mode 100644
index 0000000000..57bdefdd06
--- /dev/null
+++ b/pypaimon/api/rest_json.py
@@ -0,0 +1,51 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+
+import json
+from dataclasses import asdict
+from typing import Any, Type
+from api.typedef import T
+
+
+class JSON:
+    """Universal JSON serializer"""
+
+    @staticmethod
+    def from_json(json_str: str, target_class: Type[T]) -> T:
+        data = json.loads(json_str)
+        if hasattr(target_class, 'from_dict'):
+            return target_class.from_dict(data)
+        return data
+
+    @staticmethod
+    def to_json(obj: Any) -> str:
+        """Serialize any object to JSON"""
+        return json.dumps(obj, default=JSON._default_serializer)
+
+    @staticmethod
+    def _default_serializer(obj):
+        """Default serialization handler"""
+
+        # Handle objects with to_dict method
+        if hasattr(obj, 'to_dict') and callable(obj.to_dict):
+            return obj.to_dict()
+
+        # Handle dataclass objects
+        if hasattr(obj, '__dataclass_fields__'):
+            return asdict(obj)
+
+        raise TypeError(f"Object of type {type(obj).__name__} is not JSON")
\ No newline at end of file
diff --git a/pypaimon/api/typedef.py b/pypaimon/api/typedef.py
new file mode 100644
index 0000000000..157fa9c1d6
--- /dev/null
+++ b/pypaimon/api/typedef.py
@@ -0,0 +1,61 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+from dataclasses import dataclass
+from typing import Optional, TypeVar
+
+T = TypeVar('T')
+
+@dataclass
+class Identifier:
+    """Table/View/Function identifier"""
+    database_name: str
+    object_name: str
+    branch_name: Optional[str] = None
+
+    @classmethod
+    def create(cls, database_name: str, object_name: str) -> 'Identifier':
+        return cls(database_name, object_name)
+
+    @classmethod
+    def from_string(cls, full_name: str) -> 'Identifier':
+        parts = full_name.split('.')
+        if len(parts) == 2:
+            return cls(parts[0], parts[1])
+        elif len(parts) == 3:
+            return cls(parts[0], parts[1], parts[2])
+        else:
+            raise ValueError(f"Invalid identifier format: {full_name}")
+
+    def get_full_name(self) -> str:
+        if self.branch_name:
+            return 
f"{self.database_name}.{self.object_name}.{self.branch_name}"
+        return f"{self.database_name}.{self.object_name}"
+
+    def get_database_name(self) -> str:
+        return self.database_name
+
+    def get_table_name(self) -> str:
+        return self.object_name
+
+    def get_object_name(self) -> str:
+        return self.object_name
+
+    def get_branch_name(self) -> Optional[str]:
+        return self.branch_name
+
+    def is_system_table(self) -> bool:
+        return self.object_name.startswith('$')
\ No newline at end of file
diff --git a/pypaimon/tests/__init__.py b/pypaimon/tests/__init__.py
new file mode 100644
index 0000000000..a67d5ea255
--- /dev/null
+++ b/pypaimon/tests/__init__.py
@@ -0,0 +1,16 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
diff --git a/pypaimon/tests/api_test.py b/pypaimon/tests/api_test.py
new file mode 100644
index 0000000000..577b9ca94c
--- /dev/null
+++ b/pypaimon/tests/api_test.py
@@ -0,0 +1,805 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import re
+import uuid
+from typing import Dict, List, Optional, Any, Union, Tuple
+from dataclasses import dataclass
+import threading
+from http.server import HTTPServer, BaseHTTPRequestHandler
+from urllib.parse import urlparse
+import unittest
+
+import api
+from api.api_response import (ConfigResponse, ListDatabasesResponse, 
GetDatabaseResponse, TableMetadata, Schema,
+                              GetTableResponse, ListTablesResponse, 
TableSchema, RESTResponse, PagedList, DataField,
+                              PaimonDataType)
+from api import RESTApi
+from api.rest_json import JSON
+from api.typedef import Identifier
+
+
+@dataclass
+class ErrorResponse(RESTResponse):
+    """Error response"""
+    RESOURCE_TYPE_DATABASE = "database"
+    RESOURCE_TYPE_TABLE = "table"
+    RESOURCE_TYPE_VIEW = "view"
+    RESOURCE_TYPE_FUNCTION = "function"
+    RESOURCE_TYPE_COLUMN = "column"
+    RESOURCE_TYPE_SNAPSHOT = "snapshot"
+    RESOURCE_TYPE_TAG = "tag"
+    RESOURCE_TYPE_BRANCH = "branch"
+    RESOURCE_TYPE_DEFINITION = "definition"
+    RESOURCE_TYPE_DIALECT = "dialect"
+
+    resource_type: Optional[str]
+    resource_name: Optional[str]
+    message: str
+    code: int
+
+
+class ResourcePaths:
+    """Resource path constants"""
+
+    TABLES = "tables"
+    VIEWS = "views"
+    FUNCTIONS = "functions"
+    SNAPSHOTS = "snapshots"
+    ROLLBACK = "rollback"
+
+    def __init__(self, prefix: str = ""):
+        self.prefix = prefix.rstrip('/')
+
+    def config(self) -> str:
+        return f"/v1/config"
+
+    def databases(self) -> str:
+        return f"/v1/{self.prefix}/databases"
+
+    def tables(self) -> str:
+        return f"{self.prefix}/tables"
+
+
+# Exception classes
+class CatalogException(Exception):
+    """Base catalog exception"""
+    pass
+
+
+class DatabaseNotExistException(CatalogException):
+    """Database not exist exception"""
+
+    def __init__(self, database: str):
+        self.database = database
+        super().__init__(f"Database {database} does not exist")
+
+
+class DatabaseAlreadyExistException(CatalogException):
+    """Database already exist exception"""
+
+    def __init__(self, database: str):
+        self.database = database
+        super().__init__(f"Database {database} already exists")
+
+
+class DatabaseNoPermissionException(CatalogException):
+    """Database no permission exception"""
+
+    def __init__(self, database: str):
+        self.database = database
+        super().__init__(f"No permission to access database {database}")
+
+
+class TableNotExistException(CatalogException):
+    """Table not exist exception"""
+
+    def __init__(self, identifier: Identifier):
+        self.identifier = identifier
+        super().__init__(f"Table {identifier.get_full_name()} does not exist")
+
+
+class TableAlreadyExistException(CatalogException):
+    """Table already exist exception"""
+
+    def __init__(self, identifier: Identifier):
+        self.identifier = identifier
+        super().__init__(f"Table {identifier.get_full_name()} already exists")
+
+
+class TableNoPermissionException(CatalogException):
+    """Table no permission exception"""
+
+    def __init__(self, identifier: Identifier):
+        self.identifier = identifier
+        super().__init__(f"No permission to access table 
{identifier.get_full_name()}")
+
+
+class ViewNotExistException(CatalogException):
+    """View not exist exception"""
+
+    def __init__(self, identifier: Identifier):
+        self.identifier = identifier
+        super().__init__(f"View {identifier.get_full_name()} does not exist")
+
+
+class ViewAlreadyExistException(CatalogException):
+    """View already exist exception"""
+
+    def __init__(self, identifier: Identifier):
+        self.identifier = identifier
+        super().__init__(f"View {identifier.get_full_name()} already exists")
+
+
+class FunctionNotExistException(CatalogException):
+    """Function not exist exception"""
+
+    def __init__(self, identifier: Identifier):
+        self.identifier = identifier
+        super().__init__(f"Function {identifier.get_full_name()} does not 
exist")
+
+
+class FunctionAlreadyExistException(CatalogException):
+    """Function already exist exception"""
+
+    def __init__(self, identifier: Identifier):
+        self.identifier = identifier
+        super().__init__(f"Function {identifier.get_full_name()} already 
exists")
+
+
+class ColumnNotExistException(CatalogException):
+    """Column not exist exception"""
+
+    def __init__(self, column: str):
+        self.column = column
+        super().__init__(f"Column {column} does not exist")
+
+
+class ColumnAlreadyExistException(CatalogException):
+    """Column already exist exception"""
+
+    def __init__(self, column: str):
+        self.column = column
+        super().__init__(f"Column {column} already exists")
+
+
+class DefinitionNotExistException(CatalogException):
+    """Definition not exist exception"""
+
+    def __init__(self, identifier: Identifier, name: str):
+        self.identifier = identifier
+        self.name = name
+        super().__init__(f"Definition {name} does not exist in 
{identifier.get_full_name()}")
+
+
+class DefinitionAlreadyExistException(CatalogException):
+    """Definition already exist exception"""
+
+    def __init__(self, identifier: Identifier, name: str):
+        self.identifier = identifier
+        self.name = name
+        super().__init__(f"Definition {name} already exists in 
{identifier.get_full_name()}")
+
+
+class DialectNotExistException(CatalogException):
+    """Dialect not exist exception"""
+
+    def __init__(self, identifier: Identifier, dialect: str):
+        self.identifier = identifier
+        self.dialect = dialect
+        super().__init__(f"Dialect {dialect} does not exist in 
{identifier.get_full_name()}")
+
+
+class DialectAlreadyExistException(CatalogException):
+    """Dialect already exist exception"""
+
+    def __init__(self, identifier: Identifier, dialect: str):
+        self.identifier = identifier
+        self.dialect = dialect
+        super().__init__(f"Dialect {dialect} already exists in 
{identifier.get_full_name()}")
+
+
+# Constants
+DEFAULT_MAX_RESULTS = 100
+AUTHORIZATION_HEADER_KEY = "Authorization"
+
+# REST API parameter constants
+DATABASE_NAME_PATTERN = "databaseNamePattern"
+TABLE_NAME_PATTERN = "tableNamePattern"
+VIEW_NAME_PATTERN = "viewNamePattern"
+FUNCTION_NAME_PATTERN = "functionNamePattern"
+PARTITION_NAME_PATTERN = "partitionNamePattern"
+MAX_RESULTS = "maxResults"
+PAGE_TOKEN = "pageToken"
+
+# Core options
+PATH = "path"
+TYPE = "type"
+WAREHOUSE = "warehouse"
+SNAPSHOT_CLEAN_EMPTY_DIRECTORIES = "snapshot.clean-empty-directories"
+
+# Table types
+FORMAT_TABLE = "FORMAT_TABLE"
+OBJECT_TABLE = "OBJECT_TABLE"
+
+
+class RESTCatalogServer:
+    """Mock REST server for testing"""
+
+    def __init__(self, data_path: str, auth_provider, config: ConfigResponse, 
warehouse: str):
+        self.logger = logging.getLogger(__name__)
+        self.warehouse = warehouse
+        self.config_response = config
+
+        # Initialize resource paths
+        prefix = config.defaults.get("prefix")
+        self.resource_paths = ResourcePaths(prefix)
+        self.database_uri = self.resource_paths.databases()
+
+        # Initialize storage
+        self.database_store: Dict[str, GetDatabaseResponse] = {}
+        self.table_metadata_store: Dict[str, TableMetadata] = {}
+        self.no_permission_databases: List[str] = []
+        self.no_permission_tables: List[str] = []
+
+        # Initialize mock catalog (simplified)
+        self.data_path = data_path
+        self.auth_provider = auth_provider
+
+        # HTTP server setup
+        self.server = None
+        self.server_thread = None
+        self.port = 0
+
+    def start(self) -> None:
+        """Start the mock server"""
+        handler = self._create_request_handler()
+        self.server = HTTPServer(('localhost', 0), handler)
+        self.port = self.server.server_port
+
+        self.server_thread = threading.Thread(target=self.server.serve_forever)
+        self.server_thread.daemon = True
+        self.server_thread.start()
+
+        self.logger.info(f"Mock REST server started on port {self.port}")
+
+    def get_url(self) -> str:
+        """Get server URL"""
+        return f"http://localhost:{self.port}";
+
+    def shutdown(self) -> None:
+        """Shutdown the server"""
+        if self.server:
+            self.server.shutdown()
+            self.server.server_close()
+        if self.server_thread:
+            self.server_thread.join()
+
+    def _create_request_handler(self):
+        """Create HTTP request handler"""
+        server_instance = self
+
+        class RequestHandler(BaseHTTPRequestHandler):
+            def do_GET(self):
+                self._handle_request('GET')
+
+            def do_POST(self):
+                self._handle_request('POST')
+
+            def do_DELETE(self):
+                self._handle_request('DELETE')
+
+            def _handle_request(self, method: str):
+                try:
+                    # Parse request
+                    parsed_url = urlparse(self.path)
+                    resource_path = parsed_url.path
+                    parameters = self._parse_query_params(parsed_url.query)
+
+                    # Get request body
+                    content_length = int(self.headers.get('Content-Length', 0))
+                    data = self.rfile.read(content_length).decode('utf-8') if 
content_length > 0 else ""
+
+                    # Get headers
+                    headers = dict(self.headers)
+
+                    # Handle authentication
+                    auth_token = headers.get(AUTHORIZATION_HEADER_KEY.lower())
+                    if not self._authenticate(auth_token, resource_path, 
parameters, method, data):
+                        self._send_response(401, "Unauthorized")
+                        return
+
+                    # Route request
+                    response, status_code = server_instance._route_request(
+                        method, resource_path, parameters, data, headers
+                    )
+
+                    self._send_response(status_code, response)
+
+                except Exception as e:
+                    server_instance.logger.error(f"Request handling error: 
{e}")
+                    self._send_response(500, str(e))
+
+            def _parse_query_params(self, query: str) -> Dict[str, str]:
+                """Parse query parameters"""
+                if not query:
+                    return {}
+
+                params = {}
+                for pair in query.split('&'):
+                    if '=' in pair:
+                        key, value = pair.split('=', 1)
+                        params[key.strip()] = 
api.RESTUtil.decode_string(value.strip())
+                return params
+
+            def _authenticate(self, token: str, path: str, params: Dict[str, 
str],
+                              method: str, data: str) -> bool:
+                """Authenticate request"""
+                # Simplified authentication - always return True for mock
+                return True
+
+            def _send_response(self, status_code: int, body: str):
+                """Send HTTP response"""
+                self.send_response(status_code)
+                self.send_header('Content-Type', 'application/json')
+                self.end_headers()
+                self.wfile.write(body.encode('utf-8'))
+
+            def log_message(self, format, *args):
+                """Override to use our logger"""
+                server_instance.logger.debug(format % args)
+
+        return RequestHandler
+
+    def _route_request(self, method: str, resource_path: str, parameters: 
Dict[str, str],
+                       data: str, headers: Dict[str, str]) -> Tuple[str, int]:
+        """Route HTTP request to appropriate handler"""
+        try:
+            # Config endpoint
+            if resource_path.startswith(self.resource_paths.config()):
+                warehouse_param = parameters.get(WAREHOUSE)
+                if warehouse_param == self.warehouse:
+                    return self._mock_response(self.config_response, 200)
+
+            # Databases endpoint
+            if resource_path == self.database_uri or 
resource_path.startswith(self.database_uri + "?"):
+                return self._databases_api_handler(method, data, parameters)
+
+            # Global tables endpoint
+            if resource_path.startswith(self.resource_paths.tables()):
+                return self._tables_handle(parameters)
+
+            # Database-specific endpoints
+            if resource_path.startswith(self.database_uri + "/"):
+                return self._handle_database_resource(method, resource_path, 
parameters, data)
+
+            return self._mock_response(ErrorResponse(None, None, "Not Found", 
404), 404)
+
+        except DatabaseNotExistException as e:
+            response = ErrorResponse(
+                ErrorResponse.RESOURCE_TYPE_DATABASE, e.database, str(e), 404
+            )
+            return self._mock_response(response, 404)
+        except TableNotExistException as e:
+            response = ErrorResponse(
+                ErrorResponse.RESOURCE_TYPE_TABLE, 
e.identifier.get_table_name(), str(e), 404
+            )
+            return self._mock_response(response, 404)
+        except DatabaseNoPermissionException as e:
+            response = ErrorResponse(
+                ErrorResponse.RESOURCE_TYPE_DATABASE, e.database, str(e), 403
+            )
+            return self._mock_response(response, 403)
+        except TableNoPermissionException as e:
+            response = ErrorResponse(
+                ErrorResponse.RESOURCE_TYPE_TABLE, 
e.identifier.get_table_name(), str(e), 403
+            )
+            return self._mock_response(response, 403)
+        except Exception as e:
+            self.logger.error(f"Unexpected error: {e}")
+            response = ErrorResponse(None, None, str(e), 500)
+            return self._mock_response(response, 500)
+
+    def _handle_database_resource(self, method: str, resource_path: str,
+                                  parameters: Dict[str, str], data: str) -> 
Tuple[str, int]:
+        """Handle database-specific resource requests"""
+        # Extract database name and resource path
+        path_parts = resource_path[len(self.database_uri) + 1:].split('/')
+        database_name = api.RESTUtil.decode_string(path_parts[0])
+
+        # Check database permissions
+        if database_name in self.no_permission_databases:
+            raise DatabaseNoPermissionException(database_name)
+
+        if database_name not in self.database_store:
+            raise DatabaseNotExistException(database_name)
+
+        # Handle different resource types
+        if len(path_parts) == 1:
+            # Database operations
+            return self._database_handle(method, data, database_name)
+
+        elif len(path_parts) == 2:
+            # Collection operations (tables, views, functions)
+            resource_type = path_parts[1]
+
+            if resource_type.startswith(ResourcePaths.TABLES):
+                return self._tables_handle(method, data, database_name, 
parameters)
+
+        elif len(path_parts) >= 3:
+            # Individual resource operations
+            resource_type = path_parts[1]
+            resource_name = api.RESTUtil.decode_string(path_parts[2])
+            identifier = Identifier.create(database_name, resource_name)
+
+            if resource_type == ResourcePaths.TABLES:
+                return self._handle_table_resource(method, path_parts, 
identifier, data, parameters)
+
+        return self._mock_response(ErrorResponse(None, None, "Not Found", 
404), 404)
+
+    def _handle_table_resource(self, method: str, path_parts: List[str],
+                               identifier: Identifier, data: str,
+                               parameters: Dict[str, str]) -> Tuple[str, int]:
+        """Handle table-specific resource requests"""
+        # Check table permissions
+        if identifier.get_full_name() in self.no_permission_tables:
+            raise TableNoPermissionException(identifier)
+
+        if len(path_parts) == 3:
+            # Basic table operations
+            return self._table_handle(method, data, identifier)
+
+        elif len(path_parts) >= 4:
+            operation = path_parts[3]
+        return self._mock_response(ErrorResponse(None, None, "Not Found", 
404), 404)
+
+    def _databases_api_handler(self, method: str, data: str,
+                               parameters: Dict[str, str]) -> Tuple[str, int]:
+        """Handle databases API requests"""
+        if method == "GET":
+            database_name_pattern = parameters.get(DATABASE_NAME_PATTERN)
+            databases = [
+                db_name for db_name in self.database_store.keys()
+                if not database_name_pattern or 
self._match_name_pattern(db_name, database_name_pattern)
+            ]
+            return self._generate_final_list_databases_response(parameters, 
databases)
+
+        return self._mock_response(ErrorResponse(None, None, "Method Not 
Allowed", 405), 405)
+
+    def _database_handle(self, method: str, data: str, database_name: str) -> 
Tuple[str, int]:
+        """Handle individual database operations"""
+        if database_name not in self.database_store:
+            raise DatabaseNotExistException(database_name)
+
+        database = self.database_store[database_name]
+
+        if method == "GET":
+            response = database
+            return self._mock_response(response, 200)
+
+        elif method == "DELETE":
+            del self.database_store[database_name]
+            return self._mock_response("", 200)
+        return self._mock_response(ErrorResponse(None, None, "Method Not 
Allowed", 405), 405)
+
+    def _tables_handle(self, method: str = None, data: str = None, 
database_name: str = None,
+                       parameters: Dict[str, str] = None) -> Tuple[str, int]:
+        """Handle tables operations"""
+        if parameters is None:
+            parameters = {}
+
+        if database_name:
+            # Database-specific tables
+            if method == "GET":
+                tables = self._list_tables(database_name, parameters)
+                return self._generate_final_list_tables_response(parameters, 
tables)
+        return self._mock_response(ErrorResponse(None, None, "Method Not 
Allowed", 405), 405)
+
+    def _table_handle(self, method: str, data: str, identifier: Identifier) -> 
Tuple[str, int]:
+        """Handle individual table operations"""
+        if method == "GET":
+            if identifier.is_system_table():
+                # Handle system table
+                schema = Schema(fields=[], options={PATH: 
f"/tmp/{identifier.get_full_name()}"})
+                table_metadata = self._create_table_metadata(identifier, 1, 
schema, None, False)
+            else:
+                if identifier.get_full_name() not in self.table_metadata_store:
+                    raise TableNotExistException(identifier)
+                table_metadata = 
self.table_metadata_store[identifier.get_full_name()]
+
+            schema = table_metadata.schema.to_schema()
+            path = schema.options.pop(PATH, None)
+
+            response = self.mock_table(identifier, table_metadata, path, 
schema);
+            return self._mock_response(response, 200)
+        #
+        # elif method == "POST":
+        #     # Alter table
+        #     request_body = JSON.from_json(data, AlterTableRequest)
+        #     self._alter_table_impl(identifier, request_body.get_changes())
+        #     return self._mock_response("", 200)
+
+        elif method == "DELETE":
+            # Drop table
+            if identifier.get_full_name() in self.table_metadata_store:
+                del self.table_metadata_store[identifier.get_full_name()]
+            if identifier.get_full_name() in self.table_latest_snapshot_store:
+                del 
self.table_latest_snapshot_store[identifier.get_full_name()]
+            if identifier.get_full_name() in self.table_partitions_store:
+                del self.table_partitions_store[identifier.get_full_name()]
+
+            return self._mock_response("", 200)
+
+        return self._mock_response(ErrorResponse(None, None, "Method Not 
Allowed", 405), 405)
+
+    # Utility methods
+    def _mock_response(self, response: Union[RESTResponse, str], http_code: 
int) -> Tuple[str, int]:
+        """Create mock response"""
+        if isinstance(response, str):
+            return response, http_code
+
+        try:
+            return JSON.to_json(response), http_code
+        except Exception as e:
+            self.logger.error(f"Failed to serialize response: {e}")
+            return str(e), 500
+
+    def _get_max_results(self, parameters: Dict[str, str]) -> int:
+        """Get max results from parameters"""
+        max_results_str = parameters.get(MAX_RESULTS)
+        if max_results_str:
+            try:
+                max_results = int(max_results_str)
+                return min(max_results, DEFAULT_MAX_RESULTS) if max_results > 
0 else DEFAULT_MAX_RESULTS
+            except ValueError:
+                raise ValueError(f"Invalid maxResults value: 
{max_results_str}")
+        return DEFAULT_MAX_RESULTS
+
+    def _build_paged_entities(self, entities: List[Any], max_results: int,
+                              page_token: Optional[str], desc: bool = False) 
-> PagedList:
+        """Build paged entities"""
+        # Sort entities
+        sorted_entities = sorted(entities, key=self._get_paged_key, 
reverse=desc)
+
+        # Apply pagination
+        paged_entities = []
+        for entity in sorted_entities:
+            if len(paged_entities) < max_results:
+                if not page_token or self._get_paged_key(entity) > page_token:
+                    paged_entities.append(entity)
+            else:
+                break
+
+        # Determine next page token
+        next_page_token = None
+        if len(paged_entities) == max_results and len(sorted_entities) > 
max_results:
+            next_page_token = self._get_paged_key(paged_entities[-1])
+
+        return PagedList(elements=paged_entities, 
next_page_token=next_page_token)
+
+    def _get_paged_key(self, entity: Any) -> str:
+        """Get paging key for entity"""
+        if isinstance(entity, str):
+            return entity
+        elif hasattr(entity, 'get_name'):
+            return entity.get_name()
+        elif hasattr(entity, 'get_full_name'):
+            return entity.get_full_name()
+        elif hasattr(entity, 'name'):
+            return entity.name
+        else:
+            return str(entity)
+
+    def _match_name_pattern(self, name: str, pattern: str) -> bool:
+        """Match name against SQL pattern"""
+        if not pattern:
+            raise ValueError("Pattern cannot be empty")
+        regex_pattern = self._sql_pattern_to_regex(pattern)
+        return re.match(regex_pattern, name) is not None
+
+    def _sql_pattern_to_regex(self, pattern: str) -> str:
+        """Convert SQL pattern to regex"""
+        regex = []
+        escaped = False
+
+        for char in pattern:
+            if escaped:
+                regex.append(re.escape(char))
+                escaped = False
+            elif char == '\\':
+                escaped = True
+            elif char == '%':
+                regex.append('.*')
+            elif char == '_':
+                regex.append('.')
+            else:
+                regex.append(re.escape(char))
+
+        return '^' + ''.join(regex) + '$'
+
+    def _create_table_metadata(self, identifier: Identifier, schema_id: int,
+                               schema: Schema, uuid_str: str, is_external: 
bool) -> TableMetadata:
+        """Create table metadata"""
+        options = schema.options.copy()
+        path = f"/tmp/{identifier.get_full_name()}"
+        options[PATH] = path
+
+        table_schema = TableSchema(
+            id=schema_id,
+            fields=schema.fields,
+            highest_field_id=len(schema.fields) - 1,
+            partition_keys=schema.partition_keys,
+            primary_keys=schema.primary_keys,
+            options=options,
+            comment=schema.comment
+        )
+
+        return TableMetadata(
+            schema=table_schema,
+            is_external=is_external,
+            uuid=uuid_str or str(uuid.uuid4())
+        )
+
+    # List methods
+    def _list_tables(self, database_name: str, parameters: Dict[str, str]) -> 
List[str]:
+        """List tables in database"""
+        table_name_pattern = parameters.get(TABLE_NAME_PATTERN)
+        tables = []
+
+        for full_name, metadata in self.table_metadata_store.items():
+            identifier = Identifier.from_string(full_name)
+            if (identifier.get_database_name() == database_name and
+                    (not table_name_pattern or 
self._match_name_pattern(identifier.get_table_name(),
+                                                                        
table_name_pattern))):
+                tables.append(identifier.get_table_name())
+
+        return tables
+
+    # Response generation methods
+    def _generate_final_list_databases_response(self, parameters: Dict[str, 
str],
+                                                databases: List[str]) -> 
Tuple[str, int]:
+        """Generate final list databases response"""
+        if databases:
+            max_results = self._get_max_results(parameters)
+            page_token = parameters.get(PAGE_TOKEN)
+            paged_dbs = self._build_paged_entities(databases, max_results, 
page_token)
+            response = ListDatabasesResponse(
+                databases=paged_dbs.elements,
+                next_page_token=paged_dbs.next_page_token
+            )
+        else:
+            response = ListDatabasesResponse(databases=[], 
next_page_token=None)
+
+        return self._mock_response(response, 200)
+
+    def _generate_final_list_tables_response(self, parameters: Dict[str, str],
+                                             tables: List[str]) -> Tuple[str, 
int]:
+        """Generate final list tables response"""
+        if tables:
+            max_results = self._get_max_results(parameters)
+            page_token = parameters.get(PAGE_TOKEN)
+            paged_tables = self._build_paged_entities(tables, max_results, 
page_token)
+            response = ListTablesResponse(
+                tables=paged_tables.elements,
+                next_page_token=paged_tables.next_page_token
+            )
+        else:
+            response = ListTablesResponse(tables=[], next_page_token=None)
+
+        return self._mock_response(response, 200)
+
+    def add_no_permission_database(self, database: str) -> None:
+        """Add no permission database"""
+        self.no_permission_databases.append(database)
+
+    def add_no_permission_table(self, identifier: Identifier) -> None:
+        """Add no permission table"""
+        self.no_permission_tables.append(identifier.get_full_name())
+
+    def mock_database(self, name: str, options: dict[str, str]) -> 
GetDatabaseResponse:
+        return GetDatabaseResponse(
+            id=str(uuid.uuid4()),
+            name=name,
+            location=f"{self.data_path}/{name}",
+            options=options,
+            owner="owner",
+            created_at=1,
+            created_by="created",
+            updated_at=1,
+            updated_by="updated"
+        )
+
+    def mock_table(self, identifier: Identifier, table_metadata: 
TableMetadata, path: str,
+                   schema: Schema) -> GetTableResponse:
+        return GetTableResponse(
+            id=str(table_metadata.uuid),
+            name=identifier.get_object_name(),
+            path=path,
+            is_external=table_metadata.is_external,
+            schema_id=table_metadata.schema.id,
+            schema=schema,
+            owner="owner",
+            created_at=1,
+            created_by="created",
+            updated_at=1,
+            updated_by="updated"
+        )
+
+
+class ApiTestCase(unittest.TestCase):
+
+    def test(self):
+        """Example usage of RESTCatalogServer"""
+        # Setup logging
+        logging.basicConfig(level=logging.INFO)
+
+        # Create config
+        config = ConfigResponse(defaults={"prefix": "mock-test"})
+
+        # Create mock auth provider
+        class MockAuthProvider:
+            def merge_auth_header(self, headers, auth_param):
+                return {AUTHORIZATION_HEADER_KEY: "Bearer test-token"}
+
+        # Create server
+        server = RESTCatalogServer(
+            data_path="/tmp/test_warehouse",
+            auth_provider=MockAuthProvider(),
+            config=config,
+            warehouse="test_warehouse"
+        )
+        try:
+            # Start server
+            server.start()
+            print(f"Server started at: {server.get_url()}")
+            test_databases = {
+                "default": server.mock_database("default", {"env": "test"}),
+                "test_db1": server.mock_database("test_db1", {"env": "test"}),
+                "test_db2": server.mock_database("test_db2", {"env": "test"}),
+                "prod_db": server.mock_database("prod_db", {"env": "prod"})
+            }
+            test_tables = {
+                "default.user": TableMetadata(uuid=str(uuid.uuid4()), 
is_external=True,
+                                              schema=TableSchema(1,
+                                                                 
[DataField("name", 0, "name", PaimonDataType('int'))],
+                                                                 1, [], [], 
{}, "")),
+            }
+            server.table_metadata_store.update(test_tables)
+            server.database_store.update(test_databases)
+            options = {
+                'uri': f"http://localhost:{server.port}";,
+                'warehouse': 'test_warehouse',
+                'dlf.region': 'cn-hangzhou',
+                "token.provider": "xxxx",
+                'dlf.access-key-id': 'xxxx',
+                'dlf.access-key-secret': 'xxxx'
+            }
+            api = RESTApi(options)
+            self.assertSetEqual(set(api.list_databases()), {*test_databases})
+            self.assertEqual(api.get_database('default'), 
test_databases.get('default'))
+            table = api.get_table(Identifier.from_string('default.user'))
+            self.assertEqual(table.id, str(test_tables['default.user'].uuid))
+
+        finally:
+            # Shutdown server
+            server.shutdown()
+            print("Server stopped")
+
+
+if __name__ == "__main__":
+    unittest.main()
diff --git a/python/api/api_response.py b/python/api/api_response.py
deleted file mode 100644
index 332741547e..0000000000
--- a/python/api/api_response.py
+++ /dev/null
@@ -1,301 +0,0 @@
-"""
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-"""
-
-from abc import ABC, abstractmethod
-from typing import Dict, Optional, Any, Generic, TypeVar, List
-from dataclasses import dataclass, field
-import json
-from datetime import datetime
-from api_resquest import Identifier
-
-T = TypeVar('T')
-
-
-@dataclass
-class PagedList(Generic[T]):
-    elements: List[T]
-    next_page_token: Optional[str] = None
-
-
-class RESTResponse(ABC):
-    pass
-
-
-@dataclass
-class ErrorResponse(RESTResponse):
-    resource_type: Optional[str] = None
-    resource_name: Optional[str] = None
-    message: Optional[str] = None
-    code: Optional[int] = None
-
-    @classmethod
-    def from_dict(cls, data: Dict[str, Any]) -> 'ErrorResponse':
-        return cls(
-            resource_type=data.get('resourceType'),
-            resource_name=data.get('resourceName'),
-            message=data.get('message'),
-            code=data.get('code'),
-        )
-
-    @classmethod
-    def from_json(cls, json_str: str) -> 'ErrorResponse':
-        data = json.loads(json_str)
-        return cls.from_dict(data)
-
-
-@dataclass
-class AuditRESTResponse(RESTResponse):
-    FIELD_OWNER = "owner"
-    FIELD_CREATED_AT = "createdAt"
-    FIELD_CREATED_BY = "createdBy"
-    FIELD_UPDATED_AT = "updatedAt"
-    FIELD_UPDATED_BY = "updatedBy"
-
-    owner: Optional[str] = None
-    created_at: Optional[int] = None
-    created_by: Optional[str] = None
-    updated_at: Optional[int] = None
-    updated_by: Optional[str] = None
-
-    def get_owner(self) -> Optional[str]:
-        return self.owner
-
-    def get_created_at(self) -> Optional[int]:
-        return self.created_at
-
-    def get_created_by(self) -> Optional[str]:
-        return self.created_by
-
-    def get_updated_at(self) -> Optional[int]:
-        return self.updated_at
-
-    def get_updated_by(self) -> Optional[str]:
-        return self.updated_by
-
-    def get_created_datetime(self) -> Optional[datetime]:
-        if self.created_at:
-            return datetime.fromtimestamp(self.created_at / 1000)
-        return None
-
-    def get_updated_datetime(self) -> Optional[datetime]:
-        if self.updated_at:
-            return datetime.fromtimestamp(self.updated_at / 1000)
-        return None
-
-
-class PagedResponse(RESTResponse, Generic[T]):
-
-    @abstractmethod
-    def data(self) -> List[T]:
-        pass
-
-    @abstractmethod
-    def get_next_page_token(self) -> str:
-        pass
-
-
-@dataclass
-class ListDatabasesResponse(PagedResponse[str]):
-    databases: List[str]
-    next_page_token: str
-
-    @classmethod
-    def from_dict(cls, data: Dict[str, Any]) -> 'ListDatabasesResponse':
-        return cls(
-            databases=data.get('databases'),
-            next_page_token=data.get('nextPageToken')
-        )
-
-    @classmethod
-    def from_json(cls, json_str: str) -> 'ListDatabasesResponse':
-        data = json.loads(json_str)
-        return cls.from_dict(data)
-
-    def data(self) -> List[str]:
-        return self.databases
-
-    def get_next_page_token(self) -> str:
-        return self.next_page_token
-
-
-@dataclass
-class ListTablesResponse(PagedResponse[str]):
-    tables: Optional[List[str]]
-    next_page_token: Optional[str]
-
-    @classmethod
-    def from_dict(cls, data: Dict[str, Any]) -> 'ListTablesResponse':
-        return cls(
-            tables=data.get('tables'),
-            next_page_token=data.get('nextPageToken')
-        )
-
-    @classmethod
-    def from_json(cls, json_str: str) -> 'ListTablesResponse':
-        data = json.loads(json_str)
-        return cls.from_dict(data)
-
-    def data(self) -> Optional[List[str]]:
-        return self.tables
-
-    def get_next_page_token(self) -> Optional[str]:
-        return self.next_page_token
-
-
-@dataclass
-class GetTableResponse(RESTResponse):
-    identifier: Identifier
-    schema: Any
-    properties: Dict[str, str]
-
-    @classmethod
-    def from_dict(cls, data: Dict[str, Any]) -> Dict[str, Any]:
-        return data
-
-    @classmethod
-    def from_json(cls, json_str: str) -> Dict[str, Any]:
-        data = json.loads(json_str)
-        return cls.from_dict(data)
-
-
-@dataclass
-class GetDatabaseResponse(AuditRESTResponse):
-    FIELD_ID = "id"
-    FIELD_NAME = "name"
-    FIELD_LOCATION = "location"
-    FIELD_OPTIONS = "options"
-
-    id: Optional[str] = None
-    name: Optional[str] = None
-    location: Optional[str] = None
-    options: Optional[Dict[str, str]] = field(default_factory=dict)
-
-    def __init__(self,
-                 id: Optional[str] = None,
-                 name: Optional[str] = None,
-                 location: Optional[str] = None,
-                 options: Optional[Dict[str, str]] = None,
-                 owner: Optional[str] = None,
-                 created_at: Optional[int] = None,
-                 created_by: Optional[str] = None,
-                 updated_at: Optional[int] = None,
-                 updated_by: Optional[str] = None):
-        super().__init__(owner, created_at, created_by, updated_at, updated_by)
-        self.id = id
-        self.name = name
-        self.location = location
-        self.options = options or {}
-
-    def get_id(self) -> Optional[str]:
-        return self.id
-
-    def get_name(self) -> Optional[str]:
-        return self.name
-
-    def get_location(self) -> Optional[str]:
-        return self.location
-
-    def get_options(self) -> Dict[str, str]:
-        return self.options or {}
-
-    def to_dict(self) -> Dict[str, Any]:
-        result = {
-            self.FIELD_ID: self.id,
-            self.FIELD_NAME: self.name,
-            self.FIELD_LOCATION: self.location,
-            self.FIELD_OPTIONS: self.options
-        }
-
-        if self.owner is not None:
-            result[self.FIELD_OWNER] = self.owner
-        if self.created_at is not None:
-            result[self.FIELD_CREATED_AT] = self.created_at
-        if self.created_by is not None:
-            result[self.FIELD_CREATED_BY] = self.created_by
-        if self.updated_at is not None:
-            result[self.FIELD_UPDATED_AT] = self.updated_at
-        if self.updated_by is not None:
-            result[self.FIELD_UPDATED_BY] = self.updated_by
-
-        return result
-
-    @classmethod
-    def from_dict(cls, data: Dict[str, Any]) -> 'GetDatabaseResponse':
-        return cls(
-            id=data.get(cls.FIELD_ID),
-            name=data.get(cls.FIELD_NAME),
-            location=data.get(cls.FIELD_LOCATION),
-            options=data.get(cls.FIELD_OPTIONS, {}),
-            owner=data.get(cls.FIELD_OWNER),
-            created_at=data.get(cls.FIELD_CREATED_AT),
-            created_by=data.get(cls.FIELD_CREATED_BY),
-            updated_at=data.get(cls.FIELD_UPDATED_AT),
-            updated_by=data.get(cls.FIELD_UPDATED_BY)
-        )
-
-    def to_json(self) -> str:
-        return json.dumps(self.to_dict(), ensure_ascii=False, indent=2)
-
-    @classmethod
-    def from_json(cls, json_str: str) -> 'GetDatabaseResponse':
-        data = json.loads(json_str)
-        return cls.from_dict(data)
-
-    def __str__(self) -> str:
-        return f"GetDatabaseResponse(id={self.id}, name={self.name}, 
location={self.location})"
-
-    def __repr__(self) -> str:
-        return (f"GetDatabaseResponse(id={self.id!r}, name={self.name!r}, "
-                f"location={self.location!r}, options={self.options!r}, "
-                f"owner={self.owner!r}, created_at={self.created_at}, "
-                f"created_by={self.created_by!r}, 
updated_at={self.updated_at}, "
-                f"updated_by={self.updated_by!r})")
-
-
-@dataclass
-class ConfigResponse(RESTResponse):
-    defaults: Dict[str, Any]
-
-    @classmethod
-    def from_dict(cls, data: Dict[str, Any]) -> 'ConfigResponse':
-        return cls(
-            defaults=data.get('defaults')
-        )
-
-    @classmethod
-    def from_json(cls, json_str: str) -> 'ConfigResponse':
-        data = json.loads(json_str)
-        return cls.from_dict(data)
-
-    def merge(self, options: Dict[str, Any]) -> Dict[str, Any]:
-        merged = options.copy()
-        merged.update(self.defaults)
-        return merged
-
-
-class JSONSerializableGetDatabaseResponse(GetDatabaseResponse):
-
-    def __init__(self, *args, **kwargs):
-        super().__init__(*args, **kwargs)
-
-    def __json__(self) -> Dict[str, Any]:
-        return self.to_dict()
-
-    @classmethod
-    def __from_json__(cls, data: Dict[str, Any]) -> 
'JSONSerializableGetDatabaseResponse':
-        return cls.from_dict(data)

Reply via email to