This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new 4156d9e78f [#6981] feat(credential): support specifying location when 
get fileset credential (#7028)
4156d9e78f is described below

commit 4156d9e78f1bcafbaea8794fb30f5bdfbf2512a9
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Apr 22 11:51:36 2025 +0800

    [#6981] feat(credential): support specifying location when get fileset 
credential (#7028)
    
    ### What changes were proposed in this pull request?
    
    - using the HTTP header to specify the location name when getting
    crednetial of the fileset
    - gvfs automatically retrieves the corresponding credentials based on
    the current location name.
    
    ### Why are the changes needed?
    
    Fix: #6981
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    local run `FilesetCatalogCredentialIT` and
    `GravitinoVirtualFileSystemS3CredentialIT`
    
    Co-authored-by: mchades <[email protected]>
---
 .../gravitino/credential/CredentialConstants.java  |  3 ++
 .../catalog/hadoop/HadoopCatalogOperations.java    | 26 ++++++----
 .../hadoop/SecureHadoopCatalogOperations.java      | 59 +++++++++++++++++++---
 .../client/MetadataObjectCredentialOperations.java | 24 +++++----
 .../gravitino/api/credential/credential.py         |  3 ++
 .../metadata_object_credential_operations.py       | 16 ++++--
 .../gravitino/filesystem/gvfs_base_operations.py   | 29 +++++++++--
 .../gravitino/filesystem/gvfs_config.py            |  3 ++
 .../integration/test_gvfs_with_abs_credential.py   |  1 +
 .../integration/test_gvfs_with_gcs_credential.py   |  7 +++
 .../integration/test_gvfs_with_oss_credential.py   |  1 +
 .../integration/test_gvfs_with_s3_credential.py    |  1 +
 .../filesystem/hadoop/BaseGVFSOperations.java      | 57 ++++++++++++++++-----
 .../GravitinoVirtualFileSystemConfiguration.java   |  7 +++
 .../test/FilesetCatalogCredentialIT.java           |  2 +-
 docs/hadoop-catalog-with-adls.md                   |  5 +-
 docs/hadoop-catalog-with-gcs.md                    |  5 +-
 docs/hadoop-catalog-with-oss.md                    |  5 +-
 docs/hadoop-catalog-with-s3.md                     |  5 +-
 docs/how-to-use-gvfs.md                            |  2 +
 server-common/build.gradle.kts                     |  1 +
 .../org/apache/gravitino/server/web/Utils.java     | 13 +++++
 .../rest/MetadataObjectCredentialOperations.java   | 16 ++++++
 23 files changed, 240 insertions(+), 51 deletions(-)

diff --git 
a/catalogs/catalog-common/src/main/java/org/apache/gravitino/credential/CredentialConstants.java
 
b/catalogs/catalog-common/src/main/java/org/apache/gravitino/credential/CredentialConstants.java
index a6e0d54bfa..f0a4a967c1 100644
--- 
a/catalogs/catalog-common/src/main/java/org/apache/gravitino/credential/CredentialConstants.java
+++ 
b/catalogs/catalog-common/src/main/java/org/apache/gravitino/credential/CredentialConstants.java
@@ -28,5 +28,8 @@ public class CredentialConstants {
   public static final String OSS_TOKEN_EXPIRE_IN_SECS = 
"oss-token-expire-in-secs";
   public static final String ADLS_TOKEN_EXPIRE_IN_SECS = 
"adls-token-expire-in-secs";
 
+  /** The HTTP header used to get the credential from fileset location */
+  public static final String HTTP_HEADER_CURRENT_LOCATION_NAME = 
"Current-Location-Name";
+
   private CredentialConstants() {}
 }
diff --git 
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java
 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java
index 0b94286690..0aceee05e5 100644
--- 
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java
+++ 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java
@@ -563,13 +563,20 @@ public class HadoopCatalogOperations extends 
ManagedSchemaOperations
     }
 
     Fileset fileset = loadFileset(ident);
-    locationName =
-        locationName == null
-            ? fileset.properties().get(PROPERTY_DEFAULT_LOCATION_NAME)
-            : locationName;
-    if (!fileset.storageLocations().containsKey(locationName)) {
+    String targetLocationName;
+    if (locationName == null) {
+      targetLocationName =
+          fileset.storageLocations().size() == 1
+              // to be compatible with the old version, the fileset in old 
version only has one
+              // location and does not have the default-location-name property
+              ? fileset.storageLocations().keySet().iterator().next()
+              : fileset.properties().get(PROPERTY_DEFAULT_LOCATION_NAME);
+    } else {
+      targetLocationName = locationName;
+    }
+    if (!fileset.storageLocations().containsKey(targetLocationName)) {
       throw new NoSuchLocationNameException(
-          "Location name %s does not exist in fileset %s", locationName, 
ident);
+          "Location name %s does not exist in fileset %s", targetLocationName, 
ident);
     }
 
     boolean isSingleFile = false;
@@ -581,7 +588,7 @@ public class HadoopCatalogOperations extends 
ManagedSchemaOperations
               + "file location may be a wrong path. Please avoid using Fileset 
to manage a single"
               + " file path.");
     } else {
-      isSingleFile = checkSingleFile(fileset, locationName);
+      isSingleFile = checkSingleFile(fileset, targetLocationName);
     }
 
     // if the storage location is a single file, it cannot have sub path to 
access.
@@ -629,11 +636,12 @@ public class HadoopCatalogOperations extends 
ManagedSchemaOperations
     // 1. if the storage location is a single file, we pass the storage 
location directly
     // 2. if the processed sub path is blank, we pass the storage location 
directly
     if (isSingleFile || StringUtils.isBlank(processedSubPath)) {
-      fileLocation = fileset.storageLocations().get(locationName);
+      fileLocation = fileset.storageLocations().get(targetLocationName);
     } else {
       // the processed sub path always starts with "/" if it is not blank,
       // so we can safely remove the tailing slash if storage location ends 
with "/".
-      String storageLocation = 
removeTrailingSlash(fileset.storageLocations().get(locationName));
+      String storageLocation =
+          
removeTrailingSlash(fileset.storageLocations().get(targetLocationName));
       fileLocation = String.format("%s%s", storageLocation, processedSubPath);
     }
     return fileLocation;
diff --git 
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/SecureHadoopCatalogOperations.java
 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/SecureHadoopCatalogOperations.java
index b57d71d226..4427ff2483 100644
--- 
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/SecureHadoopCatalogOperations.java
+++ 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/SecureHadoopCatalogOperations.java
@@ -31,6 +31,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 import javax.security.auth.Subject;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.Catalog;
 import org.apache.gravitino.Entity;
 import org.apache.gravitino.EntityStore;
@@ -39,6 +40,7 @@ import org.apache.gravitino.Namespace;
 import org.apache.gravitino.Schema;
 import org.apache.gravitino.SchemaChange;
 import org.apache.gravitino.UserPrincipal;
+import org.apache.gravitino.audit.CallerContext;
 import org.apache.gravitino.catalog.hadoop.authentication.UserContext;
 import org.apache.gravitino.connector.CatalogInfo;
 import org.apache.gravitino.connector.CatalogOperations;
@@ -46,6 +48,7 @@ import org.apache.gravitino.connector.HasPropertyMetadata;
 import org.apache.gravitino.connector.SupportsSchemas;
 import org.apache.gravitino.connector.credential.PathContext;
 import org.apache.gravitino.connector.credential.SupportsPathBasedCredentials;
+import org.apache.gravitino.credential.CredentialConstants;
 import org.apache.gravitino.credential.CredentialUtils;
 import org.apache.gravitino.exceptions.FilesetAlreadyExistsException;
 import org.apache.gravitino.exceptions.NoSuchCatalogException;
@@ -262,17 +265,11 @@ public class SecureHadoopCatalogOperations
   @Override
   public List<PathContext> getPathContext(NameIdentifier filesetIdentifier) {
     Fileset fileset = loadFileset(filesetIdentifier);
-    Map<String, String> locations = fileset.storageLocations();
-    Preconditions.checkArgument(
-        locations != null && !locations.isEmpty(),
-        "No storage locations found for fileset: " + filesetIdentifier);
-
-    // todo: support multiple storage locations
-    String path = 
locations.get(fileset.properties().get(PROPERTY_DEFAULT_LOCATION_NAME));
+    String path = getTargetLocation(fileset);
 
     Set<String> providers =
         CredentialUtils.getCredentialProvidersByOrder(
-            () -> fileset.properties(),
+            fileset::properties,
             () -> {
               Namespace namespace = filesetIdentifier.namespace();
               NameIdentifier schemaIdentifier =
@@ -286,6 +283,52 @@ public class SecureHadoopCatalogOperations
         .collect(Collectors.toList());
   }
 
+  private String getTargetLocation(Fileset fileset) {
+    CallerContext callerContext = CallerContext.CallerContextHolder.get();
+    String targetLocationName;
+    String targetLocation;
+    if (callerContext != null
+        && callerContext
+            .context()
+            
.containsKey(CredentialConstants.HTTP_HEADER_CURRENT_LOCATION_NAME)) {
+      // case 1: target location name is passed in the header
+      targetLocationName =
+          
callerContext.context().get(CredentialConstants.HTTP_HEADER_CURRENT_LOCATION_NAME);
+      Preconditions.checkArgument(
+          fileset.storageLocations().containsKey(targetLocationName),
+          "The location name %s is not in the fileset %s, expected location 
names are %s",
+          targetLocationName,
+          fileset.name(),
+          fileset.storageLocations().keySet());
+      targetLocation = fileset.storageLocations().get(targetLocationName);
+
+    } else if (fileset.storageLocations().size() == 1) {
+      // case 2: target location name is not passed in the header, but there 
is only one location.
+      // note: mainly used for backward compatibility since the old code does 
not pass the header
+      // and only supports one location
+      targetLocation = fileset.storageLocations().values().iterator().next();
+      targetLocationName = 
fileset.storageLocations().keySet().iterator().next();
+
+    } else {
+      // case 3: target location name is not passed in the header, and there 
are multiple locations.
+      // use the default location name
+      targetLocationName = 
fileset.properties().get(PROPERTY_DEFAULT_LOCATION_NAME);
+      // this should never happen, but just in case
+      Preconditions.checkArgument(
+          StringUtils.isNotBlank(targetLocationName),
+          "The default location name of the fileset %s should not be empty.",
+          fileset.name());
+      targetLocation = 
fileset.properties().get(PROPERTY_DEFAULT_LOCATION_NAME);
+    }
+
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(targetLocation),
+        "The location with the location name %s of the fileset %s should not 
be empty.",
+        targetLocationName,
+        fileset.name());
+    return targetLocation;
+  }
+
   /**
    * Add the user to the subject so that we can get the last user in the 
subject. Hadoop catalog
    * uses this method to pass api user from the client side, so that we can 
get the user in the
diff --git 
a/clients/client-java/src/main/java/org/apache/gravitino/client/MetadataObjectCredentialOperations.java
 
b/clients/client-java/src/main/java/org/apache/gravitino/client/MetadataObjectCredentialOperations.java
index b11fd9caf2..58819dd94a 100644
--- 
a/clients/client-java/src/main/java/org/apache/gravitino/client/MetadataObjectCredentialOperations.java
+++ 
b/clients/client-java/src/main/java/org/apache/gravitino/client/MetadataObjectCredentialOperations.java
@@ -21,6 +21,7 @@ package org.apache.gravitino.client;
 import java.util.Collections;
 import java.util.Locale;
 import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.audit.CallerContext;
 import org.apache.gravitino.credential.Credential;
 import org.apache.gravitino.credential.SupportsCredentials;
 import org.apache.gravitino.dto.responses.CredentialResponse;
@@ -49,14 +50,19 @@ class MetadataObjectCredentialOperations implements 
SupportsCredentials {
 
   @Override
   public Credential[] getCredentials() {
-    CredentialResponse resp =
-        restClient.get(
-            credentialRequestPath,
-            CredentialResponse.class,
-            Collections.emptyMap(),
-            ErrorHandlers.credentialErrorHandler());
-
-    resp.validate();
-    return DTOConverters.fromDTO(resp.getCredentials());
+    try {
+      CallerContext callerContext = CallerContext.CallerContextHolder.get();
+      CredentialResponse resp =
+          restClient.get(
+              credentialRequestPath,
+              CredentialResponse.class,
+              callerContext != null ? callerContext.context() : 
Collections.emptyMap(),
+              ErrorHandlers.credentialErrorHandler());
+      resp.validate();
+      return DTOConverters.fromDTO(resp.getCredentials());
+    } finally {
+      // Clear the caller context
+      CallerContext.CallerContextHolder.remove();
+    }
   }
 }
diff --git a/clients/client-python/gravitino/api/credential/credential.py 
b/clients/client-python/gravitino/api/credential/credential.py
index 37b97694a2..2a74f0fd52 100644
--- a/clients/client-python/gravitino/api/credential/credential.py
+++ b/clients/client-python/gravitino/api/credential/credential.py
@@ -22,6 +22,9 @@ from typing import Dict
 class Credential(ABC):
     """Represents the credential in Gravitino."""
 
+    # The HTTP header used to get the credential from fileset location
+    HTTP_HEADER_CURRENT_LOCATION_NAME = "Current-Location-Name"
+
     @abstractmethod
     def credential_type(self) -> str:
         """The type of the credential.
diff --git 
a/clients/client-python/gravitino/client/metadata_object_credential_operations.py
 
b/clients/client-python/gravitino/client/metadata_object_credential_operations.py
index 7184cd797c..8ae83b0ec1 100644
--- 
a/clients/client-python/gravitino/client/metadata_object_credential_operations.py
+++ 
b/clients/client-python/gravitino/client/metadata_object_credential_operations.py
@@ -20,6 +20,7 @@ from typing import List
 from gravitino.api.credential.supports_credentials import SupportsCredentials
 from gravitino.api.credential.credential import Credential
 from gravitino.api.metadata_object import MetadataObject
+from gravitino.audit.caller_context import CallerContext, CallerContextHolder
 from gravitino.dto.credential_dto import CredentialDTO
 from gravitino.dto.responses.credential_response import CredentialResponse
 from gravitino.exceptions.handlers.credential_error_handler import (
@@ -53,10 +54,17 @@ class 
MetadataObjectCredentialOperations(SupportsCredentials):
         )
 
     def get_credentials(self) -> List[Credential]:
-        resp = self._rest_client.get(
-            self._request_path,
-            error_handler=CREDENTIAL_ERROR_HANDLER,
-        )
+        try:
+            caller_context: CallerContext = CallerContextHolder.get()
+            resp = self._rest_client.get(
+                self._request_path,
+                headers=(
+                    caller_context.context() if caller_context is not None 
else None
+                ),
+                error_handler=CREDENTIAL_ERROR_HANDLER,
+            )
+        finally:
+            CallerContextHolder.remove()
 
         credential_resp = CredentialResponse.from_json(resp.body, 
infer_missing=True)
         credential_resp.validate()
diff --git a/clients/client-python/gravitino/filesystem/gvfs_base_operations.py 
b/clients/client-python/gravitino/filesystem/gvfs_base_operations.py
index 82280a87ac..dc835efbca 100644
--- a/clients/client-python/gravitino/filesystem/gvfs_base_operations.py
+++ b/clients/client-python/gravitino/filesystem/gvfs_base_operations.py
@@ -27,6 +27,7 @@ from fsspec import AbstractFileSystem
 from readerwriterlock import rwlock
 
 from gravitino.api.catalog import Catalog
+from gravitino.api.credential.credential import Credential
 from gravitino.audit.caller_context import CallerContextHolder, CallerContext
 from gravitino.audit.fileset_audit_constants import FilesetAuditConstants
 from gravitino.audit.fileset_data_operation import FilesetDataOperation
@@ -75,6 +76,7 @@ class BaseGVFSOperations(ABC):
     SLASH = "/"
 
     ENV_CURRENT_LOCATION_NAME_ENV_VAR_DEFAULT = "CURRENT_LOCATION_NAME"
+    ENABLE_CREDENTIAL_VENDING_DEFAULT = False
 
     def __init__(
         self,
@@ -121,6 +123,14 @@ class BaseGVFSOperations(ABC):
         self._catalog_cache = LRUCache(maxsize=100)
         self._catalog_cache_lock = rwlock.RWLockFair()
 
+        self._enable_credential_vending = (
+            False
+            if options is None
+            else options.get(
+                GVFSConfig.GVFS_FILESYSTEM_ENABLE_CREDENTIAL_VENDING,
+                self.ENABLE_CREDENTIAL_VENDING_DEFAULT,
+            )
+        )
         self._current_location_name = self._init_current_location_name()
 
     @property
@@ -369,7 +379,7 @@ class BaseGVFSOperations(ABC):
         )
         target_location_name = location_name or fileset.properties().get(
             fileset.PROPERTY_DEFAULT_LOCATION_NAME
-        )
+        ) or fileset.LOCATION_NAME_UNKNOWN
         actual_location = fileset.storage_locations().get(target_location_name)
         if actual_location is None:
             raise NoSuchLocationNameException(
@@ -418,7 +428,7 @@ class BaseGVFSOperations(ABC):
         catalog_ident: NameIdentifier = NameIdentifier.of(
             self._metalake, identifier.namespace().level(1)
         )
-        fileset_catalog = self._get_fileset_catalog(catalog_ident)
+        fileset_catalog = 
self._get_fileset_catalog(catalog_ident).as_fileset_catalog()
 
         sub_path: str = get_sub_path_from_virtual_path(
             identifier, processed_virtual_path
@@ -430,7 +440,7 @@ class BaseGVFSOperations(ABC):
         caller_context: CallerContext = CallerContext(context)
         CallerContextHolder.set(caller_context)
 
-        return fileset_catalog.as_fileset_catalog().get_file_location(
+        return fileset_catalog.get_file_location(
             NameIdentifier.of(identifier.namespace().level(2), 
identifier.name()),
             sub_path,
             location_name,
@@ -474,7 +484,17 @@ class BaseGVFSOperations(ABC):
                     name_identifier.namespace().level(2), 
name_identifier.name()
                 )
             )
-            credentials = fileset.support_credentials().get_credentials()
+            if location_name:
+                context = {
+                    Credential.HTTP_HEADER_CURRENT_LOCATION_NAME: 
location_name,
+                }
+                caller_context: CallerContext = CallerContext(context)
+                CallerContextHolder.set(caller_context)
+            credentials = (
+                fileset.support_credentials().get_credentials()
+                if self._enable_credential_vending
+                else None
+            )
             new_cache_value = get_storage_handler_by_path(
                 actual_file_location
             ).get_filesystem_with_expiration(
@@ -487,6 +507,7 @@ class BaseGVFSOperations(ABC):
             return new_cache_value[1]
         finally:
             write_lock.release()
+            CallerContextHolder.remove()
 
     def _get_fileset_catalog(self, catalog_ident: NameIdentifier):
         read_lock = self._catalog_cache_lock.gen_rlock()
diff --git a/clients/client-python/gravitino/filesystem/gvfs_config.py 
b/clients/client-python/gravitino/filesystem/gvfs_config.py
index 159b5ab986..9dd5aab451 100644
--- a/clients/client-python/gravitino/filesystem/gvfs_config.py
+++ b/clients/client-python/gravitino/filesystem/gvfs_config.py
@@ -70,3 +70,6 @@ class GVFSConfig:
 
     # The configuration key prefix for the client request headers.
     GVFS_FILESYSTEM_CLIENT_REQUEST_HEADER_PREFIX = "client_request_header_"
+
+    # The configuration key for whether to enable credential vending. The 
default is false.
+    GVFS_FILESYSTEM_ENABLE_CREDENTIAL_VENDING = "enable_credential_vending"
diff --git 
a/clients/client-python/tests/integration/test_gvfs_with_abs_credential.py 
b/clients/client-python/tests/integration/test_gvfs_with_abs_credential.py
index 9071679fb7..a2fc0d2868 100644
--- a/clients/client-python/tests/integration/test_gvfs_with_abs_credential.py
+++ b/clients/client-python/tests/integration/test_gvfs_with_abs_credential.py
@@ -67,6 +67,7 @@ class TestGvfsWithCredentialABS(TestGvfsWithABS):
         self.options = {
             GVFSConfig.GVFS_FILESYSTEM_AZURE_ACCOUNT_NAME: 
self.azure_abs_account_name,
             GVFSConfig.GVFS_FILESYSTEM_AZURE_ACCOUNT_KEY: 
self.azure_abs_account_key,
+            GVFSConfig.GVFS_FILESYSTEM_ENABLE_CREDENTIAL_VENDING: True,
         }
 
     @classmethod
diff --git 
a/clients/client-python/tests/integration/test_gvfs_with_gcs_credential.py 
b/clients/client-python/tests/integration/test_gvfs_with_gcs_credential.py
index eec502a13b..00c0237ead 100644
--- a/clients/client-python/tests/integration/test_gvfs_with_gcs_credential.py
+++ b/clients/client-python/tests/integration/test_gvfs_with_gcs_credential.py
@@ -24,6 +24,7 @@ from gcsfs import GCSFileSystem
 
 from gravitino import Catalog, Fileset, GravitinoClient
 from gravitino.filesystem import gvfs
+from gravitino.filesystem.gvfs_config import GVFSConfig
 from tests.integration.test_gvfs_with_gcs import TestGvfsWithGCS
 
 logger = logging.getLogger(__name__)
@@ -46,6 +47,12 @@ class TestGvfsWithGCSCredential(TestGvfsWithGCS):
     bucket_name = os.environ.get("GCS_BUCKET_NAME_FOR_CREDENTIAL")
     metalake_name: str = "TestGvfsWithGCSCredential_metalake" + str(randint(1, 
10000))
 
+    def setUp(self):
+        self.options = {
+            f"{GVFSConfig.GVFS_FILESYSTEM_GCS_SERVICE_KEY_FILE}": 
self.key_file,
+            GVFSConfig.GVFS_FILESYSTEM_ENABLE_CREDENTIAL_VENDING: True,
+        }
+
     @classmethod
     def _init_test_entities(cls):
         cls.gravitino_admin_client.create_metalake(
diff --git 
a/clients/client-python/tests/integration/test_gvfs_with_oss_credential.py 
b/clients/client-python/tests/integration/test_gvfs_with_oss_credential.py
index 14b8b52312..18a3eec152 100644
--- a/clients/client-python/tests/integration/test_gvfs_with_oss_credential.py
+++ b/clients/client-python/tests/integration/test_gvfs_with_oss_credential.py
@@ -68,6 +68,7 @@ class TestGvfsWithOSSCredential(TestGvfsWithOSS):
             f"{GVFSConfig.GVFS_FILESYSTEM_OSS_ACCESS_KEY}": 
self.oss_access_key,
             f"{GVFSConfig.GVFS_FILESYSTEM_OSS_SECRET_KEY}": 
self.oss_secret_key,
             f"{GVFSConfig.GVFS_FILESYSTEM_OSS_ENDPOINT}": self.oss_endpoint,
+            GVFSConfig.GVFS_FILESYSTEM_ENABLE_CREDENTIAL_VENDING: True,
         }
 
     @classmethod
diff --git 
a/clients/client-python/tests/integration/test_gvfs_with_s3_credential.py 
b/clients/client-python/tests/integration/test_gvfs_with_s3_credential.py
index 35d88c2c82..5ee41fb5c0 100644
--- a/clients/client-python/tests/integration/test_gvfs_with_s3_credential.py
+++ b/clients/client-python/tests/integration/test_gvfs_with_s3_credential.py
@@ -65,6 +65,7 @@ class TestGvfsWithS3Credential(TestGvfsWithS3):
             f"{GVFSConfig.GVFS_FILESYSTEM_S3_ACCESS_KEY}": self.s3_access_key,
             f"{GVFSConfig.GVFS_FILESYSTEM_S3_SECRET_KEY}": self.s3_secret_key,
             f"{GVFSConfig.GVFS_FILESYSTEM_S3_ENDPOINT}": self.s3_endpoint,
+            GVFSConfig.GVFS_FILESYSTEM_ENABLE_CREDENTIAL_VENDING: True,
         }
 
     @classmethod
diff --git 
a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/BaseGVFSOperations.java
 
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/BaseGVFSOperations.java
index a79af67a23..66f1db9e73 100644
--- 
a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/BaseGVFSOperations.java
+++ 
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/BaseGVFSOperations.java
@@ -63,6 +63,7 @@ import 
org.apache.gravitino.catalog.hadoop.fs.GravitinoFileSystemCredentialsProv
 import org.apache.gravitino.catalog.hadoop.fs.SupportsCredentialVending;
 import org.apache.gravitino.client.GravitinoClient;
 import org.apache.gravitino.credential.Credential;
+import org.apache.gravitino.credential.CredentialConstants;
 import org.apache.gravitino.exceptions.CatalogNotInUseException;
 import org.apache.gravitino.exceptions.GravitinoRuntimeException;
 import org.apache.gravitino.exceptions.NoSuchCatalogException;
@@ -130,6 +131,8 @@ public abstract class BaseGVFSOperations implements 
Closeable {
 
   private final long defaultBlockSize;
 
+  private final boolean enableCredentialVending;
+
   /**
    * Constructs a new {@link BaseGVFSOperations} with the given {@link 
Configuration}.
    *
@@ -165,6 +168,10 @@ public abstract class BaseGVFSOperations implements 
Closeable {
             GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_BLOCK_SIZE,
             
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_BLOCK_SIZE_DEFAULT);
 
+    this.enableCredentialVending =
+        configuration.getBoolean(
+            
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_ENABLE_CREDENTIAL_VENDING,
+            
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_ENABLE_CREDENTIAL_VENDING_DEFAULT);
     this.conf = configuration;
   }
 
@@ -369,6 +376,15 @@ public abstract class BaseGVFSOperations implements 
Closeable {
     return defaultBlockSize;
   }
 
+  /**
+   * Whether to enable credential vending.
+   *
+   * @return true if credential vending is enabled, false otherwise.
+   */
+  protected boolean enableCredentialVending() {
+    return enableCredentialVending;
+  }
+
   /**
    * Get the actual file path by the given virtual path and location name.
    *
@@ -385,15 +401,15 @@ public abstract class BaseGVFSOperations implements 
Closeable {
     String subPath = getSubPathFromGvfsPath(filesetIdent, gvfsPath.toString());
     NameIdentifier catalogIdent =
         NameIdentifier.of(filesetIdent.namespace().level(0), 
filesetIdent.namespace().level(1));
-    setCallerContext(operation);
     String fileLocation;
     try {
+      FilesetCatalog filesetCatalog = getFilesetCatalog(catalogIdent);
+      setCallerContextForGetFileLocation(operation);
       fileLocation =
-          getFilesetCatalog(catalogIdent)
-              .getFileLocation(
-                  NameIdentifier.of(filesetIdent.namespace().level(2), 
filesetIdent.name()),
-                  subPath,
-                  locationName);
+          filesetCatalog.getFileLocation(
+              NameIdentifier.of(filesetIdent.namespace().level(2), 
filesetIdent.name()),
+              subPath,
+              locationName);
     } catch (NoSuchCatalogException | CatalogNotInUseException e) {
       String message = String.format("Cannot get fileset catalog by 
identifier: %s", catalogIdent);
       LOG.warn(message, e);
@@ -527,7 +543,7 @@ public abstract class BaseGVFSOperations implements 
Closeable {
     return internalFileSystemCache;
   }
 
-  private void setCallerContext(FilesetDataOperation operation) {
+  private void setCallerContextForGetFileLocation(FilesetDataOperation 
operation) {
     Map<String, String> contextMap = Maps.newHashMap();
     contextMap.put(
         FilesetAuditConstants.HTTP_HEADER_INTERNAL_CLIENT_TYPE,
@@ -537,6 +553,13 @@ public abstract class BaseGVFSOperations implements 
Closeable {
     CallerContext.CallerContextHolder.set(callerContext);
   }
 
+  private void setCallerContextForGetCredentials(String locationName) {
+    Map<String, String> contextMap = Maps.newHashMap();
+    contextMap.put(CredentialConstants.HTTP_HEADER_CURRENT_LOCATION_NAME, 
locationName);
+    CallerContext callerContext = 
CallerContext.builder().withContext(contextMap).build();
+    CallerContext.CallerContextHolder.set(callerContext);
+  }
+
   private FileSystem getActualFileSystemByLocationName(
       NameIdentifier filesetIdent, String locationName) throws 
FilesetPathNotFoundException {
     NameIdentifier catalogIdent =
@@ -560,7 +583,8 @@ public abstract class BaseGVFSOperations implements 
Closeable {
 
               Path targetLocation = new 
Path(fileset.storageLocations().get(targetLocationName));
               Map<String, String> allProperties =
-                  getAllProperties(cacheKey.getLeft(), 
targetLocation.toUri().getScheme());
+                  getAllProperties(
+                      cacheKey.getLeft(), targetLocation.toUri().getScheme(), 
targetLocationName);
 
               FileSystem actualFileSystem =
                   getActualFileSystemByPath(targetLocation, allProperties);
@@ -689,7 +713,8 @@ public abstract class BaseGVFSOperations implements 
Closeable {
     return cacheBuilder.build();
   }
 
-  private Map<String, String> getAllProperties(NameIdentifier filesetIdent, 
String scheme) {
+  private Map<String, String> getAllProperties(
+      NameIdentifier filesetIdent, String scheme, String locationName) {
     Catalog catalog =
         (Catalog)
             getFilesetCatalog(
@@ -698,8 +723,11 @@ public abstract class BaseGVFSOperations implements 
Closeable {
 
     Map<String, String> allProperties = 
getNecessaryProperties(catalog.properties());
     allProperties.putAll(getConfigMap(conf));
-    allProperties.putAll(
-        getCredentialProperties(getFileSystemProviderByScheme(scheme), 
filesetIdent));
+    if (enableCredentialVending()) {
+      allProperties.putAll(
+          getCredentialProperties(
+              getFileSystemProviderByScheme(scheme), filesetIdent, 
locationName));
+    }
     return allProperties;
   }
 
@@ -710,7 +738,9 @@ public abstract class BaseGVFSOperations implements 
Closeable {
   }
 
   private Map<String, String> getCredentialProperties(
-      FileSystemProvider fileSystemProvider, NameIdentifier filesetIdentifier) 
{
+      FileSystemProvider fileSystemProvider,
+      NameIdentifier filesetIdentifier,
+      String locationName) {
     // Do not support credential vending, we do not need to add any credential 
properties.
     if (!(fileSystemProvider instanceof SupportsCredentialVending)) {
       return ImmutableMap.of();
@@ -719,6 +749,7 @@ public abstract class BaseGVFSOperations implements 
Closeable {
     ImmutableMap.Builder<String, String> mapBuilder = ImmutableMap.builder();
     try {
       Fileset fileset = getFileset(filesetIdentifier);
+      setCallerContextForGetCredentials(locationName);
       Credential[] credentials = 
fileset.supportsCredentials().getCredentials();
       if (credentials.length > 0) {
         mapBuilder.put(
@@ -734,6 +765,8 @@ public abstract class BaseGVFSOperations implements 
Closeable {
       }
     } catch (Exception e) {
       throw new RuntimeException(e);
+    } finally {
+      CallerContext.CallerContextHolder.remove();
     }
 
     return mapBuilder.build();
diff --git 
a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemConfiguration.java
 
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemConfiguration.java
index f541a3492c..ef84b39014 100644
--- 
a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemConfiguration.java
+++ 
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemConfiguration.java
@@ -139,5 +139,12 @@ public class GravitinoVirtualFileSystemConfiguration {
   public static final String FS_GRAVITINO_CLIENT_REQUEST_HEADER_PREFIX =
       "fs.gravitino.client.request.header.";
 
+  /** The configuration key for whether to enable credential vending. The 
default is false. */
+  public static final String FS_GRAVITINO_ENABLE_CREDENTIAL_VENDING =
+      "fs.gravitino.enableCredentialVending";
+
+  /** The default value for whether to enable credential vending. */
+  public static final boolean FS_GRAVITINO_ENABLE_CREDENTIAL_VENDING_DEFAULT = 
false;
+
   private GravitinoVirtualFileSystemConfiguration() {}
 }
diff --git 
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/FilesetCatalogCredentialIT.java
 
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/FilesetCatalogCredentialIT.java
index 3dc3ad82ae..5fc8cb9bd4 100644
--- 
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/FilesetCatalogCredentialIT.java
+++ 
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/FilesetCatalogCredentialIT.java
@@ -152,6 +152,6 @@ public class FilesetCatalogCredentialIT extends BaseIT {
     Fileset fileset = catalog.asFilesetCatalog().loadFileset(filesetIdent);
     Credential[] credentials = fileset.supportsCredentials().getCredentials();
     Assertions.assertEquals(1, credentials.length);
-    Assertions.assertTrue(credentials[0] instanceof S3TokenCredential);
+    Assertions.assertInstanceOf(S3TokenCredential.class, credentials[0]);
   }
 }
