This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new cefe316bb [#5188] feat(python-client): Support s3 fileset in python 
client (#5209)
cefe316bb is described below

commit cefe316bbfd0b22faa46c087100b378a37023b42
Author: Qi Yu <[email protected]>
AuthorDate: Thu Oct 24 11:25:49 2024 +0800

    [#5188] feat(python-client): Support s3 fileset in python client (#5209)
    
    ### What changes were proposed in this pull request?
    
    Add support for S3 fileset in the Python client.
    
    
    ### Why are the changes needed?
    
    it's the user needs.
    
    Fix: #5188
    
    ### Does this PR introduce _any_ user-facing change?
    
    N/A
    
    ### How was this patch tested?
    
    Replace with real s3 account and execute the following test.
    
    <img width="1534" alt="image"
    
src="https://github.com/user-attachments/assets/3d6267ce-8954-43e6-bc54-ac70998df9f9";>
    
    ./gradlew :clients:client-python:test -PskipDockerTests=false
---
 clients/client-python/gravitino/filesystem/gvfs.py | 114 ++++++--
 .../gravitino/filesystem/gvfs_config.py            |   8 +-
 clients/client-python/requirements.txt             |   3 +-
 .../tests/integration/test_gvfs_with_gcs.py        | 150 +++++++++--
 .../tests/integration/test_gvfs_with_hdfs.py       |  95 +++----
 .../tests/integration/test_gvfs_with_s3.py         | 299 +++++++++++++++++++++
 6 files changed, 565 insertions(+), 104 deletions(-)

diff --git a/clients/client-python/gravitino/filesystem/gvfs.py 
b/clients/client-python/gravitino/filesystem/gvfs.py
index 8f1b2008a..a9201a833 100644
--- a/clients/client-python/gravitino/filesystem/gvfs.py
+++ b/clients/client-python/gravitino/filesystem/gvfs.py
@@ -14,7 +14,6 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-import os
 from enum import Enum
 from pathlib import PurePosixPath
 from typing import Dict, Tuple
@@ -49,6 +48,7 @@ class StorageType(Enum):
     HDFS = "hdfs"
     LOCAL = "file"
     GCS = "gs"
+    S3A = "s3a"
 
 
 class FilesetContextPair:
@@ -314,7 +314,11 @@ class 
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
 
         # convert the following to in
 
-        if storage_type in [StorageType.HDFS, StorageType.GCS]:
+        if storage_type in [
+            StorageType.HDFS,
+            StorageType.GCS,
+            StorageType.S3A,
+        ]:
             src_context_pair.filesystem().mv(
                 self._strip_storage_protocol(storage_type, src_actual_path),
                 self._strip_storage_protocol(storage_type, dst_actual_path),
@@ -336,6 +340,10 @@ class 
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
             "Deprecated method, use `rm_file` method instead."
         )
 
+    def lazy_load_class(self, module_name, class_name):
+        module = importlib.import_module(module_name)
+        return getattr(module, class_name)
+
     def rm(self, path, recursive=False, maxdepth=None):
         """Remove a file or directory.
         :param path: Virtual fileset path
@@ -348,11 +356,17 @@ class 
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
         )
         actual_path = context_pair.actual_file_location()
         storage_type = self._recognize_storage_type(actual_path)
-        context_pair.filesystem().rm(
-            self._strip_storage_protocol(storage_type, actual_path),
-            recursive,
-            maxdepth,
-        )
+        fs = context_pair.filesystem()
+
+        # S3FileSystem doesn't support maxdepth
+        if isinstance(fs, self.lazy_load_class("s3fs", "S3FileSystem")):
+            fs.rm(self._strip_storage_protocol(storage_type, actual_path), 
recursive)
+        else:
+            fs.rm(
+                self._strip_storage_protocol(storage_type, actual_path),
+                recursive,
+                maxdepth,
+            )
 
     def rm_file(self, path):
         """Remove a file.
@@ -547,9 +561,11 @@ class 
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
         """
 
         # If the storage path starts with hdfs, gcs, we should use the path as 
the prefix.
-        if storage_location.startswith(
-            f"{StorageType.HDFS.value}://"
-        ) or storage_location.startswith(f"{StorageType.GCS.value}://"):
+        if (
+            storage_location.startswith(f"{StorageType.HDFS.value}://")
+            or storage_location.startswith(f"{StorageType.GCS.value}://")
+            or storage_location.startswith(f"{StorageType.S3A.value}://")
+        ):
             actual_prefix = infer_storage_options(storage_location)["path"]
         elif storage_location.startswith(f"{StorageType.LOCAL.value}:/"):
             actual_prefix = 
storage_location[len(f"{StorageType.LOCAL.value}:") :]
@@ -586,11 +602,34 @@ class 
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
         path = self._convert_actual_path(
             entry["name"], storage_location, virtual_location
         )
+
+        # if entry contains 'mtime', then return the entry with 'mtime' else
+        # if entry contains 'LastModified', then return the entry with 
'LastModified'
+
+        if "mtime" in entry:
+            # HDFS and GCS
+            return {
+                "name": path,
+                "size": entry["size"],
+                "type": entry["type"],
+                "mtime": entry["mtime"],
+            }
+
+        if "LastModified" in entry:
+            # S3 and OSS
+            return {
+                "name": path,
+                "size": entry["size"],
+                "type": entry["type"],
+                "mtime": entry["LastModified"],
+            }
+
+        # Unknown
         return {
             "name": path,
             "size": entry["size"],
             "type": entry["type"],
-            "mtime": entry["mtime"],
+            "mtime": None,
         }
 
     def _get_fileset_context(self, virtual_path: str, operation: 
FilesetDataOperation):
@@ -692,6 +731,8 @@ class GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
             return StorageType.LOCAL
         if path.startswith(f"{StorageType.GCS.value}://"):
             return StorageType.GCS
+        if path.startswith(f"{StorageType.S3A.value}://"):
+            return StorageType.S3A
         raise GravitinoRuntimeException(
             f"Storage type doesn't support now. Path:{path}"
         )
@@ -716,7 +757,7 @@ class GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
         :param path: The path
         :return: The stripped path
         """
-        if storage_type in (StorageType.HDFS, StorageType.GCS):
+        if storage_type in (StorageType.HDFS, StorageType.GCS, 
StorageType.S3A):
             return path
         if storage_type == StorageType.LOCAL:
             return path[len(f"{StorageType.LOCAL.value}:") :]
@@ -791,7 +832,9 @@ class GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
             elif storage_type == StorageType.LOCAL:
                 fs = LocalFileSystem()
             elif storage_type == StorageType.GCS:
-                fs = ArrowFSWrapper(self._get_gcs_filesystem())
+                fs = self._get_gcs_filesystem()
+            elif storage_type == StorageType.S3A:
+                fs = self._get_s3_filesystem()
             else:
                 raise GravitinoRuntimeException(
                     f"Storage type: `{storage_type}` doesn't support now."
@@ -802,22 +845,47 @@ class 
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
             write_lock.release()
 
     def _get_gcs_filesystem(self):
-        # get All keys from the options that start with 
'gravitino.bypass.gcs.' and remove the prefix
-        gcs_options = {
-            key[len(GVFSConfig.GVFS_FILESYSTEM_BY_PASS_GCS) :]: value
-            for key, value in self._options.items()
-            if key.startswith(GVFSConfig.GVFS_FILESYSTEM_BY_PASS_GCS)
-        }
-
         # get 'service-account-key' from gcs_options, if the key is not found, 
throw an exception
-        service_account_key_path = 
gcs_options.get(GVFSConfig.GVFS_FILESYSTEM_KEY_FILE)
+        service_account_key_path = self._options.get(
+            GVFSConfig.GVFS_FILESYSTEM_GCS_SERVICE_KEY_FILE
+        )
         if service_account_key_path is None:
             raise GravitinoRuntimeException(
                 "Service account key is not found in the options."
             )
-        os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = service_account_key_path
+        return importlib.import_module("gcsfs").GCSFileSystem(
+            token=service_account_key_path
+        )
 
-        return importlib.import_module("pyarrow.fs").GcsFileSystem()
+    def _get_s3_filesystem(self):
+        # get 'aws_access_key_id' from s3_options, if the key is not found, 
throw an exception
+        aws_access_key_id = 
self._options.get(GVFSConfig.GVFS_FILESYSTEM_S3_ACCESS_KEY)
+        if aws_access_key_id is None:
+            raise GravitinoRuntimeException(
+                "AWS access key id is not found in the options."
+            )
+
+        # get 'aws_secret_access_key' from s3_options, if the key is not 
found, throw an exception
+        aws_secret_access_key = self._options.get(
+            GVFSConfig.GVFS_FILESYSTEM_S3_SECRET_KEY
+        )
+        if aws_secret_access_key is None:
+            raise GravitinoRuntimeException(
+                "AWS secret access key is not found in the options."
+            )
+
+        # get 'aws_endpoint_url' from s3_options, if the key is not found, 
throw an exception
+        aws_endpoint_url = 
self._options.get(GVFSConfig.GVFS_FILESYSTEM_S3_ENDPOINT)
+        if aws_endpoint_url is None:
+            raise GravitinoRuntimeException(
+                "AWS endpoint url is not found in the options."
+            )
+
+        return importlib.import_module("s3fs").S3FileSystem(
+            key=aws_access_key_id,
+            secret=aws_secret_access_key,
+            endpoint_url=aws_endpoint_url,
+        )
 
 
 fsspec.register_implementation(PROTOCOL_NAME, GravitinoVirtualFileSystem)
diff --git a/clients/client-python/gravitino/filesystem/gvfs_config.py 
b/clients/client-python/gravitino/filesystem/gvfs_config.py
index 618565c70..7ffacdb09 100644
--- a/clients/client-python/gravitino/filesystem/gvfs_config.py
+++ b/clients/client-python/gravitino/filesystem/gvfs_config.py
@@ -32,6 +32,8 @@ class GVFSConfig:
     OAUTH2_PATH = "oauth2_path"
     OAUTH2_SCOPE = "oauth2_scope"
 
-    GVFS_FILESYSTEM_BY_PASS = "gravitino.bypass"
-    GVFS_FILESYSTEM_BY_PASS_GCS = "gravitino.bypass.gcs."
-    GVFS_FILESYSTEM_KEY_FILE = "service-account-key-path"
+    GVFS_FILESYSTEM_GCS_SERVICE_KEY_FILE = "gcs_service_account_key_path"
+
+    GVFS_FILESYSTEM_S3_ACCESS_KEY = "s3_access_key"
+    GVFS_FILESYSTEM_S3_SECRET_KEY = "s3_secret_key"
+    GVFS_FILESYSTEM_S3_ENDPOINT = "s3_endpoint"
diff --git a/clients/client-python/requirements.txt 
b/clients/client-python/requirements.txt
index a330f738a..1d0f4fadd 100644
--- a/clients/client-python/requirements.txt
+++ b/clients/client-python/requirements.txt
@@ -23,4 +23,5 @@ readerwriterlock==1.0.9
 fsspec==2024.3.1
 pyarrow==15.0.2
 cachetools==5.3.3
-google-auth==2.35.0
\ No newline at end of file
+gcsfs==2024.3.1
+s3fs==2024.3.1
\ No newline at end of file
diff --git a/clients/client-python/tests/integration/test_gvfs_with_gcs.py 
b/clients/client-python/tests/integration/test_gvfs_with_gcs.py
index 16f84dff3..54a2cfd07 100644
--- a/clients/client-python/tests/integration/test_gvfs_with_gcs.py
+++ b/clients/client-python/tests/integration/test_gvfs_with_gcs.py
@@ -20,8 +20,8 @@ import os
 from random import randint
 import unittest
 
-from fsspec.implementations.arrow import ArrowFSWrapper
-from pyarrow.fs import GcsFileSystem
+from gcsfs import GCSFileSystem
+
 
 from tests.integration.test_gvfs_with_hdfs import TestGvfsWithHDFS
 from gravitino import (
@@ -31,7 +31,7 @@ from gravitino import (
     Fileset,
 )
 from gravitino.exceptions.base import GravitinoRuntimeException
-
+from gravitino.filesystem.gvfs_config import GVFSConfig
 
 logger = logging.getLogger(__name__)
 
@@ -45,7 +45,9 @@ class TestGvfsWithGCS(TestGvfsWithHDFS):
     metalake_name: str = "TestGvfsWithGCS_metalake" + str(randint(1, 10000))
 
     def setUp(self):
-        self.options = {"gravitino.bypass.gcs.service-account-key-path": 
self.key_file}
+        self.options = {
+            f"{GVFSConfig.GVFS_FILESYSTEM_GCS_SERVICE_KEY_FILE}": self.key_file
+        }
 
     def tearDown(self):
         self.options = {}
@@ -129,9 +131,22 @@ class TestGvfsWithGCS(TestGvfsWithHDFS):
             properties=cls.fileset_properties,
         )
 
-        os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = cls.key_file
-        arrow_gcs_fs = GcsFileSystem()
-        cls.fs = ArrowFSWrapper(arrow_gcs_fs)
+        cls.fs = GCSFileSystem(token=cls.key_file)
+
+    # Object storage like GCS does not support making directory and can only 
create
+    # objects under the bucket. So we need to skip the test for GCS.
+    def check_mkdir(self, gvfs_dir, actual_dir, gvfs_instance):
+        # GCS will not create a directory, so the directory will not exist.
+        self.fs.mkdir(actual_dir)
+        self.assertFalse(self.fs.exists(actual_dir))
+        self.assertFalse(gvfs_instance.exists(gvfs_dir))
+
+    # Object storage like GCS does not support making directory and can only 
create
+    # objects under the bucket. So we need to skip the test for GCS.
+    def check_makedirs(self, gvfs_dir, actual_dir, gvfs_instance):
+        self.fs.makedirs(actual_dir)
+        self.assertFalse(self.fs.exists(actual_dir))
+        self.assertFalse(gvfs_instance.exists(gvfs_dir))
 
     def test_modified(self):
         modified_dir = self.fileset_gvfs_location + "/test_modified"
@@ -142,17 +157,15 @@ class TestGvfsWithGCS(TestGvfsWithHDFS):
             options=self.options,
             **self.conf,
         )
-        self.fs.mkdir(modified_actual_dir)
-        self.assertTrue(self.fs.exists(modified_actual_dir))
-        self.assertTrue(fs.exists(modified_dir))
 
+        self.check_mkdir(modified_dir, modified_actual_dir, fs)
         # GCP only supports getting the `object` modify time, so the modified 
time will be None
         # if it's a directory.
         # >>> gcs.mkdir('example_qazwsx/catalog/schema/fileset3')
         # >>> r = gcs.modified('example_qazwsx/catalog/schema/fileset3')
         # >>> print(r)
         # None
-        self.assertIsNone(fs.modified(modified_dir))
+        # self.assertIsNone(fs.modified(modified_dir))
 
         # create a file under the dir 'modified_dir'.
         file_path = modified_dir + "/test.txt"
@@ -160,14 +173,107 @@ class TestGvfsWithGCS(TestGvfsWithHDFS):
         self.assertTrue(fs.exists(file_path))
         self.assertIsNotNone(fs.modified(file_path))
 
-    @unittest.skip(
-        "This test will fail for https://github.com/apache/arrow/issues/44438";
-    )
-    def test_pandas(self):
-        pass
-
-    @unittest.skip(
-        "This test will fail for https://github.com/apache/arrow/issues/44438";
-    )
-    def test_pyarrow(self):
-        pass
+    def test_rm(self):
+        rm_dir = self.fileset_gvfs_location + "/test_rm"
+        rm_actual_dir = self.fileset_storage_location + "/test_rm"
+        fs = gvfs.GravitinoVirtualFileSystem(
+            server_uri="http://localhost:8090";,
+            metalake_name=self.metalake_name,
+            options=self.options,
+            **self.conf,
+        )
+        self.check_mkdir(rm_dir, rm_actual_dir, fs)
+
+        rm_file = self.fileset_gvfs_location + "/test_rm/test.file"
+        rm_actual_file = self.fileset_storage_location + "/test_rm/test.file"
+        fs.touch(rm_file)
+        self.assertTrue(self.fs.exists(rm_actual_file))
+        self.assertTrue(fs.exists(rm_file))
+
+        # test delete file
+        fs.rm(rm_file)
+        self.assertFalse(fs.exists(rm_file))
+
+        # test delete dir with recursive = false
+        rm_new_file = self.fileset_gvfs_location + "/test_rm/test_new.file"
+        rm_new_actual_file = self.fileset_storage_location + 
"/test_rm/test_new.file"
+        self.fs.touch(rm_new_actual_file)
+        self.assertTrue(self.fs.exists(rm_new_actual_file))
+        self.assertTrue(fs.exists(rm_new_file))
+        # fs.rm(rm_dir)
+
+        # fs.rm(rm_dir, recursive=False) will delete the directory and the file
+        # directly under the directory, so we comment the following code.
+        # test delete dir with recursive = true
+        # fs.rm(rm_dir, recursive=True)
+        # self.assertFalse(fs.exists(rm_dir))
+
+    def test_rmdir(self):
+        rmdir_dir = self.fileset_gvfs_location + "/test_rmdir"
+        rmdir_actual_dir = self.fileset_storage_location + "/test_rmdir"
+        fs = gvfs.GravitinoVirtualFileSystem(
+            server_uri="http://localhost:8090";,
+            metalake_name=self.metalake_name,
+            options=self.options,
+            **self.conf,
+        )
+        self.check_mkdir(rmdir_dir, rmdir_actual_dir, fs)
+
+        rmdir_file = self.fileset_gvfs_location + "/test_rmdir/test.file"
+        rmdir_actual_file = self.fileset_storage_location + 
"/test_rmdir/test.file"
+        self.fs.touch(rmdir_actual_file)
+        self.assertTrue(self.fs.exists(rmdir_actual_file))
+        self.assertTrue(fs.exists(rmdir_file))
+
+        # test delete file, GCS will remove the file directly.
+        fs.rmdir(rmdir_file)
+
+    def test_mkdir(self):
+        mkdir_dir = self.fileset_gvfs_location + "/test_mkdir"
+        mkdir_actual_dir = self.fileset_storage_location + "/test_mkdir"
+        fs = gvfs.GravitinoVirtualFileSystem(
+            server_uri="http://localhost:8090";,
+            metalake_name=self.metalake_name,
+            options=self.options,
+            **self.conf,
+        )
+
+        # it actually takes no effect.
+        self.check_mkdir(mkdir_dir, mkdir_actual_dir, fs)
+
+        # check whether it will automatically create the bucket if 
'create_parents'
+        # is set to True.
+        new_bucket = self.bucket_name + "1"
+        mkdir_dir = mkdir_dir.replace(self.bucket_name, new_bucket)
+        mkdir_actual_dir = mkdir_actual_dir.replace(self.bucket_name, 
new_bucket)
+        fs.mkdir(mkdir_dir, create_parents=True)
+
+        self.assertFalse(self.fs.exists(mkdir_actual_dir))
+        self.assertFalse(fs.exists(mkdir_dir))
+        self.assertFalse(self.fs.exists("gs://" + new_bucket))
+
+    def test_makedirs(self):
+        mkdir_dir = self.fileset_gvfs_location + "/test_mkdir"
+        mkdir_actual_dir = self.fileset_storage_location + "/test_mkdir"
+        fs = gvfs.GravitinoVirtualFileSystem(
+            server_uri="http://localhost:8090";,
+            metalake_name=self.metalake_name,
+            options=self.options,
+            **self.conf,
+        )
+
+        # it actually takes no effect.
+        self.check_mkdir(mkdir_dir, mkdir_actual_dir, fs)
+
+        # check whether it will automatically create the bucket if 
'create_parents'
+        # is set to True.
+        new_bucket = self.bucket_name + "1"
+        mkdir_dir = mkdir_dir.replace(self.bucket_name, new_bucket)
+        mkdir_actual_dir = mkdir_actual_dir.replace(self.bucket_name, 
new_bucket)
+
+        # it takes no effect.
+        fs.makedirs(mkdir_dir)
+
+        self.assertFalse(self.fs.exists(mkdir_actual_dir))
+        self.assertFalse(fs.exists(mkdir_dir))
+        self.assertFalse(self.fs.exists("gs://" + new_bucket))
diff --git a/clients/client-python/tests/integration/test_gvfs_with_hdfs.py 
b/clients/client-python/tests/integration/test_gvfs_with_hdfs.py
index 5be5914f1..8b1c367bc 100644
--- a/clients/client-python/tests/integration/test_gvfs_with_hdfs.py
+++ b/clients/client-python/tests/integration/test_gvfs_with_hdfs.py
@@ -239,8 +239,8 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
             options=self.options,
             **self.conf,
         )
-        self.fs.mkdir(ls_actual_dir)
-        self.assertTrue(self.fs.exists(ls_actual_dir))
+
+        self.check_mkdir(ls_dir, ls_actual_dir, fs)
 
         ls_file = self.fileset_gvfs_location + "/test_ls/test.file"
         ls_actual_file = self.fileset_storage_location + "/test_ls/test.file"
@@ -266,8 +266,8 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
             options=self.options,
             **self.conf,
         )
-        self.fs.mkdir(info_actual_dir)
-        self.assertTrue(self.fs.exists(info_actual_dir))
+
+        self.check_mkdir(info_dir, info_actual_dir, fs)
 
         info_file = self.fileset_gvfs_location + "/test_info/test.file"
         info_actual_file = self.fileset_storage_location + 
"/test_info/test.file"
@@ -289,9 +289,7 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
             options=self.options,
             **self.conf,
         )
