This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 45b823907d [pvfs] Support define endpoint in path (#6008)
45b823907d is described below

commit 45b823907dd93cc21bf2a3c1adcae4bb26731438
Author: jerry <lining....@alibaba-inc.com>
AuthorDate: Fri Aug 1 15:25:56 2025 +0800

    [pvfs] Support define endpoint in path (#6008)
---
 paimon-python/pypaimon/pvfs/__init__.py   | 87 +++++++++++++++++++------------
 paimon-python/pypaimon/tests/pvfs_test.py |  4 ++
 2 files changed, 59 insertions(+), 32 deletions(-)

diff --git a/paimon-python/pypaimon/pvfs/__init__.py 
b/paimon-python/pypaimon/pvfs/__init__.py
index ec1cb0f05c..09273d65f4 100644
--- a/paimon-python/pypaimon/pvfs/__init__.py
+++ b/paimon-python/pypaimon/pvfs/__init__.py
@@ -16,7 +16,6 @@
 #  under the License.
 
 import importlib
-import re
 import time
 import datetime
 from abc import ABC
@@ -45,22 +44,37 @@ class StorageType(Enum):
 
 class PVFSIdentifier(ABC):
     catalog: str
+    endpoint: str
+
+    def get_cache_key(self) -> str:
+        return f"{self.catalog}.{self.__remove_endpoint_schema(self.endpoint)}"
+
+    @staticmethod
+    def __remove_endpoint_schema(url):
+        if url.startswith('https://'):
+            return url[8:]
+        elif url.startswith('http://'):
+            return url[7:]
+        return url
 
 
 @dataclass
 class PVFSCatalogIdentifier(PVFSIdentifier):
     catalog: str
+    endpoint: str
 
 
 @dataclass
 class PVFSDatabaseIdentifier(PVFSIdentifier):
-    database: str
     catalog: str
+    endpoint: str
+    database: str
 
 
 @dataclass
 class PVFSTableIdentifier(PVFSIdentifier):
     catalog: str
+    endpoint: str
     database: str
     table: str
     sub_path: str = None
@@ -114,7 +128,6 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
     options: Dict[str, Any]
 
     protocol = PROTOCOL_NAME
-    _identifier_pattern = 
re.compile("^pvfs://([^/]+)/([^/]+)/([^/]+)(?:/[^/]+)*/?$")
 
     def __init__(self, options: Dict = None, **kwargs):
         options.update({CatalogOptions.HTTP_USER_AGENT_HEADER: 'PythonPVFS'})
@@ -150,7 +163,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
 
     def ls(self, path, detail=True, **kwargs):
         pvfs_identifier = self._extract_pvfs_identifier(path)
-        rest_api = self.__rest_api(pvfs_identifier.catalog)
+        rest_api = self.__rest_api(pvfs_identifier)
         if isinstance(pvfs_identifier, PVFSCatalogIdentifier):
             databases = rest_api.list_databases()
             if detail:
@@ -209,7 +222,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
                 self._convert_database_virtual_path(pvfs_identifier.catalog, 
pvfs_identifier.database)
             )
         elif isinstance(pvfs_identifier, PVFSTableIdentifier):
-            rest_api = self.__rest_api(pvfs_identifier.catalog)
+            rest_api = self.__rest_api(pvfs_identifier)
             table_path = self._get_table_store(rest_api, pvfs_identifier).path
             storage_type = self._get_storage_type(table_path)
             storage_location = table_path
@@ -224,14 +237,14 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
         if isinstance(pvfs_identifier, PVFSCatalogIdentifier):
             return True
         elif isinstance(pvfs_identifier, PVFSDatabaseIdentifier):
-            rest_api = self.__rest_api(pvfs_identifier.catalog)
+            rest_api = self.__rest_api(pvfs_identifier)
             try:
                 rest_api.get_database(pvfs_identifier.database)
                 return True
             except NoSuchResourceException:
                 return False
         elif isinstance(pvfs_identifier, PVFSTableIdentifier):
-            rest_api = self.__rest_api(pvfs_identifier.catalog)
+            rest_api = self.__rest_api(pvfs_identifier)
             try:
                 table_path = self._get_table_store(rest_api, 
pvfs_identifier).path
                 if table_path is not None and pvfs_identifier.sub_path is None:
@@ -252,7 +265,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
                 and target.sub_path is not None
                 and source.sub_path is not None
                 and source == target):
-            rest_api = self.__rest_api(source.catalog)
+            rest_api = self.__rest_api(source)
             table_path = self._get_table_store(rest_api, source).path
             storage_type = self._get_storage_type(table_path)
             storage_location = table_path
@@ -274,7 +287,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
         if (isinstance(source, PVFSTableIdentifier) and
                 isinstance(target, PVFSTableIdentifier) and
                 target.catalog == source.catalog):