diff --git a/docs/hadoop-catalog-with-adls.md b/docs/hadoop-catalog-with-adls.md
index 6e6b682ece..21d64af84d 100644
--- a/docs/hadoop-catalog-with-adls.md
+++ b/docs/hadoop-catalog-with-adls.md
@@ -506,12 +506,14 @@ curl -X POST -H "Accept: 
application/vnd.gravitino.v1+json" \
 
 ### How to access ADLS fileset with credential vending
 
-If the catalog has been configured with credential, you can access ADLS 
fileset without providing authentication information via GVFS Java/Python 
client and Spark. Let's see how to access ADLS fileset with credential:
+When the catalog is configured with credentials and client-side credential 
vending is enabled, 
+you can access ADLS filesets directly using the GVFS Java/Python client or 
Spark without providing authentication details.
 
 GVFS Java client:
 
 ```java
 Configuration conf = new Configuration();
+conf.setBoolean("fs.gravitino.enableCredentialVending", true);
 conf.set("fs.AbstractFileSystem.gvfs.impl", 
"org.apache.gravitino.filesystem.hadoop.Gvfs");
 conf.set("fs.gvfs.impl", 
"org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystem");
 conf.set("fs.gravitino.server.uri", "http://localhost:8090";);
@@ -528,6 +530,7 @@ Spark:
 ```python
 spark = SparkSession.builder
     .appName("adls_fielset_test")