-        self.fs.mkdir(exist_actual_dir)
-        self.assertTrue(self.fs.exists(exist_actual_dir))
-        self.assertTrue(fs.exists(exist_dir))
+        self.check_mkdir(exist_dir, exist_actual_dir, fs)
 
         exist_file = self.fileset_gvfs_location + "/test_exist/test.file"
         exist_actual_file = self.fileset_storage_location + 
"/test_exist/test.file"
@@ -308,9 +306,8 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
             options=self.options,
             **self.conf,
         )
-        self.fs.mkdir(cp_file_actual_dir)
-        self.assertTrue(self.fs.exists(cp_file_actual_dir))
-        self.assertTrue(fs.exists(cp_file_dir))
+
+        self.check_mkdir(cp_file_dir, cp_file_actual_dir, fs)
 
         cp_file_file = self.fileset_gvfs_location + "/test_cp_file/test.file"
         cp_file_actual_file = self.fileset_storage_location + 
"/test_cp_file/test.file"
@@ -341,9 +338,7 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
             options=self.options,
             **self.conf,
         )
-        self.fs.mkdir(mv_actual_dir)
-        self.assertTrue(self.fs.exists(mv_actual_dir))
-        self.assertTrue(fs.exists(mv_dir))
+        self.check_mkdir(mv_dir, mv_actual_dir, fs)
 
         mv_new_dir = self.fileset_gvfs_location + "/test_mv_new"
         mv_new_actual_dir = self.fileset_storage_location + "/test_mv_new"
