This is an automated email from the ASF dual-hosted git repository.

fanng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 598bc0510 [#5996] feat(python-client): Using credentail in python GVFS 
client. (#5997)
598bc0510 is described below

commit 598bc051010490fddd47883ab64e450d36aa3d86
Author: Qi Yu <[email protected]>
AuthorDate: Wed Jan 8 17:04:36 2025 +0800

    [#5996] feat(python-client): Using credentail in python GVFS client. (#5997)
    
    ### What changes were proposed in this pull request?
    
    Support using credentail in GVFS python client for cloud storage.
    
    ### Why are the changes needed?
    
    It's need.
    
    Fix: #5996
    
    ### Does this PR introduce _any_ user-facing change?
    
    N/A
    
    ### How was this patch tested?
    
    New it and test locally
---
 .../gravitino/credential/ADLSTokenCredential.java  |   1 +
 .../metadata_object_credential_operations.py       |   2 +-
 clients/client-python/gravitino/filesystem/gvfs.py | 302 +++++++++++++++++----
 .../gravitino/filesystem/gvfs_config.py            |   8 +
 .../tests/integration/test_gvfs_with_abs.py        |   6 +-
 .../integration/test_gvfs_with_abs_credential.py   | 171 ++++++++++++
 .../tests/integration/test_gvfs_with_gcs.py        |   9 +-
 .../integration/test_gvfs_with_gcs_credential.py   | 112 ++++++++
 .../integration/test_gvfs_with_oss_credential.py   | 225 +++++++++++++++
 .../integration/test_gvfs_with_s3_credential.py    | 151 +++++++++++
 docs/how-to-use-gvfs.md                            |  23 +-
 11 files changed, 942 insertions(+), 68 deletions(-)

diff --git 
a/api/src/main/java/org/apache/gravitino/credential/ADLSTokenCredential.java 
b/api/src/main/java/org/apache/gravitino/credential/ADLSTokenCredential.java
index 249b0ac0b..6f1c46303 100644
--- a/api/src/main/java/org/apache/gravitino/credential/ADLSTokenCredential.java
+++ b/api/src/main/java/org/apache/gravitino/credential/ADLSTokenCredential.java
@@ -74,6 +74,7 @@ public class ADLSTokenCredential implements Credential {
   public Map<String, String> credentialInfo() {
     return (new ImmutableMap.Builder<String, String>())
         .put(GRAVITINO_ADLS_SAS_TOKEN, sasToken)
+        .put(GRAVITINO_AZURE_STORAGE_ACCOUNT_NAME, accountName)
         .build();
   }
 
diff --git 
a/clients/client-python/gravitino/client/metadata_object_credential_operations.py
 
b/clients/client-python/gravitino/client/metadata_object_credential_operations.py
index 93d538cfa..7184cd797 100644
--- 
a/clients/client-python/gravitino/client/metadata_object_credential_operations.py
+++ 
b/clients/client-python/gravitino/client/metadata_object_credential_operations.py
@@ -48,7 +48,7 @@ class MetadataObjectCredentialOperations(SupportsCredentials):
         metadata_object_type = metadata_object.type().value
         metadata_object_name = metadata_object.name()
         self._request_path = (
-            f"api/metalakes/{metalake_name}objects/{metadata_object_type}/"
+            f"api/metalakes/{metalake_name}/objects/{metadata_object_type}/"
             f"{metadata_object_name}/credentials"
         )
 
diff --git a/clients/client-python/gravitino/filesystem/gvfs.py 
b/clients/client-python/gravitino/filesystem/gvfs.py
index cd9521dc7..0dc020ee9 100644
--- a/clients/client-python/gravitino/filesystem/gvfs.py
+++ b/clients/client-python/gravitino/filesystem/gvfs.py
@@ -14,9 +14,15 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+import logging
+import sys
+
+# Disable C0302: Too many lines in module
+# pylint: disable=C0302
+import time
 from enum import Enum
 from pathlib import PurePosixPath
-from typing import Dict, Tuple
+from typing import Dict, Tuple, List
 import re
 import importlib
 import fsspec
@@ -28,6 +34,9 @@ from fsspec.implementations.arrow import ArrowFSWrapper
 from fsspec.utils import infer_storage_options
 
 from readerwriterlock import rwlock
+
+from gravitino.api.catalog import Catalog
+from gravitino.api.credential.credential import Credential
 from gravitino.audit.caller_context import CallerContext, CallerContextHolder
 from gravitino.audit.fileset_audit_constants import FilesetAuditConstants
 from gravitino.audit.fileset_data_operation import FilesetDataOperation
@@ -35,14 +44,31 @@ from gravitino.audit.internal_client_type import 
InternalClientType
 from gravitino.auth.default_oauth2_token_provider import 
DefaultOAuth2TokenProvider
 from gravitino.auth.oauth2_token_provider import OAuth2TokenProvider
 from gravitino.auth.simple_auth_provider import SimpleAuthProvider
+from gravitino.client.generic_fileset import GenericFileset
 from gravitino.client.fileset_catalog import FilesetCatalog
 from gravitino.client.gravitino_client import GravitinoClient
-from gravitino.exceptions.base import GravitinoRuntimeException
+from gravitino.exceptions.base import (
+    GravitinoRuntimeException,
+)
 from gravitino.filesystem.gvfs_config import GVFSConfig
 from gravitino.name_identifier import NameIdentifier
 
+from gravitino.api.credential.adls_token_credential import ADLSTokenCredential
+from gravitino.api.credential.azure_account_key_credential import (
+    AzureAccountKeyCredential,
+)
+from gravitino.api.credential.gcs_token_credential import GCSTokenCredential
+from gravitino.api.credential.oss_secret_key_credential import 
OSSSecretKeyCredential
+from gravitino.api.credential.oss_token_credential import OSSTokenCredential
+from gravitino.api.credential.s3_secret_key_credential import 
S3SecretKeyCredential
+from gravitino.api.credential.s3_token_credential import S3TokenCredential
+
+logger = logging.getLogger(__name__)
+
 PROTOCOL_NAME = "gvfs"
 
+TIME_WITHOUT_EXPIRATION = sys.maxsize
+
 
 class StorageType(Enum):
     HDFS = "hdfs"
@@ -677,8 +703,10 @@ class 
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
             NameIdentifier.of(identifier.namespace().level(2), 
identifier.name()),
             sub_path,
         )
+
         return FilesetContextPair(
-            actual_file_location, self._get_filesystem(actual_file_location)
+            actual_file_location,
+            self._get_filesystem(actual_file_location, fileset_catalog, 
identifier),
         )
 
     def _extract_identifier(self, path):
@@ -866,50 +894,90 @@ class 
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
         finally:
             write_lock.release()
 
-    def _get_filesystem(self, actual_file_location: str):
+    def _file_system_expired(self, expire_time: int):
+        return expire_time <= time.time() * 1000
+
+    # Disable Too many branches (13/12) (too-many-branches)
+    # pylint: disable=R0912
+    def _get_filesystem(
+        self,
+        actual_file_location: str,
+        fileset_catalog: Catalog,
+        name_identifier: NameIdentifier,
+    ):
         storage_type = self._recognize_storage_type(actual_file_location)
         read_lock = self._cache_lock.gen_rlock()
         try:
             read_lock.acquire()
-            cache_value: Tuple[StorageType, AbstractFileSystem] = 
self._cache.get(
-                storage_type
+            cache_value: Tuple[int, AbstractFileSystem] = self._cache.get(
+                name_identifier
             )
             if cache_value is not None:
-                return cache_value
+                if not self._file_system_expired(cache_value[0]):
+                    return cache_value[1]
         finally:
             read_lock.release()
 
         write_lock = self._cache_lock.gen_wlock()
         try:
             write_lock.acquire()
-            cache_value: Tuple[StorageType, AbstractFileSystem] = 
self._cache.get(
-                storage_type
+            cache_value: Tuple[int, AbstractFileSystem] = self._cache.get(
+                name_identifier
             )
+
             if cache_value is not None:
-                return cache_value
+                if not self._file_system_expired(cache_value[0]):
+                    return cache_value[1]
+
+            new_cache_value: Tuple[int, AbstractFileSystem]
             if storage_type == StorageType.HDFS:
                 fs_class = 
importlib.import_module("pyarrow.fs").HadoopFileSystem
-                fs = ArrowFSWrapper(fs_class.from_uri(actual_file_location))
+                new_cache_value = (
+                    TIME_WITHOUT_EXPIRATION,
+                    ArrowFSWrapper(fs_class.from_uri(actual_file_location)),
+                )
             elif storage_type == StorageType.LOCAL:
-                fs = LocalFileSystem()
+                new_cache_value = (TIME_WITHOUT_EXPIRATION, LocalFileSystem())
             elif storage_type == StorageType.GCS:
-                fs = self._get_gcs_filesystem()
+                new_cache_value = self._get_gcs_filesystem(
+                    fileset_catalog, name_identifier
+                )
             elif storage_type == StorageType.S3A:
-                fs = self._get_s3_filesystem()
+                new_cache_value = self._get_s3_filesystem(
+                    fileset_catalog, name_identifier
+                )
             elif storage_type == StorageType.OSS:
-                fs = self._get_oss_filesystem()
+                new_cache_value = self._get_oss_filesystem(
+                    fileset_catalog, name_identifier
+                )
             elif storage_type == StorageType.ABS:
-                fs = self._get_abs_filesystem()
+                new_cache_value = self._get_abs_filesystem(
+                    fileset_catalog, name_identifier
+                )
             else:
                 raise GravitinoRuntimeException(
                     f"Storage type: `{storage_type}` doesn't support now."
                 )
-            self._cache[storage_type] = fs
-            return fs
+            self._cache[name_identifier] = new_cache_value
+            return new_cache_value[1]
         finally:
             write_lock.release()
 
-    def _get_gcs_filesystem(self):
+    def _get_gcs_filesystem(self, fileset_catalog: Catalog, identifier: 
NameIdentifier):
+        fileset: GenericFileset = 
fileset_catalog.as_fileset_catalog().load_fileset(
+            NameIdentifier.of(identifier.namespace().level(2), 
identifier.name())
+        )
+        credentials = fileset.support_credentials().get_credentials()
+
+        credential = self._get_most_suitable_gcs_credential(credentials)
+        if credential is not None:
+            expire_time = 
self._get_expire_time_by_ratio(credential.expire_time_in_ms())
+            if isinstance(credential, GCSTokenCredential):
+                fs = importlib.import_module("gcsfs").GCSFileSystem(
+                    token=credential.token()
+                )
+                return (expire_time, fs)
+
         # get 'service-account-key' from gcs_options, if the key is not found, 
throw an exception
         service_account_key_path = self._options.get(
             GVFSConfig.GVFS_FILESYSTEM_GCS_SERVICE_KEY_FILE
@@ -918,11 +986,47 @@ class 
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
             raise GravitinoRuntimeException(
                 "Service account key is not found in the options."
             )
-        return importlib.import_module("gcsfs").GCSFileSystem(
-            token=service_account_key_path
+        return (
+            TIME_WITHOUT_EXPIRATION,
+            importlib.import_module("gcsfs").GCSFileSystem(
+                token=service_account_key_path
+            ),
         )
 
-    def _get_s3_filesystem(self):
+    def _get_s3_filesystem(self, fileset_catalog: Catalog, identifier: 
NameIdentifier):
+        fileset: GenericFileset = 
fileset_catalog.as_fileset_catalog().load_fileset(
+            NameIdentifier.of(identifier.namespace().level(2), 
identifier.name())
+        )
+        credentials = fileset.support_credentials().get_credentials()
+        credential = self._get_most_suitable_s3_credential(credentials)
+
+        # S3 endpoint from gravitino server, Note: the endpoint may not a real 
S3 endpoint
+        # it can be a simulated S3 endpoint, such as minio, so though the 
endpoint is not a required field
+        # for S3FileSystem, we still need to assign the endpoint to the 
S3FileSystem
+        s3_endpoint = fileset_catalog.properties().get("s3-endpoint", None)
+        # If the oss endpoint is not found in the fileset catalog, get it from 
the client options
+        if s3_endpoint is None:
+            s3_endpoint = 
self._options.get(GVFSConfig.GVFS_FILESYSTEM_S3_ENDPOINT)
+
+        if credential is not None:
+            expire_time = 
self._get_expire_time_by_ratio(credential.expire_time_in_ms())
+            if isinstance(credential, S3TokenCredential):
+                fs = importlib.import_module("s3fs").S3FileSystem(
+                    key=credential.access_key_id(),
+                    secret=credential.secret_access_key(),
+                    token=credential.session_token(),
+                    endpoint_url=s3_endpoint,
+                )
+                return (expire_time, fs)
+            if isinstance(credential, S3SecretKeyCredential):
+                fs = importlib.import_module("s3fs").S3FileSystem(
+                    key=credential.access_key_id(),
+                    secret=credential.secret_access_key(),
+                    endpoint_url=s3_endpoint,
+                )
+                return (expire_time, fs)
+
+        # this is the old way to get the s3 file system
         # get 'aws_access_key_id' from s3_options, if the key is not found, 
throw an exception
         aws_access_key_id = 
self._options.get(GVFSConfig.GVFS_FILESYSTEM_S3_ACCESS_KEY)
         if aws_access_key_id is None:
@@ -939,20 +1043,48 @@ class 
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
                 "AWS secret access key is not found in the options."
             )
 
-        # get 'aws_endpoint_url' from s3_options, if the key is not found, 
throw an exception
-        aws_endpoint_url = 
self._options.get(GVFSConfig.GVFS_FILESYSTEM_S3_ENDPOINT)
-        if aws_endpoint_url is None:
-            raise GravitinoRuntimeException(
-                "AWS endpoint url is not found in the options."
-            )
+        return (
+            TIME_WITHOUT_EXPIRATION,
+            importlib.import_module("s3fs").S3FileSystem(
+                key=aws_access_key_id,
+                secret=aws_secret_access_key,
+                endpoint_url=s3_endpoint,
+            ),
+        )
 
-        return importlib.import_module("s3fs").S3FileSystem(
-            key=aws_access_key_id,
-            secret=aws_secret_access_key,
-            endpoint_url=aws_endpoint_url,
+    def _get_oss_filesystem(self, fileset_catalog: Catalog, identifier: 
NameIdentifier):
+        fileset: GenericFileset = 
fileset_catalog.as_fileset_catalog().load_fileset(
+            NameIdentifier.of(identifier.namespace().level(2), 
identifier.name())
         )
+        credentials = fileset.support_credentials().get_credentials()
+
+        # OSS endpoint from gravitino server
+        oss_endpoint = fileset_catalog.properties().get("oss-endpoint", None)
+        # If the oss endpoint is not found in the fileset catalog, get it from 
the client options
+        if oss_endpoint is None:
+            oss_endpoint = 
self._options.get(GVFSConfig.GVFS_FILESYSTEM_OSS_ENDPOINT)
+
+        credential = self._get_most_suitable_oss_credential(credentials)
+        if credential is not None:
+            expire_time = 
self._get_expire_time_by_ratio(credential.expire_time_in_ms())
+            if isinstance(credential, OSSTokenCredential):
+                fs = importlib.import_module("ossfs").OSSFileSystem(
+                    key=credential.access_key_id(),
+                    secret=credential.secret_access_key(),
+                    token=credential.security_token(),
+                    endpoint=oss_endpoint,
+                )
+                return (expire_time, fs)
+            if isinstance(credential, OSSSecretKeyCredential):
+                return (
+                    expire_time,
+                    importlib.import_module("ossfs").OSSFileSystem(
+                        key=credential.access_key_id(),
+                        secret=credential.secret_access_key(),
+                        endpoint=oss_endpoint,
+                    ),
+                )
 
-    def _get_oss_filesystem(self):
         # get 'oss_access_key_id' from oss options, if the key is not found, 
throw an exception
         oss_access_key_id = 
self._options.get(GVFSConfig.GVFS_FILESYSTEM_OSS_ACCESS_KEY)
         if oss_access_key_id is None:
@@ -969,20 +1101,38 @@ class 
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
                 "OSS secret access key is not found in the options."
             )
 
-        # get 'oss_endpoint_url' from oss options, if the key is not found, 
throw an exception
-        oss_endpoint_url = 
self._options.get(GVFSConfig.GVFS_FILESYSTEM_OSS_ENDPOINT)
-        if oss_endpoint_url is None:
-            raise GravitinoRuntimeException(
-                "OSS endpoint url is not found in the options."
-            )
+        return (
+            TIME_WITHOUT_EXPIRATION,
+            importlib.import_module("ossfs").OSSFileSystem(
+                key=oss_access_key_id,
+                secret=oss_secret_access_key,
+                endpoint=oss_endpoint,
+            ),
+        )
 
-        return importlib.import_module("ossfs").OSSFileSystem(
-            key=oss_access_key_id,
-            secret=oss_secret_access_key,
-            endpoint=oss_endpoint_url,
+    def _get_abs_filesystem(self, fileset_catalog: Catalog, identifier: 
NameIdentifier):
+        fileset: GenericFileset = 
fileset_catalog.as_fileset_catalog().load_fileset(
+            NameIdentifier.of(identifier.namespace().level(2), 
identifier.name())
         )
+        credentials = fileset.support_credentials().get_credentials()
+
+        credential = self._get_most_suitable_abs_credential(credentials)
+        if credential is not None:
+            expire_time = 
self._get_expire_time_by_ratio(credential.expire_time_in_ms())
+            if isinstance(credential, ADLSTokenCredential):
+                fs = importlib.import_module("adlfs").AzureBlobFileSystem(
+                    account_name=credential.account_name(),
+                    sas_token=credential.sas_token(),
+                )
+                return (expire_time, fs)
+
+            if isinstance(credential, AzureAccountKeyCredential):
+                fs = importlib.import_module("adlfs").AzureBlobFileSystem(
+                    account_name=credential.account_name(),
+                    account_key=credential.account_key(),
+                )
+                return (expire_time, fs)
 
-    def _get_abs_filesystem(self):
         # get 'abs_account_name' from abs options, if the key is not found, 
throw an exception
         abs_account_name = self._options.get(
             GVFSConfig.GVFS_FILESYSTEM_AZURE_ACCOUNT_NAME
@@ -1001,10 +1151,68 @@ class 
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
                 "ABS account key is not found in the options."
             )
 
-        return importlib.import_module("adlfs").AzureBlobFileSystem(
-            account_name=abs_account_name,
-            account_key=abs_account_key,
+        return (
+            TIME_WITHOUT_EXPIRATION,
+            importlib.import_module("adlfs").AzureBlobFileSystem(
+                account_name=abs_account_name,
+                account_key=abs_account_key,
+            ),
+        )
+
+    def _get_most_suitable_s3_credential(self, credentials: List[Credential]):
+        for credential in credentials:
+            # Prefer to use the token credential, if it does not exist, use the
+            # secret key credential.
+            if isinstance(credential, S3TokenCredential):
+                return credential
+
+        for credential in credentials:
+            if isinstance(credential, S3SecretKeyCredential):
+                return credential
+        return None
+
+    def _get_most_suitable_oss_credential(self, credentials: List[Credential]):
+        for credential in credentials:
+            # Prefer to use the token credential, if it does not exist, use the
+            # secret key credential.
+            if isinstance(credential, OSSTokenCredential):
+                return credential
+
+        for credential in credentials:
+            if isinstance(credential, OSSSecretKeyCredential):
+                return credential
+        return None
+
+    def _get_most_suitable_gcs_credential(self, credentials: List[Credential]):
+        for credential in credentials:
+            # Prefer to use the token credential, if it does not exist, return 
None.
+            if isinstance(credential, GCSTokenCredential):
+                return credential
+        return None
+
+    def _get_most_suitable_abs_credential(self, credentials: List[Credential]):
+        for credential in credentials:
+            # Prefer to use the token credential, if it does not exist, use the
+            # account key credential
+            if isinstance(credential, ADLSTokenCredential):
+                return credential
+
+        for credential in credentials:
+            if isinstance(credential, AzureAccountKeyCredential):
+                return credential
+        return None
+
+    def _get_expire_time_by_ratio(self, expire_time: int):
+        if expire_time <= 0:
+            return TIME_WITHOUT_EXPIRATION
+
+        ratio = float(
+            self._options.get(
+                GVFSConfig.GVFS_FILESYSTEM_CREDENTIAL_EXPIRED_TIME_RATIO,
+                GVFSConfig.DEFAULT_CREDENTIAL_EXPIRED_TIME_RATIO,
+            )
         )
+        return time.time() * 1000 + (expire_time - time.time() * 1000) * ratio
 
 
 fsspec.register_implementation(PROTOCOL_NAME, GravitinoVirtualFileSystem)
diff --git a/clients/client-python/gravitino/filesystem/gvfs_config.py 
b/clients/client-python/gravitino/filesystem/gvfs_config.py
index 4261fb48f..6fbd8a99d 100644
--- a/clients/client-python/gravitino/filesystem/gvfs_config.py
+++ b/clients/client-python/gravitino/filesystem/gvfs_config.py
@@ -44,3 +44,11 @@ class GVFSConfig:
 
     GVFS_FILESYSTEM_AZURE_ACCOUNT_NAME = "abs_account_name"
     GVFS_FILESYSTEM_AZURE_ACCOUNT_KEY = "abs_account_key"
+
+    # This configuration marks the expired time of the credential. For 
instance, if the credential
+    # fetched from Gravitino server has expired time of 3600 seconds, and the 
credential_expired_time_ration is 0.5
+    # then the credential will be considered as expired after 1800 seconds and 
will try to retrieve a new credential.
+    GVFS_FILESYSTEM_CREDENTIAL_EXPIRED_TIME_RATIO = 
"credential_expiration_ratio"
+
+    # The default value of the credential_expired_time_ratio is 0.5
+    DEFAULT_CREDENTIAL_EXPIRED_TIME_RATIO = 0.5
diff --git a/clients/client-python/tests/integration/test_gvfs_with_abs.py 
b/clients/client-python/tests/integration/test_gvfs_with_abs.py
index a218efcfd..53c265c53 100644
--- a/clients/client-python/tests/integration/test_gvfs_with_abs.py
+++ b/clients/client-python/tests/integration/test_gvfs_with_abs.py
@@ -33,8 +33,6 @@ from gravitino import (
 )
 from gravitino.exceptions.base import GravitinoRuntimeException
 from gravitino.filesystem.gvfs_config import GVFSConfig
-from gravitino.filesystem.gvfs import StorageType
-
 
 logger = logging.getLogger(__name__)
 
@@ -281,7 +279,7 @@ class TestGvfsWithABS(TestGvfsWithHDFS):
         self.assertFalse(self.fs.exists(mkdir_actual_dir))
         self.assertFalse(fs.exists(mkdir_dir))
 
-        
self.assertFalse(self.fs.exists(f"{StorageType.ABS.value}://{new_bucket}"))
+        self.assertFalse(self.fs.exists("abfss://{new_bucket}"))
 
     def test_makedirs(self):
         mkdir_dir = self.fileset_gvfs_location + "/test_mkdir"
@@ -309,7 +307,7 @@ class TestGvfsWithABS(TestGvfsWithHDFS):
         self.assertFalse(self.fs.exists(mkdir_actual_dir))
 
         self.assertFalse(fs.exists(mkdir_dir))
-        
self.assertFalse(self.fs.exists(f"{StorageType.ABS.value}://{new_bucket}"))
+        self.assertFalse(self.fs.exists(f"abfss://{new_bucket}"))
 
     def test_ls(self):
         ls_dir = self.fileset_gvfs_location + "/test_ls"
diff --git 
a/clients/client-python/tests/integration/test_gvfs_with_abs_credential.py 
b/clients/client-python/tests/integration/test_gvfs_with_abs_credential.py
new file mode 100644
index 000000000..9071679fb
--- /dev/null
+++ b/clients/client-python/tests/integration/test_gvfs_with_abs_credential.py
@@ -0,0 +1,171 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import logging
+import os
+from random import randint
+import unittest
+
+
+from adlfs import AzureBlobFileSystem
+
+from gravitino import (
+    gvfs,
+    GravitinoClient,
+    Catalog,
+    Fileset,
+)
+from gravitino.filesystem.gvfs_config import GVFSConfig
+from tests.integration.test_gvfs_with_abs import TestGvfsWithABS
+
+
+logger = logging.getLogger(__name__)
+
+
+def azure_abs_with_credential_is_prepared():
+    return (
+        os.environ.get("ABS_ACCOUNT_NAME_FOR_CREDENTIAL")
+        and os.environ.get("ABS_ACCOUNT_KEY_FOR_CREDENTIAL")
+        and os.environ.get("ABS_CONTAINER_NAME_FOR_CREDENTIAL")
+        and os.environ.get("ABS_TENANT_ID_FOR_CREDENTIAL")
+        and os.environ.get("ABS_CLIENT_ID_FOR_CREDENTIAL")
+        and os.environ.get("ABS_CLIENT_SECRET_FOR_CREDENTIAL")
+    )
+
+
[email protected](
+    azure_abs_with_credential_is_prepared(),
+    "Azure Blob Storage credential test is not prepared.",
+)
+class TestGvfsWithCredentialABS(TestGvfsWithABS):
+    # Before running this test, please set the make sure azure-bundle-xxx.jar 
has been
+    # copy to the $GRAVITINO_HOME/catalogs/hadoop/libs/ directory
+    azure_abs_account_key = os.environ.get("ABS_ACCOUNT_KEY_FOR_CREDENTIAL")
+    azure_abs_account_name = os.environ.get("ABS_ACCOUNT_NAME_FOR_CREDENTIAL")
+    azure_abs_container_name = 
os.environ.get("ABS_CONTAINER_NAME_FOR_CREDENTIAL")
+    azure_abs_tenant_id = os.environ.get("ABS_TENANT_ID_FOR_CREDENTIAL")
+    azure_abs_client_id = os.environ.get("ABS_CLIENT_ID_FOR_CREDENTIAL")
+    azure_abs_client_secret = 
os.environ.get("ABS_CLIENT_SECRET_FOR_CREDENTIAL")
+
+    metalake_name: str = "TestGvfsWithCredentialABS_metalake" + str(randint(1, 
10000))
+
+    def setUp(self):
+        self.options = {
+            GVFSConfig.GVFS_FILESYSTEM_AZURE_ACCOUNT_NAME: 
self.azure_abs_account_name,
+            GVFSConfig.GVFS_FILESYSTEM_AZURE_ACCOUNT_KEY: 
self.azure_abs_account_key,
+        }
+
+    @classmethod
+    def _init_test_entities(cls):
+        cls.gravitino_admin_client.create_metalake(
+            name=cls.metalake_name, comment="", properties={}
+        )
+        cls.gravitino_client = GravitinoClient(
+            uri="http://localhost:8090";, metalake_name=cls.metalake_name
+        )
+
+        cls.config = {}
+        cls.conf = {}
+        catalog = cls.gravitino_client.create_catalog(
+            name=cls.catalog_name,
+            catalog_type=Catalog.Type.FILESET,
+            provider=cls.catalog_provider,
+            comment="",
+            properties={
+                "filesystem-providers": "abs",
+                "azure-storage-account-name": cls.azure_abs_account_name,
+                "azure-storage-account-key": cls.azure_abs_account_key,
+                "azure-tenant-id": cls.azure_abs_tenant_id,
+                "azure-client-id": cls.azure_abs_client_id,
+                "azure-client-secret": cls.azure_abs_client_secret,
+                "credential-providers": "adls-token",
+            },
+        )
+        catalog.as_schemas().create_schema(
+            schema_name=cls.schema_name, comment="", properties={}
+        )
+
+        cls.fileset_storage_location: str = (
+            
f"{cls.azure_abs_container_name}/{cls.catalog_name}/{cls.schema_name}/{cls.fileset_name}"
+        )
+        cls.fileset_gvfs_location = (
+            
f"gvfs://fileset/{cls.catalog_name}/{cls.schema_name}/{cls.fileset_name}"
+        )
+        catalog.as_fileset_catalog().create_fileset(
+            ident=cls.fileset_ident,
+            fileset_type=Fileset.Type.MANAGED,
+            comment=cls.fileset_comment,
+            storage_location=(
+                
f"abfss://{cls.azure_abs_container_name}@{cls.azure_abs_account_name}.dfs.core.windows.net/"
+                f"{cls.catalog_name}/{cls.schema_name}/{cls.fileset_name}"
+            ),
+            properties=cls.fileset_properties,
+        )
+
+        cls.fs = AzureBlobFileSystem(
+            account_name=cls.azure_abs_account_name,
+            account_key=cls.azure_abs_account_key,
+        )
+
+    # As the permission provided by the dynamic token is smaller compared to 
the permission provided by the static token
+    # like account key and account name, the test case will fail if we do not 
override the test case.
+    def test_mkdir(self):
+        mkdir_dir = self.fileset_gvfs_location + "/test_mkdir"
+        mkdir_actual_dir = self.fileset_storage_location + "/test_mkdir"
+        fs = gvfs.GravitinoVirtualFileSystem(
+            server_uri="http://localhost:8090";,
+            metalake_name=self.metalake_name,
+            options=self.options,
+            **self.conf,
+        )
+
+        # it actually takes no effect.
+        self.check_mkdir(mkdir_dir, mkdir_actual_dir, fs)
+
+        # check whether it will automatically create the bucket if 
'create_parents'
+        # is set to True.
+        new_bucket = self.azure_abs_container_name + "2"
+        mkdir_actual_dir = mkdir_actual_dir.replace(
+            self.azure_abs_container_name, new_bucket
+        )
+        self.fs.mkdir(mkdir_actual_dir, create_parents=True)
+
+        self.assertFalse(self.fs.exists(mkdir_actual_dir))
+
+        self.assertTrue(self.fs.exists(f"abfss://{new_bucket}"))
+
+    def test_makedirs(self):
+        mkdir_dir = self.fileset_gvfs_location + "/test_mkdir"
+        mkdir_actual_dir = self.fileset_storage_location + "/test_mkdir"
+        fs = gvfs.GravitinoVirtualFileSystem(
+            server_uri="http://localhost:8090";,
+            metalake_name=self.metalake_name,
+            options=self.options,
+            **self.conf,
+        )
+
+        # it actually takes no effect.
+        self.check_mkdir(mkdir_dir, mkdir_actual_dir, fs)
+
+        # check whether it will automatically create the bucket if 
'create_parents'
+        # is set to True.
+        new_bucket = self.azure_abs_container_name + "1"
+        new_mkdir_actual_dir = mkdir_actual_dir.replace(
+            self.azure_abs_container_name, new_bucket
+        )
+        self.fs.makedirs(new_mkdir_actual_dir)
+        self.assertFalse(self.fs.exists(mkdir_actual_dir))
diff --git a/clients/client-python/tests/integration/test_gvfs_with_gcs.py 
b/clients/client-python/tests/integration/test_gvfs_with_gcs.py
index 15833aca0..40e83d008 100644
--- a/clients/client-python/tests/integration/test_gvfs_with_gcs.py
+++ b/clients/client-python/tests/integration/test_gvfs_with_gcs.py
@@ -36,7 +36,7 @@ from gravitino.filesystem.gvfs_config import GVFSConfig
 logger = logging.getLogger(__name__)
 
 
-def oss_is_configured():
+def gcs_is_configured():
     return all(
         [
             os.environ.get("GCS_SERVICE_ACCOUNT_JSON_PATH") is not None,
@@ -45,7 +45,7 @@ def oss_is_configured():
     )
 
 
[email protected](oss_is_configured(), "GCS is not configured.")
[email protected](gcs_is_configured(), "GCS is not configured.")
 class TestGvfsWithGCS(TestGvfsWithHDFS):
     # Before running this test, please set the make sure gcp-bundle-x.jar has 
been
     # copy to the $GRAVITINO_HOME/catalogs/hadoop/libs/ directory
@@ -254,11 +254,10 @@ class TestGvfsWithGCS(TestGvfsWithHDFS):
         new_bucket = self.bucket_name + "1"
         mkdir_dir = mkdir_dir.replace(self.bucket_name, new_bucket)
         mkdir_actual_dir = mkdir_actual_dir.replace(self.bucket_name, 
new_bucket)
-        fs.mkdir(mkdir_dir, create_parents=True)
 
+        with self.assertRaises(OSError):
+            fs.mkdir(mkdir_dir, create_parents=True)
         self.assertFalse(self.fs.exists(mkdir_actual_dir))
-        self.assertFalse(fs.exists(mkdir_dir))
-        self.assertFalse(self.fs.exists("gs://" + new_bucket))
 
     def test_makedirs(self):
         mkdir_dir = self.fileset_gvfs_location + "/test_mkdir"
diff --git 
a/clients/client-python/tests/integration/test_gvfs_with_gcs_credential.py 
b/clients/client-python/tests/integration/test_gvfs_with_gcs_credential.py
new file mode 100644
index 000000000..eec502a13
--- /dev/null
+++ b/clients/client-python/tests/integration/test_gvfs_with_gcs_credential.py
@@ -0,0 +1,112 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import logging
+import os
+from random import randint
+import unittest
+
+from gcsfs import GCSFileSystem
+
+from gravitino import Catalog, Fileset, GravitinoClient
+from gravitino.filesystem import gvfs
+from tests.integration.test_gvfs_with_gcs import TestGvfsWithGCS
+
+logger = logging.getLogger(__name__)
+
+
+def gcs_with_credential_is_configured():
+    return all(
+        [
+            os.environ.get("GCS_SERVICE_ACCOUNT_JSON_PATH_FOR_CREDENTIAL") is 
not None,
+            os.environ.get("GCS_BUCKET_NAME_FOR_CREDENTIAL") is not None,
+        ]
+    )
+
+
[email protected](gcs_with_credential_is_configured(), "GCS is not 
configured.")
+class TestGvfsWithGCSCredential(TestGvfsWithGCS):
+    # Before running this test, please set the make sure gcp-bundle-x.jar has 
been
+    # copy to the $GRAVITINO_HOME/catalogs/hadoop/libs/ directory
+    key_file = os.environ.get("GCS_SERVICE_ACCOUNT_JSON_PATH_FOR_CREDENTIAL")
+    bucket_name = os.environ.get("GCS_BUCKET_NAME_FOR_CREDENTIAL")
+    metalake_name: str = "TestGvfsWithGCSCredential_metalake" + str(randint(1, 
10000))
+
+    @classmethod
+    def _init_test_entities(cls):
+        cls.gravitino_admin_client.create_metalake(
+            name=cls.metalake_name, comment="", properties={}
+        )
+        cls.gravitino_client = GravitinoClient(
+            uri="http://localhost:8090";, metalake_name=cls.metalake_name
+        )
+
+        cls.config = {}
+        cls.conf = {}
+        catalog = cls.gravitino_client.create_catalog(
+            name=cls.catalog_name,
+            catalog_type=Catalog.Type.FILESET,
+            provider=cls.catalog_provider,
+            comment="",
+            properties={
+                "filesystem-providers": "gcs",
+                "gcs-credential-file-path": cls.key_file,
+                "gcs-service-account-file": cls.key_file,
+                "credential-providers": "gcs-token",
+            },
+        )
+        catalog.as_schemas().create_schema(
+            schema_name=cls.schema_name, comment="", properties={}
+        )
+
+        cls.fileset_storage_location: str = (
+            
f"gs://{cls.bucket_name}/{cls.catalog_name}/{cls.schema_name}/{cls.fileset_name}"
+        )
+        cls.fileset_gvfs_location = (
+            
f"gvfs://fileset/{cls.catalog_name}/{cls.schema_name}/{cls.fileset_name}"
+        )
+        catalog.as_fileset_catalog().create_fileset(
+            ident=cls.fileset_ident,
+            fileset_type=Fileset.Type.MANAGED,
+            comment=cls.fileset_comment,
+            storage_location=cls.fileset_storage_location,
+            properties=cls.fileset_properties,
+        )
+
+        cls.fs = GCSFileSystem(token=cls.key_file)
+
+    def test_mkdir(self):
+        mkdir_dir = self.fileset_gvfs_location + "/test_mkdir"
+        mkdir_actual_dir = self.fileset_storage_location + "/test_mkdir"
+        fs = gvfs.GravitinoVirtualFileSystem(
+            server_uri="http://localhost:8090";,
+            metalake_name=self.metalake_name,
+            options=self.options,
+            **self.conf,
+        )
+
+        # it actually takes no effect.
+        self.check_mkdir(mkdir_dir, mkdir_actual_dir, fs)
+
+        # check whether it will automatically create the bucket if 
'create_parents'
+        # is set to True.
+        new_bucket = self.bucket_name + "1"
+        mkdir_dir = mkdir_dir.replace(self.bucket_name, new_bucket)
+        mkdir_actual_dir = mkdir_actual_dir.replace(self.bucket_name, 
new_bucket)
+
+        fs.mkdir(mkdir_dir, create_parents=True)
+        self.assertFalse(self.fs.exists(mkdir_actual_dir))
diff --git 
a/clients/client-python/tests/integration/test_gvfs_with_oss_credential.py 
b/clients/client-python/tests/integration/test_gvfs_with_oss_credential.py
new file mode 100644
index 000000000..14b8b5231
--- /dev/null
+++ b/clients/client-python/tests/integration/test_gvfs_with_oss_credential.py
@@ -0,0 +1,225 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import logging
+import os
+from random import randint
+import unittest
+
+
+from ossfs import OSSFileSystem
+
+from gravitino import (
+    GravitinoClient,
+    Catalog,
+    Fileset,
+)
+from gravitino.filesystem import gvfs
+from gravitino.filesystem.gvfs_config import GVFSConfig
+from tests.integration.test_gvfs_with_oss import TestGvfsWithOSS
+
+logger = logging.getLogger(__name__)
+
+
+def oss_with_credential_is_configured():
+    return all(
+        [
+            os.environ.get("OSS_STS_ACCESS_KEY_ID") is not None,
+            os.environ.get("OSS_STS_SECRET_ACCESS_KEY") is not None,
+            os.environ.get("OSS_STS_ENDPOINT") is not None,
+            os.environ.get("OSS_STS_BUCKET_NAME") is not None,
+            os.environ.get("OSS_STS_REGION") is not None,
+            os.environ.get("OSS_STS_ROLE_ARN") is not None,
+        ]
+    )
+
+
[email protected](
+    oss_with_credential_is_configured(), "OSS with crednetial is not 
configured."
+)
+class TestGvfsWithOSSCredential(TestGvfsWithOSS):
+    # Before running this test, please set the make sure aliyun-bundle-x.jar 
has been
+    # copy to the $GRAVITINO_HOME/catalogs/hadoop/libs/ directory
+    oss_access_key = os.environ.get("OSS_STS_ACCESS_KEY_ID")
+    oss_secret_key = os.environ.get("OSS_STS_SECRET_ACCESS_KEY")
+    oss_endpoint = os.environ.get("OSS_STS_ENDPOINT")
+    bucket_name = os.environ.get("OSS_STS_BUCKET_NAME")
+    oss_sts_region = os.environ.get("OSS_STS_REGION")
+    oss_sts_role_arn = os.environ.get("OSS_STS_ROLE_ARN")
+
+    metalake_name: str = "TestGvfsWithOSSCredential_metalake" + str(randint(1, 
10000))
+
+    def setUp(self):
+        self.options = {
+            f"{GVFSConfig.GVFS_FILESYSTEM_OSS_ACCESS_KEY}": 
self.oss_access_key,
+            f"{GVFSConfig.GVFS_FILESYSTEM_OSS_SECRET_KEY}": 
self.oss_secret_key,
+            f"{GVFSConfig.GVFS_FILESYSTEM_OSS_ENDPOINT}": self.oss_endpoint,
+        }
+
+    @classmethod
+    def _init_test_entities(cls):
+        cls.gravitino_admin_client.create_metalake(
+            name=cls.metalake_name, comment="", properties={}
+        )
+        cls.gravitino_client = GravitinoClient(
+            uri="http://localhost:8090";, metalake_name=cls.metalake_name
+        )
+
+        cls.config = {}
+        cls.conf = {}
+        catalog = cls.gravitino_client.create_catalog(
+            name=cls.catalog_name,
+            catalog_type=Catalog.Type.FILESET,
+            provider=cls.catalog_provider,
+            comment="",
+            properties={
+                "filesystem-providers": "oss",
+                "oss-access-key-id": cls.oss_access_key,
+                "oss-secret-access-key": cls.oss_secret_key,
+                "oss-endpoint": cls.oss_endpoint,
+                "oss-region": cls.oss_sts_region,
+                "oss-role-arn": cls.oss_sts_role_arn,
+                "credential-providers": "oss-token",
+            },
+        )
+        catalog.as_schemas().create_schema(
+            schema_name=cls.schema_name, comment="", properties={}
+        )
+
+        cls.fileset_storage_location: str = (
+            
f"oss://{cls.bucket_name}/{cls.catalog_name}/{cls.schema_name}/{cls.fileset_name}"
+        )
+        cls.fileset_gvfs_location = (
+            
f"gvfs://fileset/{cls.catalog_name}/{cls.schema_name}/{cls.fileset_name}"
+        )
+        catalog.as_fileset_catalog().create_fileset(
+            ident=cls.fileset_ident,
+            fileset_type=Fileset.Type.MANAGED,
+            comment=cls.fileset_comment,
+            storage_location=cls.fileset_storage_location,
+            properties=cls.fileset_properties,
+        )
+
+        cls.fs = OSSFileSystem(
+            key=cls.oss_access_key,
+            secret=cls.oss_secret_key,
+            endpoint=cls.oss_endpoint,
+        )
+
+    def test_cat_file(self):
+        cat_dir = self.fileset_gvfs_location + "/test_cat"
+        cat_actual_dir = self.fileset_storage_location + "/test_cat"
+        fs = gvfs.GravitinoVirtualFileSystem(
+            server_uri="http://localhost:8090";,
+            metalake_name=self.metalake_name,
+            options=self.options,
+            **self.conf,
+        )
+
+        self.check_mkdir(cat_dir, cat_actual_dir, fs)
+
+        cat_file = self.fileset_gvfs_location + "/test_cat/test.file"
+        cat_actual_file = self.fileset_storage_location + "/test_cat/test.file"
+        self.fs.touch(cat_actual_file)
+        self.assertTrue(self.fs.exists(cat_actual_file))
+        self.assertTrue(fs.exists(cat_file))
+
+        # test open and write file
+        with fs.open(cat_file, mode="wb") as f:
+            f.write(b"test_cat_file")
+        self.assertTrue(fs.info(cat_file)["size"] > 0)
+
+        # test cat file
+        content = fs.cat_file(cat_file)
+        self.assertEqual(b"test_cat_file", content)
+
+    @unittest.skip(
+        "Skip this test case because fs.ls(dir) using credential is always 
empty"
+    )
+    def test_ls(self):
+        ls_dir = self.fileset_gvfs_location + "/test_ls"
+        ls_actual_dir = self.fileset_storage_location + "/test_ls"
+
+        fs = gvfs.GravitinoVirtualFileSystem(
+            server_uri="http://localhost:8090";,
+            metalake_name=self.metalake_name,
+            options=self.options,
+            **self.conf,
+        )
+
+        self.check_mkdir(ls_dir, ls_actual_dir, fs)
+
+        ls_file = self.fileset_gvfs_location + "/test_ls/test.file"
+        ls_actual_file = self.fileset_storage_location + "/test_ls/test.file"
+        self.fs.touch(ls_actual_file)
+        self.assertTrue(self.fs.exists(ls_actual_file))
+
+        # test detail = false
+        file_list_without_detail = fs.ls(ls_dir, detail=False)
+        self.assertEqual(1, len(file_list_without_detail))
+        self.assertEqual(file_list_without_detail[0], ls_file[len("gvfs://") 
:])
+
+        # test detail = true
+        file_list_with_detail = fs.ls(ls_dir, detail=True)
+        self.assertEqual(1, len(file_list_with_detail))
+        self.assertEqual(file_list_with_detail[0]["name"], 
ls_file[len("gvfs://") :])
+
+    @unittest.skip(
+        "Skip this test case because fs.info(info_file) using credential is 
always None"
+    )
+    def test_info(self):
+        info_dir = self.fileset_gvfs_location + "/test_info"
+        info_actual_dir = self.fileset_storage_location + "/test_info"
+        fs = gvfs.GravitinoVirtualFileSystem(
+            server_uri="http://localhost:8090";,
+            metalake_name=self.metalake_name,
+            options=self.options,
+            **self.conf,
+        )
+
+        self.check_mkdir(info_dir, info_actual_dir, fs)
+
+        info_file = self.fileset_gvfs_location + "/test_info/test.file"
+        info_actual_file = self.fileset_storage_location + 
"/test_info/test.file"
+        self.fs.touch(info_actual_file)
+        self.assertTrue(self.fs.exists(info_actual_file))
+
+        ## OSS info has different behavior than S3 info. For OSS info, the 
name of the
+        ## directory will have a trailing slash if it's a directory and the 
path
+        # does not end with a slash, while S3 info will not have a trailing
+        # slash if it's a directory.
+
+        # >> > oss.info('bucket-xiaoyu/lisi')
+        # {'name': 'bucket-xiaoyu/lisi/', 'type': 'directory',
+        # 'size': 0, 'Size': 0, 'Key': 'bucket-xiaoyu/lisi/'}
+        # >> > oss.info('bucket-xiaoyu/lisi/')
+        # {'name': 'bucket-xiaoyu/lisi', 'size': 0,
+        # 'type': 'directory', 'Size': 0,
+        # 'Key': 'bucket-xiaoyu/lisi'
+
+        # >> > s3.info('paimon-bucket/lisi');
+        # {'name': 'paimon-bucket/lisi', 'type': 'directory', 'size': 0,
+        # 'StorageClass': 'DIRECTORY'}
+        # >> > s3.info('paimon-bucket/lisi/');
+        # {'name': 'paimon-bucket/lisi', 'type': 'directory', 'size': 0,
+        # 'StorageClass': 'DIRECTORY'}
+
+        dir_info = fs.info(info_dir)
+        self.assertEqual(dir_info["name"][:-1], info_dir[len("gvfs://") :])
+
+        file_info = fs.info(info_file)
+        self.assertEqual(file_info["name"], info_file[len("gvfs://") :])
diff --git 
a/clients/client-python/tests/integration/test_gvfs_with_s3_credential.py 
b/clients/client-python/tests/integration/test_gvfs_with_s3_credential.py
new file mode 100644
index 000000000..35d88c2c8
--- /dev/null
+++ b/clients/client-python/tests/integration/test_gvfs_with_s3_credential.py
@@ -0,0 +1,151 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import logging
+import os
+from random import randint
+import unittest
+
+from s3fs import S3FileSystem
+
+from gravitino import (
+    gvfs,
+    GravitinoClient,
+    Catalog,
+    Fileset,
+)
+from gravitino.filesystem.gvfs_config import GVFSConfig
+from tests.integration.test_gvfs_with_s3 import TestGvfsWithS3
+
+logger = logging.getLogger(__name__)
+
+
+def s3_with_credential_is_configured():
+    return all(
+        [
+            os.environ.get("S3_STS_ACCESS_KEY_ID") is not None,
+            os.environ.get("S3_STS_SECRET_ACCESS_KEY") is not None,
+            os.environ.get("S3_STS_ENDPOINT") is not None,
+            os.environ.get("S3_STS_BUCKET_NAME") is not None,
+            os.environ.get("S3_STS_REGION") is not None,
+            os.environ.get("S3_STS_ROLE_ARN") is not None,
+        ]
+    )
+
+
[email protected](s3_with_credential_is_configured(), "S3 is not 
configured.")
+class TestGvfsWithS3Credential(TestGvfsWithS3):
+    # Before running this test, please set the make sure aws-bundle-x.jar has 
been
+    # copy to the $GRAVITINO_HOME/catalogs/hadoop/libs/ directory
+    s3_access_key = os.environ.get("S3_STS_ACCESS_KEY_ID")
+    s3_secret_key = os.environ.get("S3_STS_SECRET_ACCESS_KEY")
+    s3_endpoint = os.environ.get("S3_STS_ENDPOINT")
+    bucket_name = os.environ.get("S3_STS_BUCKET_NAME")
+    s3_sts_region = os.environ.get("S3_STS_REGION")
+    s3_role_arn = os.environ.get("S3_STS_ROLE_ARN")
+
+    metalake_name: str = "TestGvfsWithS3Credential_metalake" + str(randint(1, 
10000))
+
+    def setUp(self):
+        self.options = {
+            f"{GVFSConfig.GVFS_FILESYSTEM_S3_ACCESS_KEY}": self.s3_access_key,
+            f"{GVFSConfig.GVFS_FILESYSTEM_S3_SECRET_KEY}": self.s3_secret_key,
+            f"{GVFSConfig.GVFS_FILESYSTEM_S3_ENDPOINT}": self.s3_endpoint,
+        }
+
+    @classmethod
+    def _init_test_entities(cls):
+        cls.gravitino_admin_client.create_metalake(
+            name=cls.metalake_name, comment="", properties={}
+        )
+        cls.gravitino_client = GravitinoClient(
+            uri="http://localhost:8090";, metalake_name=cls.metalake_name
+        )
+
+        cls.config = {}
+        cls.conf = {}
+        catalog = cls.gravitino_client.create_catalog(
+            name=cls.catalog_name,
+            catalog_type=Catalog.Type.FILESET,
+            provider=cls.catalog_provider,
+            comment="",
+            properties={
+                "filesystem-providers": "s3",
+                "s3-access-key-id": cls.s3_access_key,
+                "s3-secret-access-key": cls.s3_secret_key,
+                "s3-endpoint": cls.s3_endpoint,
+                "s3-region": cls.s3_sts_region,
+                "s3-role-arn": cls.s3_role_arn,
+                "credential-providers": "s3-token",
+            },
+        )
+        catalog.as_schemas().create_schema(
+            schema_name=cls.schema_name, comment="", properties={}
+        )
+
+        cls.fileset_storage_location: str = (
+            
f"s3a://{cls.bucket_name}/{cls.catalog_name}/{cls.schema_name}/{cls.fileset_name}"
+        )
+        cls.fileset_gvfs_location = (
+            
f"gvfs://fileset/{cls.catalog_name}/{cls.schema_name}/{cls.fileset_name}"
+        )
+        catalog.as_fileset_catalog().create_fileset(
+            ident=cls.fileset_ident,
+            fileset_type=Fileset.Type.MANAGED,
+            comment=cls.fileset_comment,
+            storage_location=cls.fileset_storage_location,
+            properties=cls.fileset_properties,
+        )
+
+        cls.fs = S3FileSystem(
+            key=cls.s3_access_key,
+            secret=cls.s3_secret_key,
+            endpoint_url=cls.s3_endpoint,
+        )
+
+    # The following tests are copied from 
tests/integration/test_gvfs_with_s3.py, with some modifications as
+    # `mkdir` and `makedirs` have different behavior in the S3, other cloud 
storage like GCS, ABS, and OSS.
+    # are similar.
+    def test_mkdir(self):
+        mkdir_dir = self.fileset_gvfs_location + "/test_mkdir"
+        mkdir_actual_dir = self.fileset_storage_location + "/test_mkdir"
+        fs = gvfs.GravitinoVirtualFileSystem(
+            server_uri="http://localhost:8090";,
+            metalake_name=self.metalake_name,
+            options=self.options,
+            **self.conf,
+        )
+
+        # it actually takes no effect.
+        self.check_mkdir(mkdir_dir, mkdir_actual_dir, fs)
+
+        with self.assertRaises(ValueError):
+            fs.mkdir(mkdir_dir, create_parents=True)
+        self.assertFalse(fs.exists(mkdir_dir))
+
+    def test_makedirs(self):
+        mkdir_dir = self.fileset_gvfs_location + "/test_mkdir"
+        mkdir_actual_dir = self.fileset_storage_location + "/test_mkdir"
+        fs = gvfs.GravitinoVirtualFileSystem(
+            server_uri="http://localhost:8090";,
+            metalake_name=self.metalake_name,
+            options=self.options,
+            **self.conf,
+        )
+
+        # it actually takes no effect.
+        self.check_mkdir(mkdir_dir, mkdir_actual_dir, fs)
diff --git a/docs/how-to-use-gvfs.md b/docs/how-to-use-gvfs.md
index 31ede3a53..aff3b74ad 100644
--- a/docs/how-to-use-gvfs.md
+++ b/docs/how-to-use-gvfs.md
@@ -455,17 +455,18 @@ to recompile the native libraries like `libhdfs` and 
others, and completely repl
 
 ### Configuration
 
-| Configuration item         | Description                                     
                                                                                
                         | Default value | Required                          | 
Since version    |
-|----------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------|---------------|-----------------------------------|------------------|
-| `server_uri`               | The Gravitino server uri, e.g. 
`http://localhost:8090`.                                                        
                                          | (none)        | Yes                 
              | 0.6.0-incubating |
-| `metalake_name`            | The metalake name which the fileset belongs to. 
                                                                                
                         | (none)        | Yes                               | 
0.6.0-incubating |
-| `cache_size`               | The cache capacity of the Gravitino Virtual 
File System.                                                                    
                             | `20`          | No                               
 | 0.6.0-incubating |                                                           
                                                           
-| `cache_expired_time`       | The value of time that the cache expires after 
accessing in the Gravitino Virtual File System. The value is in `seconds`.      
                          | `3600`        | No                                | 
0.6.0-incubating |
-| `auth_type`                | The auth type to initialize the Gravitino 
client to use with the Gravitino Virtual File System. Currently supports 
`simple` and `oauth2` auth types.     | `simple`      | No                      
          | 0.6.0-incubating |
-| `oauth2_server_uri`        | The auth server URI for the Gravitino client 
when using `oauth2` auth type.                                                  
                            | (none)        | Yes if you use `oauth2` auth type 
| 0.7.0-incubating |
-| `oauth2_credential`        | The auth credential for the Gravitino client 
when using `oauth2` auth type.                                                  
                            | (none)        | Yes if you use `oauth2` auth type 
| 0.7.0-incubating |
-| `oauth2_path`              | The auth server path for the Gravitino client 
when using `oauth2` auth type. Please remove the first slash `/` from the path, 
for example `oauth/token`. | (none)        | Yes if you use `oauth2` auth type 
| 0.7.0-incubating |
-| `oauth2_scope`             | The auth scope for the Gravitino client when 
using `oauth2` auth type with the Gravitino Virtual File System.                
                            | (none)        | Yes if you use `oauth2` auth type 
| 0.7.0-incubating |
+| Configuration item            | Description                                  
                                                                                
                                                                                
                                                                                
      | Default value | Required                          | Since version    |
+|-------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------|-----------------------------------|------------------|
+| `server_uri`                  | The Gravitino server uri, e.g. 
`http://localhost:8090`.                                                        
                                                                                
                                                                                
                    | (none)        | Yes                               | 
0.6.0-incubating |
+| `metalake_name`               | The metalake name which the fileset belongs 
to.                                                                             
                                                                                
                                                                                
       | (none)        | Yes                               | 0.6.0-incubating |
+| `cache_size`                  | The cache capacity of the Gravitino Virtual 
File System.                                                                    
                                                                                
                                                                                
       | `20`          | No                                | 0.6.0-incubating | 
                                                                                
               [...]
+| `cache_expired_time`          | The value of time that the cache expires 
after accessing in the Gravitino Virtual File System. The value is in 
`seconds`.                                                                      
                                                                                
                    | `3600`        | No                                | 
0.6.0-incubating |
+| `auth_type`                   | The auth type to initialize the Gravitino 
client to use with the Gravitino Virtual File System. Currently supports 
`simple` and `oauth2` auth types.                                               
                                                                                
                | `simple`      | No                                | 
0.6.0-incubating |
+| `oauth2_server_uri`           | The auth server URI for the Gravitino client 
when using `oauth2` auth type.                                                  
                                                                                
                                                                                
      | (none)        | Yes if you use `oauth2` auth type | 0.7.0-incubating |
+| `oauth2_credential`           | The auth credential for the Gravitino client 
when using `oauth2` auth type.                                                  
                                                                                
                                                                                
      | (none)        | Yes if you use `oauth2` auth type | 0.7.0-incubating |
+| `oauth2_path`                 | The auth server path for the Gravitino 
client when using `oauth2` auth type. Please remove the first slash `/` from 
the path, for example `oauth/token`.                                            
                                                                                
               | (none)        | Yes if you use `oauth2` auth type | 
0.7.0-incubating |
+| `oauth2_scope`                | The auth scope for the Gravitino client when 
using `oauth2` auth type with the Gravitino Virtual File System.                
                                                                                
                                                                                
      | (none)        | Yes if you use `oauth2` auth type | 0.7.0-incubating |
+| `credential_expiration_ratio` | The ratio of expiration time for credential 
from Gravitino. This is used in the cases where Gravitino Hadoop catalogs have 
enable credential vending. if the expiration time of credential fetched from 
Gravitino is 1 hour, GVFS client will try to refresh the credential in 1 * 0.9 
= 0.5 hour. | 0.5           | No                                | 
0.8.0-incubating |
 
 
 #### Extra configuration for S3, GCS, OSS fileset

Reply via email to