+    .config("spark.hadoop.fs.gravitino.enableCredentialVending", "true")
     .config("spark.hadoop.fs.AbstractFileSystem.gvfs.impl", 
"org.apache.gravitino.filesystem.hadoop.Gvfs")
     .config("spark.hadoop.fs.gvfs.impl", 
"org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystem")
     .config("spark.hadoop.fs.gravitino.server.uri", "http://localhost:8090";)
diff --git a/docs/hadoop-catalog-with-gcs.md b/docs/hadoop-catalog-with-gcs.md
index cd9c34aa94..c89c380218 100644
--- a/docs/hadoop-catalog-with-gcs.md
+++ b/docs/hadoop-catalog-with-gcs.md
@@ -481,12 +481,14 @@ curl -X POST -H "Accept: 
application/vnd.gravitino.v1+json" \
 
 ### How to access GCS fileset with credential vending
 
-If the catalog has been configured with credential, you can access GCS fileset 
without providing authentication information via GVFS Java/Python client and 
Spark. Let's see how to access GCS fileset with credential:
+When the catalog is configured with credentials and client-side credential 
vending is enabled,
+you can access GCS filesets directly using the GVFS Java/Python client or 
Spark without providing authentication details.
 
 GVFS Java client:
 
 ```java
 Configuration conf = new Configuration();
+conf.setBoolean("fs.gravitino.enableCredentialVending", true);
 conf.set("fs.AbstractFileSystem.gvfs.impl", 
"org.apache.gravitino.filesystem.hadoop.Gvfs");
 conf.set("fs.gvfs.impl", 
"org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystem");
 conf.set("fs.gravitino.server.uri", "http://localhost:8090";);
@@ -503,6 +505,7 @@ Spark:
 ```python
 spark = SparkSession.builder
     .appName("gcs_fileset_test")
