This is an automated email from the ASF dual-hosted git repository.

fanng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 78447ceca [#5973] feat(hadoop-catalog): Support credential when using 
fileset catalog with cloud storage (#5974)
78447ceca is described below

commit 78447cecac8fafbc7f7bde8797a5e821b7375bf0
Author: Qi Yu <[email protected]>
AuthorDate: Fri Jan 10 10:21:36 2025 +0800

    [#5973] feat(hadoop-catalog): Support credential when using fileset catalog 
with cloud storage (#5974)
    
    ### What changes were proposed in this pull request?
    
    Support dynamic credential in obtaining cloud storage fileset.
    
    ### Why are the changes needed?
    
    Static key are not very safe, we need to optimize it.
    
    Fix: #5973
    
    ### Does this PR introduce _any_ user-facing change?
    
    N/A
    
    ### How was this patch tested?
    
    ITs
---
 .../gravitino/oss/fs/OSSCredentialsProvider.java   |  88 +++++++++
 .../gravitino/oss/fs/OSSFileSystemProvider.java    |  20 ++-
 .../java/org/apache/gravitino/oss/fs/OSSUtils.java |  52 ++++++
 .../gravitino/s3/fs/S3CredentialsProvider.java     |  88 +++++++++
 .../gravitino/s3/fs/S3FileSystemProvider.java      |  30 +++-
 .../java/org/apache/gravitino/s3/fs/S3Utils.java   |  51 ++++++
 bundles/azure/build.gradle.kts                     |   1 -
 .../gravitino/abs/fs/AzureFileSystemProvider.java  |  42 ++++-
 .../abs/fs/AzureSasCredentialsProvider.java        |  87 +++++++++
 .../apache/gravitino/abs/fs/AzureStorageUtils.java |  66 +++++++
 bundles/gcp/build.gradle.kts                       |   1 +
 .../gravitino/gcs/fs/GCSCredentialsProvider.java   |  84 +++++++++
 .../gravitino/gcs/fs/GCSFileSystemProvider.java    |  19 +-
 .../java/org/apache/gravitino/gcs/fs/GCSUtils.java |  42 +++++
 catalogs/hadoop-common/build.gradle.kts            |   1 +
 .../catalog/hadoop/fs/FileSystemUtils.java         |  23 +++
 .../fs/GravitinoFileSystemCredentialsProvider.java |  38 ++++
 .../hadoop/fs/SupportsCredentialVending.java       |  37 ++++
 ...aultGravitinoFileSystemCredentialsProvider.java |  60 +++++++
 .../hadoop/GravitinoVirtualFileSystem.java         | 197 ++++++++-------------
 .../hadoop/GravitinoVirtualFileSystemUtils.java    | 151 ++++++++++++++++
 .../gravitino/filesystem/hadoop/TestGvfsBase.java  |  52 +++++-
 .../GravitinoVirtualFileSystemABSCredentialIT.java | 180 +++++++++++++++++++
 .../GravitinoVirtualFileSystemGCSCredentialIT.java | 150 ++++++++++++++++
 .../GravitinoVirtualFileSystemOSSCredentialIT.java | 168 ++++++++++++++++++
 .../GravitinoVirtualFileSystemS3CredentialIT.java  | 173 ++++++++++++++++++
 26 files changed, 1765 insertions(+), 136 deletions(-)

diff --git 
a/bundles/aliyun/src/main/java/org/apache/gravitino/oss/fs/OSSCredentialsProvider.java
 
b/bundles/aliyun/src/main/java/org/apache/gravitino/oss/fs/OSSCredentialsProvider.java
new file mode 100644
index 000000000..ef4afe434
--- /dev/null
+++ 
b/bundles/aliyun/src/main/java/org/apache/gravitino/oss/fs/OSSCredentialsProvider.java
@@ -0,0 +1,88 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.oss.fs;
+
+import com.aliyun.oss.common.auth.BasicCredentials;
+import com.aliyun.oss.common.auth.Credentials;
+import com.aliyun.oss.common.auth.CredentialsProvider;
+import com.aliyun.oss.common.auth.DefaultCredentials;
+import java.net.URI;
+import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
+import 
org.apache.gravitino.catalog.hadoop.fs.GravitinoFileSystemCredentialsProvider;
+import org.apache.gravitino.credential.Credential;
+import org.apache.gravitino.credential.OSSSecretKeyCredential;
+import org.apache.gravitino.credential.OSSTokenCredential;
+import org.apache.hadoop.conf.Configuration;
+
+public class OSSCredentialsProvider implements CredentialsProvider {
+
+  private GravitinoFileSystemCredentialsProvider 
gravitinoFileSystemCredentialsProvider;
+  private Credentials basicCredentials;
+  private long expirationTime = Long.MAX_VALUE;
+  private static final double EXPIRATION_TIME_FACTOR = 0.5D;
+
+  public OSSCredentialsProvider(URI uri, Configuration conf) {
+    this.gravitinoFileSystemCredentialsProvider = 
FileSystemUtils.getGvfsCredentialProvider(conf);
+  }
+
+  @Override
+  public void setCredentials(Credentials credentials) {}
+
+  @Override
+  public Credentials getCredentials() {
+    if (basicCredentials == null || System.currentTimeMillis() >= 
expirationTime) {
+      synchronized (this) {
+        refresh();
+      }
+    }
+
+    return basicCredentials;
+  }
+
+  private void refresh() {
+    Credential[] gravitinoCredentials = 
gravitinoFileSystemCredentialsProvider.getCredentials();
+    Credential credential = 
OSSUtils.getSuitableCredential(gravitinoCredentials);
+    if (credential == null) {
+      throw new RuntimeException("No suitable credential for OSS found...");
+    }
+
+    if (credential instanceof OSSSecretKeyCredential) {
+      OSSSecretKeyCredential ossSecretKeyCredential = (OSSSecretKeyCredential) 
credential;
+      basicCredentials =
+          new DefaultCredentials(
+              ossSecretKeyCredential.accessKeyId(), 
ossSecretKeyCredential.secretAccessKey());
+    } else if (credential instanceof OSSTokenCredential) {
+      OSSTokenCredential ossTokenCredential = (OSSTokenCredential) credential;
+      basicCredentials =
+          new BasicCredentials(
+              ossTokenCredential.accessKeyId(),
+              ossTokenCredential.secretAccessKey(),
+              ossTokenCredential.securityToken());
+    }
+
+    if (credential.expireTimeInMs() > 0) {
+      expirationTime =
+          System.currentTimeMillis()
+              + (long)
+                  ((credential.expireTimeInMs() - System.currentTimeMillis())
+                      * EXPIRATION_TIME_FACTOR);
+    }
+  }
+}
diff --git 
a/bundles/aliyun/src/main/java/org/apache/gravitino/oss/fs/OSSFileSystemProvider.java
 
b/bundles/aliyun/src/main/java/org/apache/gravitino/oss/fs/OSSFileSystemProvider.java
index b47d25335..358e3a08c 100644
--- 
a/bundles/aliyun/src/main/java/org/apache/gravitino/oss/fs/OSSFileSystemProvider.java
+++ 
b/bundles/aliyun/src/main/java/org/apache/gravitino/oss/fs/OSSFileSystemProvider.java
@@ -20,10 +20,15 @@ package org.apache.gravitino.oss.fs;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
 import java.io.IOException;
 import java.util.Map;
 import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider;
 import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
+import org.apache.gravitino.catalog.hadoop.fs.SupportsCredentialVending;
+import org.apache.gravitino.credential.Credential;
+import org.apache.gravitino.credential.OSSSecretKeyCredential;
+import org.apache.gravitino.credential.OSSTokenCredential;
 import org.apache.gravitino.storage.OSSProperties;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -31,7 +36,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem;
 import org.apache.hadoop.fs.aliyun.oss.Constants;
 
-public class OSSFileSystemProvider implements FileSystemProvider {
+public class OSSFileSystemProvider implements FileSystemProvider, 
SupportsCredentialVending {
 
   private static final String OSS_FILESYSTEM_IMPL = "fs.oss.impl";
 
@@ -61,9 +66,22 @@ public class OSSFileSystemProvider implements 
FileSystemProvider {
     }
 
     hadoopConfMap.forEach(configuration::set);
+
     return AliyunOSSFileSystem.newInstance(path.toUri(), configuration);
   }
 
+  @Override
+  public Map<String, String> getFileSystemCredentialConf(Credential[] 
credentials) {
+    Credential credential = OSSUtils.getSuitableCredential(credentials);
+    Map<String, String> result = Maps.newHashMap();
+    if (credential instanceof OSSSecretKeyCredential || credential instanceof 
OSSTokenCredential) {
+      result.put(
+          Constants.CREDENTIALS_PROVIDER_KEY, 
OSSCredentialsProvider.class.getCanonicalName());
+    }
+
+    return result;
+  }
+
   @Override
   public String scheme() {
     return "oss";
diff --git 
a/bundles/aliyun/src/main/java/org/apache/gravitino/oss/fs/OSSUtils.java 
b/bundles/aliyun/src/main/java/org/apache/gravitino/oss/fs/OSSUtils.java
new file mode 100644
index 000000000..87c71de37
--- /dev/null
+++ b/bundles/aliyun/src/main/java/org/apache/gravitino/oss/fs/OSSUtils.java
@@ -0,0 +1,52 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.oss.fs;
+
+import org.apache.gravitino.credential.Credential;
+import org.apache.gravitino.credential.OSSSecretKeyCredential;
+import org.apache.gravitino.credential.OSSTokenCredential;
+
+public class OSSUtils {
+
+  /**
+   * Get the credential from the credential array. Using dynamic credential 
first, if not found,
+   * uses static credential.
+   *
+   * @param credentials The credential array.
+   * @return A credential. Null if not found.
+   */
+  static Credential getSuitableCredential(Credential[] credentials) {
+    // Use dynamic credential if found.
+    for (Credential credential : credentials) {
+      if (credential instanceof OSSTokenCredential) {
+        return credential;
+      }
+    }
+
+    // If dynamic credential not found, use the static one
+    for (Credential credential : credentials) {
+      if (credential instanceof OSSSecretKeyCredential) {
+        return credential;
+      }
+    }
+
+    return null;
+  }
+}
diff --git 
a/bundles/aws/src/main/java/org/apache/gravitino/s3/fs/S3CredentialsProvider.java
 
b/bundles/aws/src/main/java/org/apache/gravitino/s3/fs/S3CredentialsProvider.java
new file mode 100644
index 000000000..2fc145889
--- /dev/null
+++ 
b/bundles/aws/src/main/java/org/apache/gravitino/s3/fs/S3CredentialsProvider.java
@@ -0,0 +1,88 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.s3.fs;
+
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.auth.BasicSessionCredentials;
+import java.net.URI;
+import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
+import 
org.apache.gravitino.catalog.hadoop.fs.GravitinoFileSystemCredentialsProvider;
+import org.apache.gravitino.credential.Credential;
+import org.apache.gravitino.credential.S3SecretKeyCredential;
+import org.apache.gravitino.credential.S3TokenCredential;
+import org.apache.hadoop.conf.Configuration;
+
+public class S3CredentialsProvider implements AWSCredentialsProvider {
+  private GravitinoFileSystemCredentialsProvider 
gravitinoFileSystemCredentialsProvider;
+
+  private AWSCredentials basicSessionCredentials;
+  private long expirationTime = Long.MAX_VALUE;
+  private static final double EXPIRATION_TIME_FACTOR = 0.5D;
+
+  public S3CredentialsProvider(final URI uri, final Configuration conf) {
+    this.gravitinoFileSystemCredentialsProvider = 
FileSystemUtils.getGvfsCredentialProvider(conf);
+  }
+
+  @Override
+  public AWSCredentials getCredentials() {
+    // Refresh credentials if they are null or about to expire.
+    if (basicSessionCredentials == null || System.currentTimeMillis() >= 
expirationTime) {
+      synchronized (this) {
+        refresh();
+      }
+    }
+
+    return basicSessionCredentials;
+  }
+
+  @Override
+  public void refresh() {
+    Credential[] gravitinoCredentials = 
gravitinoFileSystemCredentialsProvider.getCredentials();
+    Credential credential = 
S3Utils.getSuitableCredential(gravitinoCredentials);
+
+    if (credential == null) {
+      throw new RuntimeException("No suitable credential for S3 found...");
+    }
+
+    if (credential instanceof S3SecretKeyCredential) {
+      S3SecretKeyCredential s3SecretKeyCredential = (S3SecretKeyCredential) 
credential;
+      basicSessionCredentials =
+          new BasicAWSCredentials(
+              s3SecretKeyCredential.accessKeyId(), 
s3SecretKeyCredential.secretAccessKey());
+    } else if (credential instanceof S3TokenCredential) {
+      S3TokenCredential s3TokenCredential = (S3TokenCredential) credential;
+      basicSessionCredentials =
+          new BasicSessionCredentials(
+              s3TokenCredential.accessKeyId(),
+              s3TokenCredential.secretAccessKey(),
+              s3TokenCredential.sessionToken());
+    }
+
+    if (credential.expireTimeInMs() > 0) {
+      expirationTime =
+          System.currentTimeMillis()
+              + (long)
+                  ((credential.expireTimeInMs() - System.currentTimeMillis())
+                      * EXPIRATION_TIME_FACTOR);
+    }
+  }
+}
diff --git 
a/bundles/aws/src/main/java/org/apache/gravitino/s3/fs/S3FileSystemProvider.java
 
b/bundles/aws/src/main/java/org/apache/gravitino/s3/fs/S3FileSystemProvider.java
index b7cd569bb..cbe133ed7 100644
--- 
a/bundles/aws/src/main/java/org/apache/gravitino/s3/fs/S3FileSystemProvider.java
+++ 
b/bundles/aws/src/main/java/org/apache/gravitino/s3/fs/S3FileSystemProvider.java
@@ -25,11 +25,16 @@ import com.google.common.base.Joiner;
 import com.google.common.base.Splitter;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider;
 import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
+import org.apache.gravitino.catalog.hadoop.fs.SupportsCredentialVending;
+import org.apache.gravitino.credential.Credential;
+import org.apache.gravitino.credential.S3SecretKeyCredential;
+import org.apache.gravitino.credential.S3TokenCredential;
 import org.apache.gravitino.storage.S3Properties;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -39,9 +44,9 @@ import org.apache.hadoop.fs.s3a.S3AFileSystem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class S3FileSystemProvider implements FileSystemProvider {
+public class S3FileSystemProvider implements FileSystemProvider, 
SupportsCredentialVending {
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(S3FileSystemProvider.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(S3FileSystemProvider.class);
 
   @VisibleForTesting
   public static final Map<String, String> GRAVITINO_KEY_TO_S3_HADOOP_KEY =
@@ -61,18 +66,29 @@ public class S3FileSystemProvider implements 
FileSystemProvider {
     Map<String, String> hadoopConfMap =
         FileSystemUtils.toHadoopConfigMap(config, 
GRAVITINO_KEY_TO_S3_HADOOP_KEY);
 
+    hadoopConfMap.forEach(configuration::set);
     if (!hadoopConfMap.containsKey(S3_CREDENTIAL_KEY)) {
-      hadoopConfMap.put(S3_CREDENTIAL_KEY, S3_SIMPLE_CREDENTIAL);
+      configuration.set(S3_CREDENTIAL_KEY, S3_SIMPLE_CREDENTIAL);
     }
 
-    hadoopConfMap.forEach(configuration::set);
-
     // Hadoop-aws 2 does not support IAMInstanceCredentialsProvider
     checkAndSetCredentialProvider(configuration);
 
     return S3AFileSystem.newInstance(path.toUri(), configuration);
   }
 
+  @Override
+  public Map<String, String> getFileSystemCredentialConf(Credential[] 
credentials) {
+    Credential credential = S3Utils.getSuitableCredential(credentials);
+    Map<String, String> result = Maps.newHashMap();
+    if (credential instanceof S3SecretKeyCredential || credential instanceof 
S3TokenCredential) {
+      result.put(
+          Constants.AWS_CREDENTIALS_PROVIDER, 
S3CredentialsProvider.class.getCanonicalName());
+    }
+
+    return result;
+  }
+
   private void checkAndSetCredentialProvider(Configuration configuration) {
     String provides = configuration.get(S3_CREDENTIAL_KEY);
     if (provides == null) {
@@ -91,12 +107,12 @@ public class S3FileSystemProvider implements 
FileSystemProvider {
         if (AWSCredentialsProvider.class.isAssignableFrom(c)) {
           validProviders.add(provider);
         } else {
-          LOGGER.warn(
+          LOG.warn(
               "Credential provider {} is not a subclass of 
AWSCredentialsProvider, skipping",
               provider);
         }
       } catch (Exception e) {
-        LOGGER.warn(
+        LOG.warn(
             "Credential provider {} not found in the Hadoop runtime, falling 
back to default",
             provider);
         configuration.set(S3_CREDENTIAL_KEY, S3_SIMPLE_CREDENTIAL);
diff --git a/bundles/aws/src/main/java/org/apache/gravitino/s3/fs/S3Utils.java 
b/bundles/aws/src/main/java/org/apache/gravitino/s3/fs/S3Utils.java
new file mode 100644
index 000000000..078a1180b
--- /dev/null
+++ b/bundles/aws/src/main/java/org/apache/gravitino/s3/fs/S3Utils.java
@@ -0,0 +1,51 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.s3.fs;
+
+import org.apache.gravitino.credential.Credential;
+import org.apache.gravitino.credential.S3SecretKeyCredential;
+import org.apache.gravitino.credential.S3TokenCredential;
+
+public class S3Utils {
+
+  /**
+   * Get the credential from the credential array. Using dynamic credential 
first, if not found,
+   * uses static credential.
+   *
+   * @param credentials The credential array.
+   * @return A credential. Null if not found.
+   */
+  static Credential getSuitableCredential(Credential[] credentials) {
+    // Use dynamic credential if found.
+    for (Credential credential : credentials) {
+      if (credential instanceof S3TokenCredential) {
+        return credential;
+      }
+    }
+
+    // If dynamic credential not found, use the static one
+    for (Credential credential : credentials) {
+      if (credential instanceof S3SecretKeyCredential) {
+        return credential;
+      }
+    }
+    return null;
+  }
+}
diff --git a/bundles/azure/build.gradle.kts b/bundles/azure/build.gradle.kts
index 8dbd6ed48..1cbe4856a 100644
--- a/bundles/azure/build.gradle.kts
+++ b/bundles/azure/build.gradle.kts
@@ -28,7 +28,6 @@ dependencies {
   compileOnly(project(":api"))
   compileOnly(project(":catalogs:catalog-hadoop"))
   compileOnly(project(":core"))
-
   compileOnly(libs.hadoop3.abs)
   compileOnly(libs.hadoop3.client.api)
   compileOnly(libs.hadoop3.client.runtime)
diff --git 
a/bundles/azure/src/main/java/org/apache/gravitino/abs/fs/AzureFileSystemProvider.java
 
b/bundles/azure/src/main/java/org/apache/gravitino/abs/fs/AzureFileSystemProvider.java
index f89240441..3dcbb502f 100644
--- 
a/bundles/azure/src/main/java/org/apache/gravitino/abs/fs/AzureFileSystemProvider.java
+++ 
b/bundles/azure/src/main/java/org/apache/gravitino/abs/fs/AzureFileSystemProvider.java
@@ -19,19 +19,29 @@
 
 package org.apache.gravitino.abs.fs;
 
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_IS_HNS_ENABLED;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_SAS_TOKEN_PROVIDER_TYPE;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
 import java.io.IOException;
 import java.util.Map;
 import javax.annotation.Nonnull;
 import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider;
 import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
+import org.apache.gravitino.catalog.hadoop.fs.SupportsCredentialVending;
+import org.apache.gravitino.credential.ADLSTokenCredential;
+import org.apache.gravitino.credential.AzureAccountKeyCredential;
+import org.apache.gravitino.credential.Credential;
 import org.apache.gravitino.storage.AzureProperties;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.services.AuthType;
 
-public class AzureFileSystemProvider implements FileSystemProvider {
+public class AzureFileSystemProvider implements FileSystemProvider, 
SupportsCredentialVending {
 
   @VisibleForTesting public static final String ABS_PROVIDER_SCHEME = "abfss";
 
@@ -58,13 +68,39 @@ public class AzureFileSystemProvider implements 
FileSystemProvider {
           config.get(AzureProperties.GRAVITINO_AZURE_STORAGE_ACCOUNT_KEY));
     }
 
-    if (!config.containsKey(ABFS_IMPL_KEY)) {
+    if (!hadoopConfMap.containsKey(ABFS_IMPL_KEY)) {
       configuration.set(ABFS_IMPL_KEY, ABFS_IMPL);
     }
 
     hadoopConfMap.forEach(configuration::set);
 
-    return FileSystem.get(path.toUri(), configuration);
+    return FileSystem.newInstance(path.toUri(), configuration);
+  }
+
+  @Override
+  public Map<String, String> getFileSystemCredentialConf(Credential[] 
credentials) {
+    Credential credential = 
AzureStorageUtils.getSuitableCredential(credentials);
+    Map<String, String> result = Maps.newHashMap();
+    if (credential instanceof ADLSTokenCredential) {
+      ADLSTokenCredential adlsTokenCredential = (ADLSTokenCredential) 
credential;
+
+      String accountName =
+          String.format("%s.dfs.core.windows.net", 
adlsTokenCredential.accountName());
+      result.put(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME + "." + accountName, 
AuthType.SAS.name());
+      result.put(
+          FS_AZURE_SAS_TOKEN_PROVIDER_TYPE + "." + accountName,
+          AzureSasCredentialsProvider.class.getName());
+      result.put(FS_AZURE_ACCOUNT_IS_HNS_ENABLED, "true");
+    } else if (credential instanceof AzureAccountKeyCredential) {
+      AzureAccountKeyCredential azureAccountKeyCredential = 
(AzureAccountKeyCredential) credential;
+      result.put(
+          String.format(
+              "fs.azure.account.key.%s.dfs.core.windows.net",
+              azureAccountKeyCredential.accountName()),
+          azureAccountKeyCredential.accountKey());
+    }
+
+    return result;
   }
 
   @Override
diff --git 
a/bundles/azure/src/main/java/org/apache/gravitino/abs/fs/AzureSasCredentialsProvider.java
 
b/bundles/azure/src/main/java/org/apache/gravitino/abs/fs/AzureSasCredentialsProvider.java
new file mode 100644
index 000000000..85793d3d9
--- /dev/null
+++ 
b/bundles/azure/src/main/java/org/apache/gravitino/abs/fs/AzureSasCredentialsProvider.java
@@ -0,0 +1,87 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.abs.fs;
+
+import java.io.IOException;
+import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
+import 
org.apache.gravitino.catalog.hadoop.fs.GravitinoFileSystemCredentialsProvider;
+import org.apache.gravitino.credential.ADLSTokenCredential;
+import org.apache.gravitino.credential.Credential;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider;
+
+public class AzureSasCredentialsProvider implements SASTokenProvider, 
Configurable {
+
+  private Configuration configuration;
+  private String sasToken;
+
+  private GravitinoFileSystemCredentialsProvider 
gravitinoFileSystemCredentialsProvider;
+  private long expirationTime = Long.MAX_VALUE;
+  private static final double EXPIRATION_TIME_FACTOR = 0.5D;
+
+  @Override
+  public void setConf(Configuration configuration) {
+    this.configuration = configuration;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return configuration;
+  }
+
+  @Override
+  public void initialize(Configuration conf, String accountName) throws 
IOException {
+    this.configuration = conf;
+    this.gravitinoFileSystemCredentialsProvider = 
FileSystemUtils.getGvfsCredentialProvider(conf);
+  }
+
+  @Override
+  public String getSASToken(String account, String fileSystem, String path, 
String operation) {
+    // Refresh credentials if they are null or about to expire.
+    if (sasToken == null || System.currentTimeMillis() >= expirationTime) {
+      synchronized (this) {
+        refresh();
+      }
+    }
+    return sasToken;
+  }
+
+  private void refresh() {
+    Credential[] gravitinoCredentials = 
gravitinoFileSystemCredentialsProvider.getCredentials();
+    Credential credential = 
AzureStorageUtils.getADLSTokenCredential(gravitinoCredentials);
+    if (credential == null) {
+      throw new RuntimeException("No token credential for OSS found...");
+    }
+
+    if (credential instanceof ADLSTokenCredential) {
+      ADLSTokenCredential adlsTokenCredential = (ADLSTokenCredential) 
credential;
+      sasToken = adlsTokenCredential.sasToken();
+
+      if (credential.expireTimeInMs() > 0) {
+        expirationTime =
+            System.currentTimeMillis()
+                + (long)
+                    ((credential.expireTimeInMs() - System.currentTimeMillis())
+                        * EXPIRATION_TIME_FACTOR);
+      }
+    }
+  }
+}
diff --git 
a/bundles/azure/src/main/java/org/apache/gravitino/abs/fs/AzureStorageUtils.java
 
b/bundles/azure/src/main/java/org/apache/gravitino/abs/fs/AzureStorageUtils.java
new file mode 100644
index 000000000..873f61930
--- /dev/null
+++ 
b/bundles/azure/src/main/java/org/apache/gravitino/abs/fs/AzureStorageUtils.java
@@ -0,0 +1,66 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.abs.fs;
+
+import org.apache.gravitino.credential.ADLSTokenCredential;
+import org.apache.gravitino.credential.AzureAccountKeyCredential;
+import org.apache.gravitino.credential.Credential;
+
+public class AzureStorageUtils {
+
+  /**
+   * Get the ADLS credential from the credential array. Use the account and 
secret if dynamic token
+   * is not found, null if both are not found.
+   *
+   * @param credentials The credential array.
+   * @return A credential. Null if not found.
+   */
+  static Credential getSuitableCredential(Credential[] credentials) {
+    for (Credential credential : credentials) {
+      if (credential instanceof ADLSTokenCredential) {
+        return credential;
+      }
+    }
+
+    for (Credential credential : credentials) {
+      if (credential instanceof AzureAccountKeyCredential) {
+        return credential;
+      }
+    }
+
+    return null;
+  }
+
+  /**
+   * Get the ADLS token credential from the credential array. Null if not 
found.
+   *
+   * @param credentials The credential array.
+   * @return A credential. Null if not found.
+   */
+  static Credential getADLSTokenCredential(Credential[] credentials) {
+    for (Credential credential : credentials) {
+      if (credential instanceof ADLSTokenCredential) {
+        return credential;
+      }
+    }
+
+    return null;
+  }
+}
diff --git a/bundles/gcp/build.gradle.kts b/bundles/gcp/build.gradle.kts
index 95907f8a3..7d46fde9e 100644
--- a/bundles/gcp/build.gradle.kts
+++ b/bundles/gcp/build.gradle.kts
@@ -32,6 +32,7 @@ dependencies {
 
   compileOnly(libs.hadoop3.client.api)
   compileOnly(libs.hadoop3.client.runtime)
+  compileOnly(libs.hadoop3.gcs)
 
   implementation(project(":catalogs:catalog-common")) {
     exclude("*")
diff --git 
a/bundles/gcp/src/main/java/org/apache/gravitino/gcs/fs/GCSCredentialsProvider.java
 
b/bundles/gcp/src/main/java/org/apache/gravitino/gcs/fs/GCSCredentialsProvider.java
new file mode 100644
index 000000000..c4eefeeeb
--- /dev/null
+++ 
b/bundles/gcp/src/main/java/org/apache/gravitino/gcs/fs/GCSCredentialsProvider.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.gcs.fs;
+
+import com.google.cloud.hadoop.util.AccessTokenProvider;
+import java.io.IOException;
+import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
+import 
org.apache.gravitino.catalog.hadoop.fs.GravitinoFileSystemCredentialsProvider;
+import org.apache.gravitino.credential.Credential;
+import org.apache.gravitino.credential.GCSTokenCredential;
+import org.apache.hadoop.conf.Configuration;
+
+public class GCSCredentialsProvider implements AccessTokenProvider {
+  private Configuration configuration;
+  private GravitinoFileSystemCredentialsProvider 
gravitinoFileSystemCredentialsProvider;
+
+  private AccessToken accessToken;
+  private long expirationTime = Long.MAX_VALUE;
+  private static final double EXPIRATION_TIME_FACTOR = 0.5D;
+
+  @Override
+  public AccessToken getAccessToken() {
+    if (accessToken == null || System.currentTimeMillis() >= expirationTime) {
+      try {
+        refresh();
+      } catch (IOException e) {
+        throw new RuntimeException("Failed to refresh access token", e);
+      }
+    }
+    return accessToken;
+  }
+
+  @Override
+  public void refresh() throws IOException {
+    Credential[] gravitinoCredentials = 
gravitinoFileSystemCredentialsProvider.getCredentials();
+
+    Credential credential = 
GCSUtils.getGCSTokenCredential(gravitinoCredentials);
+    if (credential == null) {
+      throw new RuntimeException("No suitable credential for OSS found...");
+    }
+
+    if (credential instanceof GCSTokenCredential) {
+      GCSTokenCredential gcsTokenCredential = (GCSTokenCredential) credential;
+      accessToken = new AccessToken(gcsTokenCredential.token(), 
credential.expireTimeInMs());
+
+      if (credential.expireTimeInMs() > 0) {
+        expirationTime =
+            System.currentTimeMillis()
+                + (long)
+                    ((credential.expireTimeInMs() - System.currentTimeMillis())
+                        * EXPIRATION_TIME_FACTOR);
+      }
+    }
+  }
+
+  @Override
+  public void setConf(Configuration configuration) {
+    this.configuration = configuration;
+    this.gravitinoFileSystemCredentialsProvider =
+        FileSystemUtils.getGvfsCredentialProvider(configuration);
+  }
+
+  @Override
+  public Configuration getConf() {
+    return configuration;
+  }
+}
diff --git 
a/bundles/gcp/src/main/java/org/apache/gravitino/gcs/fs/GCSFileSystemProvider.java
 
b/bundles/gcp/src/main/java/org/apache/gravitino/gcs/fs/GCSFileSystemProvider.java
index b79b58ef4..7ab38b2d7 100644
--- 
a/bundles/gcp/src/main/java/org/apache/gravitino/gcs/fs/GCSFileSystemProvider.java
+++ 
b/bundles/gcp/src/main/java/org/apache/gravitino/gcs/fs/GCSFileSystemProvider.java
@@ -20,18 +20,23 @@ package org.apache.gravitino.gcs.fs;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
 import java.io.IOException;
 import java.util.Map;
 import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider;
 import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
+import org.apache.gravitino.catalog.hadoop.fs.SupportsCredentialVending;
+import org.apache.gravitino.credential.Credential;
+import org.apache.gravitino.credential.GCSTokenCredential;
 import org.apache.gravitino.storage.GCSProperties;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
-public class GCSFileSystemProvider implements FileSystemProvider {
+public class GCSFileSystemProvider implements FileSystemProvider, 
SupportsCredentialVending {
   private static final String GCS_SERVICE_ACCOUNT_JSON_FILE =
       "fs.gs.auth.service.account.json.keyfile";
+  private static final String GCS_TOKEN_PROVIDER_IMPL = 
"fs.gs.auth.access.token.provider.impl";
 
   @VisibleForTesting
   public static final Map<String, String> GRAVITINO_KEY_TO_GCS_HADOOP_KEY =
@@ -43,9 +48,21 @@ public class GCSFileSystemProvider implements 
FileSystemProvider {
     Configuration configuration = new Configuration();
     FileSystemUtils.toHadoopConfigMap(config, GRAVITINO_KEY_TO_GCS_HADOOP_KEY)
         .forEach(configuration::set);
+
     return FileSystem.newInstance(path.toUri(), configuration);
   }
 
+  @Override
+  public Map<String, String> getFileSystemCredentialConf(Credential[] 
credentials) {
+    Credential credential = GCSUtils.getGCSTokenCredential(credentials);
+    Map<String, String> result = Maps.newHashMap();
+    if (credential instanceof GCSTokenCredential) {
+      result.put(GCS_TOKEN_PROVIDER_IMPL, 
GCSCredentialsProvider.class.getName());
+    }
+
+    return result;
+  }
+
   @Override
   public String scheme() {
     return "gs";
diff --git 
a/bundles/gcp/src/main/java/org/apache/gravitino/gcs/fs/GCSUtils.java 
b/bundles/gcp/src/main/java/org/apache/gravitino/gcs/fs/GCSUtils.java
new file mode 100644
index 000000000..f8fbfd635
--- /dev/null
+++ b/bundles/gcp/src/main/java/org/apache/gravitino/gcs/fs/GCSUtils.java
@@ -0,0 +1,42 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.gcs.fs;
+
+import org.apache.gravitino.credential.Credential;
+import org.apache.gravitino.credential.GCSTokenCredential;
+
+public class GCSUtils {
+  /**
+   * Get the credential from the credential array. If the dynamic credential 
is not found, return
+   * null.
+   *
+   * @param credentials The credential array.
+   * @return An credential.
+   */
+  static Credential getGCSTokenCredential(Credential[] credentials) {
+    for (Credential credential : credentials) {
+      if (credential instanceof GCSTokenCredential) {
+        return credential;
+      }
+    }
+
+    return null;
+  }
+}
diff --git a/catalogs/hadoop-common/build.gradle.kts 
b/catalogs/hadoop-common/build.gradle.kts
index 566ce5986..09fd9f801 100644
--- a/catalogs/hadoop-common/build.gradle.kts
+++ b/catalogs/hadoop-common/build.gradle.kts
@@ -23,6 +23,7 @@ plugins {
 
 // try to avoid adding extra dependencies because it is used by catalogs and 
connectors.
 dependencies {
+  implementation(project(":api"))
   implementation(project(":catalogs:catalog-common"))
   implementation(libs.commons.lang3)
   implementation(libs.hadoop3.client.api)
diff --git 
a/catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/FileSystemUtils.java
 
b/catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/FileSystemUtils.java
index a1434e85c..11ecd1ee9 100644
--- 
a/catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/FileSystemUtils.java
+++ 
b/catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/FileSystemUtils.java
@@ -31,6 +31,7 @@ import java.util.Map;
 import java.util.ServiceLoader;
 import java.util.Set;
 import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
 
 public class FileSystemUtils {
 
@@ -160,4 +161,26 @@ public class FileSystemUtils {
 
     return result;
   }
+
+  /**
+   * Get the GravitinoFileSystemCredentialProvider from the configuration.
+   *
+   * @param conf Configuration
+   * @return GravitinoFileSystemCredentialProvider
+   */
+  public static GravitinoFileSystemCredentialsProvider 
getGvfsCredentialProvider(
+      Configuration conf) {
+    try {
+      GravitinoFileSystemCredentialsProvider 
gravitinoFileSystemCredentialsProvider =
+          (GravitinoFileSystemCredentialsProvider)
+              Class.forName(
+                      
conf.get(GravitinoFileSystemCredentialsProvider.GVFS_CREDENTIAL_PROVIDER))
+                  .getDeclaredConstructor()
+                  .newInstance();
+      gravitinoFileSystemCredentialsProvider.setConf(conf);
+      return gravitinoFileSystemCredentialsProvider;
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to create 
GravitinoFileSystemCredentialProvider", e);
+    }
+  }
 }
diff --git 
a/catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/GravitinoFileSystemCredentialsProvider.java
 
b/catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/GravitinoFileSystemCredentialsProvider.java
new file mode 100644
index 000000000..40c0492c7
--- /dev/null
+++ 
b/catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/GravitinoFileSystemCredentialsProvider.java
@@ -0,0 +1,38 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.catalog.hadoop.fs;
+
+import org.apache.gravitino.credential.Credential;
+import org.apache.hadoop.conf.Configurable;
+
+/** Interface for providing credentials for Gravitino Virtual File System. */
+public interface GravitinoFileSystemCredentialsProvider extends Configurable {
+
+  String GVFS_CREDENTIAL_PROVIDER = "fs.gvfs.credential.provider";
+
+  String GVFS_NAME_IDENTIFIER = "fs.gvfs.name.identifier";
+
+  /**
+   * Get credentials for Gravitino Virtual File System.
+   *
+   * @return credentials for Gravitino Virtual File System
+   */
+  Credential[] getCredentials();
+}
diff --git 
a/catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/SupportsCredentialVending.java
 
b/catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/SupportsCredentialVending.java
new file mode 100644
index 000000000..a9c0b688d
--- /dev/null
+++ 
b/catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/SupportsCredentialVending.java
@@ -0,0 +1,37 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.catalog.hadoop.fs;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.gravitino.credential.Credential;
+
+/** Interface for file systems that support credential vending. */
+public interface SupportsCredentialVending {
+  /**
+   * Get the configuration needed for the file system credential based on the 
credentials.
+   *
+   * @param credentials the credentials to be used for the file system
+   * @return the configuration for the file system credential
+   */
+  default Map<String, String> getFileSystemCredentialConf(Credential[] 
credentials) {
+    return ImmutableMap.of();
+  }
+}
diff --git 
a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/DefaultGravitinoFileSystemCredentialsProvider.java
 
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/DefaultGravitinoFileSystemCredentialsProvider.java
new file mode 100644
index 000000000..2f3278f87
--- /dev/null
+++ 
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/DefaultGravitinoFileSystemCredentialsProvider.java
@@ -0,0 +1,60 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.filesystem.hadoop;
+
+import org.apache.gravitino.NameIdentifier;
+import 
org.apache.gravitino.catalog.hadoop.fs.GravitinoFileSystemCredentialsProvider;
+import org.apache.gravitino.client.GravitinoClient;
+import org.apache.gravitino.credential.Credential;
+import org.apache.gravitino.file.Fileset;
+import org.apache.gravitino.file.FilesetCatalog;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Default implementation of {@link GravitinoFileSystemCredentialsProvider} 
which provides
+ * credentials for Gravitino Virtual File System.
+ */
+public class DefaultGravitinoFileSystemCredentialsProvider
+    implements GravitinoFileSystemCredentialsProvider {
+
+  private Configuration configuration;
+
+  @Override
+  public void setConf(Configuration configuration) {
+    this.configuration = configuration;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return configuration;
+  }
+
+  @Override
+  public Credential[] getCredentials() {
+    // The format of name identifier is `metalake.catalog.schema.fileset`
+    String nameIdentifier = configuration.get(GVFS_NAME_IDENTIFIER);
+    String[] idents = nameIdentifier.split("\\.");
+    try (GravitinoClient client = 
GravitinoVirtualFileSystemUtils.createClient(configuration)) {
+      FilesetCatalog filesetCatalog = 
client.loadCatalog(idents[1]).asFilesetCatalog();
+      Fileset fileset = 
filesetCatalog.loadFileset(NameIdentifier.of(idents[2], idents[3]));
+      return fileset.supportsCredentials().getCredentials();
+    }
+  }
+}
diff --git 
a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java
 
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java
index a9c40e558..26d248736 100644
--- 
a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java
+++ 
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java
@@ -23,35 +23,44 @@ import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.Scheduler;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import com.google.common.collect.Streams;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import java.io.File;
 import java.io.IOException;
 import java.net.URI;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.ServiceLoader;
+import java.util.Set;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.gravitino.Catalog;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.audit.CallerContext;
 import org.apache.gravitino.audit.FilesetAuditConstants;
 import org.apache.gravitino.audit.FilesetDataOperation;
 import org.apache.gravitino.audit.InternalClientType;
 import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider;
-import org.apache.gravitino.client.DefaultOAuth2TokenProvider;
+import 
org.apache.gravitino.catalog.hadoop.fs.GravitinoFileSystemCredentialsProvider;
+import org.apache.gravitino.catalog.hadoop.fs.SupportsCredentialVending;
 import org.apache.gravitino.client.GravitinoClient;
-import org.apache.gravitino.client.KerberosTokenProvider;
+import org.apache.gravitino.credential.Credential;
 import org.apache.gravitino.exceptions.GravitinoRuntimeException;
+import org.apache.gravitino.file.Fileset;
 import org.apache.gravitino.file.FilesetCatalog;
+import org.apache.gravitino.storage.AzureProperties;
+import org.apache.gravitino.storage.OSSProperties;
+import org.apache.gravitino.storage.S3Properties;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -79,7 +88,9 @@ public class GravitinoVirtualFileSystem extends FileSystem {
   private String metalakeName;
   private Cache<NameIdentifier, FilesetCatalog> catalogCache;
   private ScheduledThreadPoolExecutor catalogCleanScheduler;
-  private Cache<String, FileSystem> internalFileSystemCache;
+  // Fileset name identifier and its corresponding FileSystem cache, the name 
identifier has
+  // four levels, the first level is metalake name.
+  private Cache<NameIdentifier, FileSystem> internalFileSystemCache;
   private ScheduledThreadPoolExecutor internalFileSystemCleanScheduler;
 
   // The pattern is used to match gvfs path. The scheme prefix 
(gvfs://fileset) is optional.
@@ -91,6 +102,14 @@ public class GravitinoVirtualFileSystem extends FileSystem {
   private static final String SLASH = "/";
   private final Map<String, FileSystemProvider> fileSystemProvidersMap = 
Maps.newHashMap();
 
+  private static final Set<String> CATALOG_NECESSARY_PROPERTIES_TO_KEEP =
+      Sets.newHashSet(
+          OSSProperties.GRAVITINO_OSS_ENDPOINT,
+          OSSProperties.GRAVITINO_OSS_REGION,
+          S3Properties.GRAVITINO_S3_ENDPOINT,
+          S3Properties.GRAVITINO_S3_REGION,
+          AzureProperties.GRAVITINO_AZURE_STORAGE_ACCOUNT_NAME);
+
   @Override
   public void initialize(URI name, Configuration configuration) throws 
IOException {
     if 
(!name.toString().startsWith(GravitinoVirtualFileSystemConfiguration.GVFS_FILESET_PREFIX))
 {
@@ -132,8 +151,7 @@ public class GravitinoVirtualFileSystem extends FileSystem {
         "'%s' is not set in the configuration",
         
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_METALAKE_KEY);
 
-    initializeClient(configuration);
-
+    this.client = GravitinoVirtualFileSystemUtils.createClient(configuration);
     // Register the default local and HDFS FileSystemProvider
     fileSystemProvidersMap.putAll(getFileSystemProviders());
 
@@ -145,7 +163,7 @@ public class GravitinoVirtualFileSystem extends FileSystem {
   }
 
   @VisibleForTesting
-  Cache<String, FileSystem> internalFileSystemCache() {
+  Cache<NameIdentifier, FileSystem> internalFileSystemCache() {
     return internalFileSystemCache;
   }
 
@@ -193,116 +211,6 @@ public class GravitinoVirtualFileSystem extends 
FileSystem {
     return new ThreadFactoryBuilder().setDaemon(true).setNameFormat(name + 
"-%d").build();
   }
 
-  private void initializeClient(Configuration configuration) {
-    // initialize the Gravitino client
-    String serverUri =
-        
configuration.get(GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_SERVER_URI_KEY);
-    Preconditions.checkArgument(
-        StringUtils.isNotBlank(serverUri),
-        "'%s' is not set in the configuration",
-        GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_SERVER_URI_KEY);
-
-    String authType =
-        configuration.get(
-            
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_AUTH_TYPE_KEY,
-            GravitinoVirtualFileSystemConfiguration.SIMPLE_AUTH_TYPE);
-    if 
(authType.equalsIgnoreCase(GravitinoVirtualFileSystemConfiguration.SIMPLE_AUTH_TYPE))
 {
-      this.client =
-          
GravitinoClient.builder(serverUri).withMetalake(metalakeName).withSimpleAuth().build();
-    } else if (authType.equalsIgnoreCase(
-        GravitinoVirtualFileSystemConfiguration.OAUTH2_AUTH_TYPE)) {
-      String authServerUri =
-          configuration.get(
-              
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_OAUTH2_SERVER_URI_KEY);
-      checkAuthConfig(
-          GravitinoVirtualFileSystemConfiguration.OAUTH2_AUTH_TYPE,
-          
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_OAUTH2_SERVER_URI_KEY,
-          authServerUri);
-
-      String credential =
-          configuration.get(
-              
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_OAUTH2_CREDENTIAL_KEY);
-      checkAuthConfig(
-          GravitinoVirtualFileSystemConfiguration.OAUTH2_AUTH_TYPE,
-          
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_OAUTH2_CREDENTIAL_KEY,
-          credential);
-
-      String path =
-          configuration.get(
-              
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_OAUTH2_PATH_KEY);
-      checkAuthConfig(
-          GravitinoVirtualFileSystemConfiguration.OAUTH2_AUTH_TYPE,
-          
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_OAUTH2_PATH_KEY,
-          path);
-
-      String scope =
-          configuration.get(
-              
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_OAUTH2_SCOPE_KEY);
-      checkAuthConfig(
-          GravitinoVirtualFileSystemConfiguration.OAUTH2_AUTH_TYPE,
-          
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_OAUTH2_SCOPE_KEY,
-          scope);
-
-      DefaultOAuth2TokenProvider authDataProvider =
-          DefaultOAuth2TokenProvider.builder()
-              .withUri(authServerUri)
-              .withCredential(credential)
-              .withPath(path)
-              .withScope(scope)
-              .build();
-
-      this.client =
-          GravitinoClient.builder(serverUri)
-              .withMetalake(metalakeName)
-              .withOAuth(authDataProvider)
-              .build();
-    } else if (authType.equalsIgnoreCase(
-        GravitinoVirtualFileSystemConfiguration.KERBEROS_AUTH_TYPE)) {
-      String principal =
-          configuration.get(
-              
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_KERBEROS_PRINCIPAL_KEY);
-      checkAuthConfig(
-          GravitinoVirtualFileSystemConfiguration.KERBEROS_AUTH_TYPE,
-          
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_KERBEROS_PRINCIPAL_KEY,
-          principal);
-      String keytabFilePath =
-          configuration.get(
-              GravitinoVirtualFileSystemConfiguration
-                  .FS_GRAVITINO_CLIENT_KERBEROS_KEYTAB_FILE_PATH_KEY);
-      KerberosTokenProvider authDataProvider;
-      if (StringUtils.isNotBlank(keytabFilePath)) {
-        // Using principal and keytab to create auth provider
-        authDataProvider =
-            KerberosTokenProvider.builder()
-                .withClientPrincipal(principal)
-                .withKeyTabFile(new File(keytabFilePath))
-                .build();
-      } else {
-        // Using ticket cache to create auth provider
-        authDataProvider = 
KerberosTokenProvider.builder().withClientPrincipal(principal).build();
-      }
-      this.client =
-          GravitinoClient.builder(serverUri)
-              .withMetalake(metalakeName)
-              .withKerberosAuth(authDataProvider)
-              .build();
-    } else {
-      throw new IllegalArgumentException(
-          String.format(
-              "Unsupported authentication type: %s for %s.",
-              authType, 
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_AUTH_TYPE_KEY));
-    }
-  }
-
-  private void checkAuthConfig(String authType, String configKey, String 
configValue) {
-    Preconditions.checkArgument(
-        StringUtils.isNotBlank(configValue),
-        "%s should not be null if %s is set to %s.",
-        configKey,
-        
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_AUTH_TYPE_KEY,
-        authType);
-  }
-
   private String getVirtualLocation(NameIdentifier identifier, boolean 
withScheme) {
     return String.format(
         "%s/%s/%s/%s",
@@ -360,6 +268,7 @@ public class GravitinoVirtualFileSystem extends FileSystem {
     FilesetCatalog filesetCatalog =
         catalogCache.get(
             catalogIdent, ident -> 
client.loadCatalog(catalogIdent.name()).asFilesetCatalog());
+    Catalog catalog = (Catalog) filesetCatalog;
     Preconditions.checkArgument(
         filesetCatalog != null, String.format("Loaded fileset catalog: %s is 
null.", catalogIdent));
 
@@ -383,8 +292,8 @@ public class GravitinoVirtualFileSystem extends FileSystem {
         StringUtils.isNotBlank(scheme), "Scheme of the actual file location 
cannot be null.");
     FileSystem fs =
         internalFileSystemCache.get(
-            scheme,
-            str -> {
+            identifier,
+            ident -> {
               try {
                 FileSystemProvider provider = 
fileSystemProvidersMap.get(scheme);
                 if (provider == null) {
@@ -398,8 +307,19 @@ public class GravitinoVirtualFileSystem extends FileSystem 
{
                 // https://github.com/apache/gravitino/issues/5609
                 resetFileSystemServiceLoader(scheme);
 
-                Map<String, String> maps = getConfigMap(getConf());
-                return provider.getFileSystem(filePath, maps);
+                Map<String, String> necessaryPropertyFromCatalog =
+                    catalog.properties().entrySet().stream()
+                        .filter(
+                            property ->
+                                
CATALOG_NECESSARY_PROPERTIES_TO_KEEP.contains(property.getKey()))
+                        .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+                Map<String, String> totalProperty = 
Maps.newHashMap(necessaryPropertyFromCatalog);
+                totalProperty.putAll(getConfigMap(getConf()));
+
+                totalProperty.putAll(getCredentialProperties(provider, 
catalog, identifier));
+
+                return provider.getFileSystem(filePath, totalProperty);
               } catch (IOException ioe) {
                 throw new GravitinoRuntimeException(
                     "Exception occurs when create new FileSystem for actual 
uri: %s, msg: %s",
@@ -410,6 +330,41 @@ public class GravitinoVirtualFileSystem extends FileSystem 
{
     return new FilesetContextPair(new Path(actualFileLocation), fs);
   }
 
+  private Map<String, String> getCredentialProperties(
+      FileSystemProvider fileSystemProvider, Catalog catalog, NameIdentifier 
filesetIdentifier) {
+    // Do not support credential vending, we do not need to add any credential 
properties.
+    if (!(fileSystemProvider instanceof SupportsCredentialVending)) {
+      return ImmutableMap.of();
+    }
+
+    ImmutableMap.Builder<String, String> mapBuilder = ImmutableMap.builder();
+    try {
+      Fileset fileset =
+          catalog
+              .asFilesetCatalog()
+              .loadFileset(
+                  NameIdentifier.of(
+                      filesetIdentifier.namespace().level(2), 
filesetIdentifier.name()));
+      Credential[] credentials = 
fileset.supportsCredentials().getCredentials();
+      if (credentials.length > 0) {
+        mapBuilder.put(
+            GravitinoFileSystemCredentialsProvider.GVFS_CREDENTIAL_PROVIDER,
+            
DefaultGravitinoFileSystemCredentialsProvider.class.getCanonicalName());
+        mapBuilder.put(
+            GravitinoFileSystemCredentialsProvider.GVFS_NAME_IDENTIFIER,
+            filesetIdentifier.toString());
+
+        SupportsCredentialVending supportsCredentialVending =
+            (SupportsCredentialVending) fileSystemProvider;
+        
mapBuilder.putAll(supportsCredentialVending.getFileSystemCredentialConf(credentials));
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+
+    return mapBuilder.build();
+  }
+
   private void resetFileSystemServiceLoader(String fsScheme) {
     try {
       Map<String, Class<? extends FileSystem>> serviceFileSystems =
diff --git 
a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemUtils.java
 
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemUtils.java
new file mode 100644
index 000000000..8a0d1d874
--- /dev/null
+++ 
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemUtils.java
@@ -0,0 +1,151 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.filesystem.hadoop;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.client.DefaultOAuth2TokenProvider;
+import org.apache.gravitino.client.GravitinoClient;
+import org.apache.gravitino.client.KerberosTokenProvider;
+import org.apache.hadoop.conf.Configuration;
+
+/** Utility class for Gravitino Virtual File System. */
+public class GravitinoVirtualFileSystemUtils {
+
+  /**
+   * Get Gravitino client by the configuration.
+   *
+   * @param configuration The configuration for the Gravitino client.
+   * @return The Gravitino client.
+   */
+  public static GravitinoClient createClient(Configuration configuration) {
+    // initialize the Gravitino client
+    String serverUri =
+        
configuration.get(GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_SERVER_URI_KEY);
+    String metalakeValue =
+        
configuration.get(GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_METALAKE_KEY);
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(serverUri),
+        "'%s' is not set in the configuration",
+        GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_SERVER_URI_KEY);
+
+    String authType =
+        configuration.get(
+            
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_AUTH_TYPE_KEY,
+            GravitinoVirtualFileSystemConfiguration.SIMPLE_AUTH_TYPE);
+    if 
(authType.equalsIgnoreCase(GravitinoVirtualFileSystemConfiguration.SIMPLE_AUTH_TYPE))
 {
+      return GravitinoClient.builder(serverUri)
+          .withMetalake(metalakeValue)
+          .withSimpleAuth()
+          .build();
+    } else if (authType.equalsIgnoreCase(
+        GravitinoVirtualFileSystemConfiguration.OAUTH2_AUTH_TYPE)) {
+      String authServerUri =
+          configuration.get(
+              
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_OAUTH2_SERVER_URI_KEY);
+      checkAuthConfig(
+          GravitinoVirtualFileSystemConfiguration.OAUTH2_AUTH_TYPE,
+          
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_OAUTH2_SERVER_URI_KEY,
+          authServerUri);
+
+      String credential =
+          configuration.get(
+              
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_OAUTH2_CREDENTIAL_KEY);
+      checkAuthConfig(
+          GravitinoVirtualFileSystemConfiguration.OAUTH2_AUTH_TYPE,
+          
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_OAUTH2_CREDENTIAL_KEY,
+          credential);
+
+      String path =
+          configuration.get(
+              
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_OAUTH2_PATH_KEY);
+      checkAuthConfig(
+          GravitinoVirtualFileSystemConfiguration.OAUTH2_AUTH_TYPE,
+          
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_OAUTH2_PATH_KEY,
+          path);
+
+      String scope =
+          configuration.get(
+              
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_OAUTH2_SCOPE_KEY);
+      checkAuthConfig(
+          GravitinoVirtualFileSystemConfiguration.OAUTH2_AUTH_TYPE,
+          
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_OAUTH2_SCOPE_KEY,
+          scope);
+
+      DefaultOAuth2TokenProvider authDataProvider =
+          DefaultOAuth2TokenProvider.builder()
+              .withUri(authServerUri)
+              .withCredential(credential)
+              .withPath(path)
+              .withScope(scope)
+              .build();
+
+      return GravitinoClient.builder(serverUri)
+          .withMetalake(metalakeValue)
+          .withOAuth(authDataProvider)
+          .build();
+    } else if (authType.equalsIgnoreCase(
+        GravitinoVirtualFileSystemConfiguration.KERBEROS_AUTH_TYPE)) {
+      String principal =
+          configuration.get(
+              
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_KERBEROS_PRINCIPAL_KEY);
+      checkAuthConfig(
+          GravitinoVirtualFileSystemConfiguration.KERBEROS_AUTH_TYPE,
+          
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_KERBEROS_PRINCIPAL_KEY,
+          principal);
+      String keytabFilePath =
+          configuration.get(
+              GravitinoVirtualFileSystemConfiguration
+                  .FS_GRAVITINO_CLIENT_KERBEROS_KEYTAB_FILE_PATH_KEY);
+      KerberosTokenProvider authDataProvider;
+      if (StringUtils.isNotBlank(keytabFilePath)) {
+        // Using principal and keytab to create auth provider
+        authDataProvider =
+            KerberosTokenProvider.builder()
+                .withClientPrincipal(principal)
+                .withKeyTabFile(new File(keytabFilePath))
+                .build();
+      } else {
+        // Using ticket cache to create auth provider
+        authDataProvider = 
KerberosTokenProvider.builder().withClientPrincipal(principal).build();
+      }
+
+      return GravitinoClient.builder(serverUri)
+          .withMetalake(metalakeValue)
+          .withKerberosAuth(authDataProvider)
+          .build();
+    } else {
+      throw new IllegalArgumentException(
+          String.format(
+              "Unsupported authentication type: %s for %s.",
+              authType, 
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_AUTH_TYPE_KEY));
+    }
+  }
+
+  private static void checkAuthConfig(String authType, String configKey, 
String configValue) {
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(configValue),
+        "%s should not be null if %s is set to %s.",
+        configKey,
+        
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_AUTH_TYPE_KEY,
+        authType);
+  }
+}
diff --git 
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/TestGvfsBase.java
 
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/TestGvfsBase.java
index e7e3b7857..be30e42a4 100644
--- 
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/TestGvfsBase.java
+++ 
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/TestGvfsBase.java
@@ -27,6 +27,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.collect.ImmutableMap;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -40,7 +41,13 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.dto.AuditDTO;
+import org.apache.gravitino.dto.credential.CredentialDTO;
+import org.apache.gravitino.dto.file.FilesetDTO;
+import org.apache.gravitino.dto.responses.CredentialResponse;
 import org.apache.gravitino.dto.responses.FileLocationResponse;
+import org.apache.gravitino.dto.responses.FilesetResponse;
+import org.apache.gravitino.file.Fileset;
 import org.apache.gravitino.rest.RESTUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -140,6 +147,7 @@ public class TestGvfsBase extends GravitinoMockServerBase {
       queryParams.put("sub_path", RESTUtils.encodeString(""));
       try {
         buildMockResource(Method.GET, locationPath, queryParams, null, 
fileLocationResponse, SC_OK);
+        buildMockResourceForCredential(filesetName, localPath.toString());
       } catch (JsonProcessingException e) {
         throw new RuntimeException(e);
       }
@@ -149,7 +157,8 @@ public class TestGvfsBase extends GravitinoMockServerBase {
           Objects.requireNonNull(
               ((GravitinoVirtualFileSystem) gravitinoFileSystem)
                   .internalFileSystemCache()
-                  .getIfPresent("file"));
+                  .getIfPresent(
+                      NameIdentifier.of(metalakeName, catalogName, schemaName, 
"testFSCache")));
 
       String anotherFilesetName = "test_new_fs";
       Path diffLocalPath =
@@ -184,6 +193,7 @@ public class TestGvfsBase extends GravitinoMockServerBase {
       try {
         buildMockResource(
             Method.GET, locationPath1, queryParams, null, 
fileLocationResponse, SC_OK);
+        buildMockResourceForCredential("fileset1", localPath1.toString());
       } catch (JsonProcessingException e) {
         throw new RuntimeException(e);
       }
@@ -199,7 +209,10 @@ public class TestGvfsBase extends GravitinoMockServerBase {
                       0,
                       ((GravitinoVirtualFileSystem) 
fs).internalFileSystemCache().asMap().size()));
 
-      assertNull(((GravitinoVirtualFileSystem) 
fs).internalFileSystemCache().getIfPresent("file"));
+      assertNull(
+          ((GravitinoVirtualFileSystem) fs)
+              .internalFileSystemCache()
+              .getIfPresent(NameIdentifier.of("file")));
     }
   }
 
@@ -224,6 +237,7 @@ public class TestGvfsBase extends GravitinoMockServerBase {
       queryParams.put("sub_path", RESTUtils.encodeString("/test.txt"));
       try {
         buildMockResource(Method.GET, locationPath, queryParams, null, 
fileLocationResponse, SC_OK);
+        buildMockResourceForCredential(filesetName, localPath + "/test.txt");
       } catch (JsonProcessingException e) {
         throw new RuntimeException(e);
       }
@@ -276,6 +290,7 @@ public class TestGvfsBase extends GravitinoMockServerBase {
       queryParams.put("sub_path", RESTUtils.encodeString("/test.txt"));
       try {
         buildMockResource(Method.GET, locationPath, queryParams, null, 
fileLocationResponse, SC_OK);
+        buildMockResourceForCredential(filesetName, localPath + "/test.txt");
       } catch (JsonProcessingException e) {
         throw new RuntimeException(e);
       }
@@ -309,6 +324,32 @@ public class TestGvfsBase extends GravitinoMockServerBase {
     }
   }
 
+  private void buildMockResourceForCredential(String filesetName, String 
filesetLocation)
+      throws JsonProcessingException {
+    String filesetPath =
+        String.format(
+            "/api/metalakes/%s/catalogs/%s/schemas/%s/filesets/%s",
+            metalakeName, catalogName, schemaName, filesetName);
+    String credentialsPath =
+        String.format(
+            "/api/metalakes/%s/objects/fileset/%s.%s.%s/credentials",
+            metalakeName, catalogName, schemaName, filesetName);
+    FilesetResponse filesetResponse =
+        new FilesetResponse(
+            FilesetDTO.builder()
+                .name(filesetName)
+                .comment("comment")
+                .type(Fileset.Type.MANAGED)
+                .audit(AuditDTO.builder().build())
+                .storageLocation(filesetLocation.toString())
+                .build());
+    CredentialResponse credentialResponse = new CredentialResponse(new 
CredentialDTO[] {});
+
+    buildMockResource(Method.GET, filesetPath, ImmutableMap.of(), null, 
filesetResponse, SC_OK);
+    buildMockResource(
+        Method.GET, credentialsPath, ImmutableMap.of(), null, 
credentialResponse, SC_OK);
+  }
+
   @ParameterizedTest
   @ValueSource(booleans = {true, false})
   public void testRename(boolean withScheme) throws IOException {
@@ -343,6 +384,7 @@ public class TestGvfsBase extends GravitinoMockServerBase {
       try {
         buildMockResource(
             Method.GET, locationPath, queryParams1, null, 
fileLocationResponse1, SC_OK);
+        buildMockResourceForCredential(filesetName, localPath + 
"/rename_dst2");
       } catch (JsonProcessingException e) {
         throw new RuntimeException(e);
       }
@@ -409,6 +451,7 @@ public class TestGvfsBase extends GravitinoMockServerBase {
       queryParams.put("sub_path", RESTUtils.encodeString("/test_delete"));
       try {
         buildMockResource(Method.GET, locationPath, queryParams, null, 
fileLocationResponse, SC_OK);
+        buildMockResourceForCredential(filesetName, localPath + 
"/test_delete");
       } catch (JsonProcessingException e) {
         throw new RuntimeException(e);
       }
@@ -455,6 +498,7 @@ public class TestGvfsBase extends GravitinoMockServerBase {
       queryParams.put("sub_path", RESTUtils.encodeString(""));
       try {
         buildMockResource(Method.GET, locationPath, queryParams, null, 
fileLocationResponse, SC_OK);
+        buildMockResourceForCredential(filesetName, localPath.toString());
       } catch (JsonProcessingException e) {
         throw new RuntimeException(e);
       }
@@ -499,6 +543,7 @@ public class TestGvfsBase extends GravitinoMockServerBase {
       queryParams.put("sub_path", RESTUtils.encodeString(""));
       try {
         buildMockResource(Method.GET, locationPath, queryParams, null, 
fileLocationResponse, SC_OK);
+        buildMockResourceForCredential(filesetName, localPath.toString());
       } catch (JsonProcessingException e) {
         throw new RuntimeException(e);
       }
@@ -549,6 +594,7 @@ public class TestGvfsBase extends GravitinoMockServerBase {
       queryParams.put("sub_path", RESTUtils.encodeString("/test_mkdirs"));
       try {
         buildMockResource(Method.GET, locationPath, queryParams, null, 
fileLocationResponse, SC_OK);
+        buildMockResourceForCredential(filesetName, localPath + 
"/test_mkdirs");
       } catch (JsonProcessingException e) {
         throw new RuntimeException(e);
       }
@@ -667,6 +713,7 @@ public class TestGvfsBase extends GravitinoMockServerBase {
       queryParams.put("sub_path", RESTUtils.encodeString(""));
       try {
         buildMockResource(Method.GET, locationPath, queryParams, null, 
fileLocationResponse, SC_OK);
+        buildMockResourceForCredential(filesetName, localPath.toString());
       } catch (JsonProcessingException e) {
         throw new RuntimeException(e);
       }
@@ -693,6 +740,7 @@ public class TestGvfsBase extends GravitinoMockServerBase {
       queryParams.put("sub_path", RESTUtils.encodeString(""));
       try {
         buildMockResource(Method.GET, locationPath, queryParams, null, 
fileLocationResponse, SC_OK);
+        buildMockResourceForCredential(filesetName, localPath.toString());
       } catch (JsonProcessingException e) {
         throw new RuntimeException(e);
       }
diff --git 
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemABSCredentialIT.java
 
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemABSCredentialIT.java
new file mode 100644
index 000000000..2f79332e8
--- /dev/null
+++ 
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemABSCredentialIT.java
@@ -0,0 +1,180 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.filesystem.hadoop.integration.test;
+
+import static 
org.apache.gravitino.catalog.hadoop.HadoopCatalogPropertiesMetadata.FILESYSTEM_PROVIDERS;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.abs.fs.AzureFileSystemProvider;
+import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
+import org.apache.gravitino.credential.CredentialConstants;
+import org.apache.gravitino.integration.test.util.GravitinoITUtils;
+import org.apache.gravitino.storage.AzureProperties;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.condition.EnabledIf;
+import org.junit.platform.commons.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@EnabledIf("absIsConfigured")
+public class GravitinoVirtualFileSystemABSCredentialIT extends 
GravitinoVirtualFileSystemIT {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(GravitinoVirtualFileSystemABSCredentialIT.class);
+
+  public static final String ABS_ACCOUNT_NAME = 
System.getenv("ABS_ACCOUNT_NAME_FOR_CREDENTIAL");
+  public static final String ABS_ACCOUNT_KEY = 
System.getenv("ABS_ACCOUNT_KEY_FOR_CREDENTIAL");
+  public static final String ABS_CONTAINER_NAME =
+      System.getenv("ABS_CONTAINER_NAME_FOR_CREDENTIAL");
+  public static final String ABS_TENANT_ID = 
System.getenv("ABS_TENANT_ID_FOR_CREDENTIAL");
+  public static final String ABS_CLIENT_ID = 
System.getenv("ABS_CLIENT_ID_FOR_CREDENTIAL");
+  public static final String ABS_CLIENT_SECRET = 
System.getenv("ABS_CLIENT_SECRET_FOR_CREDENTIAL");
+
+  @BeforeAll
+  public void startIntegrationTest() {
+    // Do nothing
+  }
+
+  @BeforeAll
+  public void startUp() throws Exception {
+    // Copy the Azure jars to the gravitino server if in deploy mode.
+    copyBundleJarsToHadoop("azure-bundle");
+    // Need to download jars to gravitino server
+    super.startIntegrationTest();
+
+    // This value can be by tune by the user, please change it accordingly.
+    defaultBlockSize = 32 * 1024 * 1024;
+
+    // This value is 1 for ABS, 3 for GCS, and 1 for S3A.
+    defaultReplication = 1;
+
+    metalakeName = GravitinoITUtils.genRandomName("gvfs_it_metalake");
+    catalogName = GravitinoITUtils.genRandomName("catalog");
+    schemaName = GravitinoITUtils.genRandomName("schema");
+
+    Assertions.assertFalse(client.metalakeExists(metalakeName));
+    metalake = client.createMetalake(metalakeName, "metalake comment", 
Collections.emptyMap());
+    Assertions.assertTrue(client.metalakeExists(metalakeName));
+
+    Map<String, String> properties = Maps.newHashMap();
+
+    properties.put(AzureProperties.GRAVITINO_AZURE_STORAGE_ACCOUNT_NAME, 
ABS_ACCOUNT_NAME);
+    properties.put(AzureProperties.GRAVITINO_AZURE_STORAGE_ACCOUNT_KEY, 
ABS_ACCOUNT_KEY);
+    properties.put(AzureProperties.GRAVITINO_AZURE_CLIENT_ID, ABS_CLIENT_ID);
+    properties.put(AzureProperties.GRAVITINO_AZURE_CLIENT_SECRET, 
ABS_CLIENT_SECRET);
+    properties.put(AzureProperties.GRAVITINO_AZURE_TENANT_ID, ABS_TENANT_ID);
+    properties.put(CredentialConstants.CREDENTIAL_PROVIDERS, "adls-token");
+
+    properties.put(FILESYSTEM_PROVIDERS, 
AzureFileSystemProvider.ABS_PROVIDER_NAME);
+
+    Catalog catalog =
+        metalake.createCatalog(
+            catalogName, Catalog.Type.FILESET, "hadoop", "catalog comment", 
properties);
+    Assertions.assertTrue(metalake.catalogExists(catalogName));
+
+    catalog.asSchemas().createSchema(schemaName, "schema comment", properties);
+    Assertions.assertTrue(catalog.asSchemas().schemaExists(schemaName));
+
+    conf.set("fs.gvfs.impl", 
"org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystem");
+    conf.set("fs.AbstractFileSystem.gvfs.impl", 
"org.apache.gravitino.filesystem.hadoop.Gvfs");
+    conf.set("fs.gvfs.impl.disable.cache", "true");
+    conf.set("fs.gravitino.server.uri", serverUri);
+    conf.set("fs.gravitino.client.metalake", metalakeName);
+
+    // Pass this configuration to the real file system
+    conf.set(AzureProperties.GRAVITINO_AZURE_STORAGE_ACCOUNT_NAME, 
ABS_ACCOUNT_NAME);
+    conf.set(AzureProperties.GRAVITINO_AZURE_STORAGE_ACCOUNT_KEY, 
ABS_ACCOUNT_KEY);
+    conf.set("fs.abfss.impl", 
"org.apache.hadoop.fs.azurebfs.SecureAzureBlobFileSystem");
+
+    conf.set("fs.gravitino.client.useCloudStoreCredential", "true");
+  }
+
+  @AfterAll
+  public void tearDown() throws IOException {
+    Catalog catalog = metalake.loadCatalog(catalogName);
+    catalog.asSchemas().dropSchema(schemaName, true);
+    metalake.dropCatalog(catalogName, true);
+    client.dropMetalake(metalakeName, true);
+
+    if (client != null) {
+      client.close();
+      client = null;
+    }
+
+    try {
+      closer.close();
+    } catch (Exception e) {
+      LOG.error("Exception in closing CloseableGroup", e);
+    }
+  }
+
+  /**
+   * Remove the `gravitino.bypass` prefix from the configuration and pass it 
to the real file system
+   * This method corresponds to the method 
org.apache.gravitino.filesystem.hadoop
+   * .GravitinoVirtualFileSystem#getConfigMap(Configuration) in the original 
code.
+   */
+  protected Configuration 
convertGvfsConfigToRealFileSystemConfig(Configuration gvfsConf) {
+    Configuration absConf = new Configuration();
+    Map<String, String> map = Maps.newHashMap();
+
+    gvfsConf.forEach(entry -> map.put(entry.getKey(), entry.getValue()));
+
+    Map<String, String> hadoopConfMap = FileSystemUtils.toHadoopConfigMap(map, 
ImmutableMap.of());
+
+    if (gvfsConf.get(AzureProperties.GRAVITINO_AZURE_STORAGE_ACCOUNT_NAME) != 
null
+        && gvfsConf.get(AzureProperties.GRAVITINO_AZURE_STORAGE_ACCOUNT_KEY) 
!= null) {
+      hadoopConfMap.put(
+          String.format(
+              "fs.azure.account.key.%s.dfs.core.windows.net",
+              
gvfsConf.get(AzureProperties.GRAVITINO_AZURE_STORAGE_ACCOUNT_NAME)),
+          gvfsConf.get(AzureProperties.GRAVITINO_AZURE_STORAGE_ACCOUNT_KEY));
+    }
+
+    hadoopConfMap.forEach(absConf::set);
+
+    return absConf;
+  }
+
+  protected String genStorageLocation(String fileset) {
+    return String.format(
+        "%s://%s@%s.dfs.core.windows.net/%s",
+        AzureFileSystemProvider.ABS_PROVIDER_SCHEME, ABS_CONTAINER_NAME, 
ABS_ACCOUNT_NAME, fileset);
+  }
+
+  @Disabled("java.lang.UnsupportedOperationException: Append Support not 
enabled")
+  public void testAppend() throws IOException {}
+
+  private static boolean absIsConfigured() {
+    return 
StringUtils.isNotBlank(System.getenv("ABS_ACCOUNT_NAME_FOR_CREDENTIAL"))
+        && 
StringUtils.isNotBlank(System.getenv("ABS_ACCOUNT_KEY_FOR_CREDENTIAL"))
+        && 
StringUtils.isNotBlank(System.getenv("ABS_CONTAINER_NAME_FOR_CREDENTIAL"))
+        && 
StringUtils.isNotBlank(System.getenv("ABS_TENANT_ID_FOR_CREDENTIAL"))
+        && 
StringUtils.isNotBlank(System.getenv("ABS_CLIENT_ID_FOR_CREDENTIAL"))
+        && 
StringUtils.isNotBlank(System.getenv("ABS_CLIENT_SECRET_FOR_CREDENTIAL"));
+  }
+}
diff --git 
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemGCSCredentialIT.java
 
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemGCSCredentialIT.java
new file mode 100644
index 000000000..81b352fa5
--- /dev/null
+++ 
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemGCSCredentialIT.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.filesystem.hadoop.integration.test;
+
+import static 
org.apache.gravitino.catalog.hadoop.HadoopCatalogPropertiesMetadata.FILESYSTEM_PROVIDERS;
+
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
+import org.apache.gravitino.credential.CredentialConstants;
+import org.apache.gravitino.gcs.fs.GCSFileSystemProvider;
+import org.apache.gravitino.integration.test.util.GravitinoITUtils;
+import org.apache.gravitino.storage.GCSProperties;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.condition.EnabledIf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@EnabledIf(value = "isGCPConfigured", disabledReason = "GCP is not configured")
+public class GravitinoVirtualFileSystemGCSCredentialIT extends 
GravitinoVirtualFileSystemIT {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(GravitinoVirtualFileSystemGCSCredentialIT.class);
+
+  public static final String BUCKET_NAME = 
System.getenv("GCS_BUCKET_NAME_FOR_CREDENTIAL");
+  public static final String SERVICE_ACCOUNT_FILE =
+      System.getenv("GCS_SERVICE_ACCOUNT_JSON_PATH_FOR_CREDENTIAL");
+
+  @BeforeAll
+  public void startIntegrationTest() {
+    // Do nothing
+  }
+
+  @BeforeAll
+  public void startUp() throws Exception {
+    // Copy the GCP jars to the gravitino server if in deploy mode.
+    copyBundleJarsToHadoop("gcp-bundle");
+    // Need to download jars to gravitino server
+    super.startIntegrationTest();
+
+    // This value can be by tune by the user, please change it accordingly.
+    defaultBlockSize = 64 * 1024 * 1024;
+
+    metalakeName = GravitinoITUtils.genRandomName("gvfs_it_metalake");
+    catalogName = GravitinoITUtils.genRandomName("catalog");
+    schemaName = GravitinoITUtils.genRandomName("schema");
+
+    Assertions.assertFalse(client.metalakeExists(metalakeName));
+    metalake = client.createMetalake(metalakeName, "metalake comment", 
Collections.emptyMap());
+    Assertions.assertTrue(client.metalakeExists(metalakeName));
+
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put(FILESYSTEM_PROVIDERS, "gcs");
+    properties.put(GCSProperties.GRAVITINO_GCS_SERVICE_ACCOUNT_FILE, 
SERVICE_ACCOUNT_FILE);
+    properties.put(CredentialConstants.CREDENTIAL_PROVIDERS, "gcs-token");
+
+    Catalog catalog =
+        metalake.createCatalog(
+            catalogName, Catalog.Type.FILESET, "hadoop", "catalog comment", 
properties);
+    Assertions.assertTrue(metalake.catalogExists(catalogName));
+
+    catalog.asSchemas().createSchema(schemaName, "schema comment", properties);
+    Assertions.assertTrue(catalog.asSchemas().schemaExists(schemaName));
+
+    conf.set("fs.gvfs.impl", 
"org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystem");
+    conf.set("fs.AbstractFileSystem.gvfs.impl", 
"org.apache.gravitino.filesystem.hadoop.Gvfs");
+    conf.set("fs.gvfs.impl.disable.cache", "true");
+    conf.set("fs.gravitino.server.uri", serverUri);
+    conf.set("fs.gravitino.client.metalake", metalakeName);
+
+    // Pass this configuration to the real file system
+    conf.set(GCSProperties.GRAVITINO_GCS_SERVICE_ACCOUNT_FILE, 
SERVICE_ACCOUNT_FILE);
+  }
+
+  @AfterAll
+  public void tearDown() throws IOException {
+    Catalog catalog = metalake.loadCatalog(catalogName);
+    catalog.asSchemas().dropSchema(schemaName, true);
+    metalake.dropCatalog(catalogName, true);
+    client.dropMetalake(metalakeName, true);
+
+    if (client != null) {
+      client.close();
+      client = null;
+    }
+
+    try {
+      closer.close();
+    } catch (Exception e) {
+      LOG.error("Exception in closing CloseableGroup", e);
+    }
+  }
+
+  /**
+   * Remove the `gravitino.bypass` prefix from the configuration and pass it 
to the real file system
+   * This method corresponds to the method 
org.apache.gravitino.filesystem.hadoop
+   * .GravitinoVirtualFileSystem#getConfigMap(Configuration) in the original 
code.
+   */
+  protected Configuration 
convertGvfsConfigToRealFileSystemConfig(Configuration gvfsConf) {
+    Configuration gcsConf = new Configuration();
+    Map<String, String> map = Maps.newHashMap();
+
+    gvfsConf.forEach(entry -> map.put(entry.getKey(), entry.getValue()));
+
+    Map<String, String> hadoopConfMap =
+        FileSystemUtils.toHadoopConfigMap(
+            map, GCSFileSystemProvider.GRAVITINO_KEY_TO_GCS_HADOOP_KEY);
+
+    hadoopConfMap.forEach(gcsConf::set);
+
+    return gcsConf;
+  }
+
+  protected String genStorageLocation(String fileset) {
+    return String.format("gs://%s/dir1/dir2/%s/", BUCKET_NAME, fileset);
+  }
+
+  @Disabled(
+      "GCS does not support append, java.io.IOException: The append operation 
is not supported")
+  public void testAppend() throws IOException {}
+
+  private static boolean isGCPConfigured() {
+    return 
StringUtils.isNotBlank(System.getenv("GCS_SERVICE_ACCOUNT_JSON_PATH_FOR_CREDENTIAL"))
+        && 
StringUtils.isNotBlank(System.getenv("GCS_BUCKET_NAME_FOR_CREDENTIAL"));
+  }
+}
diff --git 
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemOSSCredentialIT.java
 
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemOSSCredentialIT.java
new file mode 100644
index 000000000..662e8f6e4
--- /dev/null
+++ 
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemOSSCredentialIT.java
@@ -0,0 +1,168 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.filesystem.hadoop.integration.test;
+
+import static 
org.apache.gravitino.catalog.hadoop.HadoopCatalogPropertiesMetadata.FILESYSTEM_PROVIDERS;
+
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
+import org.apache.gravitino.credential.CredentialConstants;
+import org.apache.gravitino.credential.OSSTokenCredential;
+import org.apache.gravitino.integration.test.util.GravitinoITUtils;
+import org.apache.gravitino.oss.fs.OSSFileSystemProvider;
+import org.apache.gravitino.storage.OSSProperties;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.condition.EnabledIf;
+import org.junit.platform.commons.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@EnabledIf(value = "ossIsConfigured", disabledReason = "OSS is not prepared")
+public class GravitinoVirtualFileSystemOSSCredentialIT extends 
GravitinoVirtualFileSystemIT {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(GravitinoVirtualFileSystemOSSCredentialIT.class);
+
+  public static final String BUCKET_NAME = 
System.getenv("OSS_BUCKET_NAME_FOR_CREDENTIAL");
+  public static final String OSS_ACCESS_KEY = 
System.getenv("OSS_ACCESS_KEY_ID_FOR_CREDENTIAL");
+  public static final String OSS_SECRET_KEY = 
System.getenv("OSS_SECRET_ACCESS_KEY_FOR_CREDENTIAL");
+  public static final String OSS_ENDPOINT = 
System.getenv("OSS_ENDPOINT_FOR_CREDENTIAL");
+  public static final String OSS_REGION = 
System.getenv("OSS_REGION_FOR_CREDENTIAL");
+  public static final String OSS_ROLE_ARN = 
System.getenv("OSS_ROLE_ARN_FOR_CREDENTIAL");
+
+  @BeforeAll
+  public void startIntegrationTest() {
+    // Do nothing
+  }
+
+  @BeforeAll
+  public void startUp() throws Exception {
+    copyBundleJarsToHadoop("aliyun-bundle");
+    // Need to download jars to gravitino server
+    super.startIntegrationTest();
+
+    // This value can be by tune by the user, please change it accordingly.
+    defaultBlockSize = 64 * 1024 * 1024;
+
+    // The default replication factor is 1.
+    defaultReplication = 1;
+
+    metalakeName = GravitinoITUtils.genRandomName("gvfs_it_metalake");
+    catalogName = GravitinoITUtils.genRandomName("catalog");
+    schemaName = GravitinoITUtils.genRandomName("schema");
+
+    Assertions.assertFalse(client.metalakeExists(metalakeName));
+    metalake = client.createMetalake(metalakeName, "metalake comment", 
Collections.emptyMap());
+    Assertions.assertTrue(client.metalakeExists(metalakeName));
+
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put(FILESYSTEM_PROVIDERS, "oss");
+    properties.put(OSSProperties.GRAVITINO_OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY);
+    properties.put(OSSProperties.GRAVITINO_OSS_ACCESS_KEY_SECRET, 
OSS_SECRET_KEY);
+    properties.put(OSSProperties.GRAVITINO_OSS_ENDPOINT, OSS_ENDPOINT);
+    properties.put(OSSProperties.GRAVITINO_OSS_REGION, OSS_REGION);
+    properties.put(OSSProperties.GRAVITINO_OSS_ROLE_ARN, OSS_ROLE_ARN);
+    properties.put(
+        CredentialConstants.CREDENTIAL_PROVIDERS, 
OSSTokenCredential.OSS_TOKEN_CREDENTIAL_TYPE);
+
+    Catalog catalog =
+        metalake.createCatalog(
+            catalogName, Catalog.Type.FILESET, "hadoop", "catalog comment", 
properties);
+    Assertions.assertTrue(metalake.catalogExists(catalogName));
+
+    catalog.asSchemas().createSchema(schemaName, "schema comment", properties);
+    Assertions.assertTrue(catalog.asSchemas().schemaExists(schemaName));
+
+    conf.set("fs.gvfs.impl", 
"org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystem");
+    conf.set("fs.AbstractFileSystem.gvfs.impl", 
"org.apache.gravitino.filesystem.hadoop.Gvfs");
+    conf.set("fs.gvfs.impl.disable.cache", "true");
+    conf.set("fs.gravitino.server.uri", serverUri);
+    conf.set("fs.gravitino.client.metalake", metalakeName);
+
+    // Pass this configuration to the real file system
+    conf.set(OSSProperties.GRAVITINO_OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY);
+    conf.set(OSSProperties.GRAVITINO_OSS_ACCESS_KEY_SECRET, OSS_SECRET_KEY);
+    conf.set(OSSProperties.GRAVITINO_OSS_ENDPOINT, OSS_ENDPOINT);
+    conf.set("fs.oss.impl", 
"org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem");
+  }
+
+  @AfterAll
+  public void tearDown() throws IOException {
+    Catalog catalog = metalake.loadCatalog(catalogName);
+    catalog.asSchemas().dropSchema(schemaName, true);
+    metalake.dropCatalog(catalogName, true);
+    client.dropMetalake(metalakeName, true);
+
+    if (client != null) {
+      client.close();
+      client = null;
+    }
+
+    try {
+      closer.close();
+    } catch (Exception e) {
+      LOG.error("Exception in closing CloseableGroup", e);
+    }
+  }
+
+  /**
+   * Remove the `gravitino.bypass` prefix from the configuration and pass it 
to the real file system
+   * This method corresponds to the method 
org.apache.gravitino.filesystem.hadoop
+   * .GravitinoVirtualFileSystem#getConfigMap(Configuration) in the original 
code.
+   */
+  protected Configuration 
convertGvfsConfigToRealFileSystemConfig(Configuration gvfsConf) {
+    Configuration ossConf = new Configuration();
+    Map<String, String> map = Maps.newHashMap();
+
+    gvfsConf.forEach(entry -> map.put(entry.getKey(), entry.getValue()));
+
+    Map<String, String> hadoopConfMap =
+        FileSystemUtils.toHadoopConfigMap(
+            map, OSSFileSystemProvider.GRAVITINO_KEY_TO_OSS_HADOOP_KEY);
+
+    hadoopConfMap.forEach(ossConf::set);
+
+    return ossConf;
+  }
+
+  protected String genStorageLocation(String fileset) {
+    return String.format("oss://%s/%s", BUCKET_NAME, fileset);
+  }
+
+  @Disabled(
+      "OSS does not support append, java.io.IOException: The append operation 
is not supported")
+  public void testAppend() throws IOException {}
+
+  protected static boolean ossIsConfigured() {
+    return 
StringUtils.isNotBlank(System.getenv("OSS_ACCESS_KEY_ID_FOR_CREDENTIAL"))
+        && 
StringUtils.isNotBlank(System.getenv("OSS_SECRET_ACCESS_KEY_FOR_CREDENTIAL"))
+        && StringUtils.isNotBlank(System.getenv("OSS_ENDPOINT_FOR_CREDENTIAL"))
+        && 
StringUtils.isNotBlank(System.getenv("OSS_BUCKET_NAME_FOR_CREDENTIAL"))
+        && StringUtils.isNotBlank(System.getenv("OSS_REGION_FOR_CREDENTIAL"))
+        && 
StringUtils.isNotBlank(System.getenv("OSS_ROLE_ARN_FOR_CREDENTIAL"));
+  }
+}
diff --git 
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemS3CredentialIT.java
 
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemS3CredentialIT.java
new file mode 100644
index 000000000..12d530967
--- /dev/null
+++ 
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemS3CredentialIT.java
@@ -0,0 +1,173 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.filesystem.hadoop.integration.test;
+
+import static 
org.apache.gravitino.catalog.hadoop.HadoopCatalogPropertiesMetadata.FILESYSTEM_PROVIDERS;
+
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
+import org.apache.gravitino.credential.CredentialConstants;
+import org.apache.gravitino.credential.S3TokenCredential;
+import org.apache.gravitino.integration.test.util.GravitinoITUtils;
+import org.apache.gravitino.s3.fs.S3FileSystemProvider;
+import org.apache.gravitino.storage.S3Properties;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.condition.EnabledIf;
+import org.junit.platform.commons.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@EnabledIf(value = "s3IsConfigured", disabledReason = "s3 with credential is 
not prepared")
+public class GravitinoVirtualFileSystemS3CredentialIT extends 
GravitinoVirtualFileSystemIT {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(GravitinoVirtualFileSystemS3CredentialIT.class);
+
+  public static final String BUCKET_NAME = 
System.getenv("S3_BUCKET_NAME_FOR_CREDENTIAL");
+  public static final String S3_ACCESS_KEY = 
System.getenv("S3_ACCESS_KEY_ID_FOR_CREDENTIAL");
+  public static final String S3_SECRET_KEY = 
System.getenv("S3_SECRET_ACCESS_KEY_FOR_CREDENTIAL");
+  public static final String S3_ENDPOINT = 
System.getenv("S3_ENDPOINT_FOR_CREDENTIAL");
+  public static final String S3_REGION = 
System.getenv("S3_REGION_FOR_CREDENTIAL");
+  public static final String S3_ROLE_ARN = 
System.getenv("S3_ROLE_ARN_FOR_CREDENTIAL");
+
+  @BeforeAll
+  public void startIntegrationTest() {
+    // Do nothing
+  }
+
+  @BeforeAll
+  public void startUp() throws Exception {
+    copyBundleJarsToHadoop("aws-bundle");
+
+    // Need to download jars to gravitino server
+    super.startIntegrationTest();
+
+    // This value can be by tune by the user, please change it accordingly.
+    defaultBlockSize = 32 * 1024 * 1024;
+
+    // The value is 1 for S3
+    defaultReplication = 1;
+
+    metalakeName = GravitinoITUtils.genRandomName("gvfs_it_metalake");
+    catalogName = GravitinoITUtils.genRandomName("catalog");
+    schemaName = GravitinoITUtils.genRandomName("schema");
+
+    Assertions.assertFalse(client.metalakeExists(metalakeName));
+    metalake = client.createMetalake(metalakeName, "metalake comment", 
Collections.emptyMap());
+    Assertions.assertTrue(client.metalakeExists(metalakeName));
+
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put(S3Properties.GRAVITINO_S3_ACCESS_KEY_ID, S3_ACCESS_KEY);
+    properties.put(S3Properties.GRAVITINO_S3_SECRET_ACCESS_KEY, S3_SECRET_KEY);
+    properties.put(S3Properties.GRAVITINO_S3_ENDPOINT, S3_ENDPOINT);
+    properties.put(
+        "gravitino.bypass.fs.s3a.aws.credentials.provider",
+        "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider");
+    properties.put(FILESYSTEM_PROVIDERS, "s3");
+
+    properties.put(S3Properties.GRAVITINO_S3_REGION, S3_REGION);
+    properties.put(S3Properties.GRAVITINO_S3_ROLE_ARN, S3_ROLE_ARN);
+    properties.put(
+        CredentialConstants.CREDENTIAL_PROVIDERS, 
S3TokenCredential.S3_TOKEN_CREDENTIAL_TYPE);
+
+    Catalog catalog =
+        metalake.createCatalog(
+            catalogName, Catalog.Type.FILESET, "hadoop", "catalog comment", 
properties);
+    Assertions.assertTrue(metalake.catalogExists(catalogName));
+
+    catalog.asSchemas().createSchema(schemaName, "schema comment", properties);
+    Assertions.assertTrue(catalog.asSchemas().schemaExists(schemaName));
+
+    conf.set("fs.gvfs.impl", 
"org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystem");
+    conf.set("fs.AbstractFileSystem.gvfs.impl", 
"org.apache.gravitino.filesystem.hadoop.Gvfs");
+    conf.set("fs.gvfs.impl.disable.cache", "true");
+    conf.set("fs.gravitino.server.uri", serverUri);
+    conf.set("fs.gravitino.client.metalake", metalakeName);
+
+    // Pass this configuration to the real file system
+    conf.set(S3Properties.GRAVITINO_S3_SECRET_ACCESS_KEY, S3_SECRET_KEY);
+    conf.set(S3Properties.GRAVITINO_S3_ACCESS_KEY_ID, S3_ACCESS_KEY);
+    conf.set(S3Properties.GRAVITINO_S3_ENDPOINT, S3_ENDPOINT);
+    conf.set(S3Properties.GRAVITINO_S3_REGION, S3_REGION);
+    conf.set(S3Properties.GRAVITINO_S3_ROLE_ARN, S3_ROLE_ARN);
+  }
+
+  @AfterAll
+  public void tearDown() throws IOException {
+    Catalog catalog = metalake.loadCatalog(catalogName);
+    catalog.asSchemas().dropSchema(schemaName, true);
+    metalake.dropCatalog(catalogName, true);
+    client.dropMetalake(metalakeName, true);
+
+    if (client != null) {
+      client.close();
+      client = null;
+    }
+
+    try {
+      closer.close();
+    } catch (Exception e) {
+      LOG.error("Exception in closing CloseableGroup", e);
+    }
+  }
+
+  /**
+   * Remove the `gravitino.bypass` prefix from the configuration and pass it 
to the real file system
+   * This method corresponds to the method 
org.apache.gravitino.filesystem.hadoop
+   * .GravitinoVirtualFileSystem#getConfigMap(Configuration) in the original 
code.
+   */
+  protected Configuration 
convertGvfsConfigToRealFileSystemConfig(Configuration gvfsConf) {
+    Configuration s3Conf = new Configuration();
+    Map<String, String> map = Maps.newHashMap();
+
+    gvfsConf.forEach(entry -> map.put(entry.getKey(), entry.getValue()));
+
+    Map<String, String> hadoopConfMap =
+        FileSystemUtils.toHadoopConfigMap(map, 
S3FileSystemProvider.GRAVITINO_KEY_TO_S3_HADOOP_KEY);
+
+    hadoopConfMap.forEach(s3Conf::set);
+
+    return s3Conf;
+  }
+
+  protected String genStorageLocation(String fileset) {
+    return String.format("s3a://%s/%s", BUCKET_NAME, fileset);
+  }
+
+  @Disabled(
+      "GCS does not support append, java.io.IOException: The append operation 
is not supported")
+  public void testAppend() throws IOException {}
+
+  protected static boolean s3IsConfigured() {
+    return 
StringUtils.isNotBlank(System.getenv("S3_ACCESS_KEY_ID_FOR_CREDENTIAL"))
+        && 
StringUtils.isNotBlank(System.getenv("S3_SECRET_ACCESS_KEY_FOR_CREDENTIAL"))
+        && StringUtils.isNotBlank(System.getenv("S3_ENDPOINT_FOR_CREDENTIAL"))
+        && 
StringUtils.isNotBlank(System.getenv("S3_BUCKET_NAME_FOR_CREDENTIAL"))
+        && StringUtils.isNotBlank(System.getenv("S3_REGION_FOR_CREDENTIAL"))
+        && StringUtils.isNotBlank(System.getenv("S3_ROLE_ARN_FOR_CREDENTIAL"));
+  }
+}

Reply via email to