This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new ef44021cd [#5139] feat(python-client): Support GCS fileset in the 
python GVFS client  (#5160)
ef44021cd is described below

commit ef44021cd6cde69b685a58b55079a4ab7f980b4c
Author: Qi Yu <[email protected]>
AuthorDate: Mon Oct 21 20:04:36 2024 +0800

    [#5139] feat(python-client): Support GCS fileset in the python GVFS client  
(#5160)
    
    ### What changes were proposed in this pull request?
    
    - Support GCS fileset in the Python GVFS client
    - Add an IT about the GCS fileset.
    
    ### Why are the changes needed?
    
    It's user needs.
    
    Fix: #5139
    
    ### Does this PR introduce _any_ user-facing change?
    
    Modify the Python GVFS client.
    
    ### How was this patch tested?
    
    Test locally and add an IT that can't run automatically.
    
    Modify mode as the following picture and execute `./gradlew
    :clients:client-python:test -PskipDockerTests=false` success.
    
    <img width="1332" alt="image"
    
src="https://github.com/user-attachments/assets/e3b781fa-3ac1-458a-9224-d7e01107828b";>
    
    
    <img width="1530" alt="image"
    
src="https://github.com/user-attachments/assets/f53c2d20-c253-4eb9-a84f-4c13569b513b";>
---
 .../gravitino/gcs/fs/GCSFileSystemProvider.java    |   5 +
 .../integration/test/HadoopGCSCatalogIT.java       |   2 +-
 clients/client-python/gravitino/filesystem/gvfs.py |  47 +++++-
 .../gravitino/filesystem/gvfs_config.py            |   4 +
 clients/client-python/requirements.txt             |   3 +-
 .../tests/integration/integration_test_env.py      |  13 ++
 .../tests/integration/test_catalog.py              |  15 +-
 .../tests/integration/test_gvfs_with_gcs.py        | 173 +++++++++++++++++++++
 .../tests/integration/test_gvfs_with_hdfs.py       | 157 +++++++++++--------
 .../test/GravitinoVirtualFileSystemGCSIT.java      |   1 +
 10 files changed, 339 insertions(+), 81 deletions(-)

diff --git 
a/bundles/gcp-bundle/src/main/java/org/apache/gravitino/gcs/fs/GCSFileSystemProvider.java
 
b/bundles/gcp-bundle/src/main/java/org/apache/gravitino/gcs/fs/GCSFileSystemProvider.java
index 919baa03b..74a70f083 100644
--- 
a/bundles/gcp-bundle/src/main/java/org/apache/gravitino/gcs/fs/GCSFileSystemProvider.java
+++ 
b/bundles/gcp-bundle/src/main/java/org/apache/gravitino/gcs/fs/GCSFileSystemProvider.java
@@ -25,8 +25,12 @@ import 
org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class GCSFileSystemProvider implements FileSystemProvider {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(GCSFileSystemProvider.class);
+
   @Override
   public FileSystem getFileSystem(Path path, Map<String, String> config) 
throws IOException {
     Configuration configuration = new Configuration();
@@ -35,6 +39,7 @@ public class GCSFileSystemProvider implements 
FileSystemProvider {
           configuration.set(k.replace("gravitino.bypass.", ""), v);
         });
 
+    LOGGER.info("Creating GCS file system with config: {}", config);
     return GoogleHadoopFileSystem.newInstance(path.toUri(), configuration);
   }
 
diff --git 
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopGCSCatalogIT.java
 
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopGCSCatalogIT.java
index db1d01336..cca13b770 100644
--- 
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopGCSCatalogIT.java
+++ 
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopGCSCatalogIT.java
@@ -44,7 +44,7 @@ public class HadoopGCSCatalogIT extends HadoopCatalogIT {
 
   @Override
   public void startIntegrationTest() throws Exception {
-    // Do nothing.
+    // Just overwrite super, do nothing.
   }
 
   @BeforeAll
diff --git a/clients/client-python/gravitino/filesystem/gvfs.py 
b/clients/client-python/gravitino/filesystem/gvfs.py
index e5a565ce0..8f1b2008a 100644
--- a/clients/client-python/gravitino/filesystem/gvfs.py
+++ b/clients/client-python/gravitino/filesystem/gvfs.py
@@ -14,11 +14,12 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
+import os
 from enum import Enum
 from pathlib import PurePosixPath
 from typing import Dict, Tuple
 import re
+import importlib
 import fsspec
 
 from cachetools import TTLCache, LRUCache
@@ -26,7 +27,7 @@ from fsspec import AbstractFileSystem
 from fsspec.implementations.local import LocalFileSystem
 from fsspec.implementations.arrow import ArrowFSWrapper
 from fsspec.utils import infer_storage_options
-from pyarrow.fs import HadoopFileSystem
+
 from readerwriterlock import rwlock
 from gravitino.audit.caller_context import CallerContext, CallerContextHolder
 from gravitino.audit.fileset_audit_constants import FilesetAuditConstants
@@ -47,6 +48,7 @@ PROTOCOL_NAME = "gvfs"
 class StorageType(Enum):
     HDFS = "hdfs"
     LOCAL = "file"
+    GCS = "gs"
 
 
 class FilesetContextPair:
@@ -66,7 +68,7 @@ class FilesetContextPair:
 
 
 class GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
-    """This is a virtual file system which users can access `fileset` and
+    """This is a virtual file system that users can access `fileset` and
     other resources.
 
     It obtains the actual storage location corresponding to the resource from 
the
@@ -149,6 +151,7 @@ class GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
         self._cache_lock = rwlock.RWLockFair()
         self._catalog_cache = LRUCache(maxsize=100)
         self._catalog_cache_lock = rwlock.RWLockFair()
+        self._options = options
 
         super().__init__(**kwargs)
 
@@ -309,7 +312,9 @@ class GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
         )
         dst_actual_path = dst_context_pair.actual_file_location()
 
-        if storage_type == StorageType.HDFS:
+        # convert the following to in
+
+        if storage_type in [StorageType.HDFS, StorageType.GCS]:
             src_context_pair.filesystem().mv(
                 self._strip_storage_protocol(storage_type, src_actual_path),
                 self._strip_storage_protocol(storage_type, dst_actual_path),
@@ -540,7 +545,11 @@ class 
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
         :param virtual_location: Virtual location
         :return A virtual path
         """
-        if storage_location.startswith(f"{StorageType.HDFS.value}://"):
+
+        # If the storage path starts with hdfs, gcs, we should use the path as 
the prefix.
+        if storage_location.startswith(
+            f"{StorageType.HDFS.value}://"
+        ) or storage_location.startswith(f"{StorageType.GCS.value}://"):
             actual_prefix = infer_storage_options(storage_location)["path"]
         elif storage_location.startswith(f"{StorageType.LOCAL.value}:/"):
             actual_prefix = 
storage_location[len(f"{StorageType.LOCAL.value}:") :]
@@ -681,6 +690,8 @@ class GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
             return StorageType.HDFS
         if path.startswith(f"{StorageType.LOCAL.value}:/"):
             return StorageType.LOCAL
+        if path.startswith(f"{StorageType.GCS.value}://"):
+            return StorageType.GCS
         raise GravitinoRuntimeException(
             f"Storage type doesn't support now. Path:{path}"
         )
@@ -705,10 +716,11 @@ class 
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
         :param path: The path
         :return: The stripped path
         """
-        if storage_type == StorageType.HDFS:
+        if storage_type in (StorageType.HDFS, StorageType.GCS):
             return path
         if storage_type == StorageType.LOCAL:
             return path[len(f"{StorageType.LOCAL.value}:") :]
+
         raise GravitinoRuntimeException(
             f"Storage type:{storage_type} doesn't support now."
         )
@@ -774,9 +786,12 @@ class 
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
             if cache_value is not None:
                 return cache_value
             if storage_type == StorageType.HDFS:
-                fs = 
ArrowFSWrapper(HadoopFileSystem.from_uri(actual_file_location))
+                fs_class = 
importlib.import_module("pyarrow.fs").HadoopFileSystem
+                fs = ArrowFSWrapper(fs_class.from_uri(actual_file_location))
             elif storage_type == StorageType.LOCAL:
                 fs = LocalFileSystem()
+            elif storage_type == StorageType.GCS:
+                fs = ArrowFSWrapper(self._get_gcs_filesystem())
             else:
                 raise GravitinoRuntimeException(
                     f"Storage type: `{storage_type}` doesn't support now."
@@ -786,5 +801,23 @@ class 
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
         finally:
             write_lock.release()
 
+    def _get_gcs_filesystem(self):
+        # get All keys from the options that start with 
'gravitino.bypass.gcs.' and remove the prefix
+        gcs_options = {
+            key[len(GVFSConfig.GVFS_FILESYSTEM_BY_PASS_GCS) :]: value
+            for key, value in self._options.items()
+            if key.startswith(GVFSConfig.GVFS_FILESYSTEM_BY_PASS_GCS)
+        }
+
+        # get 'service-account-key' from gcs_options, if the key is not found, 
throw an exception
+        service_account_key_path = 
gcs_options.get(GVFSConfig.GVFS_FILESYSTEM_KEY_FILE)
+        if service_account_key_path is None:
+            raise GravitinoRuntimeException(
+                "Service account key is not found in the options."
+            )
+        os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = service_account_key_path
+
+        return importlib.import_module("pyarrow.fs").GcsFileSystem()
+
 
 fsspec.register_implementation(PROTOCOL_NAME, GravitinoVirtualFileSystem)
diff --git a/clients/client-python/gravitino/filesystem/gvfs_config.py 
b/clients/client-python/gravitino/filesystem/gvfs_config.py
index eb5733b56..618565c70 100644
--- a/clients/client-python/gravitino/filesystem/gvfs_config.py
+++ b/clients/client-python/gravitino/filesystem/gvfs_config.py
@@ -31,3 +31,7 @@ class GVFSConfig:
     OAUTH2_CREDENTIAL = "oauth2_credential"
     OAUTH2_PATH = "oauth2_path"
     OAUTH2_SCOPE = "oauth2_scope"
+
+    GVFS_FILESYSTEM_BY_PASS = "gravitino.bypass"
+    GVFS_FILESYSTEM_BY_PASS_GCS = "gravitino.bypass.gcs."
+    GVFS_FILESYSTEM_KEY_FILE = "service-account-key-path"
diff --git a/clients/client-python/requirements.txt 
b/clients/client-python/requirements.txt
index 7242082b7..a330f738a 100644
--- a/clients/client-python/requirements.txt
+++ b/clients/client-python/requirements.txt
@@ -22,4 +22,5 @@ dataclasses-json==0.6.6
 readerwriterlock==1.0.9
 fsspec==2024.3.1
 pyarrow==15.0.2
-cachetools==5.3.3
\ No newline at end of file
+cachetools==5.3.3
+google-auth==2.35.0
\ No newline at end of file
diff --git a/clients/client-python/tests/integration/integration_test_env.py 
b/clients/client-python/tests/integration/integration_test_env.py
index cfe6c0eda..d0d39a06d 100644
--- a/clients/client-python/tests/integration/integration_test_env.py
+++ b/clients/client-python/tests/integration/integration_test_env.py
@@ -21,6 +21,7 @@ import unittest
 import subprocess
 import time
 import sys
+import shutil
 
 import requests
 
@@ -80,6 +81,12 @@ class IntegrationTestEnv(unittest.TestCase):
             )
             sys.exit(0)
 
+        # remove data dir under gravitino_home
+        data_dir = os.path.join(cls.gravitino_home, "data")
+        if os.path.exists(data_dir):
+            logger.info("Remove Gravitino data directory: %s", data_dir)
+            shutil.rmtree(data_dir)
+
         logger.info("Starting integration test environment...")
 
         # Start Gravitino Server
@@ -141,6 +148,12 @@ class IntegrationTestEnv(unittest.TestCase):
                 "project root directory."
             )
 
+        # remove data dir under gravitino_home
+        data_dir = os.path.join(gravitino_home, "data")
+        if os.path.exists(data_dir):
+            logger.info("Remove Gravitino data directory: %s", data_dir)
+            shutil.rmtree(data_dir)
+
         # Restart Gravitino Server
         env_vars = os.environ.copy()
         env_vars["HADOOP_USER_NAME"] = "anonymous"
diff --git a/clients/client-python/tests/integration/test_catalog.py 
b/clients/client-python/tests/integration/test_catalog.py
index 755b295b0..64208315e 100644
--- a/clients/client-python/tests/integration/test_catalog.py
+++ b/clients/client-python/tests/integration/test_catalog.py
@@ -39,7 +39,8 @@ logger = logging.getLogger(__name__)
 class TestCatalog(IntegrationTestEnv):
     metalake_name: str = "TestSchema_metalake" + str(randint(1, 10000))
 
-    catalog_name: str = "testCatalog"
+    catalog_name: str = "testCatalog" + str(randint(1, 10000))
+    catalog_name_bak = catalog_name
     catalog_comment: str = "catalogComment"
     catalog_location_prop: str = "location"  # Fileset Catalog must set 
`location`
     catalog_provider: str = "hadoop"
@@ -81,23 +82,27 @@ class TestCatalog(IntegrationTestEnv):
         )
         try:
             logger.info(
-                "Drop catalog %s[%s]",
+                "TestCatalog: drop catalog %s[%s]",
                 self.catalog_ident,
                 self.gravitino_client.drop_catalog(name=self.catalog_name, 
force=True),
             )
         except GravitinoRuntimeException:
-            logger.warning("Failed to drop catalog %s", self.catalog_name)
+            logger.warning("TestCatalog: failed to drop catalog %s", 
self.catalog_name)
 
         try:
             logger.info(
-                "Drop metalake %s[%s]",
+                "TestCatalog: drop metalake %s[%s]",
                 self.metalake_name,
                 self.gravitino_admin_client.drop_metalake(
                     self.metalake_name, force=True
                 ),
             )
         except GravitinoRuntimeException:
-            logger.warning("Failed to drop metalake %s", self.metalake_name)
+            logger.warning(
+                "TestCatalog: failed to drop metalake %s", self.metalake_name
+            )
+
+        self.catalog_name = self.catalog_name_bak
 
     def test_list_catalogs(self):
         self.create_catalog(self.catalog_name)
diff --git a/clients/client-python/tests/integration/test_gvfs_with_gcs.py 
b/clients/client-python/tests/integration/test_gvfs_with_gcs.py
new file mode 100644
index 000000000..16f84dff3
--- /dev/null
+++ b/clients/client-python/tests/integration/test_gvfs_with_gcs.py
@@ -0,0 +1,173 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import logging
+import os
+from random import randint
+import unittest
+
+from fsspec.implementations.arrow import ArrowFSWrapper
+from pyarrow.fs import GcsFileSystem
+
+from tests.integration.test_gvfs_with_hdfs import TestGvfsWithHDFS
+from gravitino import (
+    gvfs,
+    GravitinoClient,
+    Catalog,
+    Fileset,
+)
+from gravitino.exceptions.base import GravitinoRuntimeException
+
+
+logger = logging.getLogger(__name__)
+
+
[email protected]("This test require GCS service account key file")
+class TestGvfsWithGCS(TestGvfsWithHDFS):
+    # Before running this test, please set the make sure gcp-bundle-x.jar has 
been
+    # copy to the $GRAVITINO_HOME/catalogs/hadoop/libs/ directory
+    key_file = "your_key_file.json"
+    bucket_name = "your_bucket_name"
+    metalake_name: str = "TestGvfsWithGCS_metalake" + str(randint(1, 10000))
+
+    def setUp(self):
+        self.options = {"gravitino.bypass.gcs.service-account-key-path": 
self.key_file}
+
+    def tearDown(self):
+        self.options = {}
+
+    @classmethod
+    def setUpClass(cls):
+        cls._get_gravitino_home()
+
+        cls.hadoop_conf_path = 
f"{cls.gravitino_home}/catalogs/hadoop/conf/hadoop.conf"
+        # restart the server
+        cls.restart_server()
+        # create entity
+        cls._init_test_entities()
+
+    @classmethod
+    def tearDownClass(cls):
+        cls._clean_test_data()
+        # reset server conf in case of other ITs like HDFS has changed it and 
fail
+        # to reset it
+        cls._reset_conf(cls.config, cls.hadoop_conf_path)
+        # restart server
+        cls.restart_server()
+
+    # clear all config in the conf_path
+    @classmethod
+    def _reset_conf(cls, config, conf_path):
+        logger.info("Reset %s.", conf_path)
+        if not os.path.exists(conf_path):
+            raise GravitinoRuntimeException(f"Conf file is not found at 
`{conf_path}`.")
+        filtered_lines = []
+        with open(conf_path, mode="r", encoding="utf-8") as file:
+            origin_lines = file.readlines()
+
+        for line in origin_lines:
+            line = line.strip()
+            if line.startswith("#"):
+                # append annotations directly
+                filtered_lines.append(line + "\n")
+
+        with open(conf_path, mode="w", encoding="utf-8") as file:
+            for line in filtered_lines:
+                file.write(line)
+
+    @classmethod
+    def _init_test_entities(cls):
+        cls.gravitino_admin_client.create_metalake(
+            name=cls.metalake_name, comment="", properties={}
+        )
+        cls.gravitino_client = GravitinoClient(
+            uri="http://localhost:8090";, metalake_name=cls.metalake_name
+        )
+
+        cls.config = {}
+        cls.conf = {}
+        catalog = cls.gravitino_client.create_catalog(
+            name=cls.catalog_name,
+            catalog_type=Catalog.Type.FILESET,
+            provider=cls.catalog_provider,
+            comment="",
+            properties={
+                "filesystem-providers": "gcs",
+                "gravitino.bypass.fs.gs.auth.service.account.enable": "true",
+                "gravitino.bypass.fs.gs.auth.service.account.json.keyfile": 
cls.key_file,
+            },
+        )
+        catalog.as_schemas().create_schema(
+            schema_name=cls.schema_name, comment="", properties={}
+        )
+
+        cls.fileset_storage_location: str = (
+            
f"gs://{cls.bucket_name}/{cls.catalog_name}/{cls.schema_name}/{cls.fileset_name}"
+        )
+        cls.fileset_gvfs_location = (
+            
f"gvfs://fileset/{cls.catalog_name}/{cls.schema_name}/{cls.fileset_name}"
+        )
+        catalog.as_fileset_catalog().create_fileset(
+            ident=cls.fileset_ident,
+            fileset_type=Fileset.Type.MANAGED,
+            comment=cls.fileset_comment,
+            storage_location=cls.fileset_storage_location,
+            properties=cls.fileset_properties,
+        )
+
+        os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = cls.key_file
+        arrow_gcs_fs = GcsFileSystem()
+        cls.fs = ArrowFSWrapper(arrow_gcs_fs)
+
+    def test_modified(self):
+        modified_dir = self.fileset_gvfs_location + "/test_modified"
+        modified_actual_dir = self.fileset_storage_location + "/test_modified"
+        fs = gvfs.GravitinoVirtualFileSystem(
+            server_uri="http://localhost:8090";,
+            metalake_name=self.metalake_name,
+            options=self.options,
+            **self.conf,
+        )
+        self.fs.mkdir(modified_actual_dir)
+        self.assertTrue(self.fs.exists(modified_actual_dir))
+        self.assertTrue(fs.exists(modified_dir))
+
+        # GCP only supports getting the `object` modify time, so the modified 
time will be None
+        # if it's a directory.
+        # >>> gcs.mkdir('example_qazwsx/catalog/schema/fileset3')
+        # >>> r = gcs.modified('example_qazwsx/catalog/schema/fileset3')
+        # >>> print(r)
+        # None
+        self.assertIsNone(fs.modified(modified_dir))
+
+        # create a file under the dir 'modified_dir'.
+        file_path = modified_dir + "/test.txt"
+        fs.touch(file_path)
+        self.assertTrue(fs.exists(file_path))
+        self.assertIsNotNone(fs.modified(file_path))
+
+    @unittest.skip(
+        "This test will fail for https://github.com/apache/arrow/issues/44438";
+    )
+    def test_pandas(self):
+        pass
+
+    @unittest.skip(
+        "This test will fail for https://github.com/apache/arrow/issues/44438";
+    )
+    def test_pyarrow(self):
+        pass
diff --git a/clients/client-python/tests/integration/test_gvfs_with_hdfs.py 
b/clients/client-python/tests/integration/test_gvfs_with_hdfs.py
index 8bc6597b4..5be5914f1 100644
--- a/clients/client-python/tests/integration/test_gvfs_with_hdfs.py
+++ b/clients/client-python/tests/integration/test_gvfs_with_hdfs.py
@@ -89,6 +89,7 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
         uri="http://localhost:8090";
     )
     gravitino_client: GravitinoClient = None
+    options = {}
 
     @classmethod
     def setUpClass(cls):
@@ -124,7 +125,8 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
             BaseHadoopEnvironment.clear_hadoop_env()
         finally:
             # close hdfs container
-            cls.hdfs_container.close()
+            if cls.hdfs_container is not None:
+                cls.hdfs_container.close()
 
     @classmethod
     def _init_test_entities(cls):
@@ -159,7 +161,7 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
             properties=cls.fileset_properties,
         )
         arrow_hadoop_fs = HadoopFileSystem(host=cls.hdfs_container.get_ip(), 
port=9000)
-        cls.hdfs = ArrowFSWrapper(arrow_hadoop_fs)
+        cls.fs = ArrowFSWrapper(arrow_hadoop_fs)
         cls.conf: Dict = {"fs.defaultFS": 
f"hdfs://{cls.hdfs_container.get_ip()}:9000/"}
 
     @classmethod
@@ -234,15 +236,16 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
         fs = gvfs.GravitinoVirtualFileSystem(
             server_uri="http://localhost:8090";,
             metalake_name=self.metalake_name,
+            options=self.options,
             **self.conf,
         )
-        self.hdfs.mkdir(ls_actual_dir)
-        self.assertTrue(self.hdfs.exists(ls_actual_dir))
+        self.fs.mkdir(ls_actual_dir)
+        self.assertTrue(self.fs.exists(ls_actual_dir))
 
         ls_file = self.fileset_gvfs_location + "/test_ls/test.file"
         ls_actual_file = self.fileset_storage_location + "/test_ls/test.file"
-        self.hdfs.touch(ls_actual_file)
-        self.assertTrue(self.hdfs.exists(ls_actual_file))
+        self.fs.touch(ls_actual_file)
+        self.assertTrue(self.fs.exists(ls_actual_file))
 
         # test detail = false
         file_list_without_detail = fs.ls(ls_dir, detail=False)
@@ -260,15 +263,16 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
         fs = gvfs.GravitinoVirtualFileSystem(
             server_uri="http://localhost:8090";,
             metalake_name=self.metalake_name,
+            options=self.options,
             **self.conf,
         )
-        self.hdfs.mkdir(info_actual_dir)
-        self.assertTrue(self.hdfs.exists(info_actual_dir))
+        self.fs.mkdir(info_actual_dir)
+        self.assertTrue(self.fs.exists(info_actual_dir))
 
         info_file = self.fileset_gvfs_location + "/test_info/test.file"
         info_actual_file = self.fileset_storage_location + 
"/test_info/test.file"
-        self.hdfs.touch(info_actual_file)
-        self.assertTrue(self.hdfs.exists(info_actual_file))
+        self.fs.touch(info_actual_file)
+        self.assertTrue(self.fs.exists(info_actual_file))
 
         dir_info = fs.info(info_dir)
         self.assertEqual(dir_info["name"], info_dir[len("gvfs://") :])
@@ -282,16 +286,17 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
         fs = gvfs.GravitinoVirtualFileSystem(
             server_uri="http://localhost:8090";,
             metalake_name=self.metalake_name,
+            options=self.options,
             **self.conf,
         )
-        self.hdfs.mkdir(exist_actual_dir)
-        self.assertTrue(self.hdfs.exists(exist_actual_dir))
+        self.fs.mkdir(exist_actual_dir)
+        self.assertTrue(self.fs.exists(exist_actual_dir))
         self.assertTrue(fs.exists(exist_dir))
 
         exist_file = self.fileset_gvfs_location + "/test_exist/test.file"
         exist_actual_file = self.fileset_storage_location + 
"/test_exist/test.file"
-        self.hdfs.touch(exist_actual_file)
-        self.assertTrue(self.hdfs.exists(exist_actual_file))
+        self.fs.touch(exist_actual_file)
+        self.assertTrue(self.fs.exists(exist_actual_file))
         self.assertTrue(fs.exists(exist_file))
 
     def test_cp_file(self):
@@ -300,19 +305,20 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
         fs = gvfs.GravitinoVirtualFileSystem(
             server_uri="http://localhost:8090";,
             metalake_name=self.metalake_name,
+            options=self.options,
             **self.conf,
         )
-        self.hdfs.mkdir(cp_file_actual_dir)
-        self.assertTrue(self.hdfs.exists(cp_file_actual_dir))
+        self.fs.mkdir(cp_file_actual_dir)
+        self.assertTrue(self.fs.exists(cp_file_actual_dir))
         self.assertTrue(fs.exists(cp_file_dir))
 
         cp_file_file = self.fileset_gvfs_location + "/test_cp_file/test.file"
         cp_file_actual_file = self.fileset_storage_location + 
"/test_cp_file/test.file"
-        self.hdfs.touch(cp_file_actual_file)
-        self.assertTrue(self.hdfs.exists(cp_file_actual_file))
+        self.fs.touch(cp_file_actual_file)
+        self.assertTrue(self.fs.exists(cp_file_actual_file))
         self.assertTrue(fs.exists(cp_file_file))
 
-        with self.hdfs.open(cp_file_actual_file, "wb") as f:
+        with self.fs.open(cp_file_actual_file, "wb") as f:
             f.write(b"test_file_1")
 
         cp_file_new_file = self.fileset_gvfs_location + 
"/test_cp_file/test_cp.file"
@@ -322,7 +328,7 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
         fs.cp_file(cp_file_file, cp_file_new_file)
         self.assertTrue(fs.exists(cp_file_new_file))
 
-        with self.hdfs.open(cp_file_new_actual_file, "rb") as f:
+        with self.fs.open(cp_file_new_actual_file, "rb") as f:
             result = f.read()
         self.assertEqual(b"test_file_1", result)
 
@@ -332,10 +338,11 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
         fs = gvfs.GravitinoVirtualFileSystem(
             server_uri="http://localhost:8090";,
             metalake_name=self.metalake_name,
+            options=self.options,
             **self.conf,
         )
-        self.hdfs.mkdir(mv_actual_dir)
-        self.assertTrue(self.hdfs.exists(mv_actual_dir))
+        self.fs.mkdir(mv_actual_dir)
+        self.assertTrue(self.fs.exists(mv_actual_dir))
         self.assertTrue(fs.exists(mv_dir))
 
         mv_new_dir = self.fileset_gvfs_location + "/test_mv_new"
@@ -343,16 +350,17 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
         fs = gvfs.GravitinoVirtualFileSystem(
             server_uri="http://localhost:8090";,
             metalake_name=self.metalake_name,
+            options=self.options,
             **self.conf,
         )
-        self.hdfs.mkdir(mv_new_actual_dir)
-        self.assertTrue(self.hdfs.exists(mv_new_actual_dir))
+        self.fs.mkdir(mv_new_actual_dir)
+        self.assertTrue(self.fs.exists(mv_new_actual_dir))
         self.assertTrue(fs.exists(mv_new_dir))
 
         mv_file = self.fileset_gvfs_location + "/test_mv/test.file"
         mv_actual_file = self.fileset_storage_location + "/test_mv/test.file"
-        self.hdfs.touch(mv_actual_file)
-        self.assertTrue(self.hdfs.exists(mv_actual_file))
+        self.fs.touch(mv_actual_file)
+        self.assertTrue(self.fs.exists(mv_actual_file))
         self.assertTrue(fs.exists(mv_file))
 
         mv_new_file = self.fileset_gvfs_location + "/test_mv_new/test_new.file"
@@ -362,7 +370,7 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
 
         fs.mv(mv_file, mv_new_file)
         self.assertTrue(fs.exists(mv_new_file))
-        self.assertTrue(self.hdfs.exists(mv_new_actual_file))
+        self.assertTrue(self.fs.exists(mv_new_actual_file))
 
         # test rename without sub path, which should throw an exception
         with self.assertRaises(GravitinoRuntimeException):
@@ -374,16 +382,17 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
         fs = gvfs.GravitinoVirtualFileSystem(
             server_uri="http://localhost:8090";,
             metalake_name=self.metalake_name,
+            options=self.options,
             **self.conf,
         )
-        self.hdfs.mkdir(rm_actual_dir)
-        self.assertTrue(self.hdfs.exists(rm_actual_dir))
+        self.fs.mkdir(rm_actual_dir)
+        self.assertTrue(self.fs.exists(rm_actual_dir))
         self.assertTrue(fs.exists(rm_dir))
 
         rm_file = self.fileset_gvfs_location + "/test_rm/test.file"
         rm_actual_file = self.fileset_storage_location + "/test_rm/test.file"
-        self.hdfs.touch(rm_file)
-        self.assertTrue(self.hdfs.exists(rm_actual_file))
+        fs.touch(rm_file)
+        self.assertTrue(self.fs.exists(rm_actual_file))
         self.assertTrue(fs.exists(rm_file))
 
         # test delete file
@@ -393,8 +402,8 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
         # test delete dir with recursive = false
         rm_new_file = self.fileset_gvfs_location + "/test_rm/test_new.file"
         rm_new_actual_file = self.fileset_storage_location + 
"/test_rm/test_new.file"
-        self.hdfs.touch(rm_new_actual_file)
-        self.assertTrue(self.hdfs.exists(rm_new_actual_file))
+        self.fs.touch(rm_new_actual_file)
+        self.assertTrue(self.fs.exists(rm_new_actual_file))
         self.assertTrue(fs.exists(rm_new_file))
         with self.assertRaises(ValueError):
             fs.rm(rm_dir, recursive=False)
@@ -409,16 +418,17 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
         fs = gvfs.GravitinoVirtualFileSystem(
             server_uri="http://localhost:8090";,
             metalake_name=self.metalake_name,
+            options=self.options,
             **self.conf,
         )
-        self.hdfs.mkdir(rm_file_actual_dir)
-        self.assertTrue(self.hdfs.exists(rm_file_actual_dir))
+        self.fs.mkdir(rm_file_actual_dir)
+        self.assertTrue(self.fs.exists(rm_file_actual_dir))
         self.assertTrue(fs.exists(rm_file_dir))
 
         rm_file_file = self.fileset_gvfs_location + "/test_rm_file/test.file"
         rm_file_actual_file = self.fileset_storage_location + 
"/test_rm_file/test.file"
-        self.hdfs.touch(rm_file_actual_file)
-        self.assertTrue(self.hdfs.exists(rm_file_actual_file))
+        self.fs.touch(rm_file_actual_file)
+        self.assertTrue(self.fs.exists(rm_file_actual_file))
         self.assertTrue(fs.exists(rm_file_file))
 
         # test delete file
@@ -435,16 +445,17 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
         fs = gvfs.GravitinoVirtualFileSystem(
             server_uri="http://localhost:8090";,
             metalake_name=self.metalake_name,
+            options=self.options,
             **self.conf,
         )
-        self.hdfs.mkdir(rmdir_actual_dir)
-        self.assertTrue(self.hdfs.exists(rmdir_actual_dir))
+        self.fs.mkdir(rmdir_actual_dir)
+        self.assertTrue(self.fs.exists(rmdir_actual_dir))
         self.assertTrue(fs.exists(rmdir_dir))
 
         rmdir_file = self.fileset_gvfs_location + "/test_rmdir/test.file"
         rmdir_actual_file = self.fileset_storage_location + 
"/test_rmdir/test.file"
-        self.hdfs.touch(rmdir_actual_file)
-        self.assertTrue(self.hdfs.exists(rmdir_actual_file))
+        self.fs.touch(rmdir_actual_file)
+        self.assertTrue(self.fs.exists(rmdir_actual_file))
         self.assertTrue(fs.exists(rmdir_file))
 
         # test delete file
@@ -461,16 +472,17 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
         fs = gvfs.GravitinoVirtualFileSystem(
             server_uri="http://localhost:8090";,
             metalake_name=self.metalake_name,
+            options=self.options,
             **self.conf,
         )
-        self.hdfs.mkdir(open_actual_dir)
-        self.assertTrue(self.hdfs.exists(open_actual_dir))
+        self.fs.mkdir(open_actual_dir)
+        self.assertTrue(self.fs.exists(open_actual_dir))
         self.assertTrue(fs.exists(open_dir))
 
         open_file = self.fileset_gvfs_location + "/test_open/test.file"
         open_actual_file = self.fileset_storage_location + 
"/test_open/test.file"
-        self.hdfs.touch(open_actual_file)
-        self.assertTrue(self.hdfs.exists(open_actual_file))
+        self.fs.touch(open_actual_file)
+        self.assertTrue(self.fs.exists(open_actual_file))
         self.assertTrue(fs.exists(open_file))
 
         # test open and write file
@@ -488,11 +500,12 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
         fs = gvfs.GravitinoVirtualFileSystem(
             server_uri="http://localhost:8090";,
             metalake_name=self.metalake_name,
+            options=self.options,
             **self.conf,
         )
         fs.mkdir(mkdir_dir)
         self.assertTrue(fs.exists(mkdir_dir))
-        self.assertTrue(self.hdfs.exists(mkdir_actual_dir))
+        self.assertTrue(self.fs.exists(mkdir_actual_dir))
 
         # test mkdir dir with create_parents = false
         parent_not_exist_virtual_path = mkdir_dir + "/not_exist/sub_dir"
@@ -514,11 +527,12 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
         fs = gvfs.GravitinoVirtualFileSystem(
             server_uri="http://localhost:8090";,
             metalake_name=self.metalake_name,
+            options=self.options,
             **self.conf,
         )
         fs.makedirs(makedirs_dir)
         self.assertTrue(fs.exists(makedirs_dir))
-        self.assertTrue(self.hdfs.exists(makedirs_actual_dir))
+        self.assertTrue(self.fs.exists(makedirs_actual_dir))
 
         # test mkdir dir not exist
         parent_not_exist_virtual_path = makedirs_dir + "/not_exist/sub_dir"
@@ -532,10 +546,11 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
         fs = gvfs.GravitinoVirtualFileSystem(
             server_uri="http://localhost:8090";,
             metalake_name=self.metalake_name,
+            options=self.options,
             **self.conf,
         )
-        self.hdfs.mkdir(created_actual_dir)
-        self.assertTrue(self.hdfs.exists(created_actual_dir))
+        self.fs.mkdir(created_actual_dir)
+        self.assertTrue(self.fs.exists(created_actual_dir))
         self.assertTrue(fs.exists(created_dir))
 
         with self.assertRaises(GravitinoRuntimeException):
@@ -547,10 +562,11 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
         fs = gvfs.GravitinoVirtualFileSystem(
             server_uri="http://localhost:8090";,
             metalake_name=self.metalake_name,
+            options=self.options,
             **self.conf,
         )
-        self.hdfs.mkdir(modified_actual_dir)
-        self.assertTrue(self.hdfs.exists(modified_actual_dir))
+        self.fs.mkdir(modified_actual_dir)
+        self.assertTrue(self.fs.exists(modified_actual_dir))
         self.assertTrue(fs.exists(modified_dir))
 
         # test mkdir dir which exists
@@ -562,16 +578,17 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
         fs = gvfs.GravitinoVirtualFileSystem(
             server_uri="http://localhost:8090";,
             metalake_name=self.metalake_name,
+            options=self.options,
             **self.conf,
         )
-        self.hdfs.mkdir(cat_actual_dir)
-        self.assertTrue(self.hdfs.exists(cat_actual_dir))
+        self.fs.mkdir(cat_actual_dir)
+        self.assertTrue(self.fs.exists(cat_actual_dir))
         self.assertTrue(fs.exists(cat_dir))
 
         cat_file = self.fileset_gvfs_location + "/test_cat/test.file"
         cat_actual_file = self.fileset_storage_location + "/test_cat/test.file"
-        self.hdfs.touch(cat_actual_file)
-        self.assertTrue(self.hdfs.exists(cat_actual_file))
+        self.fs.touch(cat_actual_file)
+        self.assertTrue(self.fs.exists(cat_actual_file))
         self.assertTrue(fs.exists(cat_file))
 
         # test open and write file
@@ -589,16 +606,17 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
         fs = gvfs.GravitinoVirtualFileSystem(
             server_uri="http://localhost:8090";,
             metalake_name=self.metalake_name,
+            options=self.options,
             **self.conf,
         )
-        self.hdfs.mkdir(get_actual_dir)
-        self.assertTrue(self.hdfs.exists(get_actual_dir))
+        self.fs.mkdir(get_actual_dir)
+        self.assertTrue(self.fs.exists(get_actual_dir))
         self.assertTrue(fs.exists(get_dir))
 
         get_file = self.fileset_gvfs_location + "/test_get/test.file"
         get_actual_file = self.fileset_storage_location + "/test_get/test.file"
-        self.hdfs.touch(get_actual_file)
-        self.assertTrue(self.hdfs.exists(get_actual_file))
+        self.fs.touch(get_actual_file)
+        self.assertTrue(self.fs.exists(get_actual_file))
         self.assertTrue(fs.exists(get_file))
 
         # test open and write file
@@ -628,10 +646,11 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
         fs = gvfs.GravitinoVirtualFileSystem(
             server_uri="http://localhost:8090";,
             metalake_name=self.metalake_name,
+            options=self.options,
             **self.conf,
         )
-        self.hdfs.mkdir(pands_actual_dir)
-        self.assertTrue(self.hdfs.exists(pands_actual_dir))
+        self.fs.mkdir(pands_actual_dir)
+        self.assertTrue(self.fs.exists(pands_actual_dir))
         self.assertTrue(fs.exists(pands_dir))
 
         data = pandas.DataFrame({"Name": ["A", "B", "C", "D"], "ID": [20, 21, 
19, 18]})
@@ -642,7 +661,7 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
         )
         data.to_parquet(parquet_file, filesystem=fs)
         self.assertTrue(fs.exists(parquet_file))
-        self.assertTrue(self.hdfs.exists(parquet_actual_file))
+        self.assertTrue(self.fs.exists(parquet_actual_file))
 
         # read parquet
         ds1 = pandas.read_parquet(path=parquet_file, filesystem=fs)
@@ -650,6 +669,7 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
         storage_options = {
             "server_uri": "http://localhost:8090";,
             "metalake_name": self.metalake_name,
+            "options": self.options,
         }
         # to csv
         csv_file = self.fileset_gvfs_location + "/test_pandas/test.csv"
@@ -660,7 +680,7 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
             storage_options=storage_options,
         )
         self.assertTrue(fs.exists(csv_file))