-            rest_api = self.__rest_api(source.catalog)
+            rest_api = self.__rest_api(source)
             if target.sub_path is None and source.sub_path is None:
                 source_identifier = Identifier.create(source.database, 
source.table)
                 target_identifier = Identifier.create(target.database, 
target.table)
@@ -306,7 +319,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
 
     def rm(self, path, recursive=False, maxdepth=None):
         pvfs_identifier = self._extract_pvfs_identifier(path)
-        rest_api = self.__rest_api(pvfs_identifier.catalog)
+        rest_api = self.__rest_api(pvfs_identifier)
         if isinstance(pvfs_identifier, PVFSDatabaseIdentifier):
             database_name = pvfs_identifier.database
             if not recursive and len(rest_api.list_tables(database_name)) > 0:
@@ -342,7 +355,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
     def rm_file(self, path):
         pvfs_identifier = self._extract_pvfs_identifier(path)
         if isinstance(pvfs_identifier, PVFSTableIdentifier):
-            rest_api = self.__rest_api(pvfs_identifier.catalog)
+            rest_api = self.__rest_api(pvfs_identifier)
             table_path = self._get_table_store(rest_api, pvfs_identifier).path
             if pvfs_identifier.sub_path is not None:
                 storage_type = self._get_storage_type(table_path)
@@ -360,7 +373,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
         files = self.ls(path)
         if len(files) == 0:
             pvfs_identifier = self._extract_pvfs_identifier(path)
-            rest_api = self.__rest_api(pvfs_identifier.catalog)
+            rest_api = self.__rest_api(pvfs_identifier)
             if isinstance(pvfs_identifier, PVFSDatabaseIdentifier):
                 database_name = pvfs_identifier.database
                 rest_api.drop_database(database_name)
@@ -406,7 +419,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
                 f"open is not supported for path: {path}"
             )
         elif isinstance(pvfs_identifier, PVFSTableIdentifier):