@@ -353,9 +348,8 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
             options=self.options,
             **self.conf,
         )
-        self.fs.mkdir(mv_new_actual_dir)
-        self.assertTrue(self.fs.exists(mv_new_actual_dir))
-        self.assertTrue(fs.exists(mv_new_dir))
+
+        self.check_mkdir(mv_new_dir, mv_new_actual_dir, fs)
 
         mv_file = self.fileset_gvfs_location + "/test_mv/test.file"
         mv_actual_file = self.fileset_storage_location + "/test_mv/test.file"
@@ -385,9 +379,7 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
             options=self.options,
             **self.conf,
         )
-        self.fs.mkdir(rm_actual_dir)
-        self.assertTrue(self.fs.exists(rm_actual_dir))
-        self.assertTrue(fs.exists(rm_dir))
+        self.check_mkdir(rm_dir, rm_actual_dir, fs)
 
         rm_file = self.fileset_gvfs_location + "/test_rm/test.file"
         rm_actual_file = self.fileset_storage_location + "/test_rm/test.file"
@@ -421,9 +413,7 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
             options=self.options,
             **self.conf,
         )
-        self.fs.mkdir(rm_file_actual_dir)
-        self.assertTrue(self.fs.exists(rm_file_actual_dir))
-        self.assertTrue(fs.exists(rm_file_dir))
+        self.check_mkdir(rm_file_dir, rm_file_actual_dir, fs)
 
         rm_file_file = self.fileset_gvfs_location + "/test_rm_file/test.file"
         rm_file_actual_file = self.fileset_storage_location + 
