This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push: new e930925d78 [python] pvfs: support table cache (#5965) e930925d78 is described below commit e930925d78a8fb35214156ba77bee7bb3835532d Author: jerry <lining....@alibaba-inc.com> AuthorDate: Tue Jul 29 17:39:34 2025 +0800 [python] pvfs: support table cache (#5965) --- paimon-python/pypaimon/api/config.py | 6 +- paimon-python/pypaimon/api/rest_json.py | 11 +- paimon-python/pypaimon/api/schema.py | 11 -- paimon-python/pypaimon/pvfs/__init__.py | 206 ++++++++++++++++++---------- paimon-python/pypaimon/rest/rest_catalog.py | 3 +- paimon-python/pypaimon/tests/pvfs_test.py | 3 +- 6 files changed, 149 insertions(+), 91 deletions(-) diff --git a/paimon-python/pypaimon/api/config.py b/paimon-python/pypaimon/api/config.py index e9a6561ab2..54f7a8bc49 100644 --- a/paimon-python/pypaimon/api/config.py +++ b/paimon-python/pypaimon/api/config.py @@ -40,5 +40,7 @@ class RESTCatalogOptions: class PVFSOptions: - DEFAULT_CACHE_SIZE = 20 - CACHE_SIZE = "cache_size" + CACHE_ENABLED = "cache-enabled" + TABLE_CACHE_TTL = "cache.expire-after-write" + DEFAULT_TABLE_CACHE_TTL = 1800 + DEFAULT_CACHE_SIZE = 2**31 - 1 diff --git a/paimon-python/pypaimon/api/rest_json.py b/paimon-python/pypaimon/api/rest_json.py index 124dd69254..b944bb625a 100644 --- a/paimon-python/pypaimon/api/rest_json.py +++ b/paimon-python/pypaimon/api/rest_json.py @@ -17,7 +17,7 @@ import json from dataclasses import field, fields, is_dataclass -from typing import Any, Type, Dict, TypeVar +from typing import Any, Type, Dict, TypeVar, get_origin, get_args, Union T = TypeVar("T") @@ -74,8 +74,13 @@ class JSON: for field_info in fields(target_class): json_name = field_info.metadata.get("json_name", field_info.name) field_mapping[json_name] = field_info.name - if is_dataclass(field_info.type): - type_mapping[json_name] = field_info.type + origin_type = get_origin(field_info.type) + args = get_args(field_info.type) + field_type = field_info.type + if origin_type is Union and len(args) == 2: + field_type = args[0] + if is_dataclass(field_type): + type_mapping[json_name] = field_type # Map JSON data to field names kwargs = {} diff --git a/paimon-python/pypaimon/api/schema.py b/paimon-python/pypaimon/api/schema.py index 4bc06806c8..0a826bdc8e 100644 --- a/paimon-python/pypaimon/api/schema.py +++ b/paimon-python/pypaimon/api/schema.py @@ -39,14 +39,3 @@ class Schema: FIELD_PRIMARY_KEYS, default_factory=list) options: Dict[str, str] = json_field(FIELD_OPTIONS, default_factory=dict) comment: Optional[str] = json_field(FIELD_COMMENT, default=None) - - @staticmethod - def from_dict(data: dict): - fields = [DataField.from_dict(field) for field in data["fields"]] - return Schema( - fields=fields, - partition_keys=data["partitionKeys"], - primary_keys=data["primaryKeys"], - options=data["options"], - comment=data.get("comment"), - ) diff --git a/paimon-python/pypaimon/pvfs/__init__.py b/paimon-python/pypaimon/pvfs/__init__.py index 25dd7df7ea..0351efa0eb 100644 --- a/paimon-python/pypaimon/pvfs/__init__.py +++ b/paimon-python/pypaimon/pvfs/__init__.py @@ -24,14 +24,14 @@ from dataclasses import dataclass from enum import Enum from typing import Dict, Any, Optional, Tuple -from cachetools import LRUCache +from cachetools import LRUCache, TTLCache from readerwriterlock import rwlock import fsspec from fsspec import AbstractFileSystem from fsspec.implementations.local import LocalFileSystem -from pypaimon.api import RESTApi, GetTableTokenResponse, Schema, GetTableResponse, Identifier +from pypaimon.api import RESTApi, GetTableTokenResponse, Schema, Identifier, GetTableResponse from pypaimon.api.client import NoSuchResourceException, AlreadyExistsException from pypaimon.api.config import RESTCatalogOptions, OssOptions, PVFSOptions @@ -85,6 +85,9 @@ class PVFSTableIdentifier(PVFSIdentifier): def get_identifier(self): return Identifier.create(self.database, self.table) + def name(self): + return f'{self.catalog}.{self.database}.{self.table}' + @dataclass class PaimonRealStorage: @@ -100,6 +103,13 @@ class PaimonRealStorage: return False +@dataclass +class TableStore: + path: str + created: datetime + modified: datetime + + class PaimonVirtualFileSystem(fsspec.AbstractFileSystem): options: Dict[str, Any] @@ -110,25 +120,24 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem): options.update({RESTCatalogOptions.HTTP_USER_AGENT_HEADER: 'PythonPVFS'}) self.options = options self.warehouse = options.get(RESTCatalogOptions.WAREHOUSE) - cache_size = ( - PVFSOptions.DEFAULT_CACHE_SIZE + cache_expired_time = ( + PVFSOptions.DEFAULT_TABLE_CACHE_TTL if options is None - else options.get(PVFSOptions.CACHE_SIZE, PVFSOptions.DEFAULT_CACHE_SIZE) + else options.get( + PVFSOptions.TABLE_CACHE_TTL, PVFSOptions.DEFAULT_TABLE_CACHE_TTL + ) ) - self._rest_client_cache = LRUCache(cache_size) - self._cache = LRUCache(maxsize=cache_size) - self._cache_lock = rwlock.RWLockFair() + self._cache_enable = options.get(PVFSOptions.CACHE_ENABLED, True) + if self._cache_enable: + self._table_cache = TTLCache(maxsize=PVFSOptions.DEFAULT_CACHE_SIZE, ttl=cache_expired_time) + self._table_cache_lock = rwlock.RWLockFair() + self._rest_api_cache = LRUCache(PVFSOptions.DEFAULT_CACHE_SIZE) + self._fs_cache = LRUCache(maxsize=PVFSOptions.DEFAULT_CACHE_SIZE) + self._table_cache_lock = rwlock.RWLockFair() + self._rest_api_cache_lock = rwlock.RWLockFair() + self._fs_cache_lock = rwlock.RWLockFair() super().__init__(**kwargs) - def __rest_api(self, catalog: str): - rest_api = self._rest_client_cache.get(catalog) - if rest_api is None: - options = self.options.copy() - options.update({RESTCatalogOptions.WAREHOUSE: catalog}) - rest_api = RESTApi(options) - self._rest_client_cache[catalog] = rest_api - return rest_api - @property def fsid(self): return PROTOCOL_NAME @@ -169,9 +178,9 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem): for table in tables ] elif isinstance(pvfs_identifier, PVFSTableIdentifier): - table = rest_api.get_table(Identifier.create(pvfs_identifier.database, pvfs_identifier.table)) - storage_type = self._get_storage_type(table.path) - storage_location = table.path + table_path = self._get_table_store(rest_api, pvfs_identifier).path + storage_type = self._get_storage_type(table_path) + storage_location = table_path actual_path = pvfs_identifier.get_actual_path(storage_location) virtual_location = pvfs_identifier.get_virtual_location() fs = self._get_filesystem(pvfs_identifier, storage_type) @@ -201,9 +210,9 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem): ) elif isinstance(pvfs_identifier, PVFSTableIdentifier): rest_api = self.__rest_api(pvfs_identifier.catalog) - table = rest_api.get_table(Identifier.create(pvfs_identifier.database, pvfs_identifier.table)) - storage_type = self._get_storage_type(table.path) - storage_location = table.path + table_path = self._get_table_store(rest_api, pvfs_identifier).path + storage_type = self._get_storage_type(table_path) + storage_location = table_path actual_path = pvfs_identifier.get_actual_path(storage_location) virtual_location = pvfs_identifier.get_virtual_location() fs = self._get_filesystem(pvfs_identifier, storage_type) @@ -224,11 +233,11 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem): elif isinstance(pvfs_identifier, PVFSTableIdentifier): rest_api = self.__rest_api(pvfs_identifier.catalog) try: - table = rest_api.get_table(Identifier.create(pvfs_identifier.database, pvfs_identifier.table)) - if pvfs_identifier.sub_path is None: + table_path = self._get_table_store(rest_api, pvfs_identifier).path + if table_path is not None and pvfs_identifier.sub_path is None: return True - storage_type = self._get_storage_type(table.path) - storage_location = table.path + storage_type = self._get_storage_type(table_path) + storage_location = table_path actual_path = pvfs_identifier.get_actual_path(storage_location) fs = self._get_filesystem(pvfs_identifier, storage_type) return fs.exists(actual_path) @@ -244,10 +253,9 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem): and source.sub_path is not None and source == target): rest_api = self.__rest_api(source.catalog) - table_identifier = source.get_identifier() - table = rest_api.get_table(table_identifier) - storage_type = self._get_storage_type(table.path) - storage_location = table.path + table_path = self._get_table_store(rest_api, source).path + storage_type = self._get_storage_type(table_path) + storage_location = table_path source_actual_path = source.get_actual_path(storage_location) target_actual_path = target.get_actual_path(storage_location) fs = self._get_filesystem(source, storage_type) @@ -273,10 +281,9 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem): rest_api.rename_table(source_identifier, target_identifier) return None elif target.sub_path is not None and source.sub_path is not None and target == source: - table_identifier = source.get_identifier() - table = rest_api.get_table(table_identifier) - storage_type = self._get_storage_type(table.path) - storage_location = table.path + table_path = self._get_table_store(rest_api, source).path + storage_type = self._get_storage_type(table_path) + storage_location = table_path source_actual_path = source.get_actual_path(storage_location) target_actual_path = target.get_actual_path(storage_location) fs = self._get_filesystem(source, storage_type) @@ -305,15 +312,22 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem): if not recursive and len(rest_api.list_tables(database_name)) > 0: raise Exception('Recursive is False but database is not empty') rest_api.drop_database(database_name) + if self._table_cache: + for table in rest_api.list_tables(database_name): + table_pvfs_identifier = PVFSTableIdentifier( + pvfs_identifier.catalog, database_name, table + ).name() + self._table_cache.pop(table_pvfs_identifier) return True elif isinstance(pvfs_identifier, PVFSTableIdentifier): table_identifier = pvfs_identifier.get_identifier() - table = rest_api.get_table(table_identifier) + table_path = self._get_table_store(rest_api, pvfs_identifier).path if pvfs_identifier.sub_path is None: rest_api.drop_table(table_identifier) + self._table_cache.pop(table_identifier.name()) return True - storage_type = self._get_storage_type(table.path) - storage_location = table.path + storage_type = self._get_storage_type(table_path) + storage_location = table_path actual_path = pvfs_identifier.get_actual_path(storage_location) fs = self._get_filesystem(pvfs_identifier, storage_type) return fs.rm( @@ -328,12 +342,11 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem): def rm_file(self, path): pvfs_identifier = self._extract_pvfs_identifier(path) if isinstance(pvfs_identifier, PVFSTableIdentifier): - table_identifier = pvfs_identifier.get_identifier() rest_api = self.__rest_api(pvfs_identifier.catalog) - table = rest_api.get_table(table_identifier) + table_path = self._get_table_store(rest_api, pvfs_identifier).path if pvfs_identifier.sub_path is not None: - storage_type = self._get_storage_type(table.path) - storage_location = table.path + storage_type = self._get_storage_type(table_path) + storage_location = table_path actual_path = pvfs_identifier.get_actual_path(storage_location) fs = self._get_filesystem(pvfs_identifier, storage_type) return fs.rm_file( @@ -354,13 +367,13 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem): return True elif isinstance(pvfs_identifier, PVFSTableIdentifier): table_identifier = pvfs_identifier.get_identifier() - table = rest_api.get_table(table_identifier) + table_path = self._get_table_store(rest_api, pvfs_identifier).path if pvfs_identifier.sub_path is None: rest_api.drop_table(table_identifier) self._cache.pop(pvfs_identifier) return True - storage_type = self._get_storage_type(table.path) - storage_location = table.path + storage_type = self._get_storage_type(table_path) + storage_location = table_path actual_path = pvfs_identifier.get_actual_path(storage_location) fs = self._get_filesystem(pvfs_identifier, storage_type) return fs.rmdir( @@ -394,15 +407,14 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem): ) elif isinstance(pvfs_identifier, PVFSTableIdentifier): rest_api = self.__rest_api(pvfs_identifier.catalog) - table_identifier = pvfs_identifier.get_identifier() - table = rest_api.get_table(table_identifier) + table_path = self._get_table_store(rest_api, pvfs_identifier).path if pvfs_identifier.sub_path is None: raise Exception( f"open is not supported for path: {path}" ) else: - storage_type = self._get_storage_type(table.path) - storage_location = table.path + storage_type = self._get_storage_type(table_path) + storage_location = table_path actual_path = pvfs_identifier.get_actual_path(storage_location) fs = self._get_filesystem(pvfs_identifier, storage_type) return fs.open( @@ -424,7 +436,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem): elif isinstance(pvfs_identifier, PVFSDatabaseIdentifier): rest_api.create_database(pvfs_identifier.database, {}) elif isinstance(pvfs_identifier, PVFSTableIdentifier): - table_identifier = pvfs_identifier.get_identifier() + table_path: str if pvfs_identifier.sub_path is None: if create_parents: try: @@ -433,23 +445,22 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem): pass self._create_object_table(pvfs_identifier) else: - table: GetTableResponse if create_parents: try: rest_api.create_database(pvfs_identifier.database, {}) except AlreadyExistsException: pass try: - table = rest_api.get_table(table_identifier) + table_path = self._get_table_store(rest_api, pvfs_identifier).path except NoSuchResourceException: try: self._create_object_table(pvfs_identifier) except AlreadyExistsException: pass finally: - table = rest_api.get_table(table_identifier) - storage_type = self._get_storage_type(table.path) - storage_location = table.path + table_path = self._get_table_store(rest_api, pvfs_identifier).path + storage_type = self._get_storage_type(table_path) + storage_location = table_path actual_path = pvfs_identifier.get_actual_path(storage_location) fs = self._get_filesystem(pvfs_identifier, storage_type) return fs.mkdir( @@ -473,7 +484,6 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem): pass raise e elif isinstance(pvfs_identifier, PVFSTableIdentifier): - table_identifier = pvfs_identifier.get_identifier() if pvfs_identifier.sub_path is None: try: self._create_object_table(pvfs_identifier) @@ -489,9 +499,9 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem): pass else: raise e - table = rest_api.get_table(table_identifier) - storage_type = self._get_storage_type(table.path) - storage_location = table.path + table_path = self._get_table_store(rest_api, pvfs_identifier).path + storage_type = self._get_storage_type(table_path) + storage_location = table_path actual_path = pvfs_identifier.get_actual_path(storage_location) fs = self._get_filesystem(pvfs_identifier, storage_type) return fs.makedirs( @@ -509,11 +519,10 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem): elif isinstance(pvfs_identifier, PVFSDatabaseIdentifier): return self.__converse_ts_to_datatime(rest_api.get_database(pvfs_identifier.database).created_at) elif isinstance(pvfs_identifier, PVFSTableIdentifier): - table_identifier = pvfs_identifier.get_identifier() if pvfs_identifier.sub_path is None: - return self.__converse_ts_to_datatime(rest_api.get_table(table_identifier).created_at) + return self._get_table_store(rest_api, pvfs_identifier).created else: - table = rest_api.get_table(table_identifier) + table = self._get_table_store(rest_api, pvfs_identifier) storage_type = self._get_storage_type(table.path) storage_location = table.path actual_path = pvfs_identifier.get_actual_path(storage_location) @@ -534,7 +543,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem): elif isinstance(pvfs_identifier, PVFSTableIdentifier): table_identifier = pvfs_identifier.get_identifier() if pvfs_identifier.sub_path is None: - return self.__converse_ts_to_datatime(rest_api.get_table(table_identifier).updated_at) + return self._get_table_store(rest_api, pvfs_identifier).modified else: table = rest_api.get_table(table_identifier) storage_type = self._get_storage_type(table.path) @@ -556,14 +565,13 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem): f"cat file is not supported for path: {path}" ) elif isinstance(pvfs_identifier, PVFSTableIdentifier): - table_identifier = pvfs_identifier.get_identifier() if pvfs_identifier.sub_path is None: raise Exception( f"cat file is not supported for path: {path}" ) else: rest_api = self.__rest_api(pvfs_identifier.catalog) - table = rest_api.get_table(table_identifier) + table = self._get_table_store(rest_api, pvfs_identifier) storage_type = self._get_storage_type(table.path) storage_location = table.path actual_path = pvfs_identifier.get_actual_path(storage_location) @@ -587,13 +595,12 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem): ) elif isinstance(pvfs_identifier, PVFSTableIdentifier): rest_api = self.__rest_api(pvfs_identifier.catalog) - table_identifier = pvfs_identifier.get_identifier() if pvfs_identifier.sub_path is None: raise Exception( f"get file is not supported for path: {rpath}" ) else: - table = rest_api.get_table(table_identifier) + table = self._get_table_store(rest_api, pvfs_identifier) storage_type = self._get_storage_type(table.path) storage_location = table.path actual_path = pvfs_identifier.get_actual_path(storage_location) @@ -736,11 +743,66 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem): ) return None + def _get_table_store(self, rest_api: RESTApi, pvfs_identifier: PVFSTableIdentifier) -> Optional['TableStore']: + if not self._cache_enable: + table = rest_api.get_table(Identifier.create(pvfs_identifier.database, pvfs_identifier.table)) + return self._create_table_store(table) + read_lock = self._table_cache_lock.gen_rlock() + try: + read_lock.acquire() + cache_value: Tuple[str, TableStore] = self._table_cache.get( + pvfs_identifier.name() + ) + if cache_value is not None: + return cache_value + finally: + read_lock.release() + write_lock = self._table_cache_lock.gen_wlock() + try: + write_lock.acquire() + table = rest_api.get_table(Identifier.create(pvfs_identifier.database, pvfs_identifier.table)) + if table is not None: + table_store = self._create_table_store(table) + self._table_cache[pvfs_identifier.name()] = table_store + return table_store + else: + return None + finally: + write_lock.release() + + def _create_table_store(self, table: GetTableResponse): + created = self.__converse_ts_to_datatime(table.created_at) + modified = self.__converse_ts_to_datatime(table.updated_at) + return TableStore(path=table.path, created=created, modified=modified) + + def __rest_api(self, catalog: str): + read_lock = self._rest_api_cache_lock.gen_rlock() + try: + read_lock.acquire() + rest_api = self._rest_api_cache.get(catalog) + if rest_api is not None: + return rest_api + finally: + read_lock.release() + + write_lock = self._rest_api_cache_lock.gen_wlock() + try: + write_lock.acquire() + rest_api = self._rest_api_cache.get(catalog) + if rest_api is None: + options = self.options.copy() + options.update({RESTCatalogOptions.WAREHOUSE: catalog}) + rest_api = RESTApi(options) + self._rest_api_cache[catalog] = rest_api + return rest_api + finally: + write_lock.release() + def _get_filesystem(self, pvfs_table_identifier: PVFSTableIdentifier, storage_type: StorageType) -> 'FileSystem': - read_lock = self._cache_lock.gen_rlock() + read_lock = self._fs_cache_lock.gen_rlock() try: read_lock.acquire() - cache_value: Tuple[StorageType, AbstractFileSystem] = self._cache.get( + cache_value: Tuple[StorageType, AbstractFileSystem] = self._fs_cache.get( storage_type ) if cache_value is not None: @@ -748,10 +810,10 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem): finally: read_lock.release() - write_lock = self._cache_lock.gen_wlock() + write_lock = self._table_cache_lock.gen_wlock() try: write_lock.acquire() - cache_value: PaimonRealStorage = self._cache.get(pvfs_table_identifier) + cache_value: PaimonRealStorage = self._fs_cache.get(pvfs_table_identifier) if cache_value is not None and cache_value.need_refresh() is False: return cache_value.file_system if storage_type == StorageType.LOCAL: @@ -766,7 +828,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem): expires_at_millis=load_token_response.expires_at_millis, file_system=fs ) - self._cache[pvfs_table_identifier] = paimon_real_storage + self._fs_cache[pvfs_table_identifier] = paimon_real_storage else: raise Exception( f"Storage type: `{storage_type}` doesn't support now." diff --git a/paimon-python/pypaimon/rest/rest_catalog.py b/paimon-python/pypaimon/rest/rest_catalog.py index 2f76aa5a1d..364fec8247 100644 --- a/paimon-python/pypaimon/rest/rest_catalog.py +++ b/paimon-python/pypaimon/rest/rest_catalog.py @@ -24,7 +24,6 @@ from pypaimon.api.core_options import CoreOptions from pypaimon.api.identifier import Identifier from pypaimon.api.options import Options -from pypaimon.api.schema import Schema from pypaimon.api.table_schema import TableSchema from pypaimon.catalog.catalog import Catalog @@ -99,7 +98,7 @@ class RESTCatalog(Catalog): return self.to_table_metadata(identifier.get_database_name(), response) def to_table_metadata(self, db: str, response: GetTableResponse) -> TableMetadata: - schema = TableSchema.create(response.get_schema_id(), Schema.from_dict(response.get_schema())) + schema = TableSchema.create(response.get_schema_id(), response.get_schema()) options: Dict[str, str] = dict(schema.options) options[CoreOptions.PATH] = response.get_path() response.put_audit_options_to(options) diff --git a/paimon-python/pypaimon/tests/pvfs_test.py b/paimon-python/pypaimon/tests/pvfs_test.py index 3b084d0019..c78fcf94a0 100644 --- a/paimon-python/pypaimon/tests/pvfs_test.py +++ b/paimon-python/pypaimon/tests/pvfs_test.py @@ -57,7 +57,8 @@ class PVFSTestCase(unittest.TestCase): 'warehouse': 'test_warehouse', 'dlf.region': 'cn-hangzhou', "token.provider": "bear", - 'token': self.token + 'token': self.token, + 'cache-enabled': True } self.pvfs = PaimonVirtualFileSystem(options) self.database = 'test_database'