This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push: new 45b823907d [pvfs] Support define endpoint in path (#6008) 45b823907d is described below commit 45b823907dd93cc21bf2a3c1adcae4bb26731438 Author: jerry <lining....@alibaba-inc.com> AuthorDate: Fri Aug 1 15:25:56 2025 +0800 [pvfs] Support define endpoint in path (#6008) --- paimon-python/pypaimon/pvfs/__init__.py | 87 +++++++++++++++++++------------ paimon-python/pypaimon/tests/pvfs_test.py | 4 ++ 2 files changed, 59 insertions(+), 32 deletions(-) diff --git a/paimon-python/pypaimon/pvfs/__init__.py b/paimon-python/pypaimon/pvfs/__init__.py index ec1cb0f05c..09273d65f4 100644 --- a/paimon-python/pypaimon/pvfs/__init__.py +++ b/paimon-python/pypaimon/pvfs/__init__.py @@ -16,7 +16,6 @@ # under the License. import importlib -import re import time import datetime from abc import ABC @@ -45,22 +44,37 @@ class StorageType(Enum): class PVFSIdentifier(ABC): catalog: str + endpoint: str + + def get_cache_key(self) -> str: + return f"{self.catalog}.{self.__remove_endpoint_schema(self.endpoint)}" + + @staticmethod + def __remove_endpoint_schema(url): + if url.startswith('https://'): + return url[8:] + elif url.startswith('http://'): + return url[7:] + return url @dataclass class PVFSCatalogIdentifier(PVFSIdentifier): catalog: str + endpoint: str @dataclass class PVFSDatabaseIdentifier(PVFSIdentifier): - database: str catalog: str + endpoint: str + database: str @dataclass class PVFSTableIdentifier(PVFSIdentifier): catalog: str + endpoint: str database: str table: str sub_path: str = None @@ -114,7 +128,6 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem): options: Dict[str, Any] protocol = PROTOCOL_NAME - _identifier_pattern = re.compile("^pvfs://([^/]+)/([^/]+)/([^/]+)(?:/[^/]+)*/?$") def __init__(self, options: Dict = None, **kwargs): options.update({CatalogOptions.HTTP_USER_AGENT_HEADER: 'PythonPVFS'}) @@ -150,7 +163,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem): def ls(self, path, detail=True, **kwargs): pvfs_identifier = self._extract_pvfs_identifier(path) - rest_api = self.__rest_api(pvfs_identifier.catalog) + rest_api = self.__rest_api(pvfs_identifier) if isinstance(pvfs_identifier, PVFSCatalogIdentifier): databases = rest_api.list_databases() if detail: @@ -209,7 +222,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem): self._convert_database_virtual_path(pvfs_identifier.catalog, pvfs_identifier.database) ) elif isinstance(pvfs_identifier, PVFSTableIdentifier): - rest_api = self.__rest_api(pvfs_identifier.catalog) + rest_api = self.__rest_api(pvfs_identifier) table_path = self._get_table_store(rest_api, pvfs_identifier).path storage_type = self._get_storage_type(table_path) storage_location = table_path @@ -224,14 +237,14 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem): if isinstance(pvfs_identifier, PVFSCatalogIdentifier): return True elif isinstance(pvfs_identifier, PVFSDatabaseIdentifier): - rest_api = self.__rest_api(pvfs_identifier.catalog) + rest_api = self.__rest_api(pvfs_identifier) try: rest_api.get_database(pvfs_identifier.database) return True except NoSuchResourceException: return False elif isinstance(pvfs_identifier, PVFSTableIdentifier): - rest_api = self.__rest_api(pvfs_identifier.catalog) + rest_api = self.__rest_api(pvfs_identifier) try: table_path = self._get_table_store(rest_api, pvfs_identifier).path if table_path is not None and pvfs_identifier.sub_path is None: @@ -252,7 +265,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem): and target.sub_path is not None and source.sub_path is not None and source == target): - rest_api = self.__rest_api(source.catalog) + rest_api = self.__rest_api(source) table_path = self._get_table_store(rest_api, source).path storage_type = self._get_storage_type(table_path) storage_location = table_path @@ -274,7 +287,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem): if (isinstance(source, PVFSTableIdentifier) and isinstance(target, PVFSTableIdentifier) and target.catalog == source.catalog): - rest_api = self.__rest_api(source.catalog) + rest_api = self.__rest_api(source) if target.sub_path is None and source.sub_path is None: source_identifier = Identifier.create(source.database, source.table) target_identifier = Identifier.create(target.database, target.table) @@ -306,7 +319,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem): def rm(self, path, recursive=False, maxdepth=None): pvfs_identifier = self._extract_pvfs_identifier(path) - rest_api = self.__rest_api(pvfs_identifier.catalog) + rest_api = self.__rest_api(pvfs_identifier) if isinstance(pvfs_identifier, PVFSDatabaseIdentifier): database_name = pvfs_identifier.database if not recursive and len(rest_api.list_tables(database_name)) > 0: @@ -342,7 +355,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem): def rm_file(self, path): pvfs_identifier = self._extract_pvfs_identifier(path) if isinstance(pvfs_identifier, PVFSTableIdentifier): - rest_api = self.__rest_api(pvfs_identifier.catalog) + rest_api = self.__rest_api(pvfs_identifier) table_path = self._get_table_store(rest_api, pvfs_identifier).path if pvfs_identifier.sub_path is not None: storage_type = self._get_storage_type(table_path) @@ -360,7 +373,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem): files = self.ls(path) if len(files) == 0: pvfs_identifier = self._extract_pvfs_identifier(path) - rest_api = self.__rest_api(pvfs_identifier.catalog) + rest_api = self.__rest_api(pvfs_identifier) if isinstance(pvfs_identifier, PVFSDatabaseIdentifier): database_name = pvfs_identifier.database rest_api.drop_database(database_name) @@ -406,7 +419,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem): f"open is not supported for path: {path}" ) elif isinstance(pvfs_identifier, PVFSTableIdentifier): - rest_api = self.__rest_api(pvfs_identifier.catalog) + rest_api = self.__rest_api(pvfs_identifier) table_path = self._get_table_store(rest_api, pvfs_identifier).path if pvfs_identifier.sub_path is None: raise Exception( @@ -428,7 +441,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem): def mkdir(self, path, create_parents=True, **kwargs): pvfs_identifier = self._extract_pvfs_identifier(path) - rest_api = self.__rest_api(pvfs_identifier.catalog) + rest_api = self.__rest_api(pvfs_identifier) if isinstance(pvfs_identifier, PVFSCatalogIdentifier): raise Exception( f"mkdir is not supported for path: {path}" @@ -471,7 +484,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem): def makedirs(self, path, exist_ok=True): pvfs_identifier = self._extract_pvfs_identifier(path) - rest_api = self.__rest_api(pvfs_identifier.catalog) + rest_api = self.__rest_api(pvfs_identifier) if isinstance(pvfs_identifier, PVFSCatalogIdentifier): raise Exception( f"makedirs is not supported for path: {path}" @@ -511,7 +524,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem): def created(self, path): pvfs_identifier = self._extract_pvfs_identifier(path) - rest_api = self.__rest_api(pvfs_identifier.catalog) + rest_api = self.__rest_api(pvfs_identifier) if isinstance(pvfs_identifier, PVFSCatalogIdentifier): raise Exception( f"created is not supported for path: {path}" @@ -533,7 +546,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem): def modified(self, path): pvfs_identifier = self._extract_pvfs_identifier(path) - rest_api = self.__rest_api(pvfs_identifier.catalog) + rest_api = self.__rest_api(pvfs_identifier) if isinstance(pvfs_identifier, PVFSCatalogIdentifier): raise Exception( f"modified is not supported for path: {path}" @@ -570,7 +583,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem): f"cat file is not supported for path: {path}" ) else: - rest_api = self.__rest_api(pvfs_identifier.catalog) + rest_api = self.__rest_api(pvfs_identifier) table = self._get_table_store(rest_api, pvfs_identifier) storage_type = self._get_storage_type(table.path) storage_location = table.path @@ -594,7 +607,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem): f"get file is not supported for path: {rpath}" ) elif isinstance(pvfs_identifier, PVFSTableIdentifier): - rest_api = self.__rest_api(pvfs_identifier.catalog) + rest_api = self.__rest_api(pvfs_identifier) if pvfs_identifier.sub_path is None: raise Exception( f"get file is not supported for path: {rpath}" @@ -617,7 +630,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem): ) def _create_object_table(self, pvfs_identifier: PVFSTableIdentifier): - rest_api = self.__rest_api(pvfs_identifier.catalog) + rest_api = self.__rest_api(pvfs_identifier) schema = Schema(options={'type': 'object-table'}) table_identifier = pvfs_identifier.get_identifier() rest_api.create_table(table_identifier, schema) @@ -714,8 +727,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem): return path[len(f"{StorageType.OSS.value}://"):] return path - @staticmethod - def _extract_pvfs_identifier(path: str) -> Optional['PVFSIdentifier']: + def _extract_pvfs_identifier(self, path: str) -> Optional['PVFSIdentifier']: if not isinstance(path, str): raise Exception("path is not a string") path_without_protocol = path @@ -726,19 +738,26 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem): return None components = [component for component in path_without_protocol.rstrip('/').split('/') if component] - + catalog: str = None + endpoint: str = self.options.get(CatalogOptions.URI) + if len(components) > 0: + if '.' in components[0]: + (catalog, endpoint) = components[0].split('.', 1) + else: + catalog = components[0] if len(components) == 0: return None elif len(components) == 1: - return PVFSCatalogIdentifier(components[0]) + return (PVFSCatalogIdentifier(endpoint=endpoint, catalog=catalog)) elif len(components) == 2: - return PVFSDatabaseIdentifier(catalog=components[0], database=components[1]) + return PVFSDatabaseIdentifier(endpoint=endpoint, catalog=catalog, database=components[1]) elif len(components) == 3: - return PVFSTableIdentifier(catalog=components[0], database=components[1], table=components[2]) + return PVFSTableIdentifier(endpoint=endpoint, catalog=catalog, database=components[1], table=components[2]) elif len(components) > 3: sub_path = '/'.join(components[3:]) return PVFSTableIdentifier( - catalog=components[0], database=components[1], + endpoint=endpoint, + catalog=catalog, database=components[1], table=components[2], sub_path=sub_path ) return None @@ -775,11 +794,15 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem): modified = self.__converse_ts_to_datatime(table.updated_at) return TableStore(path=table.path, created=created, modified=modified) - def __rest_api(self, catalog: str): + def __rest_api(self, pvfs_identifier: PVFSIdentifier): read_lock = self._rest_api_cache_lock.gen_rlock() + catalog = pvfs_identifier.catalog + if pvfs_identifier.endpoint is None or catalog is None: + raise ValueError("Endpoint or catalog is not set.") + key = pvfs_identifier.get_cache_key() try: read_lock.acquire() - rest_api = self._rest_api_cache.get(catalog) + rest_api = self._rest_api_cache.get(key) if rest_api is not None: return rest_api finally: @@ -788,10 +811,10 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem): write_lock = self._rest_api_cache_lock.gen_wlock() try: write_lock.acquire() - rest_api = self._rest_api_cache.get(catalog) + rest_api = self._rest_api_cache.get(key) if rest_api is None: options = self.options.copy() - options.update({CatalogOptions.WAREHOUSE: catalog}) + options.update({CatalogOptions.WAREHOUSE: catalog, CatalogOptions.URI: pvfs_identifier.endpoint}) rest_api = RESTApi(options) self._rest_api_cache[catalog] = rest_api return rest_api @@ -819,7 +842,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem): if storage_type == StorageType.LOCAL: fs = LocalFileSystem() elif storage_type == StorageType.OSS: - rest_api = self.__rest_api(pvfs_table_identifier.catalog) + rest_api = self.__rest_api(pvfs_table_identifier) load_token_response: GetTableTokenResponse = rest_api.load_table_token( Identifier.create(pvfs_table_identifier.database, pvfs_table_identifier.table)) fs = self._get_oss_filesystem(load_token_response.token) diff --git a/paimon-python/pypaimon/tests/pvfs_test.py b/paimon-python/pypaimon/tests/pvfs_test.py index b81914a347..fe25358759 100644 --- a/paimon-python/pypaimon/tests/pvfs_test.py +++ b/paimon-python/pypaimon/tests/pvfs_test.py @@ -155,6 +155,10 @@ class PVFSTestCase(unittest.TestCase): self.assertSetEqual(set(table_dirs), expect_table_dirs) database_virtual_path = f"pvfs://{self.catalog}/{self.database}" self.assertEqual(database_virtual_path, self.pvfs.info(database_virtual_path).get('name')) + + database_virtual_path_with_endpoint = f"pvfs://{self.catalog}.localhost:{self.server.port}/{self.database}" + self.assertEqual(database_virtual_path, self.pvfs.info(database_virtual_path_with_endpoint).get('name')) + self.assertEqual(True, self.pvfs.exists(database_virtual_path)) table_virtual_path = f"pvfs://{self.catalog}/{self.database}/{self.table}" self.assertEqual(table_virtual_path, self.pvfs.info(table_virtual_path).get('name'))