"/test_rm_file/test.file"
@@ -448,9 +438,7 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
             options=self.options,
             **self.conf,
         )
-        self.fs.mkdir(rmdir_actual_dir)
-        self.assertTrue(self.fs.exists(rmdir_actual_dir))
-        self.assertTrue(fs.exists(rmdir_dir))
+        self.check_mkdir(rmdir_dir, rmdir_actual_dir, fs)
 
         rmdir_file = self.fileset_gvfs_location + "/test_rmdir/test.file"
         rmdir_actual_file = self.fileset_storage_location + 
"/test_rmdir/test.file"
@@ -475,9 +463,7 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
             options=self.options,
             **self.conf,
         )
-        self.fs.mkdir(open_actual_dir)
-        self.assertTrue(self.fs.exists(open_actual_dir))
-        self.assertTrue(fs.exists(open_dir))
+        self.check_mkdir(open_dir, open_actual_dir, fs)
 
         open_file = self.fileset_gvfs_location + "/test_open/test.file"
         open_actual_file = self.fileset_storage_location + 
"/test_open/test.file"
@@ -503,9 +489,7 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
             options=self.options,
             **self.conf,
         )
-        fs.mkdir(mkdir_dir)
-        self.assertTrue(fs.exists(mkdir_dir))
-        self.assertTrue(self.fs.exists(mkdir_actual_dir))
+        self.check_mkdir(mkdir_dir, mkdir_actual_dir, fs)
 
         # test mkdir dir with create_parents = false
         parent_not_exist_virtual_path = mkdir_dir + "/not_exist/sub_dir"