+    .config("spark.hadoop.fs.gravitino.enableCredentialVending", "true")
     .config("spark.hadoop.fs.AbstractFileSystem.gvfs.impl", 
"org.apache.gravitino.filesystem.hadoop.Gvfs")
     .config("spark.hadoop.fs.gvfs.impl", 
"org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystem")
     .config("spark.hadoop.fs.gravitino.server.uri", "http://localhost:8090";)
diff --git a/docs/hadoop-catalog-with-oss.md b/docs/hadoop-catalog-with-oss.md
index b72793a2a0..7d2f05caf9 100644
--- a/docs/hadoop-catalog-with-oss.md
+++ b/docs/hadoop-catalog-with-oss.md
@@ -520,12 +520,14 @@ curl -X POST -H "Accept: 
application/vnd.gravitino.v1+json" \
 
 ### How to access OSS fileset with credential vending
 
-If the catalog has been configured with credential, you can access OSS fileset 
without providing authentication information via GVFS Java/Python client and 
Spark. Let's see how to access OSS fileset with credential:
+When the catalog is configured with credentials and client-side credential 
vending is enabled,
+you can access OSS filesets directly using the GVFS Java/Python client or 
Spark without providing authentication details.
 
 GVFS Java client:
 
 ```java
 Configuration conf = new Configuration();
+conf.setBoolean("fs.gravitino.enableCredentialVending", true);
 conf.set("fs.AbstractFileSystem.gvfs.impl", 
"org.apache.gravitino.filesystem.hadoop.Gvfs");
 conf.set("fs.gvfs.impl", 
"org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystem");
 conf.set("fs.gravitino.server.uri", "http://localhost:8090";);
@@ -542,6 +544,7 @@ Spark:
 ```python
 spark = SparkSession.builder
     .appName("oss_fileset_test")
