discivigour commented on code in PR #7040:
URL: https://github.com/apache/paimon/pull/7040#discussion_r2694017188


##########
paimon-python/pypaimon/catalog/rest/rest_token_file_io.py:
##########
@@ -20,70 +20,164 @@
 import time
 from typing import Optional
 
-from pyarrow._fs import FileSystem
+from cachetools import TTLCache
 
 from pypaimon.api.rest_api import RESTApi
 from pypaimon.api.rest_util import RESTUtil
 from pypaimon.catalog.rest.rest_token import RESTToken
 from pypaimon.common.file_io import FileIO
+from pypaimon.filesystem.pyarrow_file_io import PyArrowFileIO
 from pypaimon.common.identifier import Identifier
 from pypaimon.common.options import Options
 from pypaimon.common.options.config import CatalogOptions, OssOptions
+from pypaimon.common.uri_reader import UriReaderFactory
 
 
 class RESTTokenFileIO(FileIO):
-
+    """
+    A FileIO to support getting token from REST Server.
+    """
+    
+    _FILE_IO_CACHE_MAXSIZE = 1000
+    _FILE_IO_CACHE_TTL = 36000  # 10 hours in seconds
+    
     def __init__(self, identifier: Identifier, path: str,
                  catalog_options: Optional[Options] = None):
         self.identifier = identifier
         self.path = path
+        self.catalog_options = catalog_options
+        self.properties = catalog_options or Options({})  # For compatibility 
with refresh_token()
         self.token: Optional[RESTToken] = None
         self.api_instance: Optional[RESTApi] = None
         self.lock = threading.Lock()
         self.log = logging.getLogger(__name__)
-        super().__init__(path, catalog_options)
+        self._uri_reader_factory_cache: Optional[UriReaderFactory] = None
+        self._file_io_cache: TTLCache = TTLCache(
+            maxsize=self._FILE_IO_CACHE_MAXSIZE,
+            ttl=self._FILE_IO_CACHE_TTL
+        )
 
     def __getstate__(self):
         state = self.__dict__.copy()
         # Remove non-serializable objects
         state.pop('lock', None)
         state.pop('api_instance', None)
+        state.pop('_file_io_cache', None)
+        state.pop('_uri_reader_factory_cache', None)
         # token can be serialized, but we'll refresh it on deserialization
         return state
 
     def __setstate__(self, state):
         self.__dict__.update(state)
-        # Recreate lock after deserialization
+        # Recreate lock and cache after deserialization
         self.lock = threading.Lock()
+        self._file_io_cache = TTLCache(
+            maxsize=self._FILE_IO_CACHE_MAXSIZE,
+            ttl=self._FILE_IO_CACHE_TTL
+        )
+        self._uri_reader_factory_cache = None
         # api_instance will be recreated when needed
         self.api_instance = None
 
-    def _initialize_oss_fs(self, path) -> FileSystem:
+    def file_io(self) -> FileIO:
         self.try_to_refresh_token()
-        merged_token = self._merge_token_with_catalog_options(self.token.token)
-        merged_properties = RESTUtil.merge(
-            self.properties.to_map() if self.properties else {},
-            merged_token
-        )
-        merged_options = Options(merged_properties)
-        original_properties = self.properties
-        self.properties = merged_options
-        try:
-            return super()._initialize_oss_fs(path)
-        finally:
-            self.properties = original_properties
+
+        if self.token is None:
+            return FileIO.get(self.path, self.catalog_options or Options({}))

Review Comment:
   The if judgment is missed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to