This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new cefe316bb [#5188] feat(python-client): Support s3 fileset in python
client (#5209)
cefe316bb is described below
commit cefe316bbfd0b22faa46c087100b378a37023b42
Author: Qi Yu <[email protected]>
AuthorDate: Thu Oct 24 11:25:49 2024 +0800
[#5188] feat(python-client): Support s3 fileset in python client (#5209)
### What changes were proposed in this pull request?
Add support for S3 fileset in the Python client.
### Why are the changes needed?
it's the user needs.
Fix: #5188
### Does this PR introduce _any_ user-facing change?
N/A
### How was this patch tested?
Replace with real s3 account and execute the following test.
<img width="1534" alt="image"
src="https://github.com/user-attachments/assets/3d6267ce-8954-43e6-bc54-ac70998df9f9">
./gradlew :clients:client-python:test -PskipDockerTests=false
---
clients/client-python/gravitino/filesystem/gvfs.py | 114 ++++++--
.../gravitino/filesystem/gvfs_config.py | 8 +-
clients/client-python/requirements.txt | 3 +-
.../tests/integration/test_gvfs_with_gcs.py | 150 +++++++++--
.../tests/integration/test_gvfs_with_hdfs.py | 95 +++----
.../tests/integration/test_gvfs_with_s3.py | 299 +++++++++++++++++++++
6 files changed, 565 insertions(+), 104 deletions(-)
diff --git a/clients/client-python/gravitino/filesystem/gvfs.py
b/clients/client-python/gravitino/filesystem/gvfs.py
index 8f1b2008a..a9201a833 100644
--- a/clients/client-python/gravitino/filesystem/gvfs.py
+++ b/clients/client-python/gravitino/filesystem/gvfs.py
@@ -14,7 +14,6 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-import os
from enum import Enum
from pathlib import PurePosixPath
from typing import Dict, Tuple
@@ -49,6 +48,7 @@ class StorageType(Enum):
HDFS = "hdfs"
LOCAL = "file"
GCS = "gs"
+ S3A = "s3a"
class FilesetContextPair:
@@ -314,7 +314,11 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
# convert the following to in
- if storage_type in [StorageType.HDFS, StorageType.GCS]:
+ if storage_type in [
+ StorageType.HDFS,
+ StorageType.GCS,
+ StorageType.S3A,
+ ]:
src_context_pair.filesystem().mv(
self._strip_storage_protocol(storage_type, src_actual_path),
self._strip_storage_protocol(storage_type, dst_actual_path),
@@ -336,6 +340,10 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
"Deprecated method, use `rm_file` method instead."
)
+ def lazy_load_class(self, module_name, class_name):
+ module = importlib.import_module(module_name)
+ return getattr(module, class_name)
+
def rm(self, path, recursive=False, maxdepth=None):
"""Remove a file or directory.
:param path: Virtual fileset path
@@ -348,11 +356,17 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
)
actual_path = context_pair.actual_file_location()
storage_type = self._recognize_storage_type(actual_path)
- context_pair.filesystem().rm(
- self._strip_storage_protocol(storage_type, actual_path),
- recursive,
- maxdepth,
- )
+ fs = context_pair.filesystem()
+
+ # S3FileSystem doesn't support maxdepth
+ if isinstance(fs, self.lazy_load_class("s3fs", "S3FileSystem")):
+ fs.rm(self._strip_storage_protocol(storage_type, actual_path),
recursive)
+ else:
+ fs.rm(
+ self._strip_storage_protocol(storage_type, actual_path),
+ recursive,
+ maxdepth,
+ )
def rm_file(self, path):
"""Remove a file.
@@ -547,9 +561,11 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
"""
# If the storage path starts with hdfs, gcs, we should use the path as
the prefix.
- if storage_location.startswith(
- f"{StorageType.HDFS.value}://"
- ) or storage_location.startswith(f"{StorageType.GCS.value}://"):
+ if (
+ storage_location.startswith(f"{StorageType.HDFS.value}://")
+ or storage_location.startswith(f"{StorageType.GCS.value}://")
+ or storage_location.startswith(f"{StorageType.S3A.value}://")
+ ):
actual_prefix = infer_storage_options(storage_location)["path"]
elif storage_location.startswith(f"{StorageType.LOCAL.value}:/"):
actual_prefix =
storage_location[len(f"{StorageType.LOCAL.value}:") :]
@@ -586,11 +602,34 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
path = self._convert_actual_path(
entry["name"], storage_location, virtual_location
)
+
+ # if entry contains 'mtime', then return the entry with 'mtime' else
+ # if entry contains 'LastModified', then return the entry with
'LastModified'
+
+ if "mtime" in entry:
+ # HDFS and GCS
+ return {
+ "name": path,
+ "size": entry["size"],
+ "type": entry["type"],
+ "mtime": entry["mtime"],
+ }
+
+ if "LastModified" in entry:
+ # S3 and OSS
+ return {
+ "name": path,
+ "size": entry["size"],
+ "type": entry["type"],
+ "mtime": entry["LastModified"],
+ }
+
+ # Unknown
return {
"name": path,
"size": entry["size"],
"type": entry["type"],
- "mtime": entry["mtime"],
+ "mtime": None,
}
def _get_fileset_context(self, virtual_path: str, operation:
FilesetDataOperation):
@@ -692,6 +731,8 @@ class GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
return StorageType.LOCAL
if path.startswith(f"{StorageType.GCS.value}://"):
return StorageType.GCS
+ if path.startswith(f"{StorageType.S3A.value}://"):
+ return StorageType.S3A
raise GravitinoRuntimeException(
f"Storage type doesn't support now. Path:{path}"
)
@@ -716,7 +757,7 @@ class GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
:param path: The path
:return: The stripped path
"""
- if storage_type in (StorageType.HDFS, StorageType.GCS):
+ if storage_type in (StorageType.HDFS, StorageType.GCS,
StorageType.S3A):
return path
if storage_type == StorageType.LOCAL:
return path[len(f"{StorageType.LOCAL.value}:") :]
@@ -791,7 +832,9 @@ class GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
elif storage_type == StorageType.LOCAL:
fs = LocalFileSystem()
elif storage_type == StorageType.GCS:
- fs = ArrowFSWrapper(self._get_gcs_filesystem())
+ fs = self._get_gcs_filesystem()
+ elif storage_type == StorageType.S3A:
+ fs = self._get_s3_filesystem()
else:
raise GravitinoRuntimeException(
f"Storage type: `{storage_type}` doesn't support now."
@@ -802,22 +845,47 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
write_lock.release()
def _get_gcs_filesystem(self):
- # get All keys from the options that start with
'gravitino.bypass.gcs.' and remove the prefix
- gcs_options = {
- key[len(GVFSConfig.GVFS_FILESYSTEM_BY_PASS_GCS) :]: value
- for key, value in self._options.items()
- if key.startswith(GVFSConfig.GVFS_FILESYSTEM_BY_PASS_GCS)
- }
-
# get 'service-account-key' from gcs_options, if the key is not found,
throw an exception
- service_account_key_path =
gcs_options.get(GVFSConfig.GVFS_FILESYSTEM_KEY_FILE)
+ service_account_key_path = self._options.get(
+ GVFSConfig.GVFS_FILESYSTEM_GCS_SERVICE_KEY_FILE
+ )
if service_account_key_path is None:
raise GravitinoRuntimeException(
"Service account key is not found in the options."
)
- os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = service_account_key_path
+ return importlib.import_module("gcsfs").GCSFileSystem(
+ token=service_account_key_path
+ )
- return importlib.import_module("pyarrow.fs").GcsFileSystem()
+ def _get_s3_filesystem(self):
+ # get 'aws_access_key_id' from s3_options, if the key is not found,
throw an exception
+ aws_access_key_id =
self._options.get(GVFSConfig.GVFS_FILESYSTEM_S3_ACCESS_KEY)
+ if aws_access_key_id is None:
+ raise GravitinoRuntimeException(
+ "AWS access key id is not found in the options."
+ )
+
+ # get 'aws_secret_access_key' from s3_options, if the key is not
found, throw an exception
+ aws_secret_access_key = self._options.get(
+ GVFSConfig.GVFS_FILESYSTEM_S3_SECRET_KEY
+ )
+ if aws_secret_access_key is None:
+ raise GravitinoRuntimeException(
+ "AWS secret access key is not found in the options."
+ )
+
+ # get 'aws_endpoint_url' from s3_options, if the key is not found,
throw an exception
+ aws_endpoint_url =
self._options.get(GVFSConfig.GVFS_FILESYSTEM_S3_ENDPOINT)
+ if aws_endpoint_url is None:
+ raise GravitinoRuntimeException(
+ "AWS endpoint url is not found in the options."
+ )
+
+ return importlib.import_module("s3fs").S3FileSystem(
+ key=aws_access_key_id,
+ secret=aws_secret_access_key,
+ endpoint_url=aws_endpoint_url,
+ )
fsspec.register_implementation(PROTOCOL_NAME, GravitinoVirtualFileSystem)
diff --git a/clients/client-python/gravitino/filesystem/gvfs_config.py
b/clients/client-python/gravitino/filesystem/gvfs_config.py
index 618565c70..7ffacdb09 100644
--- a/clients/client-python/gravitino/filesystem/gvfs_config.py
+++ b/clients/client-python/gravitino/filesystem/gvfs_config.py
@@ -32,6 +32,8 @@ class GVFSConfig:
OAUTH2_PATH = "oauth2_path"
OAUTH2_SCOPE = "oauth2_scope"
- GVFS_FILESYSTEM_BY_PASS = "gravitino.bypass"
- GVFS_FILESYSTEM_BY_PASS_GCS = "gravitino.bypass.gcs."
- GVFS_FILESYSTEM_KEY_FILE = "service-account-key-path"
+ GVFS_FILESYSTEM_GCS_SERVICE_KEY_FILE = "gcs_service_account_key_path"
+
+ GVFS_FILESYSTEM_S3_ACCESS_KEY = "s3_access_key"
+ GVFS_FILESYSTEM_S3_SECRET_KEY = "s3_secret_key"
+ GVFS_FILESYSTEM_S3_ENDPOINT = "s3_endpoint"
diff --git a/clients/client-python/requirements.txt
b/clients/client-python/requirements.txt
index a330f738a..1d0f4fadd 100644
--- a/clients/client-python/requirements.txt
+++ b/clients/client-python/requirements.txt
@@ -23,4 +23,5 @@ readerwriterlock==1.0.9
fsspec==2024.3.1
pyarrow==15.0.2
cachetools==5.3.3
-google-auth==2.35.0
\ No newline at end of file
+gcsfs==2024.3.1
+s3fs==2024.3.1
\ No newline at end of file
diff --git a/clients/client-python/tests/integration/test_gvfs_with_gcs.py
b/clients/client-python/tests/integration/test_gvfs_with_gcs.py
index 16f84dff3..54a2cfd07 100644
--- a/clients/client-python/tests/integration/test_gvfs_with_gcs.py
+++ b/clients/client-python/tests/integration/test_gvfs_with_gcs.py
@@ -20,8 +20,8 @@ import os
from random import randint
import unittest
-from fsspec.implementations.arrow import ArrowFSWrapper
-from pyarrow.fs import GcsFileSystem
+from gcsfs import GCSFileSystem
+
from tests.integration.test_gvfs_with_hdfs import TestGvfsWithHDFS
from gravitino import (
@@ -31,7 +31,7 @@ from gravitino import (
Fileset,
)
from gravitino.exceptions.base import GravitinoRuntimeException
-
+from gravitino.filesystem.gvfs_config import GVFSConfig
logger = logging.getLogger(__name__)
@@ -45,7 +45,9 @@ class TestGvfsWithGCS(TestGvfsWithHDFS):
metalake_name: str = "TestGvfsWithGCS_metalake" + str(randint(1, 10000))
def setUp(self):
- self.options = {"gravitino.bypass.gcs.service-account-key-path":
self.key_file}
+ self.options = {
+ f"{GVFSConfig.GVFS_FILESYSTEM_GCS_SERVICE_KEY_FILE}": self.key_file
+ }
def tearDown(self):
self.options = {}
@@ -129,9 +131,22 @@ class TestGvfsWithGCS(TestGvfsWithHDFS):
properties=cls.fileset_properties,
)
- os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = cls.key_file
- arrow_gcs_fs = GcsFileSystem()
- cls.fs = ArrowFSWrapper(arrow_gcs_fs)
+ cls.fs = GCSFileSystem(token=cls.key_file)
+
+ # Object storage like GCS does not support making directory and can only
create
+ # objects under the bucket. So we need to skip the test for GCS.
+ def check_mkdir(self, gvfs_dir, actual_dir, gvfs_instance):
+ # GCS will not create a directory, so the directory will not exist.
+ self.fs.mkdir(actual_dir)
+ self.assertFalse(self.fs.exists(actual_dir))
+ self.assertFalse(gvfs_instance.exists(gvfs_dir))
+
+ # Object storage like GCS does not support making directory and can only
create
+ # objects under the bucket. So we need to skip the test for GCS.
+ def check_makedirs(self, gvfs_dir, actual_dir, gvfs_instance):
+ self.fs.makedirs(actual_dir)
+ self.assertFalse(self.fs.exists(actual_dir))
+ self.assertFalse(gvfs_instance.exists(gvfs_dir))
def test_modified(self):
modified_dir = self.fileset_gvfs_location + "/test_modified"
@@ -142,17 +157,15 @@ class TestGvfsWithGCS(TestGvfsWithHDFS):
options=self.options,
**self.conf,
)
- self.fs.mkdir(modified_actual_dir)
- self.assertTrue(self.fs.exists(modified_actual_dir))
- self.assertTrue(fs.exists(modified_dir))
+ self.check_mkdir(modified_dir, modified_actual_dir, fs)
# GCP only supports getting the `object` modify time, so the modified
time will be None
# if it's a directory.
# >>> gcs.mkdir('example_qazwsx/catalog/schema/fileset3')
# >>> r = gcs.modified('example_qazwsx/catalog/schema/fileset3')
# >>> print(r)
# None
- self.assertIsNone(fs.modified(modified_dir))
+ # self.assertIsNone(fs.modified(modified_dir))
# create a file under the dir 'modified_dir'.
file_path = modified_dir + "/test.txt"
@@ -160,14 +173,107 @@ class TestGvfsWithGCS(TestGvfsWithHDFS):
self.assertTrue(fs.exists(file_path))
self.assertIsNotNone(fs.modified(file_path))
- @unittest.skip(
- "This test will fail for https://github.com/apache/arrow/issues/44438"
- )
- def test_pandas(self):
- pass
-
- @unittest.skip(
- "This test will fail for https://github.com/apache/arrow/issues/44438"
- )
- def test_pyarrow(self):
- pass
+ def test_rm(self):
+ rm_dir = self.fileset_gvfs_location + "/test_rm"
+ rm_actual_dir = self.fileset_storage_location + "/test_rm"
+ fs = gvfs.GravitinoVirtualFileSystem(
+ server_uri="http://localhost:8090",
+ metalake_name=self.metalake_name,
+ options=self.options,
+ **self.conf,
+ )
+ self.check_mkdir(rm_dir, rm_actual_dir, fs)
+
+ rm_file = self.fileset_gvfs_location + "/test_rm/test.file"
+ rm_actual_file = self.fileset_storage_location + "/test_rm/test.file"
+ fs.touch(rm_file)
+ self.assertTrue(self.fs.exists(rm_actual_file))
+ self.assertTrue(fs.exists(rm_file))
+
+ # test delete file
+ fs.rm(rm_file)
+ self.assertFalse(fs.exists(rm_file))
+
+ # test delete dir with recursive = false
+ rm_new_file = self.fileset_gvfs_location + "/test_rm/test_new.file"
+ rm_new_actual_file = self.fileset_storage_location +
"/test_rm/test_new.file"
+ self.fs.touch(rm_new_actual_file)
+ self.assertTrue(self.fs.exists(rm_new_actual_file))
+ self.assertTrue(fs.exists(rm_new_file))
+ # fs.rm(rm_dir)
+
+ # fs.rm(rm_dir, recursive=False) will delete the directory and the file
+ # directly under the directory, so we comment the following code.
+ # test delete dir with recursive = true
+ # fs.rm(rm_dir, recursive=True)
+ # self.assertFalse(fs.exists(rm_dir))
+
+ def test_rmdir(self):
+ rmdir_dir = self.fileset_gvfs_location + "/test_rmdir"
+ rmdir_actual_dir = self.fileset_storage_location + "/test_rmdir"
+ fs = gvfs.GravitinoVirtualFileSystem(
+ server_uri="http://localhost:8090",
+ metalake_name=self.metalake_name,
+ options=self.options,
+ **self.conf,
+ )
+ self.check_mkdir(rmdir_dir, rmdir_actual_dir, fs)
+
+ rmdir_file = self.fileset_gvfs_location + "/test_rmdir/test.file"
+ rmdir_actual_file = self.fileset_storage_location +
"/test_rmdir/test.file"
+ self.fs.touch(rmdir_actual_file)
+ self.assertTrue(self.fs.exists(rmdir_actual_file))
+ self.assertTrue(fs.exists(rmdir_file))
+
+ # test delete file, GCS will remove the file directly.
+ fs.rmdir(rmdir_file)
+
+ def test_mkdir(self):
+ mkdir_dir = self.fileset_gvfs_location + "/test_mkdir"
+ mkdir_actual_dir = self.fileset_storage_location + "/test_mkdir"
+ fs = gvfs.GravitinoVirtualFileSystem(
+ server_uri="http://localhost:8090",
+ metalake_name=self.metalake_name,
+ options=self.options,
+ **self.conf,
+ )
+
+ # it actually takes no effect.
+ self.check_mkdir(mkdir_dir, mkdir_actual_dir, fs)
+
+ # check whether it will automatically create the bucket if
'create_parents'
+ # is set to True.
+ new_bucket = self.bucket_name + "1"
+ mkdir_dir = mkdir_dir.replace(self.bucket_name, new_bucket)
+ mkdir_actual_dir = mkdir_actual_dir.replace(self.bucket_name,
new_bucket)
+ fs.mkdir(mkdir_dir, create_parents=True)
+
+ self.assertFalse(self.fs.exists(mkdir_actual_dir))
+ self.assertFalse(fs.exists(mkdir_dir))
+ self.assertFalse(self.fs.exists("gs://" + new_bucket))
+
+ def test_makedirs(self):
+ mkdir_dir = self.fileset_gvfs_location + "/test_mkdir"
+ mkdir_actual_dir = self.fileset_storage_location + "/test_mkdir"
+ fs = gvfs.GravitinoVirtualFileSystem(
+ server_uri="http://localhost:8090",
+ metalake_name=self.metalake_name,
+ options=self.options,
+ **self.conf,
+ )
+
+ # it actually takes no effect.
+ self.check_mkdir(mkdir_dir, mkdir_actual_dir, fs)
+
+ # check whether it will automatically create the bucket if
'create_parents'
+ # is set to True.
+ new_bucket = self.bucket_name + "1"
+ mkdir_dir = mkdir_dir.replace(self.bucket_name, new_bucket)
+ mkdir_actual_dir = mkdir_actual_dir.replace(self.bucket_name,
new_bucket)
+
+ # it takes no effect.
+ fs.makedirs(mkdir_dir)
+
+ self.assertFalse(self.fs.exists(mkdir_actual_dir))
+ self.assertFalse(fs.exists(mkdir_dir))
+ self.assertFalse(self.fs.exists("gs://" + new_bucket))
diff --git a/clients/client-python/tests/integration/test_gvfs_with_hdfs.py
b/clients/client-python/tests/integration/test_gvfs_with_hdfs.py
index 5be5914f1..8b1c367bc 100644
--- a/clients/client-python/tests/integration/test_gvfs_with_hdfs.py
+++ b/clients/client-python/tests/integration/test_gvfs_with_hdfs.py
@@ -239,8 +239,8 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
options=self.options,
**self.conf,
)
- self.fs.mkdir(ls_actual_dir)
- self.assertTrue(self.fs.exists(ls_actual_dir))
+
+ self.check_mkdir(ls_dir, ls_actual_dir, fs)
ls_file = self.fileset_gvfs_location + "/test_ls/test.file"
ls_actual_file = self.fileset_storage_location + "/test_ls/test.file"
@@ -266,8 +266,8 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
options=self.options,
**self.conf,
)
- self.fs.mkdir(info_actual_dir)
- self.assertTrue(self.fs.exists(info_actual_dir))
+
+ self.check_mkdir(info_dir, info_actual_dir, fs)
info_file = self.fileset_gvfs_location + "/test_info/test.file"
info_actual_file = self.fileset_storage_location +
"/test_info/test.file"
@@ -289,9 +289,7 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
options=self.options,
**self.conf,
)
- self.fs.mkdir(exist_actual_dir)
- self.assertTrue(self.fs.exists(exist_actual_dir))
- self.assertTrue(fs.exists(exist_dir))
+ self.check_mkdir(exist_dir, exist_actual_dir, fs)
exist_file = self.fileset_gvfs_location + "/test_exist/test.file"
exist_actual_file = self.fileset_storage_location +
"/test_exist/test.file"
@@ -308,9 +306,8 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
options=self.options,
**self.conf,
)
- self.fs.mkdir(cp_file_actual_dir)
- self.assertTrue(self.fs.exists(cp_file_actual_dir))
- self.assertTrue(fs.exists(cp_file_dir))
+
+ self.check_mkdir(cp_file_dir, cp_file_actual_dir, fs)
cp_file_file = self.fileset_gvfs_location + "/test_cp_file/test.file"
cp_file_actual_file = self.fileset_storage_location +
"/test_cp_file/test.file"
@@ -341,9 +338,7 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
options=self.options,
**self.conf,
)
- self.fs.mkdir(mv_actual_dir)
- self.assertTrue(self.fs.exists(mv_actual_dir))
- self.assertTrue(fs.exists(mv_dir))
+ self.check_mkdir(mv_dir, mv_actual_dir, fs)
mv_new_dir = self.fileset_gvfs_location + "/test_mv_new"
mv_new_actual_dir = self.fileset_storage_location + "/test_mv_new"
@@ -353,9 +348,8 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
options=self.options,
**self.conf,
)
- self.fs.mkdir(mv_new_actual_dir)
- self.assertTrue(self.fs.exists(mv_new_actual_dir))
- self.assertTrue(fs.exists(mv_new_dir))
+
+ self.check_mkdir(mv_new_dir, mv_new_actual_dir, fs)
mv_file = self.fileset_gvfs_location + "/test_mv/test.file"
mv_actual_file = self.fileset_storage_location + "/test_mv/test.file"
@@ -385,9 +379,7 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
options=self.options,
**self.conf,
)
- self.fs.mkdir(rm_actual_dir)
- self.assertTrue(self.fs.exists(rm_actual_dir))
- self.assertTrue(fs.exists(rm_dir))
+ self.check_mkdir(rm_dir, rm_actual_dir, fs)
rm_file = self.fileset_gvfs_location + "/test_rm/test.file"
rm_actual_file = self.fileset_storage_location + "/test_rm/test.file"
@@ -421,9 +413,7 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
options=self.options,
**self.conf,
)
- self.fs.mkdir(rm_file_actual_dir)
- self.assertTrue(self.fs.exists(rm_file_actual_dir))
- self.assertTrue(fs.exists(rm_file_dir))
+ self.check_mkdir(rm_file_dir, rm_file_actual_dir, fs)
rm_file_file = self.fileset_gvfs_location + "/test_rm_file/test.file"
rm_file_actual_file = self.fileset_storage_location +
"/test_rm_file/test.file"
@@ -448,9 +438,7 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
options=self.options,
**self.conf,
)
- self.fs.mkdir(rmdir_actual_dir)
- self.assertTrue(self.fs.exists(rmdir_actual_dir))
- self.assertTrue(fs.exists(rmdir_dir))
+ self.check_mkdir(rmdir_dir, rmdir_actual_dir, fs)
rmdir_file = self.fileset_gvfs_location + "/test_rmdir/test.file"
rmdir_actual_file = self.fileset_storage_location +
"/test_rmdir/test.file"
@@ -475,9 +463,7 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
options=self.options,
**self.conf,
)
- self.fs.mkdir(open_actual_dir)
- self.assertTrue(self.fs.exists(open_actual_dir))
- self.assertTrue(fs.exists(open_dir))
+ self.check_mkdir(open_dir, open_actual_dir, fs)
open_file = self.fileset_gvfs_location + "/test_open/test.file"
open_actual_file = self.fileset_storage_location +
"/test_open/test.file"
@@ -503,9 +489,7 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
options=self.options,
**self.conf,
)
- fs.mkdir(mkdir_dir)
- self.assertTrue(fs.exists(mkdir_dir))
- self.assertTrue(self.fs.exists(mkdir_actual_dir))
+ self.check_mkdir(mkdir_dir, mkdir_actual_dir, fs)
# test mkdir dir with create_parents = false
parent_not_exist_virtual_path = mkdir_dir + "/not_exist/sub_dir"
@@ -530,9 +514,7 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
options=self.options,
**self.conf,
)
- fs.makedirs(makedirs_dir)
- self.assertTrue(fs.exists(makedirs_dir))
- self.assertTrue(self.fs.exists(makedirs_actual_dir))
+ self.check_makedirs(makedirs_dir, makedirs_actual_dir, fs)
# test mkdir dir not exist
parent_not_exist_virtual_path = makedirs_dir + "/not_exist/sub_dir"
@@ -549,9 +531,8 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
options=self.options,
**self.conf,
)
- self.fs.mkdir(created_actual_dir)
- self.assertTrue(self.fs.exists(created_actual_dir))
- self.assertTrue(fs.exists(created_dir))
+
+ self.check_mkdir(created_dir, created_actual_dir, fs)
with self.assertRaises(GravitinoRuntimeException):
fs.created(created_dir)
@@ -565,13 +546,22 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
options=self.options,
**self.conf,
)
- self.fs.mkdir(modified_actual_dir)
- self.assertTrue(self.fs.exists(modified_actual_dir))
- self.assertTrue(fs.exists(modified_dir))
+
+ self.check_mkdir(modified_dir, modified_actual_dir, fs)
# test mkdir dir which exists
self.assertIsNotNone(fs.modified(modified_dir))
+ def check_mkdir(self, gvfs_dir, actual_dir, gvfs_instance):
+ self.fs.mkdir(actual_dir)
+ self.assertTrue(self.fs.exists(actual_dir))
+ self.assertTrue(gvfs_instance.exists(gvfs_dir))
+
+ def check_makedirs(self, gvfs_dir, actual_dir, gvfs_instance):
+ self.fs.makedirs(actual_dir)
+ self.assertTrue(self.fs.exists(actual_dir))
+ self.assertTrue(gvfs_instance.exists(gvfs_dir))
+
def test_cat_file(self):
cat_dir = self.fileset_gvfs_location + "/test_cat"
cat_actual_dir = self.fileset_storage_location + "/test_cat"
@@ -581,9 +571,8 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
options=self.options,
**self.conf,
)
- self.fs.mkdir(cat_actual_dir)
- self.assertTrue(self.fs.exists(cat_actual_dir))
- self.assertTrue(fs.exists(cat_dir))
+
+ self.check_mkdir(cat_dir, cat_actual_dir, fs)
cat_file = self.fileset_gvfs_location + "/test_cat/test.file"
cat_actual_file = self.fileset_storage_location + "/test_cat/test.file"
@@ -609,9 +598,8 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
options=self.options,
**self.conf,
)
- self.fs.mkdir(get_actual_dir)
- self.assertTrue(self.fs.exists(get_actual_dir))
- self.assertTrue(fs.exists(get_dir))
+
+ self.check_mkdir(get_dir, get_actual_dir, fs)
get_file = self.fileset_gvfs_location + "/test_get/test.file"
get_actual_file = self.fileset_storage_location + "/test_get/test.file"
@@ -649,9 +637,8 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
options=self.options,
**self.conf,
)
- self.fs.mkdir(pands_actual_dir)
- self.assertTrue(self.fs.exists(pands_actual_dir))
- self.assertTrue(fs.exists(pands_dir))
+
+ self.check_mkdir(pands_dir, pands_actual_dir, fs)
data = pandas.DataFrame({"Name": ["A", "B", "C", "D"], "ID": [20, 21,
19, 18]})
# to parquet
@@ -695,9 +682,8 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
options=self.options,
**self.conf,
)
- self.fs.mkdir(pyarrow_actual_dir)
- self.assertTrue(self.fs.exists(pyarrow_actual_dir))
- self.assertTrue(fs.exists(pyarrow_dir))
+
+ self.check_mkdir(pyarrow_dir, pyarrow_actual_dir, fs)
data = pandas.DataFrame({"Name": ["A", "B", "C", "D"], "ID": [20, 21,
19, 18]})
# to parquet
@@ -725,9 +711,8 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
options=self.options,
**self.conf,
)
- self.fs.mkdir(llama_actual_dir)
- self.assertTrue(self.fs.exists(llama_actual_dir))
- self.assertTrue(fs.exists(llama_dir))
+ self.check_mkdir(llama_dir, llama_actual_dir, fs)
+
data = pandas.DataFrame({"Name": ["A", "B", "C", "D"], "ID": [20, 21,
19, 18]})
storage_options = {
diff --git a/clients/client-python/tests/integration/test_gvfs_with_s3.py
b/clients/client-python/tests/integration/test_gvfs_with_s3.py
new file mode 100644
index 000000000..5758a7e65
--- /dev/null
+++ b/clients/client-python/tests/integration/test_gvfs_with_s3.py
@@ -0,0 +1,299 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import logging
+import os
+from random import randint
+import unittest
+
+from s3fs import S3FileSystem
+
+from tests.integration.test_gvfs_with_hdfs import TestGvfsWithHDFS
+from gravitino import (
+ gvfs,
+ GravitinoClient,
+ Catalog,
+ Fileset,
+)
+from gravitino.exceptions.base import GravitinoRuntimeException
+from gravitino.filesystem.gvfs_config import GVFSConfig
+
+logger = logging.getLogger(__name__)
+
+
[email protected]("This test require S3 service account")
+class TestGvfsWithS3(TestGvfsWithHDFS):
+ # Before running this test, please set the make sure aws-bundle-x.jar has
been
+ # copy to the $GRAVITINO_HOME/catalogs/hadoop/libs/ directory
+ s3_access_key = "your_access_key"
+ s3_secret_key = "your_secret_key"
+ s3_endpoint = "your_endpoint"
+ bucket_name = "your_bucket_name"
+
+ metalake_name: str = "TestGvfsWithS3_metalake" + str(randint(1, 10000))
+
+ def setUp(self):
+ self.options = {
+ f"{GVFSConfig.GVFS_FILESYSTEM_S3_ACCESS_KEY}": self.s3_access_key,
+ f"{GVFSConfig.GVFS_FILESYSTEM_S3_SECRET_KEY}": self.s3_secret_key,
+ f"{GVFSConfig.GVFS_FILESYSTEM_S3_ENDPOINT}": self.s3_endpoint,
+ }
+
+ def tearDown(self):
+ self.options = {}
+
+ @classmethod
+ def setUpClass(cls):
+ cls._get_gravitino_home()
+
+ cls.hadoop_conf_path =
f"{cls.gravitino_home}/catalogs/hadoop/conf/hadoop.conf"
+ # restart the server
+ cls.restart_server()
+ # create entity
+ cls._init_test_entities()
+
+ @classmethod
+ def tearDownClass(cls):
+ cls._clean_test_data()
+ # reset server conf in case of other ITs like HDFS has changed it and
fail
+ # to reset it
+ cls._reset_conf(cls.config, cls.hadoop_conf_path)
+ # restart server
+ cls.restart_server()
+
+ # clear all config in the conf_path
+ @classmethod
+ def _reset_conf(cls, config, conf_path):
+ logger.info("Reset %s.", conf_path)
+ if not os.path.exists(conf_path):
+ raise GravitinoRuntimeException(f"Conf file is not found at
`{conf_path}`.")
+ filtered_lines = []
+ with open(conf_path, mode="r", encoding="utf-8") as file:
+ origin_lines = file.readlines()
+
+ for line in origin_lines:
+ line = line.strip()
+ if line.startswith("#"):
+ # append annotations directly
+ filtered_lines.append(line + "\n")
+
+ with open(conf_path, mode="w", encoding="utf-8") as file:
+ for line in filtered_lines:
+ file.write(line)
+
+ @classmethod
+ def _init_test_entities(cls):
+ cls.gravitino_admin_client.create_metalake(
+ name=cls.metalake_name, comment="", properties={}
+ )
+ cls.gravitino_client = GravitinoClient(
+ uri="http://localhost:8090", metalake_name=cls.metalake_name
+ )
+
+ cls.config = {}
+ cls.conf = {}
+ catalog = cls.gravitino_client.create_catalog(
+ name=cls.catalog_name,
+ catalog_type=Catalog.Type.FILESET,
+ provider=cls.catalog_provider,
+ comment="",
+ properties={
+ "filesystem-providers": "s3",
+ "gravitino.bypass.fs.s3a.access.key": cls.s3_access_key,
+ "gravitino.bypass.fs.s3a.secret.key": cls.s3_secret_key,
+ "gravitino.bypass.fs.s3a.endpoint": cls.s3_endpoint,
+ },
+ )
+ catalog.as_schemas().create_schema(
+ schema_name=cls.schema_name, comment="", properties={}
+ )
+
+ cls.fileset_storage_location: str = (
+
f"s3a://{cls.bucket_name}/{cls.catalog_name}/{cls.schema_name}/{cls.fileset_name}"
+ )
+ cls.fileset_gvfs_location = (
+
f"gvfs://fileset/{cls.catalog_name}/{cls.schema_name}/{cls.fileset_name}"
+ )
+ catalog.as_fileset_catalog().create_fileset(
+ ident=cls.fileset_ident,
+ fileset_type=Fileset.Type.MANAGED,
+ comment=cls.fileset_comment,
+ storage_location=cls.fileset_storage_location,
+ properties=cls.fileset_properties,
+ )
+
+ cls.fs = S3FileSystem(
+ key=cls.s3_access_key,
+ secret=cls.s3_secret_key,
+ endpoint_url=cls.s3_endpoint,
+ )
+
+ def check_mkdir(self, gvfs_dir, actual_dir, gvfs_instance):
+ # S3 will not create a directory, so the directory will not exist.
+ self.fs.mkdir(actual_dir)
+ self.assertFalse(self.fs.exists(actual_dir))
+ self.assertFalse(gvfs_instance.exists(gvfs_dir))
+
+ def check_makedirs(self, gvfs_dir, actual_dir, gvfs_instance):
+ self.fs.makedirs(actual_dir)
+ self.assertFalse(self.fs.exists(actual_dir))
+ self.assertFalse(gvfs_instance.exists(gvfs_dir))
+
+ def test_modified(self):
+ modified_dir = self.fileset_gvfs_location + "/test_modified"
+ modified_actual_dir = self.fileset_storage_location + "/test_modified"
+ fs = gvfs.GravitinoVirtualFileSystem(
+ server_uri="http://localhost:8090",
+ metalake_name=self.metalake_name,
+ options=self.options,
+ **self.conf,
+ )
+
+ self.check_mkdir(modified_dir, modified_actual_dir, fs)
+ # S3 only supports getting the `object` modify time, so the modified
time will be None
+ # if it's a directory.
+ # >>> gcs.mkdir('example_qazwsx/catalog/schema/fileset3')
+ # >>> r = gcs.modified('example_qazwsx/catalog/schema/fileset3')
+ # >>> print(r)
+ # None
+ # self.assertIsNone(fs.modified(modified_dir))
+
+ # create a file under the dir 'modified_dir'.
+ file_path = modified_dir + "/test.txt"
+ fs.touch(file_path)
+ self.assertTrue(fs.exists(file_path))
+ self.assertIsNotNone(fs.modified(file_path))
+
+ def test_rm(self):
+ rm_dir = self.fileset_gvfs_location + "/test_rm"
+ rm_actual_dir = self.fileset_storage_location + "/test_rm"
+ fs = gvfs.GravitinoVirtualFileSystem(
+ server_uri="http://localhost:8090",
+ metalake_name=self.metalake_name,
+ options=self.options,
+ **self.conf,
+ )
+ self.check_mkdir(rm_dir, rm_actual_dir, fs)
+
+ rm_file = self.fileset_gvfs_location + "/test_rm/test.file"
+ rm_actual_file = self.fileset_storage_location + "/test_rm/test.file"
+ fs.touch(rm_file)
+ self.assertTrue(self.fs.exists(rm_actual_file))
+ self.assertTrue(fs.exists(rm_file))
+
+ # test delete file
+ fs.rm(rm_file)
+ self.assertFalse(fs.exists(rm_file))
+
+ # test delete dir with recursive = false
+ rm_new_file = self.fileset_gvfs_location + "/test_rm/test_new.file"
+ rm_new_actual_file = self.fileset_storage_location +
"/test_rm/test_new.file"
+ self.fs.touch(rm_new_actual_file)
+ self.assertTrue(self.fs.exists(rm_new_actual_file))
+ self.assertTrue(fs.exists(rm_new_file))
+
+ def test_rmdir(self):
+ rmdir_dir = self.fileset_gvfs_location + "/test_rmdir"
+ rmdir_actual_dir = self.fileset_storage_location + "/test_rmdir"
+ fs = gvfs.GravitinoVirtualFileSystem(
+ server_uri="http://localhost:8090",
+ metalake_name=self.metalake_name,
+ options=self.options,
+ **self.conf,
+ )
+ self.check_mkdir(rmdir_dir, rmdir_actual_dir, fs)
+
+ rmdir_file = self.fileset_gvfs_location + "/test_rmdir/test.file"
+ rmdir_actual_file = self.fileset_storage_location +
"/test_rmdir/test.file"
+ self.fs.touch(rmdir_actual_file)
+ self.assertTrue(self.fs.exists(rmdir_actual_file))
+ self.assertTrue(fs.exists(rmdir_file))
+
+ fs.rm_file(rmdir_file)
+
+ def test_mkdir(self):
+ mkdir_dir = self.fileset_gvfs_location + "/test_mkdir"
+ mkdir_actual_dir = self.fileset_storage_location + "/test_mkdir"
+ fs = gvfs.GravitinoVirtualFileSystem(
+ server_uri="http://localhost:8090",
+ metalake_name=self.metalake_name,
+ options=self.options,
+ **self.conf,
+ )
+
+ # it actually takes no effect.
+ self.check_mkdir(mkdir_dir, mkdir_actual_dir, fs)
+
+ # check whether it will automatically create the bucket if
'create_parents'
+ # is set to True.
+ new_bucket = self.bucket_name + "1"
+ mkdir_dir = mkdir_dir.replace(self.bucket_name, new_bucket)
+ fs.mkdir(mkdir_dir, create_parents=True)
+
+ self.assertFalse(fs.exists(mkdir_dir))
+ self.assertFalse(self.fs.exists("s3://" + new_bucket))
+
+ def test_makedirs(self):
+ mkdir_dir = self.fileset_gvfs_location + "/test_mkdir"
+ mkdir_actual_dir = self.fileset_storage_location + "/test_mkdir"
+ fs = gvfs.GravitinoVirtualFileSystem(
+ server_uri="http://localhost:8090",
+ metalake_name=self.metalake_name,
+ options=self.options,
+ **self.conf,
+ )
+
+ # it actually takes no effect.
+ self.check_mkdir(mkdir_dir, mkdir_actual_dir, fs)
+
+ # check whether it will automatically create the bucket if
'create_parents'
+ # is set to True.
+ new_bucket = self.bucket_name + "1"
+ mkdir_dir = mkdir_dir.replace(self.bucket_name, new_bucket)
+ mkdir_actual_dir = mkdir_actual_dir.replace(self.bucket_name,
new_bucket)
+
+ # it takes no effect.
+ fs.makedirs(mkdir_dir)
+ with self.assertRaises(OSError):
+ self.fs.exists(mkdir_actual_dir)
+
+ self.assertFalse(fs.exists(mkdir_dir))
+ self.assertFalse(self.fs.exists("s3://" + new_bucket))
+
+ def test_rm_file(self):
+ rm_file_dir = self.fileset_gvfs_location + "/test_rm_file"
+ rm_file_actual_dir = self.fileset_storage_location + "/test_rm_file"
+ fs = gvfs.GravitinoVirtualFileSystem(
+ server_uri="http://localhost:8090",
+ metalake_name=self.metalake_name,
+ options=self.options,
+ **self.conf,
+ )
+ self.check_mkdir(rm_file_dir, rm_file_actual_dir, fs)
+
+ rm_file_file = self.fileset_gvfs_location + "/test_rm_file/test.file"
+ rm_file_actual_file = self.fileset_storage_location +
"/test_rm_file/test.file"
+ self.fs.touch(rm_file_actual_file)
+ self.assertTrue(self.fs.exists(rm_file_actual_file))
+ self.assertTrue(fs.exists(rm_file_file))
+
+ # test delete file
+ fs.rm_file(rm_file_file)
+ self.assertFalse(fs.exists(rm_file_file))
+
+ # test delete dir
+ fs.rm_file(rm_file_dir)