@@ -530,9 +514,7 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
             options=self.options,
             **self.conf,
         )
-        fs.makedirs(makedirs_dir)
-        self.assertTrue(fs.exists(makedirs_dir))
-        self.assertTrue(self.fs.exists(makedirs_actual_dir))
+        self.check_makedirs(makedirs_dir, makedirs_actual_dir, fs)
 
         # test mkdir dir not exist
         parent_not_exist_virtual_path = makedirs_dir + "/not_exist/sub_dir"
@@ -549,9 +531,8 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
             options=self.options,
             **self.conf,
         )
-        self.fs.mkdir(created_actual_dir)
-        self.assertTrue(self.fs.exists(created_actual_dir))
-        self.assertTrue(fs.exists(created_dir))
+
+        self.check_mkdir(created_dir, created_actual_dir, fs)
 
         with self.assertRaises(GravitinoRuntimeException):
             fs.created(created_dir)
@@ -565,13 +546,22 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
             options=self.options,
             **self.conf,
         )
-        self.fs.mkdir(modified_actual_dir)
-        self.assertTrue(self.fs.exists(modified_actual_dir))
-        self.assertTrue(fs.exists(modified_dir))
+
+        self.check_mkdir(modified_dir, modified_actual_dir, fs)
 
         # test mkdir dir which exists
         self.assertIsNotNone(fs.modified(modified_dir))
 