+    .config("spark.hadoop.fs.gravitino.enableCredentialVending", "true")
     .config("spark.hadoop.fs.AbstractFileSystem.gvfs.impl", 
"org.apache.gravitino.filesystem.hadoop.Gvfs")
     .config("spark.hadoop.fs.gvfs.impl", 
"org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystem")
     .config("spark.hadoop.fs.gravitino.server.uri", "http://localhost:8090";)
diff --git a/docs/hadoop-catalog-with-s3.md b/docs/hadoop-catalog-with-s3.md
index caf48dc549..7d5a7bcadd 100644
--- a/docs/hadoop-catalog-with-s3.md
+++ b/docs/hadoop-catalog-with-s3.md
@@ -523,12 +523,14 @@ curl -X POST -H "Accept: 
application/vnd.gravitino.v1+json" \
 
 ### How to access S3 fileset with credential vending
 
-If the catalog has been configured with credential, you can access S3 fileset 
without providing authentication information via GVFS Java/Python client and 
Spark. Let's see how to access S3 fileset with credential:
+When the catalog is configured with credentials and client-side credential 
vending is enabled,
+you can access S3 filesets directly using the GVFS Java/Python client or Spark 
without providing authentication details.
 
 GVFS Java client:
 
 ```java
 Configuration conf = new Configuration();
+conf.setBoolean("fs.gravitino.enableCredentialVending", true);
 conf.set("fs.AbstractFileSystem.gvfs.impl", 
"org.apache.gravitino.filesystem.hadoop.Gvfs");
 conf.set("fs.gvfs.impl", 
"org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystem");
 conf.set("fs.gravitino.server.uri", "http://localhost:8090";);
@@ -545,6 +547,7 @@ Spark:
 ```python
 spark = SparkSession.builder
     .appName("s3_fileset_test")
