This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 568c008cdb [python] api: Support python RESTApi (#5873)
568c008cdb is described below
commit 568c008cdb2ad2b971710f3b1a7078607836ba44
Author: jerry <[email protected]>
AuthorDate: Fri Jul 11 15:20:23 2025 +0800
[python] api: Support python RESTApi (#5873)
---
.gitignore | 1 +
python/api/__init__.py | 264 ++++++++++++++++++++++++++++++++++++
python/api/api_response.py | 301 +++++++++++++++++++++++++++++++++++++++++
python/api/api_resquest.py | 43 ++++++
python/api/auth.py | 284 +++++++++++++++++++++++++++++++++++++++
python/api/client.py | 324 +++++++++++++++++++++++++++++++++++++++++++++
6 files changed, 1217 insertions(+)
diff --git a/.gitignore b/.gitignore
index 7b1aaba72b..8d59f6faa3 100644
--- a/.gitignore
+++ b/.gitignore
@@ -19,6 +19,7 @@ target
.java-version
dependency-reduced-pom.xml
metastore_db/
+python/.idea/
### VS Code ###
.vscode/
diff --git a/python/api/__init__.py b/python/api/__init__.py
new file mode 100644
index 0000000000..29c50f85f2
--- /dev/null
+++ b/python/api/__init__.py
@@ -0,0 +1,264 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import logging
+from typing import Dict, List, Optional, Callable
+from auth import RESTAuthFunction
+from api_response import PagedList, GetTableResponse, ListDatabasesResponse,
ListTablesResponse, GetDatabaseResponse, \
+ ConfigResponse, PagedResponse, T
+from api_resquest import Identifier, CreateDatabaseRequest
+
+
+class RESTCatalogOptions:
+
+ URI = "uri"
+ WAREHOUSE = "warehouse"
+ TOKEN_PROVIDER = "token.provider"
+ DLF_REGION = "dlf.region"
+ DLF_ACCESS_KEY_ID = "dlf.access-key-id"
+ DLF_ACCESS_KEY_SECRET = "dlf.access-key-secret"
+ PREFIX = 'prefix'
+
+
+class RESTException(Exception):
+ pass
+
+
+class AlreadyExistsException(RESTException):
+ pass
+
+
+class ForbiddenException(RESTException):
+ pass
+
+
+class NoSuchResourceException(RESTException):
+ pass
+
+
+class RESTUtil:
+ @staticmethod
+ def encode_string(value: str) -> str:
+ import urllib.parse
+ return urllib.parse.quote(value)
+
+ @staticmethod
+ def extract_prefix_map(options: Dict[str, str], prefix: str) -> Dict[str,
str]:
+ result = {}
+ config = options
+ for key, value in config.items():
+ if key.startswith(prefix):
+ new_key = key[len(prefix):]
+ result[new_key] = str(value)
+ return result
+
+
+class ResourcePaths:
+
+ V1 = "v1"
+ DATABASES = "databases"
+ TABLES = "tables"
+ TABLE_DETAILS = "table-details"
+
+ def __init__(self, base_path: str = ""):
+ self.base_path = base_path.rstrip('/')
+
+ @classmethod
+ def for_catalog_properties(cls, options: dict[str, str]) ->
'ResourcePaths':
+ prefix = options.get(RESTCatalogOptions.PREFIX, "")
+ return cls(f"/{cls.V1}/{prefix}" if prefix else f"/{cls.V1}")
+
+ def config(self) -> str:
+ return f"/{self.V1}/config"
+
+ def databases(self) -> str:
+ return f"{self.base_path}/{self.DATABASES}"
+
+ def database(self, name: str) -> str:
+ return f"{self.base_path}/{self.DATABASES}/{name}"
+
+ def tables(self, database_name: Optional[str] = None) -> str:
+ if database_name:
+ return
f"{self.base_path}/{self.DATABASES}/{database_name}/{self.TABLES}"
+ return f"{self.base_path}/{self.TABLES}"
+
+ def table(self, database_name: str, table_name: str) -> str:
+ return
f"{self.base_path}/{self.DATABASES}/{database_name}/{self.TABLES}/{table_name}"
+
+ def table_details(self, database_name: str) -> str:
+ return
f"{self.base_path}/{self.DATABASES}/{database_name}/{self.TABLE_DETAILS}"
+
+
+class RESTApi:
+
+ HEADER_PREFIX = "header."
+ MAX_RESULTS = "maxResults"
+ PAGE_TOKEN = "pageToken"
+ DATABASE_NAME_PATTERN = "databaseNamePattern"
+ TABLE_NAME_PATTERN = "tableNamePattern"
+
+ def __init__(self, options: Dict[str, str], config_required: bool = True):
+ self.logger = logging.getLogger(self.__class__.__name__)
+
+ from client import HttpClient
+ from auth import DLFAuthProvider, DLFToken
+
+ self.client = HttpClient(options.get(RESTCatalogOptions.URI))
+ auth_provider = DLFAuthProvider(
+ DLFToken(options), options.get(RESTCatalogOptions.DLF_REGION)
+ )
+ base_headers = RESTUtil.extract_prefix_map(options, self.HEADER_PREFIX)
+
+ if config_required:
+ warehouse = options.get(RESTCatalogOptions.WAREHOUSE)
+ query_params = {}
+ if warehouse:
+ query_params[RESTCatalogOptions.WAREHOUSE] =
RESTUtil.encode_string(warehouse)
+
+ config_response = self.client.get_with_params(
+ ResourcePaths().config(),
+ query_params,
+ ConfigResponse,
+ RESTAuthFunction({}, auth_provider)
+ )
+ options = config_response.merge(options)
+ base_headers.update(RESTUtil.extract_prefix_map(options,
self.HEADER_PREFIX))
+
+ self.rest_auth_function = RESTAuthFunction(base_headers, auth_provider)
+ self.options = options
+ self.resource_paths = ResourcePaths.for_catalog_properties(options)
+
+ def __build_paged_query_params(max_results: Optional[int],
+ page_token: Optional[str],
+ name_patterns: Dict[str, str]) -> Dict[str,
str]:
+ query_params = {}
+ if max_results is not None and max_results > 0:
+ query_params[RESTApi.MAX_RESULTS] = str(max_results)
+
+ if page_token is not None and page_token.strip():
+ query_params[RESTApi.PAGE_TOKEN] = page_token
+
+ for (key, value) in name_patterns:
+ if key and value and key.strip() and value.strip():
+ query_params[key] = value
+
+ return query_params
+
+ def __list_data_from_page_api(self, page_api: Callable[[Dict[str, str]],
PagedResponse[T]]) -> List[T]:
+ results = []
+ query_params = {}
+ page_token = None
+
+ while True:
+ if page_token:
+ query_params[RESTApi.PAGE_TOKEN] = page_token
+ elif RESTApi.PAGE_TOKEN in query_params:
+ del query_params[RESTApi.PAGE_TOKEN]
+
+ response = page_api(query_params)
+
+ if response.data:
+ results.extend(response.data())
+
+ page_token = response.next_page_token
+
+ if not page_token or not response.data:
+ break
+
+ return results
+
+ def get_options(self) -> dict[str, str]:
+ return self.options
+
+ def list_databases(self) -> List[str]:
+ return self.__list_data_from_page_api(
+ lambda query_params: self.client.get_with_params(
+ self.resource_paths.databases(),
+ query_params,
+ ListDatabasesResponse,
+ self.rest_auth_function
+ )
+ )
+
+ def list_databases_paged(self,
+ max_results: Optional[int] = None,
+ page_token: Optional[str] = None,
+ database_name_pattern: Optional[str] = None) ->
PagedList[str]:
+
+ response = self.client.get_with_params(
+ self.resource_paths.databases(),
+ self.__build_paged_query_params(
+ max_results,
+ page_token,
+ {self.DATABASE_NAME_PATTERN: database_name_pattern}
+ ),
+ ListDatabasesResponse,
+ self.rest_auth_function
+ )
+
+ databases = response.data() or []
+ return PagedList(databases, response.get_next_page_token())
+
+ def create_database(self, name: str, properties: Dict[str, str]) -> None:
+ request = CreateDatabaseRequest(name, properties)
+ self.client.post(self.resource_paths.databases(), request,
self.rest_auth_function)
+
+ def get_database(self, name: str) -> GetDatabaseResponse:
+ return self.client.get(
+ self.resource_paths.database(name),
+ GetDatabaseResponse,
+ self.rest_auth_function
+ )
+
+ def drop_database(self, name: str) -> None:
+ self.client.delete(self.resource_paths.database(name),
self.rest_auth_function)
+
+ def list_tables(self, database_name: str) -> List[str]:
+ return self.__list_data_from_page_api(
+ lambda query_params: self.client.get_with_params(
+ self.resource_paths.tables(database_name),
+ query_params,
+ ListTablesResponse,
+ self.rest_auth_function
+ )
+ )
+
+ def list_tables_paged(self,
+ database_name: str,
+ max_results: Optional[int] = None,
+ page_token: Optional[str] = None,
+ table_name_pattern: Optional[str] = None) ->
PagedList[str]:
+ response = self.client.get_with_params(
+ self.resource_paths.tables(database_name),
+ self.__build_paged_query_params(
+ max_results,
+ page_token,
+ {self.TABLE_NAME_PATTERN: table_name_pattern}
+ ),
+ ListTablesResponse,
+ self.rest_auth_function
+ )
+
+ tables = response.data() or []
+ return PagedList(tables, response.get_next_page_token())
+
+ def get_table(self, identifier: Identifier) -> GetTableResponse:
+ return self.client.get(
+ self.resource_paths.table(identifier.database_name,
identifier.object_name),
+ GetTableResponse,
+ self.rest_auth_function
+ )
diff --git a/python/api/api_response.py b/python/api/api_response.py
new file mode 100644
index 0000000000..332741547e
--- /dev/null
+++ b/python/api/api_response.py
@@ -0,0 +1,301 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+from abc import ABC, abstractmethod
+from typing import Dict, Optional, Any, Generic, TypeVar, List
+from dataclasses import dataclass, field
+import json
+from datetime import datetime
+from api_resquest import Identifier
+
+T = TypeVar('T')
+
+
+@dataclass
+class PagedList(Generic[T]):
+ elements: List[T]
+ next_page_token: Optional[str] = None
+
+
+class RESTResponse(ABC):
+ pass
+
+
+@dataclass
+class ErrorResponse(RESTResponse):
+ resource_type: Optional[str] = None
+ resource_name: Optional[str] = None
+ message: Optional[str] = None
+ code: Optional[int] = None
+
+ @classmethod
+ def from_dict(cls, data: Dict[str, Any]) -> 'ErrorResponse':
+ return cls(
+ resource_type=data.get('resourceType'),
+ resource_name=data.get('resourceName'),
+ message=data.get('message'),
+ code=data.get('code'),
+ )
+
+ @classmethod
+ def from_json(cls, json_str: str) -> 'ErrorResponse':
+ data = json.loads(json_str)
+ return cls.from_dict(data)
+
+
+@dataclass
+class AuditRESTResponse(RESTResponse):
+ FIELD_OWNER = "owner"
+ FIELD_CREATED_AT = "createdAt"
+ FIELD_CREATED_BY = "createdBy"
+ FIELD_UPDATED_AT = "updatedAt"
+ FIELD_UPDATED_BY = "updatedBy"
+
+ owner: Optional[str] = None
+ created_at: Optional[int] = None
+ created_by: Optional[str] = None
+ updated_at: Optional[int] = None
+ updated_by: Optional[str] = None
+
+ def get_owner(self) -> Optional[str]:
+ return self.owner
+
+ def get_created_at(self) -> Optional[int]:
+ return self.created_at
+
+ def get_created_by(self) -> Optional[str]:
+ return self.created_by
+
+ def get_updated_at(self) -> Optional[int]:
+ return self.updated_at
+
+ def get_updated_by(self) -> Optional[str]:
+ return self.updated_by
+
+ def get_created_datetime(self) -> Optional[datetime]:
+ if self.created_at:
+ return datetime.fromtimestamp(self.created_at / 1000)
+ return None
+
+ def get_updated_datetime(self) -> Optional[datetime]:
+ if self.updated_at:
+ return datetime.fromtimestamp(self.updated_at / 1000)
+ return None
+
+
+class PagedResponse(RESTResponse, Generic[T]):
+
+ @abstractmethod
+ def data(self) -> List[T]:
+ pass
+
+ @abstractmethod
+ def get_next_page_token(self) -> str:
+ pass
+
+
+@dataclass
+class ListDatabasesResponse(PagedResponse[str]):
+ databases: List[str]
+ next_page_token: str
+
+ @classmethod
+ def from_dict(cls, data: Dict[str, Any]) -> 'ListDatabasesResponse':
+ return cls(
+ databases=data.get('databases'),
+ next_page_token=data.get('nextPageToken')
+ )
+
+ @classmethod
+ def from_json(cls, json_str: str) -> 'ListDatabasesResponse':
+ data = json.loads(json_str)
+ return cls.from_dict(data)
+
+ def data(self) -> List[str]:
+ return self.databases
+
+ def get_next_page_token(self) -> str:
+ return self.next_page_token
+
+
+@dataclass
+class ListTablesResponse(PagedResponse[str]):
+ tables: Optional[List[str]]
+ next_page_token: Optional[str]
+
+ @classmethod
+ def from_dict(cls, data: Dict[str, Any]) -> 'ListTablesResponse':
+ return cls(
+ tables=data.get('tables'),
+ next_page_token=data.get('nextPageToken')
+ )
+
+ @classmethod
+ def from_json(cls, json_str: str) -> 'ListTablesResponse':
+ data = json.loads(json_str)
+ return cls.from_dict(data)
+
+ def data(self) -> Optional[List[str]]:
+ return self.tables
+
+ def get_next_page_token(self) -> Optional[str]:
+ return self.next_page_token
+
+
+@dataclass
+class GetTableResponse(RESTResponse):
+ identifier: Identifier
+ schema: Any
+ properties: Dict[str, str]
+
+ @classmethod
+ def from_dict(cls, data: Dict[str, Any]) -> Dict[str, Any]:
+ return data
+
+ @classmethod
+ def from_json(cls, json_str: str) -> Dict[str, Any]:
+ data = json.loads(json_str)
+ return cls.from_dict(data)
+
+
+@dataclass
+class GetDatabaseResponse(AuditRESTResponse):
+ FIELD_ID = "id"
+ FIELD_NAME = "name"
+ FIELD_LOCATION = "location"
+ FIELD_OPTIONS = "options"
+
+ id: Optional[str] = None
+ name: Optional[str] = None
+ location: Optional[str] = None
+ options: Optional[Dict[str, str]] = field(default_factory=dict)
+
+ def __init__(self,
+ id: Optional[str] = None,
+ name: Optional[str] = None,
+ location: Optional[str] = None,
+ options: Optional[Dict[str, str]] = None,
+ owner: Optional[str] = None,
+ created_at: Optional[int] = None,
+ created_by: Optional[str] = None,
+ updated_at: Optional[int] = None,
+ updated_by: Optional[str] = None):
+ super().__init__(owner, created_at, created_by, updated_at, updated_by)
+ self.id = id
+ self.name = name
+ self.location = location
+ self.options = options or {}
+
+ def get_id(self) -> Optional[str]:
+ return self.id
+
+ def get_name(self) -> Optional[str]:
+ return self.name
+
+ def get_location(self) -> Optional[str]:
+ return self.location
+
+ def get_options(self) -> Dict[str, str]:
+ return self.options or {}
+
+ def to_dict(self) -> Dict[str, Any]:
+ result = {
+ self.FIELD_ID: self.id,
+ self.FIELD_NAME: self.name,
+ self.FIELD_LOCATION: self.location,
+ self.FIELD_OPTIONS: self.options
+ }
+
+ if self.owner is not None:
+ result[self.FIELD_OWNER] = self.owner
+ if self.created_at is not None:
+ result[self.FIELD_CREATED_AT] = self.created_at
+ if self.created_by is not None:
+ result[self.FIELD_CREATED_BY] = self.created_by
+ if self.updated_at is not None:
+ result[self.FIELD_UPDATED_AT] = self.updated_at
+ if self.updated_by is not None:
+ result[self.FIELD_UPDATED_BY] = self.updated_by
+
+ return result
+
+ @classmethod
+ def from_dict(cls, data: Dict[str, Any]) -> 'GetDatabaseResponse':
+ return cls(
+ id=data.get(cls.FIELD_ID),
+ name=data.get(cls.FIELD_NAME),
+ location=data.get(cls.FIELD_LOCATION),
+ options=data.get(cls.FIELD_OPTIONS, {}),
+ owner=data.get(cls.FIELD_OWNER),
+ created_at=data.get(cls.FIELD_CREATED_AT),
+ created_by=data.get(cls.FIELD_CREATED_BY),
+ updated_at=data.get(cls.FIELD_UPDATED_AT),
+ updated_by=data.get(cls.FIELD_UPDATED_BY)
+ )
+
+ def to_json(self) -> str:
+ return json.dumps(self.to_dict(), ensure_ascii=False, indent=2)
+
+ @classmethod
+ def from_json(cls, json_str: str) -> 'GetDatabaseResponse':
+ data = json.loads(json_str)
+ return cls.from_dict(data)
+
+ def __str__(self) -> str:
+ return f"GetDatabaseResponse(id={self.id}, name={self.name},
location={self.location})"
+
+ def __repr__(self) -> str:
+ return (f"GetDatabaseResponse(id={self.id!r}, name={self.name!r}, "
+ f"location={self.location!r}, options={self.options!r}, "
+ f"owner={self.owner!r}, created_at={self.created_at}, "
+ f"created_by={self.created_by!r},
updated_at={self.updated_at}, "
+ f"updated_by={self.updated_by!r})")
+
+
+@dataclass
+class ConfigResponse(RESTResponse):
+ defaults: Dict[str, Any]
+
+ @classmethod
+ def from_dict(cls, data: Dict[str, Any]) -> 'ConfigResponse':
+ return cls(
+ defaults=data.get('defaults')
+ )
+
+ @classmethod
+ def from_json(cls, json_str: str) -> 'ConfigResponse':
+ data = json.loads(json_str)
+ return cls.from_dict(data)
+
+ def merge(self, options: Dict[str, Any]) -> Dict[str, Any]:
+ merged = options.copy()
+ merged.update(self.defaults)
+ return merged
+
+
+class JSONSerializableGetDatabaseResponse(GetDatabaseResponse):
+
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+
+ def __json__(self) -> Dict[str, Any]:
+ return self.to_dict()
+
+ @classmethod
+ def __from_json__(cls, data: Dict[str, Any]) ->
'JSONSerializableGetDatabaseResponse':
+ return cls.from_dict(data)
diff --git a/python/api/api_resquest.py b/python/api/api_resquest.py
new file mode 100644
index 0000000000..d7fd882447
--- /dev/null
+++ b/python/api/api_resquest.py
@@ -0,0 +1,43 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+from abc import ABC
+from dataclasses import dataclass
+from typing import Dict, List, Any, Optional
+
+
+class RESTRequest(ABC):
+ pass
+
+
+@dataclass
+class Identifier:
+ database_name: str
+ object_name: str
+
+
+@dataclass
+class CreateDatabaseRequest(RESTRequest):
+ name: str
+ properties: Dict[str, str]
+
+
+@dataclass
+class AlterDatabaseRequest(RESTRequest):
+ removals: List[str]
+ updates: Dict[str, str]
diff --git a/python/api/auth.py b/python/api/auth.py
new file mode 100644
index 0000000000..9e88651694
--- /dev/null
+++ b/python/api/auth.py
@@ -0,0 +1,284 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import base64
+import hashlib
+import hmac
+import logging
+import threading
+from abc import ABC, abstractmethod
+from collections import OrderedDict
+from dataclasses import dataclass
+from datetime import datetime, timezone
+from typing import Optional, Dict
+
+
+@dataclass
+class RESTAuthParameter:
+ method: str
+ path: str
+ data: str
+ parameters: Dict[str, str]
+
+
+@dataclass
+class DLFToken:
+ access_key_id: str
+ access_key_secret: str
+ security_token: Optional[str] = None
+
+ def __init__(self, options: Dict[str, str]):
+ from api import RESTCatalogOptions
+ self.access_key_id = options.get(RESTCatalogOptions.DLF_ACCESS_KEY_ID)
+ self.access_key_secret =
options.get(RESTCatalogOptions.DLF_ACCESS_KEY_SECRET)
+
+
+class AuthProvider(ABC):
+
+ @abstractmethod
+ def merge_auth_header(self, base_header: Dict[str, str], parammeter:
RESTAuthParameter) -> Dict[str, str]:
+ """Merge authorization header into header."""
+
+
+class RESTAuthFunction:
+
+ def __init__(self, init_header: Dict[str, str], auth_provider:
AuthProvider):
+ self.init_header = init_header.copy() if init_header else {}
+ self.auth_provider = auth_provider
+
+ def __call__(self, rest_auth_parameter: RESTAuthParameter) -> Dict[str,
str]:
+ return self.auth_provider.merge_auth_header(self.init_header,
rest_auth_parameter)
+
+ def apply(self, rest_auth_parameter: RESTAuthParameter) -> Dict[str, str]:
+ return self.__call__(rest_auth_parameter)
+
+
+class DLFAuthProvider(AuthProvider):
+ DLF_AUTHORIZATION_HEADER_KEY = "Authorization"
+ DLF_CONTENT_MD5_HEADER_KEY = "Content-MD5"
+ DLF_CONTENT_TYPE_KEY = "Content-Type"
+ DLF_DATE_HEADER_KEY = "x-dlf-date"
+ DLF_SECURITY_TOKEN_HEADER_KEY = "x-dlf-security-token"
+ DLF_AUTH_VERSION_HEADER_KEY = "x-dlf-version"
+ DLF_CONTENT_SHA56_HEADER_KEY = "x-dlf-content-sha256"
+ DLF_CONTENT_SHA56_VALUE = "UNSIGNED-PAYLOAD"
+
+ AUTH_DATE_TIME_FORMAT = "%Y%m%dT%H%M%SZ"
+ MEDIA_TYPE = "application/json"
+
+ def __init__(self,
+ token: DLFToken,
+ region: str):
+ self.logger = logging.getLogger(self.__class__.__name__)
+ self.token = token
+ self.region = region
+ self._lock = threading.Lock()
+
+ def merge_auth_header(self,
+ base_header: Dict[str, str],
+ rest_auth_parameter: RESTAuthParameter) -> Dict[str,
str]:
+ try:
+ date_time = base_header.get(
+ self.DLF_DATE_HEADER_KEY.lower(),
+ datetime.now(timezone.utc).strftime(self.AUTH_DATE_TIME_FORMAT)
+ )
+ date = date_time[:8]
+
+ sign_headers = self.generate_sign_headers(
+ rest_auth_parameter.data,
+ date_time,
+ self.token.security_token
+ )
+
+ authorization = DLFAuthSignature.get_authorization(
+ rest_auth_parameter=rest_auth_parameter,
+ dlf_token=self.token,
+ region=self.region,
+ headers=sign_headers,
+ date_time=date_time,
+ date=date
+ )
+
+ headers_with_auth = base_header.copy()
+ headers_with_auth.update(sign_headers)
+ headers_with_auth[self.DLF_AUTHORIZATION_HEADER_KEY] =
authorization
+
+ return headers_with_auth
+
+ except Exception as e:
+ raise RuntimeError(f"Failed to merge auth header: {e}")
+
+ @classmethod
+ def generate_sign_headers(cls,
+ data: Optional[str],
+ date_time: str,
+ security_token: Optional[str]) -> Dict[str, str]:
+ sign_headers = {}
+
+ sign_headers[cls.DLF_DATE_HEADER_KEY] = date_time
+ sign_headers[cls.DLF_CONTENT_SHA56_HEADER_KEY] =
cls.DLF_CONTENT_SHA56_VALUE
+ sign_headers[cls.DLF_AUTH_VERSION_HEADER_KEY] = "v1" #
DLFAuthSignature.VERSION
+
+ if data is not None and data != "":
+ sign_headers[cls.DLF_CONTENT_TYPE_KEY] = cls.MEDIA_TYPE
+ sign_headers[cls.DLF_CONTENT_MD5_HEADER_KEY] =
DLFAuthSignature.md5(data)
+
+ if security_token is not None:
+ sign_headers[cls.DLF_SECURITY_TOKEN_HEADER_KEY] = security_token
+ return sign_headers
+
+
+class DLFAuthSignature:
+ VERSION = "v1"
+ SIGNATURE_ALGORITHM = "DLF4-HMAC-SHA256"
+ PRODUCT = "DlfNext"
+ HMAC_SHA256 = "sha256"
+ REQUEST_TYPE = "aliyun_v4_request"
+ SIGNATURE_KEY = "Signature"
+ NEW_LINE = "\n"
+ SIGNED_HEADERS = [
+ DLFAuthProvider.DLF_CONTENT_MD5_HEADER_KEY.lower(),
+ DLFAuthProvider.DLF_CONTENT_TYPE_KEY.lower(),
+ DLFAuthProvider.DLF_CONTENT_SHA56_HEADER_KEY.lower(),
+ DLFAuthProvider.DLF_DATE_HEADER_KEY.lower(),
+ DLFAuthProvider.DLF_AUTH_VERSION_HEADER_KEY.lower(),
+ DLFAuthProvider.DLF_SECURITY_TOKEN_HEADER_KEY.lower()
+ ]
+
+ @classmethod
+ def get_authorization(cls,
+ rest_auth_parameter: RESTAuthParameter,
+ dlf_token: DLFToken,
+ region: str,
+ headers: Dict[str, str],
+ date_time: str,
+ date: str) -> str:
+ try:
+ canonical_request = cls.get_canonical_request(rest_auth_parameter,
headers)
+
+ string_to_sign = cls.NEW_LINE.join([
+ cls.SIGNATURE_ALGORITHM,
+ date_time,
+ f"{date}/{region}/{cls.PRODUCT}/{cls.REQUEST_TYPE}",
+ cls._sha256_hex(canonical_request)
+ ])
+
+ date_key =
cls._hmac_sha256(f"aliyun_v4{dlf_token.access_key_secret}".encode('utf-8'),
date)
+ date_region_key = cls._hmac_sha256(date_key, region)
+ date_region_service_key = cls._hmac_sha256(date_region_key,
cls.PRODUCT)
+ signing_key = cls._hmac_sha256(date_region_service_key,
cls.REQUEST_TYPE)
+
+ result = cls._hmac_sha256(signing_key, string_to_sign)
+ signature = cls._hex_encode(result)
+
+ credential = f"{cls.SIGNATURE_ALGORITHM}
Credential={dlf_token.access_key_id}/{date}/{region}/{cls.PRODUCT}/{cls.REQUEST_TYPE}"
+ signature_part = f"{cls.SIGNATURE_KEY}={signature}"
+
+ return f"{credential},{signature_part}"
+
+ except Exception as e:
+ raise RuntimeError(f"Failed to generate authorization: {e}")
+
+ @classmethod
+ def md5(cls, raw: str) -> str:
+ try:
+ md5_hash = hashlib.md5(raw.encode('utf-8')).digest()
+ return base64.b64encode(md5_hash).decode('utf-8')
+ except Exception as e:
+ raise RuntimeError(f"Failed to calculate MD5: {e}")
+
+ @classmethod
+ def _hmac_sha256(cls, key: bytes, data: str) -> bytes:
+ try:
+ return hmac.new(key, data.encode('utf-8'), hashlib.sha256).digest()
+ except Exception as e:
+ raise RuntimeError(f"Failed to calculate HMAC-SHA256: {e}")
+
+ @classmethod
+ def get_canonical_request(cls,
+ rest_auth_parameter: RESTAuthParameter,
+ headers: Dict[str, str]) -> str:
+ canonical_request = cls.NEW_LINE.join([
+ rest_auth_parameter.method,
+ rest_auth_parameter.path
+ ])
+
+ canonical_query_string =
cls._build_canonical_query_string(rest_auth_parameter.parameters)
+ canonical_request = cls.NEW_LINE.join([canonical_request,
canonical_query_string])
+
+ sorted_signed_headers_map =
cls._build_sorted_signed_headers_map(headers)
+ for key, value in sorted_signed_headers_map.items():
+ canonical_request = cls.NEW_LINE.join([
+ canonical_request,
+ f"{key}:{value}"
+ ])
+
+ content_sha256 = headers.get(
+ DLFAuthProvider.DLF_CONTENT_SHA56_HEADER_KEY,
+ DLFAuthProvider.DLF_CONTENT_SHA56_VALUE
+ )
+
+ return cls.NEW_LINE.join([canonical_request, content_sha256])
+
+ @classmethod
+ def _build_canonical_query_string(cls, parameters: Optional[Dict[str,
str]]) -> str:
+ if not parameters:
+ return ""
+
+ sorted_params = OrderedDict(sorted(parameters.items()))
+
+ query_parts = []
+ for key, value in sorted_params.items():
+ key = cls._trim(key)
+ if value is not None and value != "":
+ value = cls._trim(value)
+ query_parts.append(f"{key}={value}")
+ else:
+ query_parts.append(key)
+
+ return "&".join(query_parts)
+
+ @classmethod
+ def _build_sorted_signed_headers_map(cls, headers: Optional[Dict[str,
str]]) -> OrderedDict:
+ sorted_headers = OrderedDict()
+
+ if headers:
+ for key, value in headers.items():
+ lower_key = key.lower()
+ if lower_key in cls.SIGNED_HEADERS:
+ sorted_headers[lower_key] = cls._trim(value)
+
+ return OrderedDict(sorted(sorted_headers.items()))
+
+ @classmethod
+ def _sha256_hex(cls, raw: str) -> str:
+ try:
+ sha256_hash = hashlib.sha256(raw.encode('utf-8')).digest()
+ return cls._hex_encode(sha256_hash)
+ except Exception as e:
+ raise RuntimeError(f"Failed to calculate SHA256: {e}")
+
+ @classmethod
+ def _hex_encode(cls, raw: bytes) -> str:
+ if raw is None:
+ return None
+
+ return raw.hex()
+
+ @classmethod
+ def _trim(cls, value: str) -> str:
+ return value.strip() if value else ""
diff --git a/python/api/client.py b/python/api/client.py
new file mode 100644
index 0000000000..409b4d2c28
--- /dev/null
+++ b/python/api/client.py
@@ -0,0 +1,324 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+import json
+import logging
+import urllib.parse
+from abc import ABC, abstractmethod
+from typing import Dict, Optional, Type, TypeVar, Callable
+
+import requests
+from requests.adapters import HTTPAdapter
+from urllib3 import Retry
+
+from auth import RESTAuthParameter
+from api import RESTApi
+from api_response import ErrorResponse
+
+T = TypeVar('T', bound='RESTResponse')
+
+
+class RESTRequest(ABC):
+ pass
+
+
+class RESTException(Exception):
+
+ def __init__(self, message: str, cause: Optional[Exception] = None):
+ super().__init__(message)
+ self.cause = cause
+
+
+class ErrorHandler(ABC):
+
+ @abstractmethod
+ def accept(self, error: ErrorResponse, request_id: str) -> None:
+ pass
+
+
+class DefaultErrorHandler(ErrorHandler):
+
+ _instance = None
+
+ @classmethod
+ def get_instance(cls) -> 'DefaultErrorHandler':
+ if cls._instance is None:
+ cls._instance = cls()
+ return cls._instance
+
+ def accept(self, error: ErrorResponse, request_id: str) -> None:
+ message = f"REST API error (request_id: {request_id}): {error.message}"
+ if error.resource_name:
+ message += f" (resource: {error.resource_name})"
+ if error.resource_type:
+ message += f" (resource_type: {error.resource_type})"
+
+ raise RESTException(message)
+
+
+class ExponentialRetryInterceptor:
+
+ def __init__(self, max_retries: int = 5):
+ self.max_retries = max_retries
+ self.logger = logging.getLogger(self.__class__.__name__)
+
+ def create_retry_strategy(self) -> Retry:
+ return Retry(
+ total=self.max_retries,
+ status_forcelist=[429, 500, 502, 503, 504],
+ method_whitelist=["HEAD", "GET", "PUT", "DELETE", "OPTIONS",
"TRACE", "POST"],
+ backoff_factor=1,
+ raise_on_status=False
+ )
+
+
+class LoggingInterceptor:
+
+ REQUEST_ID_KEY = "x-request-id"
+ DEFAULT_REQUEST_ID = "unknown"
+
+ def __init__(self):
+ self.logger = logging.getLogger(self.__class__.__name__)
+
+ def log_request(self, method: str, url: str, headers: Dict[str, str]) ->
None:
+ request_id = headers.get(self.REQUEST_ID_KEY, self.DEFAULT_REQUEST_ID)
+ self.logger.debug(f"Request [{request_id}]: {method} {url}")
+
+ def log_response(self, status_code: int, headers: Dict[str, str]) -> None:
+ request_id = headers.get(self.REQUEST_ID_KEY, self.DEFAULT_REQUEST_ID)
+ self.logger.debug(f"Response [{request_id}]: {status_code}")
+
+
+class RESTClient(ABC):
+
+ @abstractmethod
+ def get(self, path: str, response_type: Type[T],
+ rest_auth_function: Callable[[RESTAuthParameter], Dict[str, str]])
-> T:
+ pass
+
+ @abstractmethod
+ def get_with_params(self, path: str, query_params: Dict[str, str],
+ response_type: Type[T],
+ rest_auth_function: Callable[[RESTAuthParameter],
Dict[str, str]]) -> T:
+ pass
+
+ @abstractmethod
+ def post(self, path: str, body: RESTRequest,
+ rest_auth_function: Callable[[RESTAuthParameter], Dict[str,
str]]) -> T:
+ pass
+
+ @abstractmethod
+ def post_with_response_type(self, path: str, body: RESTRequest,
response_type: Type[T],
+ rest_auth_function:
Callable[[RESTAuthParameter], Dict[str, str]]) -> T:
+ pass
+
+ @abstractmethod
+ def delete(self, path: str,
+ rest_auth_function: Callable[[RESTAuthParameter], Dict[str,
str]]) -> T:
+ pass
+
+ @abstractmethod
+ def delete_with_body(self, path: str, body: RESTRequest,
+ rest_auth_function: Callable[[RESTAuthParameter],
Dict[str, str]]) -> T:
+ pass
+
+
+def _normalize_uri(uri: str) -> str:
+ if not uri or uri.strip() == "":
+ raise ValueError("uri is empty which must be defined.")
+
+ server_uri = uri.strip()
+
+ if server_uri.endswith("/"):
+ server_uri = server_uri[:-1]
+
+ if not server_uri.startswith("http://") and not
server_uri.startswith("https://"):
+ server_uri = f"http://{server_uri}"
+
+ return server_uri
+
+
+def _parse_error_response(response_body: Optional[str], status_code: int) ->
ErrorResponse:
+ if response_body:
+ try:
+ return ErrorResponse.from_json(response_body)
+ except Exception:
+ return ErrorResponse(
+ resource_type=None,
+ resource_name=None,
+ message=response_body,
+ code=status_code
+ )
+ else:
+ return ErrorResponse(
+ resource_type=None,
+ resource_name=None,
+ message="response body is null",
+ code=status_code
+ )
+
+
+def _get_headers_with_params(path: str, query_params: Dict[str, str],
+ method: str, data: str,
+ header_function: Callable[[RESTAuthParameter],
Dict[str, str]]) -> Dict[str, str]:
+ rest_auth_parameter = RESTAuthParameter(
+ path=path,
+ parameters=query_params,
+ method=method,
+ data=data
+ )
+ return header_function(rest_auth_parameter)
+
+
+def _get_headers(path: str, method: str, query_params: Dict[str, str], data:
str,
+ header_function: Callable[[RESTAuthParameter], Dict[str,
str]]) -> Dict[str, str]:
+ return _get_headers_with_params(path, query_params, method, data,
header_function)
+
+
+class HttpClient(RESTClient):
+
+ def __init__(self, uri: str):
+ self.logger = logging.getLogger(self.__class__.__name__)
+ self.uri = _normalize_uri(uri)
+ self.error_handler = DefaultErrorHandler.get_instance()
+ self.logging_interceptor = LoggingInterceptor()
+
+ self.session = requests.Session()
+
+ retry_interceptor = ExponentialRetryInterceptor(max_retries=5)
+ retry_strategy = retry_interceptor.create_retry_strategy()
+ adapter = HTTPAdapter(max_retries=retry_strategy)
+
+ self.session.mount("http://", adapter)
+ self.session.mount("https://", adapter)
+
+ self.session.timeout = (180, 180)
+
+ self.session.headers.update({
+ 'Accept': 'application/json'
+ })
+
+ def set_error_handler(self, error_handler: ErrorHandler) -> None:
+ self.error_handler = error_handler
+
+ def get(self, path: str, response_type: Type[T],
+ rest_auth_function: Callable[[RESTAuthParameter], Dict[str, str]])
-> T:
+ auth_headers = _get_headers(path, "GET", {}, "", rest_auth_function)
+ url = self._get_request_url(path, None)
+
+ return self._execute_request("GET", url, headers=auth_headers,
+ response_type=response_type)
+
+ def get_with_params(self, path: str, query_params: Dict[str, str],
+ response_type: Type[T],
+ rest_auth_function: Callable[[RESTAuthParameter],
Dict[str, str]]) -> T:
+ auth_headers = _get_headers(path, "GET", query_params, None,
rest_auth_function)
+ url = self._get_request_url(path, query_params)
+
+ return self._execute_request("GET", url, headers=auth_headers,
+ response_type=response_type)
+
+ def post(self, path: str, body: RESTRequest,
+ rest_auth_function: Callable[[RESTAuthParameter], Dict[str,
str]]) -> T:
+ return self.post_with_response_type(path, body, None,
rest_auth_function)
+
+ def post_with_response_type(self, path: str, body: RESTRequest,
response_type: Optional[Type[T]],
+ rest_auth_function:
Callable[[RESTAuthParameter], Dict[str, str]]) -> T:
+ try:
+ body_str = RESTApi.to_json(body)
+ auth_headers = _get_headers(path, "POST", body_str,
rest_auth_function)
+ url = self._get_request_url(path, None)
+
+ return self._execute_request("POST", url, data=body_str,
headers=auth_headers,
+ response_type=response_type)
+ except json.JSONEncodeError as e:
+ raise RESTException("build request failed.", e)
+
+ def delete(self, path: str,
+ rest_auth_function: Callable[[RESTAuthParameter], Dict[str,
str]]) -> T:
+ auth_headers = _get_headers(path, "DELETE", "", rest_auth_function)
+ url = self._get_request_url(path, None)
+
+ return self._execute_request("DELETE", url, headers=auth_headers,
response_type=None)
+
+ def delete_with_body(self, path: str, body: RESTRequest,
+ rest_auth_function: Callable[[RESTAuthParameter],
Dict[str, str]]) -> T:
+ try:
+ body_str = RESTApi.to_json(body)
+ auth_headers = _get_headers(path, "DELETE", body_str,
rest_auth_function)
+ url = self._get_request_url(path, None)
+
+ return self._execute_request("DELETE", url, data=body_str,
headers=auth_headers,
+ response_type=None)
+ except json.JSONEncodeError as e:
+ raise RESTException("build request failed.", e)
+
+ def _get_request_url(self, path: str, query_params: Optional[Dict[str,
str]]) -> str:
+ if not path or path.strip() == "":
+ full_path = self.uri
+ else:
+ full_path = self.uri + path
+
+ if query_params:
+ query_string = urllib.parse.urlencode(query_params)
+ full_path = f"{full_path}?{query_string}"
+
+ return full_path
+
+ def get_uri(self) -> str:
+ return self.uri
+
+ def _execute_request(self, method: str, url: str,
+ data: Optional[str] = None,
+ headers: Optional[Dict[str, str]] = None,
+ response_type: Optional[Type[T]] = None) -> T:
+ try:
+ if headers:
+ self.logging_interceptor.log_request(method, url, headers)
+
+ response = self.session.request(
+ method=method,
+ url=url,
+ data=data.encode('utf-8') if data else None,
+ headers=headers
+ )
+
+ response_headers = dict(response.headers)
+ self.logging_interceptor.log_response(response.status_code,
response_headers)
+
+ response_body_str = response.text if response.text else None
+
+ if not response.ok:
+ error = _parse_error_response(response_body_str,
response.status_code)
+ request_id = response.headers.get(
+ LoggingInterceptor.REQUEST_ID_KEY,
+ LoggingInterceptor.DEFAULT_REQUEST_ID
+ )
+ self.error_handler.accept(error, request_id)
+
+ if response_type is not None and response_body_str is not None:
+ return response_type.from_json(response_body_str)
+ elif response_type is None:
+ return None
+ else:
+ raise RESTException("response body is null.")
+
+ except RESTException:
+ raise
+ except Exception as e:
+ raise RESTException("rest exception", e)