+    def check_mkdir(self, gvfs_dir, actual_dir, gvfs_instance):
+        self.fs.mkdir(actual_dir)
+        self.assertTrue(self.fs.exists(actual_dir))
+        self.assertTrue(gvfs_instance.exists(gvfs_dir))
+
+    def check_makedirs(self, gvfs_dir, actual_dir, gvfs_instance):
+        self.fs.makedirs(actual_dir)
+        self.assertTrue(self.fs.exists(actual_dir))
+        self.assertTrue(gvfs_instance.exists(gvfs_dir))
+
     def test_cat_file(self):
         cat_dir = self.fileset_gvfs_location + "/test_cat"
         cat_actual_dir = self.fileset_storage_location + "/test_cat"
@@ -581,9 +571,8 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
             options=self.options,
             **self.conf,
         )
-        self.fs.mkdir(cat_actual_dir)
-        self.assertTrue(self.fs.exists(cat_actual_dir))
-        self.assertTrue(fs.exists(cat_dir))
+
+        self.check_mkdir(cat_dir, cat_actual_dir, fs)
 
         cat_file = self.fileset_gvfs_location + "/test_cat/test.file"
         cat_actual_file = self.fileset_storage_location + "/test_cat/test.file"
@@ -609,9 +598,8 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
             options=self.options,
             **self.conf,
         )
-        self.fs.mkdir(get_actual_dir)
-        self.assertTrue(self.fs.exists(get_actual_dir))
-        self.assertTrue(fs.exists(get_dir))
+
+        self.check_mkdir(get_dir, get_actual_dir, fs)
 
         get_file = self.fileset_gvfs_location + "/test_get/test.file"
         get_actual_file = self.fileset_storage_location + "/test_get/test.file"
@@ -649,9 +637,8 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
             options=self.options,
             **self.conf,
         )
-        self.fs.mkdir(pands_actual_dir)
-        self.assertTrue(self.fs.exists(pands_actual_dir))
-        self.assertTrue(fs.exists(pands_dir))
+
+        self.check_mkdir(pands_dir, pands_actual_dir, fs)
 
         data = pandas.DataFrame({"Name": ["A", "B", "C", "D"], "ID": [20, 21, 
19, 18]})
         # to parquet
@@ -695,9 +682,8 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
             options=self.options,
             **self.conf,
         )
-        self.fs.mkdir(pyarrow_actual_dir)
-        self.assertTrue(self.fs.exists(pyarrow_actual_dir))
-        self.assertTrue(fs.exists(pyarrow_dir))
+
+        self.check_mkdir(pyarrow_dir, pyarrow_actual_dir, fs)
 
         data = pandas.DataFrame({"Name": ["A", "B", "C", "D"], "ID": [20, 21, 
19, 18]})
         # to parquet
@@ -725,9 +711,8 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
             options=self.options,
             **self.conf,
         )