+    .config("spark.hadoop.fs.gravitino.enableCredentialVending", "true")
     .config("spark.hadoop.fs.AbstractFileSystem.gvfs.impl", 
"org.apache.gravitino.filesystem.hadoop.Gvfs")
     .config("spark.hadoop.fs.gvfs.impl", 
"org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystem")
     .config("spark.hadoop.fs.gravitino.server.uri", "http://localhost:8090";)
diff --git a/docs/how-to-use-gvfs.md b/docs/how-to-use-gvfs.md
index 996c61f947..6b3e4a0a04 100644
--- a/docs/how-to-use-gvfs.md
+++ b/docs/how-to-use-gvfs.md
@@ -70,6 +70,7 @@ the path mapping and convert automatically.
 | `fs.gravitino.operations.class`                       | The operations class 
to provide the FS operations for the Gravitino Virtual File System. Users can 
extends `BaseGVFSOperations` to implement their own operations and configure 
the class name in this conf to use custom FS operations.                        
                                                     | 
`org.apache.gravitino.filesystem.hadoop.DefaultGVFSOperations` | No             
                     | 0.9.0-incubating |
 | `fs.gravitino.hook.class`                             | The hook class to 
inject into the <br/>Gravitino Virtual File System. Users can implement their 
own `GravitinoVirtualFileSystemHook` and configure the class name in this conf 
to inject custom code.                                                          
                                                      | 
