yuqi1129 commented on code in PR #5997:
URL: https://github.com/apache/gravitino/pull/5997#discussion_r1904073910
##########
clients/client-python/gravitino/filesystem/gvfs.py:
##########
@@ -866,50 +894,93 @@ def _get_fileset_catalog(self, catalog_ident:
NameIdentifier):
finally:
write_lock.release()
- def _get_filesystem(self, actual_file_location: str):
+ # Disable Too many branches (13/12) (too-many-branches)
+ # pylint: disable=R0912
+ def _get_filesystem(
+ self,
+ actual_file_location: str,
+ fileset_catalog: Catalog,
+ name_identifier: NameIdentifier,
+ ):
storage_type = self._recognize_storage_type(actual_file_location)
read_lock = self._cache_lock.gen_rlock()
try:
read_lock.acquire()
- cache_value: Tuple[StorageType, AbstractFileSystem] =
self._cache.get(
- storage_type
+ cache_value: Tuple[int, AbstractFileSystem] = self._cache.get(
+ name_identifier
)
if cache_value is not None:
- return cache_value
+ # if the cache value is not expired, return the cache value
+ if cache_value[0] > time.time() * 1000:
+ return cache_value[1]
finally:
read_lock.release()
write_lock = self._cache_lock.gen_wlock()
try:
write_lock.acquire()
- cache_value: Tuple[StorageType, AbstractFileSystem] =
self._cache.get(
- storage_type
+ cache_value: Tuple[int, AbstractFileSystem] = self._cache.get(
+ name_identifier
)
+
if cache_value is not None:
- return cache_value
+ # if the cache value is not expired, return the cache value
+ if cache_value[0] > time.time() * 1000:
+ return cache_value[1]
+
+ new_cache_value: Tuple[int, AbstractFileSystem]
if storage_type == StorageType.HDFS:
fs_class =
importlib.import_module("pyarrow.fs").HadoopFileSystem
- fs = ArrowFSWrapper(fs_class.from_uri(actual_file_location))
+ new_cache_value = (
+ 0,
+ ArrowFSWrapper(fs_class.from_uri(actual_file_location)),
+ )
elif storage_type == StorageType.LOCAL:
- fs = LocalFileSystem()
+ new_cache_value = (0, LocalFileSystem())
elif storage_type == StorageType.GCS:
- fs = self._get_gcs_filesystem()
+ new_cache_value = self._get_gcs_filesystem(
+ fileset_catalog, name_identifier
+ )
elif storage_type == StorageType.S3A:
- fs = self._get_s3_filesystem()
+ new_cache_value = self._get_s3_filesystem(
+ fileset_catalog, name_identifier
+ )
elif storage_type == StorageType.OSS:
- fs = self._get_oss_filesystem()
+ new_cache_value = self._get_oss_filesystem(
+ fileset_catalog, name_identifier
+ )
elif storage_type == StorageType.ABS:
- fs = self._get_abs_filesystem()
+ new_cache_value = self._get_abs_filesystem(
+ fileset_catalog, name_identifier
+ )
else:
raise GravitinoRuntimeException(
f"Storage type: `{storage_type}` doesn't support now."
)
- self._cache[storage_type] = fs
- return fs
+ self._cache[name_identifier] = new_cache_value
+ return new_cache_value[1]
finally:
write_lock.release()
- def _get_gcs_filesystem(self):
+ def _get_gcs_filesystem(self, fileset_catalog: Catalog, identifier:
NameIdentifier):
+ try:
+ fileset: GenericFileset =
fileset_catalog.as_fileset_catalog().load_fileset(
+ NameIdentifier.of(identifier.namespace().level(2),
identifier.name())
+ )
+ credentials = fileset.support_credentials().get_credentials()
+ except (NoSuchCredentialException, CatalogNotInUseException) as e:
+ logger.warning("Failed to get credentials from fileset: %s", e)
+ credentials = []
+
+ credential = self._get_most_suitable_gcs_credential(credentials)
+ if credential is not None:
+ expire_time =
self._get_expire_time_by_ratio(credential.expire_time_in_ms())
+ if isinstance(credential, GCSTokenCredential):
+ fs = importlib.import_module("gcsfs").GCSFileSystem(
+ token=credential.token()
+ )
+ return (expire_time, fs)
+
# get 'service-account-key' from gcs_options, if the key is not found,
throw an exception
service_account_key_path = self._options.get(
Review Comment:
environment value is not supported in the current implemenation for all four
cloud storage and should be set explicitly.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]