This is an automated email from the ASF dual-hosted git repository.
fanng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 598bc0510 [#5996] feat(python-client): Using credentail in python GVFS
client. (#5997)
598bc0510 is described below
commit 598bc051010490fddd47883ab64e450d36aa3d86
Author: Qi Yu <[email protected]>
AuthorDate: Wed Jan 8 17:04:36 2025 +0800
[#5996] feat(python-client): Using credentail in python GVFS client. (#5997)
### What changes were proposed in this pull request?
Support using credentail in GVFS python client for cloud storage.
### Why are the changes needed?
It's need.
Fix: #5996
### Does this PR introduce _any_ user-facing change?
N/A
### How was this patch tested?
New it and test locally
---
.../gravitino/credential/ADLSTokenCredential.java | 1 +
.../metadata_object_credential_operations.py | 2 +-
clients/client-python/gravitino/filesystem/gvfs.py | 302 +++++++++++++++++----
.../gravitino/filesystem/gvfs_config.py | 8 +
.../tests/integration/test_gvfs_with_abs.py | 6 +-
.../integration/test_gvfs_with_abs_credential.py | 171 ++++++++++++
.../tests/integration/test_gvfs_with_gcs.py | 9 +-
.../integration/test_gvfs_with_gcs_credential.py | 112 ++++++++
.../integration/test_gvfs_with_oss_credential.py | 225 +++++++++++++++
.../integration/test_gvfs_with_s3_credential.py | 151 +++++++++++
docs/how-to-use-gvfs.md | 23 +-
11 files changed, 942 insertions(+), 68 deletions(-)
diff --git
a/api/src/main/java/org/apache/gravitino/credential/ADLSTokenCredential.java
b/api/src/main/java/org/apache/gravitino/credential/ADLSTokenCredential.java
index 249b0ac0b..6f1c46303 100644
--- a/api/src/main/java/org/apache/gravitino/credential/ADLSTokenCredential.java
+++ b/api/src/main/java/org/apache/gravitino/credential/ADLSTokenCredential.java
@@ -74,6 +74,7 @@ public class ADLSTokenCredential implements Credential {
public Map<String, String> credentialInfo() {
return (new ImmutableMap.Builder<String, String>())
.put(GRAVITINO_ADLS_SAS_TOKEN, sasToken)
+ .put(GRAVITINO_AZURE_STORAGE_ACCOUNT_NAME, accountName)
.build();
}
diff --git
a/clients/client-python/gravitino/client/metadata_object_credential_operations.py
b/clients/client-python/gravitino/client/metadata_object_credential_operations.py
index 93d538cfa..7184cd797 100644
---
a/clients/client-python/gravitino/client/metadata_object_credential_operations.py
+++
b/clients/client-python/gravitino/client/metadata_object_credential_operations.py
@@ -48,7 +48,7 @@ class MetadataObjectCredentialOperations(SupportsCredentials):
metadata_object_type = metadata_object.type().value
metadata_object_name = metadata_object.name()
self._request_path = (
- f"api/metalakes/{metalake_name}objects/{metadata_object_type}/"
+ f"api/metalakes/{metalake_name}/objects/{metadata_object_type}/"
f"{metadata_object_name}/credentials"
)
diff --git a/clients/client-python/gravitino/filesystem/gvfs.py
b/clients/client-python/gravitino/filesystem/gvfs.py
index cd9521dc7..0dc020ee9 100644
--- a/clients/client-python/gravitino/filesystem/gvfs.py
+++ b/clients/client-python/gravitino/filesystem/gvfs.py
@@ -14,9 +14,15 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+import logging
+import sys
+
+# Disable C0302: Too many lines in module
+# pylint: disable=C0302
+import time
from enum import Enum
from pathlib import PurePosixPath
-from typing import Dict, Tuple
+from typing import Dict, Tuple, List
import re
import importlib
import fsspec
@@ -28,6 +34,9 @@ from fsspec.implementations.arrow import ArrowFSWrapper
from fsspec.utils import infer_storage_options
from readerwriterlock import rwlock
+
+from gravitino.api.catalog import Catalog
+from gravitino.api.credential.credential import Credential
from gravitino.audit.caller_context import CallerContext, CallerContextHolder
from gravitino.audit.fileset_audit_constants import FilesetAuditConstants
from gravitino.audit.fileset_data_operation import FilesetDataOperation
@@ -35,14 +44,31 @@ from gravitino.audit.internal_client_type import
InternalClientType
from gravitino.auth.default_oauth2_token_provider import
DefaultOAuth2TokenProvider
from gravitino.auth.oauth2_token_provider import OAuth2TokenProvider
from gravitino.auth.simple_auth_provider import SimpleAuthProvider
+from gravitino.client.generic_fileset import GenericFileset
from gravitino.client.fileset_catalog import FilesetCatalog
from gravitino.client.gravitino_client import GravitinoClient
-from gravitino.exceptions.base import GravitinoRuntimeException
+from gravitino.exceptions.base import (
+ GravitinoRuntimeException,
+)
from gravitino.filesystem.gvfs_config import GVFSConfig
from gravitino.name_identifier import NameIdentifier
+from gravitino.api.credential.adls_token_credential import ADLSTokenCredential
+from gravitino.api.credential.azure_account_key_credential import (
+ AzureAccountKeyCredential,
+)
+from gravitino.api.credential.gcs_token_credential import GCSTokenCredential
+from gravitino.api.credential.oss_secret_key_credential import
OSSSecretKeyCredential
+from gravitino.api.credential.oss_token_credential import OSSTokenCredential
+from gravitino.api.credential.s3_secret_key_credential import
S3SecretKeyCredential
+from gravitino.api.credential.s3_token_credential import S3TokenCredential
+
+logger = logging.getLogger(__name__)
+
PROTOCOL_NAME = "gvfs"
+TIME_WITHOUT_EXPIRATION = sys.maxsize
+
class StorageType(Enum):
HDFS = "hdfs"
@@ -677,8 +703,10 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
NameIdentifier.of(identifier.namespace().level(2),
identifier.name()),
sub_path,
)
+
return FilesetContextPair(
- actual_file_location, self._get_filesystem(actual_file_location)
+ actual_file_location,
+ self._get_filesystem(actual_file_location, fileset_catalog,
identifier),
)
def _extract_identifier(self, path):
@@ -866,50 +894,90 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
finally:
write_lock.release()
- def _get_filesystem(self, actual_file_location: str):
+ def _file_system_expired(self, expire_time: int):
+ return expire_time <= time.time() * 1000
+
+ # Disable Too many branches (13/12) (too-many-branches)
+ # pylint: disable=R0912
+ def _get_filesystem(
+ self,
+ actual_file_location: str,
+ fileset_catalog: Catalog,
+ name_identifier: NameIdentifier,
+ ):
storage_type = self._recognize_storage_type(actual_file_location)
read_lock = self._cache_lock.gen_rlock()
try:
read_lock.acquire()
- cache_value: Tuple[StorageType, AbstractFileSystem] =
self._cache.get(
- storage_type
+ cache_value: Tuple[int, AbstractFileSystem] = self._cache.get(
+ name_identifier
)
if cache_value is not None:
- return cache_value
+ if not self._file_system_expired(cache_value[0]):
+ return cache_value[1]
finally:
read_lock.release()
write_lock = self._cache_lock.gen_wlock()
try:
write_lock.acquire()
- cache_value: Tuple[StorageType, AbstractFileSystem] =
self._cache.get(
- storage_type
+ cache_value: Tuple[int, AbstractFileSystem] = self._cache.get(
+ name_identifier
)
+
if cache_value is not None:
- return cache_value
+ if not self._file_system_expired(cache_value[0]):
+ return cache_value[1]
+
+ new_cache_value: Tuple[int, AbstractFileSystem]
if storage_type == StorageType.HDFS:
fs_class =
importlib.import_module("pyarrow.fs").HadoopFileSystem
- fs = ArrowFSWrapper(fs_class.from_uri(actual_file_location))
+ new_cache_value = (
+ TIME_WITHOUT_EXPIRATION,
+ ArrowFSWrapper(fs_class.from_uri(actual_file_location)),
+ )
elif storage_type == StorageType.LOCAL:
- fs = LocalFileSystem()
+ new_cache_value = (TIME_WITHOUT_EXPIRATION, LocalFileSystem())
elif storage_type == StorageType.GCS:
- fs = self._get_gcs_filesystem()
+ new_cache_value = self._get_gcs_filesystem(
+ fileset_catalog, name_identifier
+ )
elif storage_type == StorageType.S3A:
- fs = self._get_s3_filesystem()
+ new_cache_value = self._get_s3_filesystem(
+ fileset_catalog, name_identifier
+ )
elif storage_type == StorageType.OSS:
- fs = self._get_oss_filesystem()
+ new_cache_value = self._get_oss_filesystem(
+ fileset_catalog, name_identifier
+ )
elif storage_type == StorageType.ABS:
- fs = self._get_abs_filesystem()
+ new_cache_value = self._get_abs_filesystem(
+ fileset_catalog, name_identifier
+ )
else:
raise GravitinoRuntimeException(
f"Storage type: `{storage_type}` doesn't support now."
)
- self._cache[storage_type] = fs
- return fs
+ self._cache[name_identifier] = new_cache_value
+ return new_cache_value[1]
finally:
write_lock.release()
- def _get_gcs_filesystem(self):
+ def _get_gcs_filesystem(self, fileset_catalog: Catalog, identifier:
NameIdentifier):
+ fileset: GenericFileset =
fileset_catalog.as_fileset_catalog().load_fileset(
+ NameIdentifier.of(identifier.namespace().level(2),
identifier.name())
+ )
+ credentials = fileset.support_credentials().get_credentials()
+
+ credential = self._get_most_suitable_gcs_credential(credentials)
+ if credential is not None:
+ expire_time =
self._get_expire_time_by_ratio(credential.expire_time_in_ms())
+ if isinstance(credential, GCSTokenCredential):
+ fs = importlib.import_module("gcsfs").GCSFileSystem(
+ token=credential.token()
+ )
+ return (expire_time, fs)
+
# get 'service-account-key' from gcs_options, if the key is not found,
throw an exception
service_account_key_path = self._options.get(
GVFSConfig.GVFS_FILESYSTEM_GCS_SERVICE_KEY_FILE
@@ -918,11 +986,47 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
raise GravitinoRuntimeException(
"Service account key is not found in the options."
)
- return importlib.import_module("gcsfs").GCSFileSystem(
- token=service_account_key_path
+ return (
+ TIME_WITHOUT_EXPIRATION,
+ importlib.import_module("gcsfs").GCSFileSystem(
+ token=service_account_key_path
+ ),
)
- def _get_s3_filesystem(self):
+ def _get_s3_filesystem(self, fileset_catalog: Catalog, identifier:
NameIdentifier):
+ fileset: GenericFileset =
fileset_catalog.as_fileset_catalog().load_fileset(
+ NameIdentifier.of(identifier.namespace().level(2),
identifier.name())
+ )
+ credentials = fileset.support_credentials().get_credentials()
+ credential = self._get_most_suitable_s3_credential(credentials)
+
+ # S3 endpoint from gravitino server, Note: the endpoint may not a real
S3 endpoint
+ # it can be a simulated S3 endpoint, such as minio, so though the
endpoint is not a required field
+ # for S3FileSystem, we still need to assign the endpoint to the
S3FileSystem
+ s3_endpoint = fileset_catalog.properties().get("s3-endpoint", None)
+ # If the oss endpoint is not found in the fileset catalog, get it from
the client options
+ if s3_endpoint is None:
+ s3_endpoint =
self._options.get(GVFSConfig.GVFS_FILESYSTEM_S3_ENDPOINT)
+
+ if credential is not None:
+ expire_time =
self._get_expire_time_by_ratio(credential.expire_time_in_ms())
+ if isinstance(credential, S3TokenCredential):
+ fs = importlib.import_module("s3fs").S3FileSystem(
+ key=credential.access_key_id(),
+ secret=credential.secret_access_key(),
+ token=credential.session_token(),
+ endpoint_url=s3_endpoint,
+ )
+ return (expire_time, fs)
+ if isinstance(credential, S3SecretKeyCredential):
+ fs = importlib.import_module("s3fs").S3FileSystem(
+ key=credential.access_key_id(),
+ secret=credential.secret_access_key(),
+ endpoint_url=s3_endpoint,
+ )
+ return (expire_time, fs)
+
+ # this is the old way to get the s3 file system
# get 'aws_access_key_id' from s3_options, if the key is not found,
throw an exception
aws_access_key_id =
self._options.get(GVFSConfig.GVFS_FILESYSTEM_S3_ACCESS_KEY)
if aws_access_key_id is None:
@@ -939,20 +1043,48 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
"AWS secret access key is not found in the options."
)
- # get 'aws_endpoint_url' from s3_options, if the key is not found,
throw an exception
- aws_endpoint_url =
self._options.get(GVFSConfig.GVFS_FILESYSTEM_S3_ENDPOINT)
- if aws_endpoint_url is None:
- raise GravitinoRuntimeException(
- "AWS endpoint url is not found in the options."
- )
+ return (
+ TIME_WITHOUT_EXPIRATION,
+ importlib.import_module("s3fs").S3FileSystem(
+ key=aws_access_key_id,
+ secret=aws_secret_access_key,
+ endpoint_url=s3_endpoint,
+ ),
+ )
- return importlib.import_module("s3fs").S3FileSystem(
- key=aws_access_key_id,
- secret=aws_secret_access_key,
- endpoint_url=aws_endpoint_url,
+ def _get_oss_filesystem(self, fileset_catalog: Catalog, identifier:
NameIdentifier):
+ fileset: GenericFileset =
fileset_catalog.as_fileset_catalog().load_fileset(
+ NameIdentifier.of(identifier.namespace().level(2),
identifier.name())
)
+ credentials = fileset.support_credentials().get_credentials()
+
+ # OSS endpoint from gravitino server
+ oss_endpoint = fileset_catalog.properties().get("oss-endpoint", None)
+ # If the oss endpoint is not found in the fileset catalog, get it from
the client options
+ if oss_endpoint is None:
+ oss_endpoint =
self._options.get(GVFSConfig.GVFS_FILESYSTEM_OSS_ENDPOINT)
+
+ credential = self._get_most_suitable_oss_credential(credentials)
+ if credential is not None:
+ expire_time =
self._get_expire_time_by_ratio(credential.expire_time_in_ms())
+ if isinstance(credential, OSSTokenCredential):
+ fs = importlib.import_module("ossfs").OSSFileSystem(
+ key=credential.access_key_id(),
+ secret=credential.secret_access_key(),
+ token=credential.security_token(),
+ endpoint=oss_endpoint,
+ )
+ return (expire_time, fs)
+ if isinstance(credential, OSSSecretKeyCredential):
+ return (
+ expire_time,
+ importlib.import_module("ossfs").OSSFileSystem(
+ key=credential.access_key_id(),
+ secret=credential.secret_access_key(),
+ endpoint=oss_endpoint,
+ ),
+ )
- def _get_oss_filesystem(self):
# get 'oss_access_key_id' from oss options, if the key is not found,
throw an exception
oss_access_key_id =
self._options.get(GVFSConfig.GVFS_FILESYSTEM_OSS_ACCESS_KEY)
if oss_access_key_id is None:
@@ -969,20 +1101,38 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
"OSS secret access key is not found in the options."
)
- # get 'oss_endpoint_url' from oss options, if the key is not found,
throw an exception
- oss_endpoint_url =
self._options.get(GVFSConfig.GVFS_FILESYSTEM_OSS_ENDPOINT)
- if oss_endpoint_url is None:
- raise GravitinoRuntimeException(
- "OSS endpoint url is not found in the options."
- )
+ return (
+ TIME_WITHOUT_EXPIRATION,
+ importlib.import_module("ossfs").OSSFileSystem(
+ key=oss_access_key_id,
+ secret=oss_secret_access_key,
+ endpoint=oss_endpoint,
+ ),
+ )
- return importlib.import_module("ossfs").OSSFileSystem(
- key=oss_access_key_id,
- secret=oss_secret_access_key,
- endpoint=oss_endpoint_url,
+ def _get_abs_filesystem(self, fileset_catalog: Catalog, identifier:
NameIdentifier):
+ fileset: GenericFileset =
fileset_catalog.as_fileset_catalog().load_fileset(
+ NameIdentifier.of(identifier.namespace().level(2),
identifier.name())
)
+ credentials = fileset.support_credentials().get_credentials()
+
+ credential = self._get_most_suitable_abs_credential(credentials)
+ if credential is not None:
+ expire_time =
self._get_expire_time_by_ratio(credential.expire_time_in_ms())
+ if isinstance(credential, ADLSTokenCredential):
+ fs = importlib.import_module("adlfs").AzureBlobFileSystem(
+ account_name=credential.account_name(),
+ sas_token=credential.sas_token(),
+ )
+ return (expire_time, fs)
+
+ if isinstance(credential, AzureAccountKeyCredential):
+ fs = importlib.import_module("adlfs").AzureBlobFileSystem(
+ account_name=credential.account_name(),
+ account_key=credential.account_key(),
+ )
+ return (expire_time, fs)
- def _get_abs_filesystem(self):
# get 'abs_account_name' from abs options, if the key is not found,
throw an exception
abs_account_name = self._options.get(
GVFSConfig.GVFS_FILESYSTEM_AZURE_ACCOUNT_NAME
@@ -1001,10 +1151,68 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
"ABS account key is not found in the options."
)
- return importlib.import_module("adlfs").AzureBlobFileSystem(
- account_name=abs_account_name,
- account_key=abs_account_key,
+ return (
+ TIME_WITHOUT_EXPIRATION,
+ importlib.import_module("adlfs").AzureBlobFileSystem(
+ account_name=abs_account_name,
+ account_key=abs_account_key,
+ ),
+ )
+
+ def _get_most_suitable_s3_credential(self, credentials: List[Credential]):
+ for credential in credentials:
+ # Prefer to use the token credential, if it does not exist, use the
+ # secret key credential.
+ if isinstance(credential, S3TokenCredential):
+ return credential
+
+ for credential in credentials:
+ if isinstance(credential, S3SecretKeyCredential):
+ return credential
+ return None
+
+ def _get_most_suitable_oss_credential(self, credentials: List[Credential]):
+ for credential in credentials:
+ # Prefer to use the token credential, if it does not exist, use the
+ # secret key credential.
+ if isinstance(credential, OSSTokenCredential):
+ return credential
+
+ for credential in credentials:
+ if isinstance(credential, OSSSecretKeyCredential):
+ return credential
+ return None
+
+ def _get_most_suitable_gcs_credential(self, credentials: List[Credential]):
+ for credential in credentials:
+ # Prefer to use the token credential, if it does not exist, return
None.
+ if isinstance(credential, GCSTokenCredential):
+ return credential
+ return None
+
+ def _get_most_suitable_abs_credential(self, credentials: List[Credential]):
+ for credential in credentials:
+ # Prefer to use the token credential, if it does not exist, use the
+ # account key credential
+ if isinstance(credential, ADLSTokenCredential):
+ return credential
+
+ for credential in credentials:
+ if isinstance(credential, AzureAccountKeyCredential):
+ return credential
+ return None
+
+ def _get_expire_time_by_ratio(self, expire_time: int):
+ if expire_time <= 0:
+ return TIME_WITHOUT_EXPIRATION
+
+ ratio = float(
+ self._options.get(
+ GVFSConfig.GVFS_FILESYSTEM_CREDENTIAL_EXPIRED_TIME_RATIO,
+ GVFSConfig.DEFAULT_CREDENTIAL_EXPIRED_TIME_RATIO,
+ )
)
+ return time.time() * 1000 + (expire_time - time.time() * 1000) * ratio
fsspec.register_implementation(PROTOCOL_NAME, GravitinoVirtualFileSystem)
diff --git a/clients/client-python/gravitino/filesystem/gvfs_config.py
b/clients/client-python/gravitino/filesystem/gvfs_config.py
index 4261fb48f..6fbd8a99d 100644
--- a/clients/client-python/gravitino/filesystem/gvfs_config.py
+++ b/clients/client-python/gravitino/filesystem/gvfs_config.py
@@ -44,3 +44,11 @@ class GVFSConfig:
GVFS_FILESYSTEM_AZURE_ACCOUNT_NAME = "abs_account_name"
GVFS_FILESYSTEM_AZURE_ACCOUNT_KEY = "abs_account_key"
+
+ # This configuration marks the expired time of the credential. For
instance, if the credential
+ # fetched from Gravitino server has expired time of 3600 seconds, and the
credential_expired_time_ration is 0.5
+ # then the credential will be considered as expired after 1800 seconds and
will try to retrieve a new credential.
+ GVFS_FILESYSTEM_CREDENTIAL_EXPIRED_TIME_RATIO =
"credential_expiration_ratio"
+
+ # The default value of the credential_expired_time_ratio is 0.5
+ DEFAULT_CREDENTIAL_EXPIRED_TIME_RATIO = 0.5
diff --git a/clients/client-python/tests/integration/test_gvfs_with_abs.py
b/clients/client-python/tests/integration/test_gvfs_with_abs.py
index a218efcfd..53c265c53 100644
--- a/clients/client-python/tests/integration/test_gvfs_with_abs.py
+++ b/clients/client-python/tests/integration/test_gvfs_with_abs.py
@@ -33,8 +33,6 @@ from gravitino import (
)
from gravitino.exceptions.base import GravitinoRuntimeException
from gravitino.filesystem.gvfs_config import GVFSConfig
-from gravitino.filesystem.gvfs import StorageType
-
logger = logging.getLogger(__name__)
@@ -281,7 +279,7 @@ class TestGvfsWithABS(TestGvfsWithHDFS):
self.assertFalse(self.fs.exists(mkdir_actual_dir))
self.assertFalse(fs.exists(mkdir_dir))
-
self.assertFalse(self.fs.exists(f"{StorageType.ABS.value}://{new_bucket}"))
+ self.assertFalse(self.fs.exists("abfss://{new_bucket}"))
def test_makedirs(self):
mkdir_dir = self.fileset_gvfs_location + "/test_mkdir"
@@ -309,7 +307,7 @@ class TestGvfsWithABS(TestGvfsWithHDFS):
self.assertFalse(self.fs.exists(mkdir_actual_dir))
self.assertFalse(fs.exists(mkdir_dir))
-
self.assertFalse(self.fs.exists(f"{StorageType.ABS.value}://{new_bucket}"))
+ self.assertFalse(self.fs.exists(f"abfss://{new_bucket}"))
def test_ls(self):
ls_dir = self.fileset_gvfs_location + "/test_ls"
diff --git
a/clients/client-python/tests/integration/test_gvfs_with_abs_credential.py
b/clients/client-python/tests/integration/test_gvfs_with_abs_credential.py
new file mode 100644
index 000000000..9071679fb
--- /dev/null
+++ b/clients/client-python/tests/integration/test_gvfs_with_abs_credential.py
@@ -0,0 +1,171 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import logging
+import os
+from random import randint
+import unittest
+
+
+from adlfs import AzureBlobFileSystem
+
+from gravitino import (
+ gvfs,
+ GravitinoClient,
+ Catalog,
+ Fileset,
+)
+from gravitino.filesystem.gvfs_config import GVFSConfig
+from tests.integration.test_gvfs_with_abs import TestGvfsWithABS
+
+
+logger = logging.getLogger(__name__)
+
+
+def azure_abs_with_credential_is_prepared():
+ return (
+ os.environ.get("ABS_ACCOUNT_NAME_FOR_CREDENTIAL")
+ and os.environ.get("ABS_ACCOUNT_KEY_FOR_CREDENTIAL")
+ and os.environ.get("ABS_CONTAINER_NAME_FOR_CREDENTIAL")
+ and os.environ.get("ABS_TENANT_ID_FOR_CREDENTIAL")
+ and os.environ.get("ABS_CLIENT_ID_FOR_CREDENTIAL")
+ and os.environ.get("ABS_CLIENT_SECRET_FOR_CREDENTIAL")
+ )
+
+
[email protected](
+ azure_abs_with_credential_is_prepared(),
+ "Azure Blob Storage credential test is not prepared.",
+)
+class TestGvfsWithCredentialABS(TestGvfsWithABS):
+ # Before running this test, please set the make sure azure-bundle-xxx.jar
has been
+ # copy to the $GRAVITINO_HOME/catalogs/hadoop/libs/ directory
+ azure_abs_account_key = os.environ.get("ABS_ACCOUNT_KEY_FOR_CREDENTIAL")
+ azure_abs_account_name = os.environ.get("ABS_ACCOUNT_NAME_FOR_CREDENTIAL")
+ azure_abs_container_name =
os.environ.get("ABS_CONTAINER_NAME_FOR_CREDENTIAL")
+ azure_abs_tenant_id = os.environ.get("ABS_TENANT_ID_FOR_CREDENTIAL")
+ azure_abs_client_id = os.environ.get("ABS_CLIENT_ID_FOR_CREDENTIAL")
+ azure_abs_client_secret =
os.environ.get("ABS_CLIENT_SECRET_FOR_CREDENTIAL")
+
+ metalake_name: str = "TestGvfsWithCredentialABS_metalake" + str(randint(1,
10000))
+
+ def setUp(self):
+ self.options = {
+ GVFSConfig.GVFS_FILESYSTEM_AZURE_ACCOUNT_NAME:
self.azure_abs_account_name,
+ GVFSConfig.GVFS_FILESYSTEM_AZURE_ACCOUNT_KEY:
self.azure_abs_account_key,
+ }
+
+ @classmethod
+ def _init_test_entities(cls):
+ cls.gravitino_admin_client.create_metalake(
+ name=cls.metalake_name, comment="", properties={}
+ )
+ cls.gravitino_client = GravitinoClient(
+ uri="http://localhost:8090", metalake_name=cls.metalake_name
+ )
+
+ cls.config = {}
+ cls.conf = {}
+ catalog = cls.gravitino_client.create_catalog(
+ name=cls.catalog_name,
+ catalog_type=Catalog.Type.FILESET,
+ provider=cls.catalog_provider,
+ comment="",
+ properties={
+ "filesystem-providers": "abs",
+ "azure-storage-account-name": cls.azure_abs_account_name,
+ "azure-storage-account-key": cls.azure_abs_account_key,
+ "azure-tenant-id": cls.azure_abs_tenant_id,
+ "azure-client-id": cls.azure_abs_client_id,
+ "azure-client-secret": cls.azure_abs_client_secret,
+ "credential-providers": "adls-token",
+ },
+ )
+ catalog.as_schemas().create_schema(
+ schema_name=cls.schema_name, comment="", properties={}
+ )
+
+ cls.fileset_storage_location: str = (
+
f"{cls.azure_abs_container_name}/{cls.catalog_name}/{cls.schema_name}/{cls.fileset_name}"
+ )
+ cls.fileset_gvfs_location = (
+
f"gvfs://fileset/{cls.catalog_name}/{cls.schema_name}/{cls.fileset_name}"
+ )
+ catalog.as_fileset_catalog().create_fileset(
+ ident=cls.fileset_ident,
+ fileset_type=Fileset.Type.MANAGED,
+ comment=cls.fileset_comment,
+ storage_location=(
+
f"abfss://{cls.azure_abs_container_name}@{cls.azure_abs_account_name}.dfs.core.windows.net/"
+ f"{cls.catalog_name}/{cls.schema_name}/{cls.fileset_name}"
+ ),
+ properties=cls.fileset_properties,
+ )
+
+ cls.fs = AzureBlobFileSystem(
+ account_name=cls.azure_abs_account_name,
+ account_key=cls.azure_abs_account_key,
+ )
+
+ # As the permission provided by the dynamic token is smaller compared to
the permission provided by the static token
+ # like account key and account name, the test case will fail if we do not
override the test case.
+ def test_mkdir(self):
+ mkdir_dir = self.fileset_gvfs_location + "/test_mkdir"
+ mkdir_actual_dir = self.fileset_storage_location + "/test_mkdir"
+ fs = gvfs.GravitinoVirtualFileSystem(
+ server_uri="http://localhost:8090",
+ metalake_name=self.metalake_name,
+ options=self.options,
+ **self.conf,
+ )
+
+ # it actually takes no effect.
+ self.check_mkdir(mkdir_dir, mkdir_actual_dir, fs)
+
+ # check whether it will automatically create the bucket if
'create_parents'
+ # is set to True.
+ new_bucket = self.azure_abs_container_name + "2"
+ mkdir_actual_dir = mkdir_actual_dir.replace(
+ self.azure_abs_container_name, new_bucket
+ )
+ self.fs.mkdir(mkdir_actual_dir, create_parents=True)
+
+ self.assertFalse(self.fs.exists(mkdir_actual_dir))
+
+ self.assertTrue(self.fs.exists(f"abfss://{new_bucket}"))
+
+ def test_makedirs(self):
+ mkdir_dir = self.fileset_gvfs_location + "/test_mkdir"
+ mkdir_actual_dir = self.fileset_storage_location + "/test_mkdir"
+ fs = gvfs.GravitinoVirtualFileSystem(
+ server_uri="http://localhost:8090",
+ metalake_name=self.metalake_name,
+ options=self.options,
+ **self.conf,
+ )
+
+ # it actually takes no effect.
+ self.check_mkdir(mkdir_dir, mkdir_actual_dir, fs)
+
+ # check whether it will automatically create the bucket if
'create_parents'
+ # is set to True.
+ new_bucket = self.azure_abs_container_name + "1"
+ new_mkdir_actual_dir = mkdir_actual_dir.replace(
+ self.azure_abs_container_name, new_bucket
+ )
+ self.fs.makedirs(new_mkdir_actual_dir)
+ self.assertFalse(self.fs.exists(mkdir_actual_dir))
diff --git a/clients/client-python/tests/integration/test_gvfs_with_gcs.py
b/clients/client-python/tests/integration/test_gvfs_with_gcs.py
index 15833aca0..40e83d008 100644
--- a/clients/client-python/tests/integration/test_gvfs_with_gcs.py
+++ b/clients/client-python/tests/integration/test_gvfs_with_gcs.py
@@ -36,7 +36,7 @@ from gravitino.filesystem.gvfs_config import GVFSConfig
logger = logging.getLogger(__name__)
-def oss_is_configured():
+def gcs_is_configured():
return all(
[
os.environ.get("GCS_SERVICE_ACCOUNT_JSON_PATH") is not None,
@@ -45,7 +45,7 @@ def oss_is_configured():
)
[email protected](oss_is_configured(), "GCS is not configured.")
[email protected](gcs_is_configured(), "GCS is not configured.")
class TestGvfsWithGCS(TestGvfsWithHDFS):
# Before running this test, please set the make sure gcp-bundle-x.jar has
been
# copy to the $GRAVITINO_HOME/catalogs/hadoop/libs/ directory
@@ -254,11 +254,10 @@ class TestGvfsWithGCS(TestGvfsWithHDFS):
new_bucket = self.bucket_name + "1"
mkdir_dir = mkdir_dir.replace(self.bucket_name, new_bucket)
mkdir_actual_dir = mkdir_actual_dir.replace(self.bucket_name,
new_bucket)
- fs.mkdir(mkdir_dir, create_parents=True)
+ with self.assertRaises(OSError):
+ fs.mkdir(mkdir_dir, create_parents=True)
self.assertFalse(self.fs.exists(mkdir_actual_dir))
- self.assertFalse(fs.exists(mkdir_dir))
- self.assertFalse(self.fs.exists("gs://" + new_bucket))
def test_makedirs(self):
mkdir_dir = self.fileset_gvfs_location + "/test_mkdir"
diff --git
a/clients/client-python/tests/integration/test_gvfs_with_gcs_credential.py
b/clients/client-python/tests/integration/test_gvfs_with_gcs_credential.py
new file mode 100644
index 000000000..eec502a13
--- /dev/null
+++ b/clients/client-python/tests/integration/test_gvfs_with_gcs_credential.py
@@ -0,0 +1,112 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import logging
+import os
+from random import randint
+import unittest
+
+from gcsfs import GCSFileSystem
+
+from gravitino import Catalog, Fileset, GravitinoClient
+from gravitino.filesystem import gvfs
+from tests.integration.test_gvfs_with_gcs import TestGvfsWithGCS
+
+logger = logging.getLogger(__name__)
+
+
+def gcs_with_credential_is_configured():
+ return all(
+ [
+ os.environ.get("GCS_SERVICE_ACCOUNT_JSON_PATH_FOR_CREDENTIAL") is
not None,
+ os.environ.get("GCS_BUCKET_NAME_FOR_CREDENTIAL") is not None,
+ ]
+ )
+
+
[email protected](gcs_with_credential_is_configured(), "GCS is not
configured.")
+class TestGvfsWithGCSCredential(TestGvfsWithGCS):
+ # Before running this test, please set the make sure gcp-bundle-x.jar has
been
+ # copy to the $GRAVITINO_HOME/catalogs/hadoop/libs/ directory
+ key_file = os.environ.get("GCS_SERVICE_ACCOUNT_JSON_PATH_FOR_CREDENTIAL")
+ bucket_name = os.environ.get("GCS_BUCKET_NAME_FOR_CREDENTIAL")
+ metalake_name: str = "TestGvfsWithGCSCredential_metalake" + str(randint(1,
10000))
+
+ @classmethod
+ def _init_test_entities(cls):
+ cls.gravitino_admin_client.create_metalake(
+ name=cls.metalake_name, comment="", properties={}
+ )
+ cls.gravitino_client = GravitinoClient(
+ uri="http://localhost:8090", metalake_name=cls.metalake_name
+ )
+
+ cls.config = {}
+ cls.conf = {}
+ catalog = cls.gravitino_client.create_catalog(
+ name=cls.catalog_name,
+ catalog_type=Catalog.Type.FILESET,
+ provider=cls.catalog_provider,
+ comment="",
+ properties={
+ "filesystem-providers": "gcs",
+ "gcs-credential-file-path": cls.key_file,
+ "gcs-service-account-file": cls.key_file,
+ "credential-providers": "gcs-token",
+ },
+ )
+ catalog.as_schemas().create_schema(
+ schema_name=cls.schema_name, comment="", properties={}
+ )
+
+ cls.fileset_storage_location: str = (
+
f"gs://{cls.bucket_name}/{cls.catalog_name}/{cls.schema_name}/{cls.fileset_name}"
+ )
+ cls.fileset_gvfs_location = (
+
f"gvfs://fileset/{cls.catalog_name}/{cls.schema_name}/{cls.fileset_name}"
+ )
+ catalog.as_fileset_catalog().create_fileset(
+ ident=cls.fileset_ident,
+ fileset_type=Fileset.Type.MANAGED,
+ comment=cls.fileset_comment,
+ storage_location=cls.fileset_storage_location,
+ properties=cls.fileset_properties,
+ )
+
+ cls.fs = GCSFileSystem(token=cls.key_file)
+
+ def test_mkdir(self):
+ mkdir_dir = self.fileset_gvfs_location + "/test_mkdir"
+ mkdir_actual_dir = self.fileset_storage_location + "/test_mkdir"
+ fs = gvfs.GravitinoVirtualFileSystem(
+ server_uri="http://localhost:8090",
+ metalake_name=self.metalake_name,
+ options=self.options,
+ **self.conf,
+ )
+
+ # it actually takes no effect.
+ self.check_mkdir(mkdir_dir, mkdir_actual_dir, fs)
+
+ # check whether it will automatically create the bucket if
'create_parents'
+ # is set to True.
+ new_bucket = self.bucket_name + "1"
+ mkdir_dir = mkdir_dir.replace(self.bucket_name, new_bucket)
+ mkdir_actual_dir = mkdir_actual_dir.replace(self.bucket_name,
new_bucket)
+
+ fs.mkdir(mkdir_dir, create_parents=True)
+ self.assertFalse(self.fs.exists(mkdir_actual_dir))
diff --git
a/clients/client-python/tests/integration/test_gvfs_with_oss_credential.py
b/clients/client-python/tests/integration/test_gvfs_with_oss_credential.py
new file mode 100644
index 000000000..14b8b5231
--- /dev/null
+++ b/clients/client-python/tests/integration/test_gvfs_with_oss_credential.py
@@ -0,0 +1,225 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import logging
+import os
+from random import randint
+import unittest
+
+
+from ossfs import OSSFileSystem
+
+from gravitino import (
+ GravitinoClient,
+ Catalog,
+ Fileset,
+)
+from gravitino.filesystem import gvfs
+from gravitino.filesystem.gvfs_config import GVFSConfig
+from tests.integration.test_gvfs_with_oss import TestGvfsWithOSS
+
+logger = logging.getLogger(__name__)
+
+
+def oss_with_credential_is_configured():
+ return all(
+ [
+ os.environ.get("OSS_STS_ACCESS_KEY_ID") is not None,
+ os.environ.get("OSS_STS_SECRET_ACCESS_KEY") is not None,
+ os.environ.get("OSS_STS_ENDPOINT") is not None,
+ os.environ.get("OSS_STS_BUCKET_NAME") is not None,
+ os.environ.get("OSS_STS_REGION") is not None,
+ os.environ.get("OSS_STS_ROLE_ARN") is not None,
+ ]
+ )
+
+
[email protected](
+ oss_with_credential_is_configured(), "OSS with crednetial is not
configured."
+)
+class TestGvfsWithOSSCredential(TestGvfsWithOSS):
+ # Before running this test, please set the make sure aliyun-bundle-x.jar
has been
+ # copy to the $GRAVITINO_HOME/catalogs/hadoop/libs/ directory
+ oss_access_key = os.environ.get("OSS_STS_ACCESS_KEY_ID")
+ oss_secret_key = os.environ.get("OSS_STS_SECRET_ACCESS_KEY")
+ oss_endpoint = os.environ.get("OSS_STS_ENDPOINT")
+ bucket_name = os.environ.get("OSS_STS_BUCKET_NAME")
+ oss_sts_region = os.environ.get("OSS_STS_REGION")
+ oss_sts_role_arn = os.environ.get("OSS_STS_ROLE_ARN")
+
+ metalake_name: str = "TestGvfsWithOSSCredential_metalake" + str(randint(1,
10000))
+
+ def setUp(self):
+ self.options = {
+ f"{GVFSConfig.GVFS_FILESYSTEM_OSS_ACCESS_KEY}":
self.oss_access_key,
+ f"{GVFSConfig.GVFS_FILESYSTEM_OSS_SECRET_KEY}":
self.oss_secret_key,
+ f"{GVFSConfig.GVFS_FILESYSTEM_OSS_ENDPOINT}": self.oss_endpoint,
+ }
+
+ @classmethod
+ def _init_test_entities(cls):
+ cls.gravitino_admin_client.create_metalake(
+ name=cls.metalake_name, comment="", properties={}
+ )
+ cls.gravitino_client = GravitinoClient(
+ uri="http://localhost:8090", metalake_name=cls.metalake_name
+ )
+
+ cls.config = {}
+ cls.conf = {}
+ catalog = cls.gravitino_client.create_catalog(
+ name=cls.catalog_name,
+ catalog_type=Catalog.Type.FILESET,
+ provider=cls.catalog_provider,
+ comment="",
+ properties={
+ "filesystem-providers": "oss",
+ "oss-access-key-id": cls.oss_access_key,
+ "oss-secret-access-key": cls.oss_secret_key,
+ "oss-endpoint": cls.oss_endpoint,
+ "oss-region": cls.oss_sts_region,
+ "oss-role-arn": cls.oss_sts_role_arn,
+ "credential-providers": "oss-token",
+ },
+ )
+ catalog.as_schemas().create_schema(
+ schema_name=cls.schema_name, comment="", properties={}
+ )
+
+ cls.fileset_storage_location: str = (
+
f"oss://{cls.bucket_name}/{cls.catalog_name}/{cls.schema_name}/{cls.fileset_name}"
+ )
+ cls.fileset_gvfs_location = (
+
f"gvfs://fileset/{cls.catalog_name}/{cls.schema_name}/{cls.fileset_name}"
+ )
+ catalog.as_fileset_catalog().create_fileset(
+ ident=cls.fileset_ident,
+ fileset_type=Fileset.Type.MANAGED,
+ comment=cls.fileset_comment,
+ storage_location=cls.fileset_storage_location,
+ properties=cls.fileset_properties,
+ )
+
+ cls.fs = OSSFileSystem(
+ key=cls.oss_access_key,
+ secret=cls.oss_secret_key,
+ endpoint=cls.oss_endpoint,
+ )
+
+ def test_cat_file(self):
+ cat_dir = self.fileset_gvfs_location + "/test_cat"
+ cat_actual_dir = self.fileset_storage_location + "/test_cat"
+ fs = gvfs.GravitinoVirtualFileSystem(
+ server_uri="http://localhost:8090",
+ metalake_name=self.metalake_name,
+ options=self.options,
+ **self.conf,
+ )
+
+ self.check_mkdir(cat_dir, cat_actual_dir, fs)
+
+ cat_file = self.fileset_gvfs_location + "/test_cat/test.file"
+ cat_actual_file = self.fileset_storage_location + "/test_cat/test.file"
+ self.fs.touch(cat_actual_file)
+ self.assertTrue(self.fs.exists(cat_actual_file))
+ self.assertTrue(fs.exists(cat_file))
+
+ # test open and write file
+ with fs.open(cat_file, mode="wb") as f:
+ f.write(b"test_cat_file")
+ self.assertTrue(fs.info(cat_file)["size"] > 0)
+
+ # test cat file
+ content = fs.cat_file(cat_file)
+ self.assertEqual(b"test_cat_file", content)
+
+ @unittest.skip(
+ "Skip this test case because fs.ls(dir) using credential is always
empty"
+ )
+ def test_ls(self):
+ ls_dir = self.fileset_gvfs_location + "/test_ls"
+ ls_actual_dir = self.fileset_storage_location + "/test_ls"
+
+ fs = gvfs.GravitinoVirtualFileSystem(
+ server_uri="http://localhost:8090",
+ metalake_name=self.metalake_name,
+ options=self.options,
+ **self.conf,
+ )
+
+ self.check_mkdir(ls_dir, ls_actual_dir, fs)
+
+ ls_file = self.fileset_gvfs_location + "/test_ls/test.file"
+ ls_actual_file = self.fileset_storage_location + "/test_ls/test.file"
+ self.fs.touch(ls_actual_file)
+ self.assertTrue(self.fs.exists(ls_actual_file))
+
+ # test detail = false
+ file_list_without_detail = fs.ls(ls_dir, detail=False)
+ self.assertEqual(1, len(file_list_without_detail))
+ self.assertEqual(file_list_without_detail[0], ls_file[len("gvfs://")
:])
+
+ # test detail = true
+ file_list_with_detail = fs.ls(ls_dir, detail=True)
+ self.assertEqual(1, len(file_list_with_detail))
+ self.assertEqual(file_list_with_detail[0]["name"],
ls_file[len("gvfs://") :])
+
+ @unittest.skip(
+ "Skip this test case because fs.info(info_file) using credential is
always None"
+ )
+ def test_info(self):
+ info_dir = self.fileset_gvfs_location + "/test_info"
+ info_actual_dir = self.fileset_storage_location + "/test_info"
+ fs = gvfs.GravitinoVirtualFileSystem(
+ server_uri="http://localhost:8090",
+ metalake_name=self.metalake_name,
+ options=self.options,
+ **self.conf,
+ )
+
+ self.check_mkdir(info_dir, info_actual_dir, fs)
+
+ info_file = self.fileset_gvfs_location + "/test_info/test.file"
+ info_actual_file = self.fileset_storage_location +
"/test_info/test.file"
+ self.fs.touch(info_actual_file)
+ self.assertTrue(self.fs.exists(info_actual_file))
+
+ ## OSS info has different behavior than S3 info. For OSS info, the
name of the
+ ## directory will have a trailing slash if it's a directory and the
path
+ # does not end with a slash, while S3 info will not have a trailing
+ # slash if it's a directory.
+
+ # >> > oss.info('bucket-xiaoyu/lisi')
+ # {'name': 'bucket-xiaoyu/lisi/', 'type': 'directory',
+ # 'size': 0, 'Size': 0, 'Key': 'bucket-xiaoyu/lisi/'}
+ # >> > oss.info('bucket-xiaoyu/lisi/')
+ # {'name': 'bucket-xiaoyu/lisi', 'size': 0,
+ # 'type': 'directory', 'Size': 0,
+ # 'Key': 'bucket-xiaoyu/lisi'
+
+ # >> > s3.info('paimon-bucket/lisi');
+ # {'name': 'paimon-bucket/lisi', 'type': 'directory', 'size': 0,
+ # 'StorageClass': 'DIRECTORY'}
+ # >> > s3.info('paimon-bucket/lisi/');
+ # {'name': 'paimon-bucket/lisi', 'type': 'directory', 'size': 0,
+ # 'StorageClass': 'DIRECTORY'}
+
+ dir_info = fs.info(info_dir)
+ self.assertEqual(dir_info["name"][:-1], info_dir[len("gvfs://") :])
+
+ file_info = fs.info(info_file)
+ self.assertEqual(file_info["name"], info_file[len("gvfs://") :])
diff --git
a/clients/client-python/tests/integration/test_gvfs_with_s3_credential.py
b/clients/client-python/tests/integration/test_gvfs_with_s3_credential.py
new file mode 100644
index 000000000..35d88c2c8
--- /dev/null
+++ b/clients/client-python/tests/integration/test_gvfs_with_s3_credential.py
@@ -0,0 +1,151 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import logging
+import os
+from random import randint
+import unittest
+
+from s3fs import S3FileSystem
+
+from gravitino import (
+ gvfs,
+ GravitinoClient,
+ Catalog,
+ Fileset,
+)
+from gravitino.filesystem.gvfs_config import GVFSConfig
+from tests.integration.test_gvfs_with_s3 import TestGvfsWithS3
+
+logger = logging.getLogger(__name__)
+
+
+def s3_with_credential_is_configured():
+ return all(
+ [
+ os.environ.get("S3_STS_ACCESS_KEY_ID") is not None,
+ os.environ.get("S3_STS_SECRET_ACCESS_KEY") is not None,
+ os.environ.get("S3_STS_ENDPOINT") is not None,
+ os.environ.get("S3_STS_BUCKET_NAME") is not None,
+ os.environ.get("S3_STS_REGION") is not None,
+ os.environ.get("S3_STS_ROLE_ARN") is not None,
+ ]
+ )
+
+
[email protected](s3_with_credential_is_configured(), "S3 is not
configured.")
+class TestGvfsWithS3Credential(TestGvfsWithS3):
+ # Before running this test, please set the make sure aws-bundle-x.jar has
been
+ # copy to the $GRAVITINO_HOME/catalogs/hadoop/libs/ directory
+ s3_access_key = os.environ.get("S3_STS_ACCESS_KEY_ID")
+ s3_secret_key = os.environ.get("S3_STS_SECRET_ACCESS_KEY")
+ s3_endpoint = os.environ.get("S3_STS_ENDPOINT")
+ bucket_name = os.environ.get("S3_STS_BUCKET_NAME")
+ s3_sts_region = os.environ.get("S3_STS_REGION")
+ s3_role_arn = os.environ.get("S3_STS_ROLE_ARN")
+
+ metalake_name: str = "TestGvfsWithS3Credential_metalake" + str(randint(1,
10000))
+
+ def setUp(self):
+ self.options = {
+ f"{GVFSConfig.GVFS_FILESYSTEM_S3_ACCESS_KEY}": self.s3_access_key,
+ f"{GVFSConfig.GVFS_FILESYSTEM_S3_SECRET_KEY}": self.s3_secret_key,
+ f"{GVFSConfig.GVFS_FILESYSTEM_S3_ENDPOINT}": self.s3_endpoint,
+ }
+
+ @classmethod
+ def _init_test_entities(cls):
+ cls.gravitino_admin_client.create_metalake(
+ name=cls.metalake_name, comment="", properties={}
+ )
+ cls.gravitino_client = GravitinoClient(
+ uri="http://localhost:8090", metalake_name=cls.metalake_name
+ )
+
+ cls.config = {}
+ cls.conf = {}
+ catalog = cls.gravitino_client.create_catalog(
+ name=cls.catalog_name,
+ catalog_type=Catalog.Type.FILESET,
+ provider=cls.catalog_provider,
+ comment="",
+ properties={
+ "filesystem-providers": "s3",
+ "s3-access-key-id": cls.s3_access_key,
+ "s3-secret-access-key": cls.s3_secret_key,
+ "s3-endpoint": cls.s3_endpoint,
+ "s3-region": cls.s3_sts_region,
+ "s3-role-arn": cls.s3_role_arn,
+ "credential-providers": "s3-token",
+ },
+ )
+ catalog.as_schemas().create_schema(
+ schema_name=cls.schema_name, comment="", properties={}
+ )
+
+ cls.fileset_storage_location: str = (
+
f"s3a://{cls.bucket_name}/{cls.catalog_name}/{cls.schema_name}/{cls.fileset_name}"
+ )
+ cls.fileset_gvfs_location = (
+
f"gvfs://fileset/{cls.catalog_name}/{cls.schema_name}/{cls.fileset_name}"
+ )
+ catalog.as_fileset_catalog().create_fileset(
+ ident=cls.fileset_ident,
+ fileset_type=Fileset.Type.MANAGED,
+ comment=cls.fileset_comment,
+ storage_location=cls.fileset_storage_location,
+ properties=cls.fileset_properties,
+ )
+
+ cls.fs = S3FileSystem(
+ key=cls.s3_access_key,
+ secret=cls.s3_secret_key,
+ endpoint_url=cls.s3_endpoint,
+ )
+
+ # The following tests are copied from
tests/integration/test_gvfs_with_s3.py, with some modifications as
+ # `mkdir` and `makedirs` have different behavior in the S3, other cloud
storage like GCS, ABS, and OSS.
+ # are similar.
+ def test_mkdir(self):
+ mkdir_dir = self.fileset_gvfs_location + "/test_mkdir"
+ mkdir_actual_dir = self.fileset_storage_location + "/test_mkdir"
+ fs = gvfs.GravitinoVirtualFileSystem(
+ server_uri="http://localhost:8090",
+ metalake_name=self.metalake_name,
+ options=self.options,
+ **self.conf,
+ )
+
+ # it actually takes no effect.
+ self.check_mkdir(mkdir_dir, mkdir_actual_dir, fs)
+
+ with self.assertRaises(ValueError):
+ fs.mkdir(mkdir_dir, create_parents=True)
+ self.assertFalse(fs.exists(mkdir_dir))
+
+ def test_makedirs(self):
+ mkdir_dir = self.fileset_gvfs_location + "/test_mkdir"
+ mkdir_actual_dir = self.fileset_storage_location + "/test_mkdir"
+ fs = gvfs.GravitinoVirtualFileSystem(
+ server_uri="http://localhost:8090",
+ metalake_name=self.metalake_name,
+ options=self.options,
+ **self.conf,
+ )
+
+ # it actually takes no effect.
+ self.check_mkdir(mkdir_dir, mkdir_actual_dir, fs)
diff --git a/docs/how-to-use-gvfs.md b/docs/how-to-use-gvfs.md
index 31ede3a53..aff3b74ad 100644
--- a/docs/how-to-use-gvfs.md
+++ b/docs/how-to-use-gvfs.md
@@ -455,17 +455,18 @@ to recompile the native libraries like `libhdfs` and
others, and completely repl
### Configuration
-| Configuration item | Description
| Default value | Required |
Since version |
-|----------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------|---------------|-----------------------------------|------------------|
-| `server_uri` | The Gravitino server uri, e.g.
`http://localhost:8090`.
| (none) | Yes
| 0.6.0-incubating |
-| `metalake_name` | The metalake name which the fileset belongs to.
| (none) | Yes |
0.6.0-incubating |
-| `cache_size` | The cache capacity of the Gravitino Virtual
File System.
| `20` | No
| 0.6.0-incubating |
-| `cache_expired_time` | The value of time that the cache expires after
accessing in the Gravitino Virtual File System. The value is in `seconds`.
| `3600` | No |
0.6.0-incubating |
-| `auth_type` | The auth type to initialize the Gravitino
client to use with the Gravitino Virtual File System. Currently supports
`simple` and `oauth2` auth types. | `simple` | No
| 0.6.0-incubating |
-| `oauth2_server_uri` | The auth server URI for the Gravitino client
when using `oauth2` auth type.
| (none) | Yes if you use `oauth2` auth type
| 0.7.0-incubating |
-| `oauth2_credential` | The auth credential for the Gravitino client
when using `oauth2` auth type.
| (none) | Yes if you use `oauth2` auth type
| 0.7.0-incubating |
-| `oauth2_path` | The auth server path for the Gravitino client
when using `oauth2` auth type. Please remove the first slash `/` from the path,
for example `oauth/token`. | (none) | Yes if you use `oauth2` auth type
| 0.7.0-incubating |
-| `oauth2_scope` | The auth scope for the Gravitino client when
using `oauth2` auth type with the Gravitino Virtual File System.
| (none) | Yes if you use `oauth2` auth type
| 0.7.0-incubating |
+| Configuration item | Description
| Default value | Required | Since version |
+|-------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------|-----------------------------------|------------------|
+| `server_uri` | The Gravitino server uri, e.g.
`http://localhost:8090`.
| (none) | Yes |
0.6.0-incubating |
+| `metalake_name` | The metalake name which the fileset belongs
to.
| (none) | Yes | 0.6.0-incubating |
+| `cache_size` | The cache capacity of the Gravitino Virtual
File System.
| `20` | No | 0.6.0-incubating |
[...]
+| `cache_expired_time` | The value of time that the cache expires
after accessing in the Gravitino Virtual File System. The value is in
`seconds`.
| `3600` | No |
0.6.0-incubating |
+| `auth_type` | The auth type to initialize the Gravitino
client to use with the Gravitino Virtual File System. Currently supports
`simple` and `oauth2` auth types.
| `simple` | No |
0.6.0-incubating |
+| `oauth2_server_uri` | The auth server URI for the Gravitino client
when using `oauth2` auth type.
| (none) | Yes if you use `oauth2` auth type | 0.7.0-incubating |
+| `oauth2_credential` | The auth credential for the Gravitino client
when using `oauth2` auth type.
| (none) | Yes if you use `oauth2` auth type | 0.7.0-incubating |
+| `oauth2_path` | The auth server path for the Gravitino
client when using `oauth2` auth type. Please remove the first slash `/` from
the path, for example `oauth/token`.
| (none) | Yes if you use `oauth2` auth type |
0.7.0-incubating |
+| `oauth2_scope` | The auth scope for the Gravitino client when
using `oauth2` auth type with the Gravitino Virtual File System.
| (none) | Yes if you use `oauth2` auth type | 0.7.0-incubating |
+| `credential_expiration_ratio` | The ratio of expiration time for credential
from Gravitino. This is used in the cases where Gravitino Hadoop catalogs have
enable credential vending. if the expiration time of credential fetched from
Gravitino is 1 hour, GVFS client will try to refresh the credential in 1 * 0.9
= 0.5 hour. | 0.5 | No |
0.8.0-incubating |
#### Extra configuration for S3, GCS, OSS fileset