`org.apache.gravitino.filesystem.hadoop.NoOpHook`              | No             
                     | 0.9.0-incubating |
 | `fs.gravitino.client.request.header.`                 | The configuration 
key prefix for the Gravitino client request header. You can set the request 
header for the Gravitino client.                                                
                                                                                
                                                       | (none)                 
                                        | No                                  | 
0.9.0-incubating |
+| `fs.gravitino.enableCredentialVending`                | Whether to enable 
credential vending for the Gravitino Virtual File System.                       
                                                                                
                                                                                
                                                   | `false`                    
                                    | No                                  | 
0.9.0-incubating |
 
 Apart from the above properties, to access fileset like S3, GCS, OSS and 
custom fileset, extra properties are needed, please see 
 [S3 GVFS Java client 
configurations](./hadoop-catalog-with-s3.md#using-the-gvfs-java-client-to-access-the-fileset),
 [GCS GVFS Java client 
configurations](./hadoop-catalog-with-gcs.md#using-the-gvfs-java-client-to-access-the-fileset),
 [OSS GVFS Java client 
configurations](./hadoop-catalog-with-oss.md#using-the-gvfs-java-client-to-access-the-fileset)
 and [Azure Blob Storage GVFS Java client 
configurations](./hadoop-catalog-with-adls.md#using-the-gvfs-java-client-to-access-the-fileset)
 for  [...]
@@ -365,6 +366,7 @@ to recompile the native libraries like `libhdfs` and 
others, and completely repl
 | `operations_class`              | The operations class to provide the FS 
operations for the Gravitino Virtual File System. Users can extends 
`BaseGVFSOperations` to implement their own operations and configure the class 
name in this conf to use custom FS operations.                                  
                                   | 
`gravitino.filesystem.gvfs_default_operations.DefaultGVFSOperations` | No       
                         | 0.9.0-incubating |
 | `hook_class`                    | The hook class to inject into the 
Gravitino Virtual File System. Users can implement their own 
`GravitinoVirtualFileSystemHook` and configure the class name in this conf to 
inject custom code.                                                             
                                                | 
`gravitino.filesystem.gvfs_hook.NoOpHook`                            | No       
                         | 0.9.0-incubating |
 | `client_request_header_`        | The configuration key prefix for the 
Gravitino client request header. You can set the request header for the 
Gravitino client.                                                               
                                                                                
                                | (none)                                        
                       | No                                | 0.9.0-incubating |
+| `enable_credential_vending`     | Whether to enable credential vending for 
the Gravitino Virtual File System.                                              
                                                                                
                                                                                
                    | `false`                                                   
           | No                                | 0.9.0-incubating |
 
 #### Configurations for S3, GCS, OSS and Azure Blob storage fileset
 
diff --git a/server-common/build.gradle.kts b/server-common/build.gradle.kts
index ab4908ff31..5403d873c1 100644
--- a/server-common/build.gradle.kts
+++ b/server-common/build.gradle.kts
@@ -26,6 +26,7 @@ plugins {
 
 dependencies {
   implementation(project(":api"))
+  implementation(project(":catalogs:catalog-common"))
   implementation(project(":common")) {
     exclude("com.fasterxml.jackson.core")
     exclude("com.fasterxml.jackson.datatype")
diff --git 
a/server-common/src/main/java/org/apache/gravitino/server/web/Utils.java 
b/server-common/src/main/java/org/apache/gravitino/server/web/Utils.java
index 80c2cd537d..2e50150bd4 100644
--- a/server-common/src/main/java/org/apache/gravitino/server/web/Utils.java
+++ b/server-common/src/main/java/org/apache/gravitino/server/web/Utils.java
@@ -32,6 +32,7 @@ import org.apache.gravitino.audit.FilesetAuditConstants;
 import org.apache.gravitino.audit.FilesetDataOperation;
 import org.apache.gravitino.audit.InternalClientType;
 import org.apache.gravitino.auth.AuthConstants;
+import org.apache.gravitino.credential.CredentialConstants;
 import org.apache.gravitino.dto.responses.ErrorResponse;
 import org.apache.gravitino.utils.PrincipalUtils;
 
@@ -222,4 +223,16 @@ public class Utils {
     }
     return filteredHeaders;
   }
+
+  public static Map<String, String> 
filterFilesetCredentialHeaders(HttpServletRequest httpRequest) {
+    Map<String, String> filteredHeaders = Maps.newHashMap();
+
+    String currentLocationName =
+        
httpRequest.getHeader(CredentialConstants.HTTP_HEADER_CURRENT_LOCATION_NAME);
+    if (StringUtils.isNotBlank(currentLocationName)) {
+      filteredHeaders.put(
+          CredentialConstants.HTTP_HEADER_CURRENT_LOCATION_NAME, 
currentLocationName);
+    }
+    return filteredHeaders;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/rest/MetadataObjectCredentialOperations.java
 
b/server/src/main/java/org/apache/gravitino/server/web/rest/MetadataObjectCredentialOperations.java
index 1046bbba1a..438ad0b645 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/rest/MetadataObjectCredentialOperations.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/rest/MetadataObjectCredentialOperations.java
@@ -24,6 +24,7 @@ import com.codahale.metrics.annotation.Timed;
 import com.google.common.collect.ImmutableSet;
 import java.util.List;
 import java.util.Locale;
+import java.util.Map;
 import java.util.Set;
 import javax.inject.Inject;
 import javax.servlet.http.HttpServletRequest;
@@ -37,6 +38,7 @@ import javax.ws.rs.core.Response;
 import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.MetadataObjects;
 import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.audit.CallerContext;
 import org.apache.gravitino.credential.Credential;
 import org.apache.gravitino.credential.CredentialOperationDispatcher;
 import org.apache.gravitino.dto.credential.CredentialDTO;
@@ -95,6 +97,17 @@ public class MetadataObjectCredentialOperations {
             }
 
             NameIdentifier identifier = 
MetadataObjectUtil.toEntityIdent(metalake, object);
+            Map<String, String> filteredFilesetHeaders =
+                Utils.filterFilesetCredentialHeaders(httpRequest);
+            // set the fileset info into the thread local context
+            if (!filteredFilesetHeaders.isEmpty()) {
+              CallerContext context =
+                  
CallerContext.builder().withContext(filteredFilesetHeaders).build();
+              CallerContext.CallerContextHolder.set(context);
+              LOG.info(
+                  "Set the caller context for getting credential: {}",
+                  context.context().toString());
+            }
             List<Credential> credentials = 
credentialOperationDispatcher.getCredentials(identifier);
             if (credentials == null) {
               return Utils.ok(new CredentialResponse(new CredentialDTO[0]));
@@ -105,6 +118,9 @@ public class MetadataObjectCredentialOperations {
           });
     } catch (Exception e) {
       return ExceptionHandlers.handleCredentialException(OperationType.GET, 
fullName, e);
+    } finally {
+      // Clear the caller context
+      CallerContext.CallerContextHolder.remove();
     }
   }
 

Reply via email to