-        self.assertTrue(self.hdfs.exists(csv_actual_file))
+        self.assertTrue(self.fs.exists(csv_actual_file))
 
         # read csv
         ds2 = pandas.read_csv(csv_file, storage_options=storage_options)
@@ -672,10 +692,11 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
         fs = gvfs.GravitinoVirtualFileSystem(
             server_uri="http://localhost:8090";,
             metalake_name=self.metalake_name,
+            options=self.options,
             **self.conf,
         )
-        self.hdfs.mkdir(pyarrow_actual_dir)
-        self.assertTrue(self.hdfs.exists(pyarrow_actual_dir))
+        self.fs.mkdir(pyarrow_actual_dir)
+        self.assertTrue(self.fs.exists(pyarrow_actual_dir))
         self.assertTrue(fs.exists(pyarrow_dir))
 
         data = pandas.DataFrame({"Name": ["A", "B", "C", "D"], "ID": [20, 21, 
19, 18]})
@@ -701,16 +722,18 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
         fs = gvfs.GravitinoVirtualFileSystem(
             server_uri="http://localhost:8090";,
             metalake_name=self.metalake_name,
+            options=self.options,
             **self.conf,
         )
-        self.hdfs.mkdir(llama_actual_dir)
-        self.assertTrue(self.hdfs.exists(llama_actual_dir))
+        self.fs.mkdir(llama_actual_dir)
+        self.assertTrue(self.fs.exists(llama_actual_dir))
         self.assertTrue(fs.exists(llama_dir))
         data = pandas.DataFrame({"Name": ["A", "B", "C", "D"], "ID": [20, 21, 
19, 18]})
 
         storage_options = {
             "server_uri": "http://localhost:8090";,
             "metalake_name": self.metalake_name,
+            "options": self.options,
         }
         csv_file = llama_dir + "/test.csv"
         # to csv
diff --git 
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemGCSIT.java
 
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemGCSIT.java
index 73a45006f..312236fe5 100644
--- 
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemGCSIT.java
+++ 
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemGCSIT.java
@@ -52,6 +52,7 @@ public class GravitinoVirtualFileSystemGCSIT extends 
GravitinoVirtualFileSystemI
 
   @BeforeAll
   public void startUp() throws Exception {
+    // Copy the GCP jars to the gravitino server if in deploy mode.
     copyBundleJarsToHadoop("gcp-bundle");
     // Need to download jars to gravitino server
     super.startIntegrationTest();


Reply via email to