This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/branch-0.9 by this push:
new 4156d9e78f [#6981] feat(credential): support specifying location when
get fileset credential (#7028)
4156d9e78f is described below
commit 4156d9e78f1bcafbaea8794fb30f5bdfbf2512a9
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Apr 22 11:51:36 2025 +0800
[#6981] feat(credential): support specifying location when get fileset
credential (#7028)
### What changes were proposed in this pull request?
- using the HTTP header to specify the location name when getting
crednetial of the fileset
- gvfs automatically retrieves the corresponding credentials based on
the current location name.
### Why are the changes needed?
Fix: #6981
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
local run `FilesetCatalogCredentialIT` and
`GravitinoVirtualFileSystemS3CredentialIT`
Co-authored-by: mchades <[email protected]>
---
.../gravitino/credential/CredentialConstants.java | 3 ++
.../catalog/hadoop/HadoopCatalogOperations.java | 26 ++++++----
.../hadoop/SecureHadoopCatalogOperations.java | 59 +++++++++++++++++++---
.../client/MetadataObjectCredentialOperations.java | 24 +++++----
.../gravitino/api/credential/credential.py | 3 ++
.../metadata_object_credential_operations.py | 16 ++++--
.../gravitino/filesystem/gvfs_base_operations.py | 29 +++++++++--
.../gravitino/filesystem/gvfs_config.py | 3 ++
.../integration/test_gvfs_with_abs_credential.py | 1 +
.../integration/test_gvfs_with_gcs_credential.py | 7 +++
.../integration/test_gvfs_with_oss_credential.py | 1 +
.../integration/test_gvfs_with_s3_credential.py | 1 +
.../filesystem/hadoop/BaseGVFSOperations.java | 57 ++++++++++++++++-----
.../GravitinoVirtualFileSystemConfiguration.java | 7 +++
.../test/FilesetCatalogCredentialIT.java | 2 +-
docs/hadoop-catalog-with-adls.md | 5 +-
docs/hadoop-catalog-with-gcs.md | 5 +-
docs/hadoop-catalog-with-oss.md | 5 +-
docs/hadoop-catalog-with-s3.md | 5 +-
docs/how-to-use-gvfs.md | 2 +
server-common/build.gradle.kts | 1 +
.../org/apache/gravitino/server/web/Utils.java | 13 +++++
.../rest/MetadataObjectCredentialOperations.java | 16 ++++++
23 files changed, 240 insertions(+), 51 deletions(-)
diff --git
a/catalogs/catalog-common/src/main/java/org/apache/gravitino/credential/CredentialConstants.java
b/catalogs/catalog-common/src/main/java/org/apache/gravitino/credential/CredentialConstants.java
index a6e0d54bfa..f0a4a967c1 100644
---
a/catalogs/catalog-common/src/main/java/org/apache/gravitino/credential/CredentialConstants.java
+++
b/catalogs/catalog-common/src/main/java/org/apache/gravitino/credential/CredentialConstants.java
@@ -28,5 +28,8 @@ public class CredentialConstants {
public static final String OSS_TOKEN_EXPIRE_IN_SECS =
"oss-token-expire-in-secs";
public static final String ADLS_TOKEN_EXPIRE_IN_SECS =
"adls-token-expire-in-secs";
+ /** The HTTP header used to get the credential from fileset location */
+ public static final String HTTP_HEADER_CURRENT_LOCATION_NAME =
"Current-Location-Name";
+
private CredentialConstants() {}
}
diff --git
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java
index 0b94286690..0aceee05e5 100644
---
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java
+++
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java
@@ -563,13 +563,20 @@ public class HadoopCatalogOperations extends
ManagedSchemaOperations
}
Fileset fileset = loadFileset(ident);
- locationName =
- locationName == null
- ? fileset.properties().get(PROPERTY_DEFAULT_LOCATION_NAME)
- : locationName;
- if (!fileset.storageLocations().containsKey(locationName)) {
+ String targetLocationName;
+ if (locationName == null) {
+ targetLocationName =
+ fileset.storageLocations().size() == 1
+ // to be compatible with the old version, the fileset in old
version only has one
+ // location and does not have the default-location-name property
+ ? fileset.storageLocations().keySet().iterator().next()
+ : fileset.properties().get(PROPERTY_DEFAULT_LOCATION_NAME);
+ } else {
+ targetLocationName = locationName;
+ }
+ if (!fileset.storageLocations().containsKey(targetLocationName)) {
throw new NoSuchLocationNameException(
- "Location name %s does not exist in fileset %s", locationName,
ident);
+ "Location name %s does not exist in fileset %s", targetLocationName,
ident);
}
boolean isSingleFile = false;
@@ -581,7 +588,7 @@ public class HadoopCatalogOperations extends
ManagedSchemaOperations
+ "file location may be a wrong path. Please avoid using Fileset
to manage a single"
+ " file path.");
} else {
- isSingleFile = checkSingleFile(fileset, locationName);
+ isSingleFile = checkSingleFile(fileset, targetLocationName);
}
// if the storage location is a single file, it cannot have sub path to
access.
@@ -629,11 +636,12 @@ public class HadoopCatalogOperations extends
ManagedSchemaOperations
// 1. if the storage location is a single file, we pass the storage
location directly
// 2. if the processed sub path is blank, we pass the storage location
directly
if (isSingleFile || StringUtils.isBlank(processedSubPath)) {
- fileLocation = fileset.storageLocations().get(locationName);
+ fileLocation = fileset.storageLocations().get(targetLocationName);
} else {
// the processed sub path always starts with "/" if it is not blank,
// so we can safely remove the tailing slash if storage location ends
with "/".
- String storageLocation =
removeTrailingSlash(fileset.storageLocations().get(locationName));
+ String storageLocation =
+
removeTrailingSlash(fileset.storageLocations().get(targetLocationName));
fileLocation = String.format("%s%s", storageLocation, processedSubPath);
}
return fileLocation;
diff --git
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/SecureHadoopCatalogOperations.java
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/SecureHadoopCatalogOperations.java
index b57d71d226..4427ff2483 100644
---
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/SecureHadoopCatalogOperations.java
+++
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/SecureHadoopCatalogOperations.java
@@ -31,6 +31,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import javax.security.auth.Subject;
+import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.Catalog;
import org.apache.gravitino.Entity;
import org.apache.gravitino.EntityStore;
@@ -39,6 +40,7 @@ import org.apache.gravitino.Namespace;
import org.apache.gravitino.Schema;
import org.apache.gravitino.SchemaChange;
import org.apache.gravitino.UserPrincipal;
+import org.apache.gravitino.audit.CallerContext;
import org.apache.gravitino.catalog.hadoop.authentication.UserContext;
import org.apache.gravitino.connector.CatalogInfo;
import org.apache.gravitino.connector.CatalogOperations;
@@ -46,6 +48,7 @@ import org.apache.gravitino.connector.HasPropertyMetadata;
import org.apache.gravitino.connector.SupportsSchemas;
import org.apache.gravitino.connector.credential.PathContext;
import org.apache.gravitino.connector.credential.SupportsPathBasedCredentials;
+import org.apache.gravitino.credential.CredentialConstants;
import org.apache.gravitino.credential.CredentialUtils;
import org.apache.gravitino.exceptions.FilesetAlreadyExistsException;
import org.apache.gravitino.exceptions.NoSuchCatalogException;
@@ -262,17 +265,11 @@ public class SecureHadoopCatalogOperations
@Override
public List<PathContext> getPathContext(NameIdentifier filesetIdentifier) {
Fileset fileset = loadFileset(filesetIdentifier);
- Map<String, String> locations = fileset.storageLocations();
- Preconditions.checkArgument(
- locations != null && !locations.isEmpty(),
- "No storage locations found for fileset: " + filesetIdentifier);
-
- // todo: support multiple storage locations
- String path =
locations.get(fileset.properties().get(PROPERTY_DEFAULT_LOCATION_NAME));
+ String path = getTargetLocation(fileset);
Set<String> providers =
CredentialUtils.getCredentialProvidersByOrder(
- () -> fileset.properties(),
+ fileset::properties,
() -> {
Namespace namespace = filesetIdentifier.namespace();
NameIdentifier schemaIdentifier =
@@ -286,6 +283,52 @@ public class SecureHadoopCatalogOperations
.collect(Collectors.toList());
}
+ private String getTargetLocation(Fileset fileset) {
+ CallerContext callerContext = CallerContext.CallerContextHolder.get();
+ String targetLocationName;
+ String targetLocation;
+ if (callerContext != null
+ && callerContext
+ .context()
+
.containsKey(CredentialConstants.HTTP_HEADER_CURRENT_LOCATION_NAME)) {
+ // case 1: target location name is passed in the header
+ targetLocationName =
+
callerContext.context().get(CredentialConstants.HTTP_HEADER_CURRENT_LOCATION_NAME);
+ Preconditions.checkArgument(
+ fileset.storageLocations().containsKey(targetLocationName),
+ "The location name %s is not in the fileset %s, expected location
names are %s",
+ targetLocationName,
+ fileset.name(),
+ fileset.storageLocations().keySet());
+ targetLocation = fileset.storageLocations().get(targetLocationName);
+
+ } else if (fileset.storageLocations().size() == 1) {
+ // case 2: target location name is not passed in the header, but there
is only one location.
+ // note: mainly used for backward compatibility since the old code does
not pass the header
+ // and only supports one location
+ targetLocation = fileset.storageLocations().values().iterator().next();
+ targetLocationName =
fileset.storageLocations().keySet().iterator().next();
+
+ } else {
+ // case 3: target location name is not passed in the header, and there
are multiple locations.
+ // use the default location name
+ targetLocationName =
fileset.properties().get(PROPERTY_DEFAULT_LOCATION_NAME);
+ // this should never happen, but just in case
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(targetLocationName),
+ "The default location name of the fileset %s should not be empty.",
+ fileset.name());
+ targetLocation =
fileset.properties().get(PROPERTY_DEFAULT_LOCATION_NAME);
+ }
+
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(targetLocation),
+ "The location with the location name %s of the fileset %s should not
be empty.",
+ targetLocationName,
+ fileset.name());
+ return targetLocation;
+ }
+
/**
* Add the user to the subject so that we can get the last user in the
subject. Hadoop catalog
* uses this method to pass api user from the client side, so that we can
get the user in the
diff --git
a/clients/client-java/src/main/java/org/apache/gravitino/client/MetadataObjectCredentialOperations.java
b/clients/client-java/src/main/java/org/apache/gravitino/client/MetadataObjectCredentialOperations.java
index b11fd9caf2..58819dd94a 100644
---
a/clients/client-java/src/main/java/org/apache/gravitino/client/MetadataObjectCredentialOperations.java
+++
b/clients/client-java/src/main/java/org/apache/gravitino/client/MetadataObjectCredentialOperations.java
@@ -21,6 +21,7 @@ package org.apache.gravitino.client;
import java.util.Collections;
import java.util.Locale;
import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.audit.CallerContext;
import org.apache.gravitino.credential.Credential;
import org.apache.gravitino.credential.SupportsCredentials;
import org.apache.gravitino.dto.responses.CredentialResponse;
@@ -49,14 +50,19 @@ class MetadataObjectCredentialOperations implements
SupportsCredentials {
@Override
public Credential[] getCredentials() {
- CredentialResponse resp =
- restClient.get(
- credentialRequestPath,
- CredentialResponse.class,
- Collections.emptyMap(),
- ErrorHandlers.credentialErrorHandler());
-
- resp.validate();
- return DTOConverters.fromDTO(resp.getCredentials());
+ try {
+ CallerContext callerContext = CallerContext.CallerContextHolder.get();
+ CredentialResponse resp =
+ restClient.get(
+ credentialRequestPath,
+ CredentialResponse.class,
+ callerContext != null ? callerContext.context() :
Collections.emptyMap(),
+ ErrorHandlers.credentialErrorHandler());
+ resp.validate();
+ return DTOConverters.fromDTO(resp.getCredentials());
+ } finally {
+ // Clear the caller context
+ CallerContext.CallerContextHolder.remove();
+ }
}
}
diff --git a/clients/client-python/gravitino/api/credential/credential.py
b/clients/client-python/gravitino/api/credential/credential.py
index 37b97694a2..2a74f0fd52 100644
--- a/clients/client-python/gravitino/api/credential/credential.py
+++ b/clients/client-python/gravitino/api/credential/credential.py
@@ -22,6 +22,9 @@ from typing import Dict
class Credential(ABC):
"""Represents the credential in Gravitino."""
+ # The HTTP header used to get the credential from fileset location
+ HTTP_HEADER_CURRENT_LOCATION_NAME = "Current-Location-Name"
+
@abstractmethod
def credential_type(self) -> str:
"""The type of the credential.
diff --git
a/clients/client-python/gravitino/client/metadata_object_credential_operations.py
b/clients/client-python/gravitino/client/metadata_object_credential_operations.py
index 7184cd797c..8ae83b0ec1 100644
---
a/clients/client-python/gravitino/client/metadata_object_credential_operations.py
+++
b/clients/client-python/gravitino/client/metadata_object_credential_operations.py
@@ -20,6 +20,7 @@ from typing import List
from gravitino.api.credential.supports_credentials import SupportsCredentials
from gravitino.api.credential.credential import Credential
from gravitino.api.metadata_object import MetadataObject
+from gravitino.audit.caller_context import CallerContext, CallerContextHolder
from gravitino.dto.credential_dto import CredentialDTO
from gravitino.dto.responses.credential_response import CredentialResponse
from gravitino.exceptions.handlers.credential_error_handler import (
@@ -53,10 +54,17 @@ class
MetadataObjectCredentialOperations(SupportsCredentials):
)
def get_credentials(self) -> List[Credential]:
- resp = self._rest_client.get(
- self._request_path,
- error_handler=CREDENTIAL_ERROR_HANDLER,
- )
+ try:
+ caller_context: CallerContext = CallerContextHolder.get()
+ resp = self._rest_client.get(
+ self._request_path,
+ headers=(
+ caller_context.context() if caller_context is not None
else None
+ ),
+ error_handler=CREDENTIAL_ERROR_HANDLER,
+ )
+ finally:
+ CallerContextHolder.remove()
credential_resp = CredentialResponse.from_json(resp.body,
infer_missing=True)
credential_resp.validate()
diff --git a/clients/client-python/gravitino/filesystem/gvfs_base_operations.py
b/clients/client-python/gravitino/filesystem/gvfs_base_operations.py
index 82280a87ac..dc835efbca 100644
--- a/clients/client-python/gravitino/filesystem/gvfs_base_operations.py
+++ b/clients/client-python/gravitino/filesystem/gvfs_base_operations.py
@@ -27,6 +27,7 @@ from fsspec import AbstractFileSystem
from readerwriterlock import rwlock
from gravitino.api.catalog import Catalog
+from gravitino.api.credential.credential import Credential
from gravitino.audit.caller_context import CallerContextHolder, CallerContext
from gravitino.audit.fileset_audit_constants import FilesetAuditConstants
from gravitino.audit.fileset_data_operation import FilesetDataOperation
@@ -75,6 +76,7 @@ class BaseGVFSOperations(ABC):
SLASH = "/"
ENV_CURRENT_LOCATION_NAME_ENV_VAR_DEFAULT = "CURRENT_LOCATION_NAME"
+ ENABLE_CREDENTIAL_VENDING_DEFAULT = False
def __init__(
self,
@@ -121,6 +123,14 @@ class BaseGVFSOperations(ABC):
self._catalog_cache = LRUCache(maxsize=100)
self._catalog_cache_lock = rwlock.RWLockFair()
+ self._enable_credential_vending = (
+ False
+ if options is None
+ else options.get(
+ GVFSConfig.GVFS_FILESYSTEM_ENABLE_CREDENTIAL_VENDING,
+ self.ENABLE_CREDENTIAL_VENDING_DEFAULT,
+ )
+ )
self._current_location_name = self._init_current_location_name()
@property
@@ -369,7 +379,7 @@ class BaseGVFSOperations(ABC):
)
target_location_name = location_name or fileset.properties().get(
fileset.PROPERTY_DEFAULT_LOCATION_NAME
- )
+ ) or fileset.LOCATION_NAME_UNKNOWN
actual_location = fileset.storage_locations().get(target_location_name)
if actual_location is None:
raise NoSuchLocationNameException(
@@ -418,7 +428,7 @@ class BaseGVFSOperations(ABC):
catalog_ident: NameIdentifier = NameIdentifier.of(
self._metalake, identifier.namespace().level(1)
)
- fileset_catalog = self._get_fileset_catalog(catalog_ident)
+ fileset_catalog =
self._get_fileset_catalog(catalog_ident).as_fileset_catalog()
sub_path: str = get_sub_path_from_virtual_path(
identifier, processed_virtual_path
@@ -430,7 +440,7 @@ class BaseGVFSOperations(ABC):
caller_context: CallerContext = CallerContext(context)
CallerContextHolder.set(caller_context)
- return fileset_catalog.as_fileset_catalog().get_file_location(
+ return fileset_catalog.get_file_location(
NameIdentifier.of(identifier.namespace().level(2),
identifier.name()),
sub_path,
location_name,
@@ -474,7 +484,17 @@ class BaseGVFSOperations(ABC):
name_identifier.namespace().level(2),
name_identifier.name()
)
)
- credentials = fileset.support_credentials().get_credentials()
+ if location_name:
+ context = {
+ Credential.HTTP_HEADER_CURRENT_LOCATION_NAME:
location_name,
+ }
+ caller_context: CallerContext = CallerContext(context)
+ CallerContextHolder.set(caller_context)
+ credentials = (
+ fileset.support_credentials().get_credentials()
+ if self._enable_credential_vending
+ else None
+ )
new_cache_value = get_storage_handler_by_path(
actual_file_location
).get_filesystem_with_expiration(
@@ -487,6 +507,7 @@ class BaseGVFSOperations(ABC):
return new_cache_value[1]
finally:
write_lock.release()
+ CallerContextHolder.remove()
def _get_fileset_catalog(self, catalog_ident: NameIdentifier):
read_lock = self._catalog_cache_lock.gen_rlock()
diff --git a/clients/client-python/gravitino/filesystem/gvfs_config.py
b/clients/client-python/gravitino/filesystem/gvfs_config.py
index 159b5ab986..9dd5aab451 100644
--- a/clients/client-python/gravitino/filesystem/gvfs_config.py
+++ b/clients/client-python/gravitino/filesystem/gvfs_config.py
@@ -70,3 +70,6 @@ class GVFSConfig:
# The configuration key prefix for the client request headers.
GVFS_FILESYSTEM_CLIENT_REQUEST_HEADER_PREFIX = "client_request_header_"
+
+ # The configuration key for whether to enable credential vending. The
default is false.
+ GVFS_FILESYSTEM_ENABLE_CREDENTIAL_VENDING = "enable_credential_vending"
diff --git
a/clients/client-python/tests/integration/test_gvfs_with_abs_credential.py
b/clients/client-python/tests/integration/test_gvfs_with_abs_credential.py
index 9071679fb7..a2fc0d2868 100644
--- a/clients/client-python/tests/integration/test_gvfs_with_abs_credential.py
+++ b/clients/client-python/tests/integration/test_gvfs_with_abs_credential.py
@@ -67,6 +67,7 @@ class TestGvfsWithCredentialABS(TestGvfsWithABS):
self.options = {
GVFSConfig.GVFS_FILESYSTEM_AZURE_ACCOUNT_NAME:
self.azure_abs_account_name,
GVFSConfig.GVFS_FILESYSTEM_AZURE_ACCOUNT_KEY:
self.azure_abs_account_key,
+ GVFSConfig.GVFS_FILESYSTEM_ENABLE_CREDENTIAL_VENDING: True,
}
@classmethod
diff --git
a/clients/client-python/tests/integration/test_gvfs_with_gcs_credential.py
b/clients/client-python/tests/integration/test_gvfs_with_gcs_credential.py
index eec502a13b..00c0237ead 100644
--- a/clients/client-python/tests/integration/test_gvfs_with_gcs_credential.py
+++ b/clients/client-python/tests/integration/test_gvfs_with_gcs_credential.py
@@ -24,6 +24,7 @@ from gcsfs import GCSFileSystem
from gravitino import Catalog, Fileset, GravitinoClient
from gravitino.filesystem import gvfs
+from gravitino.filesystem.gvfs_config import GVFSConfig
from tests.integration.test_gvfs_with_gcs import TestGvfsWithGCS
logger = logging.getLogger(__name__)
@@ -46,6 +47,12 @@ class TestGvfsWithGCSCredential(TestGvfsWithGCS):
bucket_name = os.environ.get("GCS_BUCKET_NAME_FOR_CREDENTIAL")
metalake_name: str = "TestGvfsWithGCSCredential_metalake" + str(randint(1,
10000))
+ def setUp(self):
+ self.options = {
+ f"{GVFSConfig.GVFS_FILESYSTEM_GCS_SERVICE_KEY_FILE}":
self.key_file,
+ GVFSConfig.GVFS_FILESYSTEM_ENABLE_CREDENTIAL_VENDING: True,
+ }
+
@classmethod
def _init_test_entities(cls):
cls.gravitino_admin_client.create_metalake(
diff --git
a/clients/client-python/tests/integration/test_gvfs_with_oss_credential.py
b/clients/client-python/tests/integration/test_gvfs_with_oss_credential.py
index 14b8b52312..18a3eec152 100644
--- a/clients/client-python/tests/integration/test_gvfs_with_oss_credential.py
+++ b/clients/client-python/tests/integration/test_gvfs_with_oss_credential.py
@@ -68,6 +68,7 @@ class TestGvfsWithOSSCredential(TestGvfsWithOSS):
f"{GVFSConfig.GVFS_FILESYSTEM_OSS_ACCESS_KEY}":
self.oss_access_key,
f"{GVFSConfig.GVFS_FILESYSTEM_OSS_SECRET_KEY}":
self.oss_secret_key,
f"{GVFSConfig.GVFS_FILESYSTEM_OSS_ENDPOINT}": self.oss_endpoint,
+ GVFSConfig.GVFS_FILESYSTEM_ENABLE_CREDENTIAL_VENDING: True,
}
@classmethod
diff --git
a/clients/client-python/tests/integration/test_gvfs_with_s3_credential.py
b/clients/client-python/tests/integration/test_gvfs_with_s3_credential.py
index 35d88c2c82..5ee41fb5c0 100644
--- a/clients/client-python/tests/integration/test_gvfs_with_s3_credential.py
+++ b/clients/client-python/tests/integration/test_gvfs_with_s3_credential.py
@@ -65,6 +65,7 @@ class TestGvfsWithS3Credential(TestGvfsWithS3):
f"{GVFSConfig.GVFS_FILESYSTEM_S3_ACCESS_KEY}": self.s3_access_key,
f"{GVFSConfig.GVFS_FILESYSTEM_S3_SECRET_KEY}": self.s3_secret_key,
f"{GVFSConfig.GVFS_FILESYSTEM_S3_ENDPOINT}": self.s3_endpoint,
+ GVFSConfig.GVFS_FILESYSTEM_ENABLE_CREDENTIAL_VENDING: True,
}
@classmethod
diff --git
a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/BaseGVFSOperations.java
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/BaseGVFSOperations.java
index a79af67a23..66f1db9e73 100644
---
a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/BaseGVFSOperations.java
+++
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/BaseGVFSOperations.java
@@ -63,6 +63,7 @@ import
org.apache.gravitino.catalog.hadoop.fs.GravitinoFileSystemCredentialsProv
import org.apache.gravitino.catalog.hadoop.fs.SupportsCredentialVending;
import org.apache.gravitino.client.GravitinoClient;
import org.apache.gravitino.credential.Credential;
+import org.apache.gravitino.credential.CredentialConstants;
import org.apache.gravitino.exceptions.CatalogNotInUseException;
import org.apache.gravitino.exceptions.GravitinoRuntimeException;
import org.apache.gravitino.exceptions.NoSuchCatalogException;
@@ -130,6 +131,8 @@ public abstract class BaseGVFSOperations implements
Closeable {
private final long defaultBlockSize;
+ private final boolean enableCredentialVending;
+
/**
* Constructs a new {@link BaseGVFSOperations} with the given {@link
Configuration}.
*
@@ -165,6 +168,10 @@ public abstract class BaseGVFSOperations implements
Closeable {
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_BLOCK_SIZE,
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_BLOCK_SIZE_DEFAULT);
+ this.enableCredentialVending =
+ configuration.getBoolean(
+
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_ENABLE_CREDENTIAL_VENDING,
+
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_ENABLE_CREDENTIAL_VENDING_DEFAULT);
this.conf = configuration;
}
@@ -369,6 +376,15 @@ public abstract class BaseGVFSOperations implements
Closeable {
return defaultBlockSize;
}
+ /**
+ * Whether to enable credential vending.
+ *
+ * @return true if credential vending is enabled, false otherwise.
+ */
+ protected boolean enableCredentialVending() {
+ return enableCredentialVending;
+ }
+
/**
* Get the actual file path by the given virtual path and location name.
*
@@ -385,15 +401,15 @@ public abstract class BaseGVFSOperations implements
Closeable {
String subPath = getSubPathFromGvfsPath(filesetIdent, gvfsPath.toString());
NameIdentifier catalogIdent =
NameIdentifier.of(filesetIdent.namespace().level(0),
filesetIdent.namespace().level(1));
- setCallerContext(operation);
String fileLocation;
try {
+ FilesetCatalog filesetCatalog = getFilesetCatalog(catalogIdent);
+ setCallerContextForGetFileLocation(operation);
fileLocation =
- getFilesetCatalog(catalogIdent)
- .getFileLocation(
- NameIdentifier.of(filesetIdent.namespace().level(2),
filesetIdent.name()),
- subPath,
- locationName);
+ filesetCatalog.getFileLocation(
+ NameIdentifier.of(filesetIdent.namespace().level(2),
filesetIdent.name()),
+ subPath,
+ locationName);
} catch (NoSuchCatalogException | CatalogNotInUseException e) {
String message = String.format("Cannot get fileset catalog by
identifier: %s", catalogIdent);
LOG.warn(message, e);
@@ -527,7 +543,7 @@ public abstract class BaseGVFSOperations implements
Closeable {
return internalFileSystemCache;
}
- private void setCallerContext(FilesetDataOperation operation) {
+ private void setCallerContextForGetFileLocation(FilesetDataOperation
operation) {
Map<String, String> contextMap = Maps.newHashMap();
contextMap.put(
FilesetAuditConstants.HTTP_HEADER_INTERNAL_CLIENT_TYPE,
@@ -537,6 +553,13 @@ public abstract class BaseGVFSOperations implements
Closeable {
CallerContext.CallerContextHolder.set(callerContext);
}
+ private void setCallerContextForGetCredentials(String locationName) {
+ Map<String, String> contextMap = Maps.newHashMap();
+ contextMap.put(CredentialConstants.HTTP_HEADER_CURRENT_LOCATION_NAME,
locationName);
+ CallerContext callerContext =
CallerContext.builder().withContext(contextMap).build();
+ CallerContext.CallerContextHolder.set(callerContext);
+ }
+
private FileSystem getActualFileSystemByLocationName(
NameIdentifier filesetIdent, String locationName) throws
FilesetPathNotFoundException {
NameIdentifier catalogIdent =
@@ -560,7 +583,8 @@ public abstract class BaseGVFSOperations implements
Closeable {
Path targetLocation = new
Path(fileset.storageLocations().get(targetLocationName));
Map<String, String> allProperties =
- getAllProperties(cacheKey.getLeft(),
targetLocation.toUri().getScheme());
+ getAllProperties(
+ cacheKey.getLeft(), targetLocation.toUri().getScheme(),
targetLocationName);
FileSystem actualFileSystem =
getActualFileSystemByPath(targetLocation, allProperties);
@@ -689,7 +713,8 @@ public abstract class BaseGVFSOperations implements
Closeable {
return cacheBuilder.build();
}
- private Map<String, String> getAllProperties(NameIdentifier filesetIdent,
String scheme) {
+ private Map<String, String> getAllProperties(
+ NameIdentifier filesetIdent, String scheme, String locationName) {
Catalog catalog =
(Catalog)
getFilesetCatalog(
@@ -698,8 +723,11 @@ public abstract class BaseGVFSOperations implements
Closeable {
Map<String, String> allProperties =
getNecessaryProperties(catalog.properties());
allProperties.putAll(getConfigMap(conf));
- allProperties.putAll(
- getCredentialProperties(getFileSystemProviderByScheme(scheme),
filesetIdent));
+ if (enableCredentialVending()) {
+ allProperties.putAll(
+ getCredentialProperties(
+ getFileSystemProviderByScheme(scheme), filesetIdent,
locationName));
+ }
return allProperties;
}
@@ -710,7 +738,9 @@ public abstract class BaseGVFSOperations implements
Closeable {
}
private Map<String, String> getCredentialProperties(
- FileSystemProvider fileSystemProvider, NameIdentifier filesetIdentifier)
{
+ FileSystemProvider fileSystemProvider,
+ NameIdentifier filesetIdentifier,
+ String locationName) {
// Do not support credential vending, we do not need to add any credential
properties.
if (!(fileSystemProvider instanceof SupportsCredentialVending)) {
return ImmutableMap.of();
@@ -719,6 +749,7 @@ public abstract class BaseGVFSOperations implements
Closeable {
ImmutableMap.Builder<String, String> mapBuilder = ImmutableMap.builder();
try {
Fileset fileset = getFileset(filesetIdentifier);
+ setCallerContextForGetCredentials(locationName);
Credential[] credentials =
fileset.supportsCredentials().getCredentials();
if (credentials.length > 0) {
mapBuilder.put(
@@ -734,6 +765,8 @@ public abstract class BaseGVFSOperations implements
Closeable {
}
} catch (Exception e) {
throw new RuntimeException(e);
+ } finally {
+ CallerContext.CallerContextHolder.remove();
}
return mapBuilder.build();
diff --git
a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemConfiguration.java
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemConfiguration.java
index f541a3492c..ef84b39014 100644
---
a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemConfiguration.java
+++
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemConfiguration.java
@@ -139,5 +139,12 @@ public class GravitinoVirtualFileSystemConfiguration {
public static final String FS_GRAVITINO_CLIENT_REQUEST_HEADER_PREFIX =
"fs.gravitino.client.request.header.";
+ /** The configuration key for whether to enable credential vending. The
default is false. */
+ public static final String FS_GRAVITINO_ENABLE_CREDENTIAL_VENDING =
+ "fs.gravitino.enableCredentialVending";
+
+ /** The default value for whether to enable credential vending. */
+ public static final boolean FS_GRAVITINO_ENABLE_CREDENTIAL_VENDING_DEFAULT =
false;
+
private GravitinoVirtualFileSystemConfiguration() {}
}
diff --git
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/FilesetCatalogCredentialIT.java
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/FilesetCatalogCredentialIT.java
index 3dc3ad82ae..5fc8cb9bd4 100644
---
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/FilesetCatalogCredentialIT.java
+++
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/FilesetCatalogCredentialIT.java
@@ -152,6 +152,6 @@ public class FilesetCatalogCredentialIT extends BaseIT {
Fileset fileset = catalog.asFilesetCatalog().loadFileset(filesetIdent);
Credential[] credentials = fileset.supportsCredentials().getCredentials();
Assertions.assertEquals(1, credentials.length);
- Assertions.assertTrue(credentials[0] instanceof S3TokenCredential);
+ Assertions.assertInstanceOf(S3TokenCredential.class, credentials[0]);
}
}
diff --git a/docs/hadoop-catalog-with-adls.md b/docs/hadoop-catalog-with-adls.md
index 6e6b682ece..21d64af84d 100644
--- a/docs/hadoop-catalog-with-adls.md
+++ b/docs/hadoop-catalog-with-adls.md
@@ -506,12 +506,14 @@ curl -X POST -H "Accept:
application/vnd.gravitino.v1+json" \
### How to access ADLS fileset with credential vending
-If the catalog has been configured with credential, you can access ADLS
fileset without providing authentication information via GVFS Java/Python
client and Spark. Let's see how to access ADLS fileset with credential:
+When the catalog is configured with credentials and client-side credential
vending is enabled,
+you can access ADLS filesets directly using the GVFS Java/Python client or
Spark without providing authentication details.
GVFS Java client:
```java
Configuration conf = new Configuration();
+conf.setBoolean("fs.gravitino.enableCredentialVending", true);
conf.set("fs.AbstractFileSystem.gvfs.impl",
"org.apache.gravitino.filesystem.hadoop.Gvfs");
conf.set("fs.gvfs.impl",
"org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystem");
conf.set("fs.gravitino.server.uri", "http://localhost:8090");
@@ -528,6 +530,7 @@ Spark:
```python
spark = SparkSession.builder
.appName("adls_fielset_test")
+ .config("spark.hadoop.fs.gravitino.enableCredentialVending", "true")
.config("spark.hadoop.fs.AbstractFileSystem.gvfs.impl",
"org.apache.gravitino.filesystem.hadoop.Gvfs")
.config("spark.hadoop.fs.gvfs.impl",
"org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystem")
.config("spark.hadoop.fs.gravitino.server.uri", "http://localhost:8090")
diff --git a/docs/hadoop-catalog-with-gcs.md b/docs/hadoop-catalog-with-gcs.md
index cd9c34aa94..c89c380218 100644
--- a/docs/hadoop-catalog-with-gcs.md
+++ b/docs/hadoop-catalog-with-gcs.md
@@ -481,12 +481,14 @@ curl -X POST -H "Accept:
application/vnd.gravitino.v1+json" \
### How to access GCS fileset with credential vending
-If the catalog has been configured with credential, you can access GCS fileset
without providing authentication information via GVFS Java/Python client and
Spark. Let's see how to access GCS fileset with credential:
+When the catalog is configured with credentials and client-side credential
vending is enabled,
+you can access GCS filesets directly using the GVFS Java/Python client or
Spark without providing authentication details.
GVFS Java client:
```java
Configuration conf = new Configuration();
+conf.setBoolean("fs.gravitino.enableCredentialVending", true);
conf.set("fs.AbstractFileSystem.gvfs.impl",
"org.apache.gravitino.filesystem.hadoop.Gvfs");
conf.set("fs.gvfs.impl",
"org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystem");
conf.set("fs.gravitino.server.uri", "http://localhost:8090");
@@ -503,6 +505,7 @@ Spark:
```python
spark = SparkSession.builder
.appName("gcs_fileset_test")
+ .config("spark.hadoop.fs.gravitino.enableCredentialVending", "true")
.config("spark.hadoop.fs.AbstractFileSystem.gvfs.impl",
"org.apache.gravitino.filesystem.hadoop.Gvfs")
.config("spark.hadoop.fs.gvfs.impl",
"org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystem")
.config("spark.hadoop.fs.gravitino.server.uri", "http://localhost:8090")
diff --git a/docs/hadoop-catalog-with-oss.md b/docs/hadoop-catalog-with-oss.md
index b72793a2a0..7d2f05caf9 100644
--- a/docs/hadoop-catalog-with-oss.md
+++ b/docs/hadoop-catalog-with-oss.md
@@ -520,12 +520,14 @@ curl -X POST -H "Accept:
application/vnd.gravitino.v1+json" \
### How to access OSS fileset with credential vending
-If the catalog has been configured with credential, you can access OSS fileset
without providing authentication information via GVFS Java/Python client and
Spark. Let's see how to access OSS fileset with credential:
+When the catalog is configured with credentials and client-side credential
vending is enabled,
+you can access OSS filesets directly using the GVFS Java/Python client or
Spark without providing authentication details.
GVFS Java client:
```java
Configuration conf = new Configuration();
+conf.setBoolean("fs.gravitino.enableCredentialVending", true);
conf.set("fs.AbstractFileSystem.gvfs.impl",
"org.apache.gravitino.filesystem.hadoop.Gvfs");
conf.set("fs.gvfs.impl",
"org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystem");
conf.set("fs.gravitino.server.uri", "http://localhost:8090");
@@ -542,6 +544,7 @@ Spark:
```python
spark = SparkSession.builder
.appName("oss_fileset_test")
+ .config("spark.hadoop.fs.gravitino.enableCredentialVending", "true")
.config("spark.hadoop.fs.AbstractFileSystem.gvfs.impl",
"org.apache.gravitino.filesystem.hadoop.Gvfs")
.config("spark.hadoop.fs.gvfs.impl",
"org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystem")
.config("spark.hadoop.fs.gravitino.server.uri", "http://localhost:8090")
diff --git a/docs/hadoop-catalog-with-s3.md b/docs/hadoop-catalog-with-s3.md
index caf48dc549..7d5a7bcadd 100644
--- a/docs/hadoop-catalog-with-s3.md
+++ b/docs/hadoop-catalog-with-s3.md
@@ -523,12 +523,14 @@ curl -X POST -H "Accept:
application/vnd.gravitino.v1+json" \
### How to access S3 fileset with credential vending
-If the catalog has been configured with credential, you can access S3 fileset
without providing authentication information via GVFS Java/Python client and
Spark. Let's see how to access S3 fileset with credential:
+When the catalog is configured with credentials and client-side credential
vending is enabled,
+you can access S3 filesets directly using the GVFS Java/Python client or Spark
without providing authentication details.
GVFS Java client:
```java
Configuration conf = new Configuration();
+conf.setBoolean("fs.gravitino.enableCredentialVending", true);
conf.set("fs.AbstractFileSystem.gvfs.impl",
"org.apache.gravitino.filesystem.hadoop.Gvfs");
conf.set("fs.gvfs.impl",
"org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystem");
conf.set("fs.gravitino.server.uri", "http://localhost:8090");
@@ -545,6 +547,7 @@ Spark:
```python
spark = SparkSession.builder
.appName("s3_fileset_test")
+ .config("spark.hadoop.fs.gravitino.enableCredentialVending", "true")
.config("spark.hadoop.fs.AbstractFileSystem.gvfs.impl",
"org.apache.gravitino.filesystem.hadoop.Gvfs")
.config("spark.hadoop.fs.gvfs.impl",
"org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystem")
.config("spark.hadoop.fs.gravitino.server.uri", "http://localhost:8090")
diff --git a/docs/how-to-use-gvfs.md b/docs/how-to-use-gvfs.md
index 996c61f947..6b3e4a0a04 100644
--- a/docs/how-to-use-gvfs.md
+++ b/docs/how-to-use-gvfs.md
@@ -70,6 +70,7 @@ the path mapping and convert automatically.
| `fs.gravitino.operations.class` | The operations class
to provide the FS operations for the Gravitino Virtual File System. Users can
extends `BaseGVFSOperations` to implement their own operations and configure
the class name in this conf to use custom FS operations.
|
`org.apache.gravitino.filesystem.hadoop.DefaultGVFSOperations` | No
| 0.9.0-incubating |
| `fs.gravitino.hook.class` | The hook class to
inject into the <br/>Gravitino Virtual File System. Users can implement their
own `GravitinoVirtualFileSystemHook` and configure the class name in this conf
to inject custom code.
|
`org.apache.gravitino.filesystem.hadoop.NoOpHook` | No
| 0.9.0-incubating |
| `fs.gravitino.client.request.header.` | The configuration
key prefix for the Gravitino client request header. You can set the request
header for the Gravitino client.
| (none)
| No |
0.9.0-incubating |
+| `fs.gravitino.enableCredentialVending` | Whether to enable
credential vending for the Gravitino Virtual File System.
| `false`
| No |
0.9.0-incubating |
Apart from the above properties, to access fileset like S3, GCS, OSS and
custom fileset, extra properties are needed, please see
[S3 GVFS Java client
configurations](./hadoop-catalog-with-s3.md#using-the-gvfs-java-client-to-access-the-fileset),
[GCS GVFS Java client
configurations](./hadoop-catalog-with-gcs.md#using-the-gvfs-java-client-to-access-the-fileset),
[OSS GVFS Java client
configurations](./hadoop-catalog-with-oss.md#using-the-gvfs-java-client-to-access-the-fileset)
and [Azure Blob Storage GVFS Java client
configurations](./hadoop-catalog-with-adls.md#using-the-gvfs-java-client-to-access-the-fileset)
for [...]
@@ -365,6 +366,7 @@ to recompile the native libraries like `libhdfs` and
others, and completely repl
| `operations_class` | The operations class to provide the FS
operations for the Gravitino Virtual File System. Users can extends
`BaseGVFSOperations` to implement their own operations and configure the class
name in this conf to use custom FS operations.
|
`gravitino.filesystem.gvfs_default_operations.DefaultGVFSOperations` | No
| 0.9.0-incubating |
| `hook_class` | The hook class to inject into the
Gravitino Virtual File System. Users can implement their own
`GravitinoVirtualFileSystemHook` and configure the class name in this conf to
inject custom code.
|
`gravitino.filesystem.gvfs_hook.NoOpHook` | No
| 0.9.0-incubating |
| `client_request_header_` | The configuration key prefix for the
Gravitino client request header. You can set the request header for the
Gravitino client.
| (none)
| No | 0.9.0-incubating |
+| `enable_credential_vending` | Whether to enable credential vending for
the Gravitino Virtual File System.
| `false`
| No | 0.9.0-incubating |
#### Configurations for S3, GCS, OSS and Azure Blob storage fileset
diff --git a/server-common/build.gradle.kts b/server-common/build.gradle.kts
index ab4908ff31..5403d873c1 100644
--- a/server-common/build.gradle.kts
+++ b/server-common/build.gradle.kts
@@ -26,6 +26,7 @@ plugins {
dependencies {
implementation(project(":api"))
+ implementation(project(":catalogs:catalog-common"))
implementation(project(":common")) {
exclude("com.fasterxml.jackson.core")
exclude("com.fasterxml.jackson.datatype")
diff --git
a/server-common/src/main/java/org/apache/gravitino/server/web/Utils.java
b/server-common/src/main/java/org/apache/gravitino/server/web/Utils.java
index 80c2cd537d..2e50150bd4 100644
--- a/server-common/src/main/java/org/apache/gravitino/server/web/Utils.java
+++ b/server-common/src/main/java/org/apache/gravitino/server/web/Utils.java
@@ -32,6 +32,7 @@ import org.apache.gravitino.audit.FilesetAuditConstants;
import org.apache.gravitino.audit.FilesetDataOperation;
import org.apache.gravitino.audit.InternalClientType;
import org.apache.gravitino.auth.AuthConstants;
+import org.apache.gravitino.credential.CredentialConstants;
import org.apache.gravitino.dto.responses.ErrorResponse;
import org.apache.gravitino.utils.PrincipalUtils;
@@ -222,4 +223,16 @@ public class Utils {
}
return filteredHeaders;
}
+
+ public static Map<String, String>
filterFilesetCredentialHeaders(HttpServletRequest httpRequest) {
+ Map<String, String> filteredHeaders = Maps.newHashMap();
+
+ String currentLocationName =
+
httpRequest.getHeader(CredentialConstants.HTTP_HEADER_CURRENT_LOCATION_NAME);
+ if (StringUtils.isNotBlank(currentLocationName)) {
+ filteredHeaders.put(
+ CredentialConstants.HTTP_HEADER_CURRENT_LOCATION_NAME,
currentLocationName);
+ }
+ return filteredHeaders;
+ }
}
diff --git
a/server/src/main/java/org/apache/gravitino/server/web/rest/MetadataObjectCredentialOperations.java
b/server/src/main/java/org/apache/gravitino/server/web/rest/MetadataObjectCredentialOperations.java
index 1046bbba1a..438ad0b645 100644
---
a/server/src/main/java/org/apache/gravitino/server/web/rest/MetadataObjectCredentialOperations.java
+++
b/server/src/main/java/org/apache/gravitino/server/web/rest/MetadataObjectCredentialOperations.java
@@ -24,6 +24,7 @@ import com.codahale.metrics.annotation.Timed;
import com.google.common.collect.ImmutableSet;
import java.util.List;
import java.util.Locale;
+import java.util.Map;
import java.util.Set;
import javax.inject.Inject;
import javax.servlet.http.HttpServletRequest;
@@ -37,6 +38,7 @@ import javax.ws.rs.core.Response;
import org.apache.gravitino.MetadataObject;
import org.apache.gravitino.MetadataObjects;
import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.audit.CallerContext;
import org.apache.gravitino.credential.Credential;
import org.apache.gravitino.credential.CredentialOperationDispatcher;
import org.apache.gravitino.dto.credential.CredentialDTO;
@@ -95,6 +97,17 @@ public class MetadataObjectCredentialOperations {
}
NameIdentifier identifier =
MetadataObjectUtil.toEntityIdent(metalake, object);
+ Map<String, String> filteredFilesetHeaders =
+ Utils.filterFilesetCredentialHeaders(httpRequest);
+ // set the fileset info into the thread local context
+ if (!filteredFilesetHeaders.isEmpty()) {
+ CallerContext context =
+
CallerContext.builder().withContext(filteredFilesetHeaders).build();
+ CallerContext.CallerContextHolder.set(context);
+ LOG.info(
+ "Set the caller context for getting credential: {}",
+ context.context().toString());
+ }
List<Credential> credentials =
credentialOperationDispatcher.getCredentials(identifier);
if (credentials == null) {
return Utils.ok(new CredentialResponse(new CredentialDTO[0]));
@@ -105,6 +118,9 @@ public class MetadataObjectCredentialOperations {
});
} catch (Exception e) {
return ExceptionHandlers.handleCredentialException(OperationType.GET,
fullName, e);
+ } finally {
+ // Clear the caller context
+ CallerContext.CallerContextHolder.remove();
}
}