-        self.fs.mkdir(llama_actual_dir)
-        self.assertTrue(self.fs.exists(llama_actual_dir))
-        self.assertTrue(fs.exists(llama_dir))
+        self.check_mkdir(llama_dir, llama_actual_dir, fs)
+
         data = pandas.DataFrame({"Name": ["A", "B", "C", "D"], "ID": [20, 21, 
19, 18]})
 
         storage_options = {
diff --git a/clients/client-python/tests/integration/test_gvfs_with_s3.py 
b/clients/client-python/tests/integration/test_gvfs_with_s3.py
new file mode 100644
index 000000000..5758a7e65
--- /dev/null
+++ b/clients/client-python/tests/integration/test_gvfs_with_s3.py
@@ -0,0 +1,299 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import logging
+import os
+from random import randint
+import unittest
+
+from s3fs import S3FileSystem
+
+from tests.integration.test_gvfs_with_hdfs import TestGvfsWithHDFS
+from gravitino import (
+    gvfs,
+    GravitinoClient,
+    Catalog,
+    Fileset,
+)
+from gravitino.exceptions.base import GravitinoRuntimeException
+from gravitino.filesystem.gvfs_config import GVFSConfig
+
+logger = logging.getLogger(__name__)
+
+
[email protected]("This test require S3 service account")
+class TestGvfsWithS3(TestGvfsWithHDFS):
+    # Before running this test, please set the make sure aws-bundle-x.jar has 
been
+    # copy to the $GRAVITINO_HOME/catalogs/hadoop/libs/ directory
+    s3_access_key = "your_access_key"
+    s3_secret_key = "your_secret_key"
+    s3_endpoint = "your_endpoint"
+    bucket_name = "your_bucket_name"
+
+    metalake_name: str = "TestGvfsWithS3_metalake" + str(randint(1, 10000))
+
+    def setUp(self):
+        self.options = {
+            f"{GVFSConfig.GVFS_FILESYSTEM_S3_ACCESS_KEY}": self.s3_access_key,
+            f"{GVFSConfig.GVFS_FILESYSTEM_S3_SECRET_KEY}": self.s3_secret_key,
+            f"{GVFSConfig.GVFS_FILESYSTEM_S3_ENDPOINT}": self.s3_endpoint,
+        }
+
+    def tearDown(self):
+        self.options = {}
+
+    @classmethod
+    def setUpClass(cls):
+        cls._get_gravitino_home()
+
+        cls.hadoop_conf_path = 
f"{cls.gravitino_home}/catalogs/hadoop/conf/hadoop.conf"
+        # restart the server
+        cls.restart_server()
+        # create entity
+        cls._init_test_entities()
+
+    @classmethod
+    def tearDownClass(cls):
+        cls._clean_test_data()
+        # reset server conf in case of other ITs like HDFS has changed it and 
fail
+        # to reset it
+        cls._reset_conf(cls.config, cls.hadoop_conf_path)
+        # restart server
+        cls.restart_server()
+
+    # clear all config in the conf_path
+    @classmethod
+    def _reset_conf(cls, config, conf_path):
+        logger.info("Reset %s.", conf_path)
+        if not os.path.exists(conf_path):
+            raise GravitinoRuntimeException(f"Conf file is not found at 
`{conf_path}`.")
+        filtered_lines = []
+        with open(conf_path, mode="r", encoding="utf-8") as file:
+            origin_lines = file.readlines()
+
+        for line in origin_lines:
+            line = line.strip()
+            if line.startswith("#"):
+                # append annotations directly
+                filtered_lines.append(line + "\n")
+
+        with open(conf_path, mode="w", encoding="utf-8") as file:
+            for line in filtered_lines:
+                file.write(line)
+
+    @classmethod
+    def _init_test_entities(cls):
+        cls.gravitino_admin_client.create_metalake(
+            name=cls.metalake_name, comment="", properties={}
+        )
+        cls.gravitino_client = GravitinoClient(
+            uri="http://localhost:8090";, metalake_name=cls.metalake_name
+        )
+
+        cls.config = {}
+        cls.conf = {}
+        catalog = cls.gravitino_client.create_catalog(
+            name=cls.catalog_name,
+            catalog_type=Catalog.Type.FILESET,
+            provider=cls.catalog_provider,
+            comment="",
+            properties={
+                "filesystem-providers": "s3",
+                "gravitino.bypass.fs.s3a.access.key": cls.s3_access_key,
+                "gravitino.bypass.fs.s3a.secret.key": cls.s3_secret_key,
+                "gravitino.bypass.fs.s3a.endpoint": cls.s3_endpoint,
+            },
+        )
+        catalog.as_schemas().create_schema(
+            schema_name=cls.schema_name, comment="", properties={}
+        )
+
+        cls.fileset_storage_location: str = (
+            
f"s3a://{cls.bucket_name}/{cls.catalog_name}/{cls.schema_name}/{cls.fileset_name}"
+        )
+        cls.fileset_gvfs_location = (
+            
f"gvfs://fileset/{cls.catalog_name}/{cls.schema_name}/{cls.fileset_name}"
+        )
+        catalog.as_fileset_catalog().create_fileset(
+            ident=cls.fileset_ident,
+            fileset_type=Fileset.Type.MANAGED,
+            comment=cls.fileset_comment,
+            storage_location=cls.fileset_storage_location,
+            properties=cls.fileset_properties,
+        )
+
+        cls.fs = S3FileSystem(
+            key=cls.s3_access_key,
+            secret=cls.s3_secret_key,
+            endpoint_url=cls.s3_endpoint,
+        )
+
+    def check_mkdir(self, gvfs_dir, actual_dir, gvfs_instance):
+        # S3 will not create a directory, so the directory will not exist.
+        self.fs.mkdir(actual_dir)
+        self.assertFalse(self.fs.exists(actual_dir))
+        self.assertFalse(gvfs_instance.exists(gvfs_dir))
+
+    def check_makedirs(self, gvfs_dir, actual_dir, gvfs_instance):
+        self.fs.makedirs(actual_dir)
+        self.assertFalse(self.fs.exists(actual_dir))
+        self.assertFalse(gvfs_instance.exists(gvfs_dir))
+
+    def test_modified(self):
+        modified_dir = self.fileset_gvfs_location + "/test_modified"
+        modified_actual_dir = self.fileset_storage_location + "/test_modified"
+        fs = gvfs.GravitinoVirtualFileSystem(
+            server_uri="http://localhost:8090";,
+            metalake_name=self.metalake_name,
+            options=self.options,
+            **self.conf,
+        )
+
+        self.check_mkdir(modified_dir, modified_actual_dir, fs)
+        # S3 only supports getting the `object` modify time, so the modified 
time will be None
+        # if it's a directory.
+        # >>> gcs.mkdir('example_qazwsx/catalog/schema/fileset3')
+        # >>> r = gcs.modified('example_qazwsx/catalog/schema/fileset3')
+        # >>> print(r)
+        # None
+        # self.assertIsNone(fs.modified(modified_dir))
+
+        # create a file under the dir 'modified_dir'.
+        file_path = modified_dir + "/test.txt"
+        fs.touch(file_path)
+        self.assertTrue(fs.exists(file_path))
+        self.assertIsNotNone(fs.modified(file_path))
+
+    def test_rm(self):
+        rm_dir = self.fileset_gvfs_location + "/test_rm"
+        rm_actual_dir = self.fileset_storage_location + "/test_rm"
+        fs = gvfs.GravitinoVirtualFileSystem(
+            server_uri="http://localhost:8090";,
+            metalake_name=self.metalake_name,
+            options=self.options,
+            **self.conf,
+        )
+        self.check_mkdir(rm_dir, rm_actual_dir, fs)
+
+        rm_file = self.fileset_gvfs_location + "/test_rm/test.file"
+        rm_actual_file = self.fileset_storage_location + "/test_rm/test.file"
+        fs.touch(rm_file)
+        self.assertTrue(self.fs.exists(rm_actual_file))
+        self.assertTrue(fs.exists(rm_file))
+
+        # test delete file
+        fs.rm(rm_file)
+        self.assertFalse(fs.exists(rm_file))
+
+        # test delete dir with recursive = false
+        rm_new_file = self.fileset_gvfs_location + "/test_rm/test_new.file"
+        rm_new_actual_file = self.fileset_storage_location + 
"/test_rm/test_new.file"
+        self.fs.touch(rm_new_actual_file)
+        self.assertTrue(self.fs.exists(rm_new_actual_file))
+        self.assertTrue(fs.exists(rm_new_file))
+
+    def test_rmdir(self):
+        rmdir_dir = self.fileset_gvfs_location + "/test_rmdir"
+        rmdir_actual_dir = self.fileset_storage_location + "/test_rmdir"
+        fs = gvfs.GravitinoVirtualFileSystem(
+            server_uri="http://localhost:8090";,
+            metalake_name=self.metalake_name,
+            options=self.options,
+            **self.conf,
+        )
+        self.check_mkdir(rmdir_dir, rmdir_actual_dir, fs)
+
+        rmdir_file = self.fileset_gvfs_location + "/test_rmdir/test.file"
+        rmdir_actual_file = self.fileset_storage_location + 
"/test_rmdir/test.file"
+        self.fs.touch(rmdir_actual_file)
+        self.assertTrue(self.fs.exists(rmdir_actual_file))
+        self.assertTrue(fs.exists(rmdir_file))
+
+        fs.rm_file(rmdir_file)
+
+    def test_mkdir(self):
+        mkdir_dir = self.fileset_gvfs_location + "/test_mkdir"
+        mkdir_actual_dir = self.fileset_storage_location + "/test_mkdir"
+        fs = gvfs.GravitinoVirtualFileSystem(
+            server_uri="http://localhost:8090";,
+            metalake_name=self.metalake_name,
+            options=self.options,
+            **self.conf,
+        )
+
+        # it actually takes no effect.
+        self.check_mkdir(mkdir_dir, mkdir_actual_dir, fs)
+
+        # check whether it will automatically create the bucket if 
'create_parents'
+        # is set to True.
+        new_bucket = self.bucket_name + "1"
+        mkdir_dir = mkdir_dir.replace(self.bucket_name, new_bucket)
+        fs.mkdir(mkdir_dir, create_parents=True)
+
+        self.assertFalse(fs.exists(mkdir_dir))
+        self.assertFalse(self.fs.exists("s3://" + new_bucket))
+
+    def test_makedirs(self):
+        mkdir_dir = self.fileset_gvfs_location + "/test_mkdir"
+        mkdir_actual_dir = self.fileset_storage_location + "/test_mkdir"
+        fs = gvfs.GravitinoVirtualFileSystem(
+            server_uri="http://localhost:8090";,
+            metalake_name=self.metalake_name,
+            options=self.options,
+            **self.conf,
+        )
+
+        # it actually takes no effect.
+        self.check_mkdir(mkdir_dir, mkdir_actual_dir, fs)
+
+        # check whether it will automatically create the bucket if 
'create_parents'
+        # is set to True.
+        new_bucket = self.bucket_name + "1"
+        mkdir_dir = mkdir_dir.replace(self.bucket_name, new_bucket)
+        mkdir_actual_dir = mkdir_actual_dir.replace(self.bucket_name, 
new_bucket)
+
+        # it takes no effect.
+        fs.makedirs(mkdir_dir)
+        with self.assertRaises(OSError):
+            self.fs.exists(mkdir_actual_dir)
+
+        self.assertFalse(fs.exists(mkdir_dir))
+        self.assertFalse(self.fs.exists("s3://" + new_bucket))
+
+    def test_rm_file(self):
+        rm_file_dir = self.fileset_gvfs_location + "/test_rm_file"
+        rm_file_actual_dir = self.fileset_storage_location + "/test_rm_file"
+        fs = gvfs.GravitinoVirtualFileSystem(
+            server_uri="http://localhost:8090";,
+            metalake_name=self.metalake_name,
+            options=self.options,
+            **self.conf,
+        )
+        self.check_mkdir(rm_file_dir, rm_file_actual_dir, fs)
+
+        rm_file_file = self.fileset_gvfs_location + "/test_rm_file/test.file"
+        rm_file_actual_file = self.fileset_storage_location + 
"/test_rm_file/test.file"
+        self.fs.touch(rm_file_actual_file)
+        self.assertTrue(self.fs.exists(rm_file_actual_file))
+        self.assertTrue(fs.exists(rm_file_file))
+
+        # test delete file
+        fs.rm_file(rm_file_file)
+        self.assertFalse(fs.exists(rm_file_file))
+
+        # test delete dir
+        fs.rm_file(rm_file_dir)


Reply via email to