FANNG1 commented on code in PR #5997: URL: https://github.com/apache/gravitino/pull/5997#discussion_r1904042397
########## clients/client-python/tests/integration/test_gvfs_with_gcs_credential.py: ########## @@ -0,0 +1,112 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import logging +import os +from random import randint +import unittest + +from gcsfs import GCSFileSystem + +from gravitino import Catalog, Fileset, GravitinoClient +from gravitino.filesystem import gvfs +from tests.integration.test_gvfs_with_gcs import TestGvfsWithGCS + +logger = logging.getLogger(__name__) + + +def gcs_with_credential_is_configured(): + return all( + [ + os.environ.get("GCS_STS_SERVICE_ACCOUNT_JSON_PATH") is not None, + os.environ.get("GCS_STS_BUCKET_NAME") is not None, + ] + ) + + [email protected](gcs_with_credential_is_configured(), "GCS is not configured.") +class TestGvfsWithGCSCredential(TestGvfsWithGCS): + # Before running this test, please set the make sure gcp-bundle-x.jar has been + # copy to the $GRAVITINO_HOME/catalogs/hadoop/libs/ directory + key_file = os.environ.get("GCS_STS_SERVICE_ACCOUNT_JSON_PATH") + bucket_name = os.environ.get("GCS_STS_BUCKET_NAME") + metalake_name: str = "TestGvfsWithGCSCredential_metalake" + str(randint(1, 10000)) + + @classmethod + def _init_test_entities(cls): + cls.gravitino_admin_client.create_metalake( + name=cls.metalake_name, comment="", properties={} + ) + cls.gravitino_client = GravitinoClient( + uri="http://localhost:8090", metalake_name=cls.metalake_name + ) + + cls.config = {} + cls.conf = {} + catalog = cls.gravitino_client.create_catalog( + name=cls.catalog_name, + catalog_type=Catalog.Type.FILESET, + provider=cls.catalog_provider, + comment="", + properties={ + "filesystem-providers": "gcs", + "gcs-credential-file-path": cls.key_file, + "gcs-service-account-file": cls.key_file, + "credential-providers": "gcs-token", + }, + ) + catalog.as_schemas().create_schema( + schema_name=cls.schema_name, comment="", properties={} + ) + + cls.fileset_storage_location: str = ( + f"gs://{cls.bucket_name}/{cls.catalog_name}/{cls.schema_name}/{cls.fileset_name}" + ) + cls.fileset_gvfs_location = ( + f"gvfs://fileset/{cls.catalog_name}/{cls.schema_name}/{cls.fileset_name}" + ) + catalog.as_fileset_catalog().create_fileset( + ident=cls.fileset_ident, + fileset_type=Fileset.Type.MANAGED, + comment=cls.fileset_comment, + storage_location=cls.fileset_storage_location, + properties=cls.fileset_properties, + ) + + cls.fs = GCSFileSystem(token=cls.key_file) + + def test_mkdir(self): Review Comment: why not move these tests to `TestGvfsWithHDFS` ? ########## clients/client-python/gravitino/filesystem/gvfs.py: ########## @@ -866,50 +894,93 @@ def _get_fileset_catalog(self, catalog_ident: NameIdentifier): finally: write_lock.release() - def _get_filesystem(self, actual_file_location: str): + # Disable Too many branches (13/12) (too-many-branches) + # pylint: disable=R0912 + def _get_filesystem( + self, + actual_file_location: str, + fileset_catalog: Catalog, + name_identifier: NameIdentifier, + ): storage_type = self._recognize_storage_type(actual_file_location) read_lock = self._cache_lock.gen_rlock() try: read_lock.acquire() - cache_value: Tuple[StorageType, AbstractFileSystem] = self._cache.get( - storage_type + cache_value: Tuple[int, AbstractFileSystem] = self._cache.get( + name_identifier ) if cache_value is not None: - return cache_value + # if the cache value is not expired, return the cache value + if cache_value[0] > time.time() * 1000: + return cache_value[1] finally: read_lock.release() write_lock = self._cache_lock.gen_wlock() try: write_lock.acquire() - cache_value: Tuple[StorageType, AbstractFileSystem] = self._cache.get( - storage_type + cache_value: Tuple[int, AbstractFileSystem] = self._cache.get( + name_identifier ) + if cache_value is not None: - return cache_value + # if the cache value is not expired, return the cache value + if cache_value[0] > time.time() * 1000: + return cache_value[1] + + new_cache_value: Tuple[int, AbstractFileSystem] if storage_type == StorageType.HDFS: fs_class = importlib.import_module("pyarrow.fs").HadoopFileSystem - fs = ArrowFSWrapper(fs_class.from_uri(actual_file_location)) + new_cache_value = ( + 0, + ArrowFSWrapper(fs_class.from_uri(actual_file_location)), + ) elif storage_type == StorageType.LOCAL: - fs = LocalFileSystem() + new_cache_value = (0, LocalFileSystem()) elif storage_type == StorageType.GCS: - fs = self._get_gcs_filesystem() + new_cache_value = self._get_gcs_filesystem( + fileset_catalog, name_identifier + ) elif storage_type == StorageType.S3A: - fs = self._get_s3_filesystem() + new_cache_value = self._get_s3_filesystem( + fileset_catalog, name_identifier + ) elif storage_type == StorageType.OSS: - fs = self._get_oss_filesystem() + new_cache_value = self._get_oss_filesystem( + fileset_catalog, name_identifier + ) elif storage_type == StorageType.ABS: - fs = self._get_abs_filesystem() + new_cache_value = self._get_abs_filesystem( + fileset_catalog, name_identifier + ) else: raise GravitinoRuntimeException( f"Storage type: `{storage_type}` doesn't support now." ) - self._cache[storage_type] = fs - return fs + self._cache[name_identifier] = new_cache_value + return new_cache_value[1] finally: write_lock.release() - def _get_gcs_filesystem(self): + def _get_gcs_filesystem(self, fileset_catalog: Catalog, identifier: NameIdentifier): + try: + fileset: GenericFileset = fileset_catalog.as_fileset_catalog().load_fileset( + NameIdentifier.of(identifier.namespace().level(2), identifier.name()) + ) + credentials = fileset.support_credentials().get_credentials() + except (NoSuchCredentialException, CatalogNotInUseException) as e: + logger.warning("Failed to get credentials from fileset: %s", e) + credentials = [] + + credential = self._get_most_suitable_gcs_credential(credentials) + if credential is not None: + expire_time = self._get_expire_time_by_ratio(credential.expire_time_in_ms()) + if isinstance(credential, GCSTokenCredential): + fs = importlib.import_module("gcsfs").GCSFileSystem( + token=credential.token() + ) + return (expire_time, fs) + # get 'service-account-key' from gcs_options, if the key is not found, throw an exception service_account_key_path = self._options.get( Review Comment: maybe not related to this PR, what if the user get gcs credential file from environment? ########## clients/client-python/gravitino/filesystem/gvfs.py: ########## @@ -866,50 +894,93 @@ def _get_fileset_catalog(self, catalog_ident: NameIdentifier): finally: write_lock.release() - def _get_filesystem(self, actual_file_location: str): + # Disable Too many branches (13/12) (too-many-branches) + # pylint: disable=R0912 + def _get_filesystem( + self, + actual_file_location: str, + fileset_catalog: Catalog, + name_identifier: NameIdentifier, + ): storage_type = self._recognize_storage_type(actual_file_location) read_lock = self._cache_lock.gen_rlock() try: read_lock.acquire() - cache_value: Tuple[StorageType, AbstractFileSystem] = self._cache.get( - storage_type + cache_value: Tuple[int, AbstractFileSystem] = self._cache.get( + name_identifier ) if cache_value is not None: - return cache_value + # if the cache value is not expired, return the cache value + if cache_value[0] > time.time() * 1000: + return cache_value[1] finally: read_lock.release() write_lock = self._cache_lock.gen_wlock() try: write_lock.acquire() - cache_value: Tuple[StorageType, AbstractFileSystem] = self._cache.get( - storage_type + cache_value: Tuple[int, AbstractFileSystem] = self._cache.get( + name_identifier ) + if cache_value is not None: - return cache_value + # if the cache value is not expired, return the cache value + if cache_value[0] > time.time() * 1000: Review Comment: process the cases which expire time is zero? and could you use a function to check whether the token expires? ########## clients/client-python/tests/integration/test_gvfs_with_gcs_credential.py: ########## @@ -0,0 +1,112 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import logging +import os +from random import randint +import unittest + +from gcsfs import GCSFileSystem + +from gravitino import Catalog, Fileset, GravitinoClient +from gravitino.filesystem import gvfs +from tests.integration.test_gvfs_with_gcs import TestGvfsWithGCS + +logger = logging.getLogger(__name__) + + +def gcs_with_credential_is_configured(): + return all( + [ + os.environ.get("GCS_STS_SERVICE_ACCOUNT_JSON_PATH") is not None, + os.environ.get("GCS_STS_BUCKET_NAME") is not None, + ] + ) + + [email protected](gcs_with_credential_is_configured(), "GCS is not configured.") +class TestGvfsWithGCSCredential(TestGvfsWithGCS): Review Comment: The current xxCredential tests depends on HDFS test class which couldn't be tested on Mac and should run with docker environment, it's not convenient, could you fix it in this PR or create separate issue to track it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
