discivigour commented on code in PR #7040:
URL: https://github.com/apache/paimon/pull/7040#discussion_r2694017188
##########
paimon-python/pypaimon/catalog/rest/rest_token_file_io.py:
##########
@@ -20,70 +20,164 @@
import time
from typing import Optional
-from pyarrow._fs import FileSystem
+from cachetools import TTLCache
from pypaimon.api.rest_api import RESTApi
from pypaimon.api.rest_util import RESTUtil
from pypaimon.catalog.rest.rest_token import RESTToken
from pypaimon.common.file_io import FileIO
+from pypaimon.filesystem.pyarrow_file_io import PyArrowFileIO
from pypaimon.common.identifier import Identifier
from pypaimon.common.options import Options
from pypaimon.common.options.config import CatalogOptions, OssOptions
+from pypaimon.common.uri_reader import UriReaderFactory
class RESTTokenFileIO(FileIO):
-
+ """
+ A FileIO to support getting token from REST Server.
+ """
+
+ _FILE_IO_CACHE_MAXSIZE = 1000
+ _FILE_IO_CACHE_TTL = 36000 # 10 hours in seconds
+
def __init__(self, identifier: Identifier, path: str,
catalog_options: Optional[Options] = None):
self.identifier = identifier
self.path = path
+ self.catalog_options = catalog_options
+ self.properties = catalog_options or Options({}) # For compatibility
with refresh_token()
self.token: Optional[RESTToken] = None
self.api_instance: Optional[RESTApi] = None
self.lock = threading.Lock()
self.log = logging.getLogger(__name__)
- super().__init__(path, catalog_options)
+ self._uri_reader_factory_cache: Optional[UriReaderFactory] = None
+ self._file_io_cache: TTLCache = TTLCache(
+ maxsize=self._FILE_IO_CACHE_MAXSIZE,
+ ttl=self._FILE_IO_CACHE_TTL
+ )
def __getstate__(self):
state = self.__dict__.copy()
# Remove non-serializable objects
state.pop('lock', None)
state.pop('api_instance', None)
+ state.pop('_file_io_cache', None)
+ state.pop('_uri_reader_factory_cache', None)
# token can be serialized, but we'll refresh it on deserialization
return state
def __setstate__(self, state):
self.__dict__.update(state)
- # Recreate lock after deserialization
+ # Recreate lock and cache after deserialization
self.lock = threading.Lock()
+ self._file_io_cache = TTLCache(
+ maxsize=self._FILE_IO_CACHE_MAXSIZE,
+ ttl=self._FILE_IO_CACHE_TTL
+ )
+ self._uri_reader_factory_cache = None
# api_instance will be recreated when needed
self.api_instance = None
- def _initialize_oss_fs(self, path) -> FileSystem:
+ def file_io(self) -> FileIO:
self.try_to_refresh_token()
- merged_token = self._merge_token_with_catalog_options(self.token.token)
- merged_properties = RESTUtil.merge(
- self.properties.to_map() if self.properties else {},
- merged_token
- )
- merged_options = Options(merged_properties)
- original_properties = self.properties
- self.properties = merged_options
- try:
- return super()._initialize_oss_fs(path)
- finally:
- self.properties = original_properties
+
+ if self.token is None:
+ return FileIO.get(self.path, self.catalog_options or Options({}))
Review Comment:
The if judgment is missed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]