-            rest_api = self.__rest_api(pvfs_identifier.catalog)
+            rest_api = self.__rest_api(pvfs_identifier)
             table_path = self._get_table_store(rest_api, pvfs_identifier).path
             if pvfs_identifier.sub_path is None:
                 raise Exception(
@@ -428,7 +441,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
 
     def mkdir(self, path, create_parents=True, **kwargs):
         pvfs_identifier = self._extract_pvfs_identifier(path)
-        rest_api = self.__rest_api(pvfs_identifier.catalog)
+        rest_api = self.__rest_api(pvfs_identifier)
         if isinstance(pvfs_identifier, PVFSCatalogIdentifier):
             raise Exception(
                 f"mkdir is not supported for path: {path}"
@@ -471,7 +484,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
 
     def makedirs(self, path, exist_ok=True):
         pvfs_identifier = self._extract_pvfs_identifier(path)
-        rest_api = self.__rest_api(pvfs_identifier.catalog)
+        rest_api = self.__rest_api(pvfs_identifier)
         if isinstance(pvfs_identifier, PVFSCatalogIdentifier):
             raise Exception(
                 f"makedirs is not supported for path: {path}"
@@ -511,7 +524,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
 
     def created(self, path):
         pvfs_identifier = self._extract_pvfs_identifier(path)
-        rest_api = self.__rest_api(pvfs_identifier.catalog)
+        rest_api = self.__rest_api(pvfs_identifier)
         if isinstance(pvfs_identifier, PVFSCatalogIdentifier):
             raise Exception(
                 f"created is not supported for path: {path}"
@@ -533,7 +546,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
 
     def modified(self, path):
         pvfs_identifier = self._extract_pvfs_identifier(path)
-        rest_api = self.__rest_api(pvfs_identifier.catalog)
+        rest_api = self.__rest_api(pvfs_identifier)
         if isinstance(pvfs_identifier, PVFSCatalogIdentifier):
             raise Exception(
                 f"modified is not supported for path: {path}"
@@ -570,7 +583,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
                     f"cat file is not supported for path: {path}"
                 )
             else:
-                rest_api = self.__rest_api(pvfs_identifier.catalog)
+                rest_api = self.__rest_api(pvfs_identifier)
                 table = self._get_table_store(rest_api, pvfs_identifier)
                 storage_type = self._get_storage_type(table.path)
                 storage_location = table.path
@@ -594,7 +607,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
                 f"get file is not supported for path: {rpath}"
             )
         elif isinstance(pvfs_identifier, PVFSTableIdentifier):
-            rest_api = self.__rest_api(pvfs_identifier.catalog)
+            rest_api = self.__rest_api(pvfs_identifier)
             if pvfs_identifier.sub_path is None:
                 raise Exception(
                     f"get file is not supported for path: {rpath}"
@@ -617,7 +630,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
         )
 
     def _create_object_table(self, pvfs_identifier: PVFSTableIdentifier):
-        rest_api = self.__rest_api(pvfs_identifier.catalog)
+        rest_api = self.__rest_api(pvfs_identifier)
         schema = Schema(options={'type': 'object-table'})
         table_identifier = pvfs_identifier.get_identifier()
         rest_api.create_table(table_identifier, schema)
@@ -714,8 +727,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
             return path[len(f"{StorageType.OSS.value}://"):]
         return path
 
-    @staticmethod
-    def _extract_pvfs_identifier(path: str) -> Optional['PVFSIdentifier']:
+    def _extract_pvfs_identifier(self, path: str) -> 
Optional['PVFSIdentifier']:
         if not isinstance(path, str):
             raise Exception("path is not a string")
         path_without_protocol = path
@@ -726,19 +738,26 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
             return None
 
         components = [component for component in 
path_without_protocol.rstrip('/').split('/') if component]
-
+        catalog: str = None
+        endpoint: str = self.options.get(CatalogOptions.URI)
+        if len(components) > 0:
+            if '.' in components[0]:
+                (catalog, endpoint) = components[0].split('.', 1)
+            else:
+                catalog = components[0]
         if len(components) == 0:
             return None
         elif len(components) == 1:
-            return PVFSCatalogIdentifier(components[0])
+            return (PVFSCatalogIdentifier(endpoint=endpoint, catalog=catalog))
         elif len(components) == 2:
-            return PVFSDatabaseIdentifier(catalog=components[0], 
database=components[1])
+            return PVFSDatabaseIdentifier(endpoint=endpoint, catalog=catalog, 
database=components[1])
         elif len(components) == 3:
-            return PVFSTableIdentifier(catalog=components[0], 
database=components[1], table=components[2])
+            return PVFSTableIdentifier(endpoint=endpoint, catalog=catalog, 
database=components[1], table=components[2])
         elif len(components) > 3:
             sub_path = '/'.join(components[3:])
             return PVFSTableIdentifier(
-                catalog=components[0], database=components[1],
+                endpoint=endpoint,
+                catalog=catalog, database=components[1],
                 table=components[2], sub_path=sub_path
             )
         return None
@@ -775,11 +794,15 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
         modified = self.__converse_ts_to_datatime(table.updated_at)
         return TableStore(path=table.path, created=created, modified=modified)
 
-    def __rest_api(self, catalog: str):
+    def __rest_api(self, pvfs_identifier: PVFSIdentifier):
         read_lock = self._rest_api_cache_lock.gen_rlock()
+        catalog = pvfs_identifier.catalog
+        if pvfs_identifier.endpoint is None or catalog is None:
+            raise ValueError("Endpoint or catalog is not set.")
+        key = pvfs_identifier.get_cache_key()
         try:
             read_lock.acquire()
-            rest_api = self._rest_api_cache.get(catalog)
+            rest_api = self._rest_api_cache.get(key)
             if rest_api is not None:
                 return rest_api
         finally:
@@ -788,10 +811,10 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
         write_lock = self._rest_api_cache_lock.gen_wlock()
         try:
             write_lock.acquire()
-            rest_api = self._rest_api_cache.get(catalog)
+            rest_api = self._rest_api_cache.get(key)
             if rest_api is None:
                 options = self.options.copy()
-                options.update({CatalogOptions.WAREHOUSE: catalog})
+                options.update({CatalogOptions.WAREHOUSE: catalog, 
CatalogOptions.URI: pvfs_identifier.endpoint})
                 rest_api = RESTApi(options)
                 self._rest_api_cache[catalog] = rest_api
             return rest_api
@@ -819,7 +842,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
             if storage_type == StorageType.LOCAL:
                 fs = LocalFileSystem()
             elif storage_type == StorageType.OSS:
-                rest_api = self.__rest_api(pvfs_table_identifier.catalog)
+                rest_api = self.__rest_api(pvfs_table_identifier)
                 load_token_response: GetTableTokenResponse = 
rest_api.load_table_token(
                     Identifier.create(pvfs_table_identifier.database, 
pvfs_table_identifier.table))
                 fs = self._get_oss_filesystem(load_token_response.token)
diff --git a/paimon-python/pypaimon/tests/pvfs_test.py 
b/paimon-python/pypaimon/tests/pvfs_test.py
index b81914a347..fe25358759 100644
--- a/paimon-python/pypaimon/tests/pvfs_test.py
+++ b/paimon-python/pypaimon/tests/pvfs_test.py
@@ -155,6 +155,10 @@ class PVFSTestCase(unittest.TestCase):
         self.assertSetEqual(set(table_dirs), expect_table_dirs)
         database_virtual_path = f"pvfs://{self.catalog}/{self.database}"
         self.assertEqual(database_virtual_path, 
self.pvfs.info(database_virtual_path).get('name'))
+
+        database_virtual_path_with_endpoint = 
f"pvfs://{self.catalog}.localhost:{self.server.port}/{self.database}"
+        self.assertEqual(database_virtual_path, 
self.pvfs.info(database_virtual_path_with_endpoint).get('name'))
+
         self.assertEqual(True, self.pvfs.exists(database_virtual_path))
         table_virtual_path = 
f"pvfs://{self.catalog}/{self.database}/{self.table}"
         self.assertEqual(table_virtual_path, 
self.pvfs.info(table_virtual_path).get('name'))

Reply via email to