This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new ef44021cd [#5139] feat(python-client): Support GCS fileset in the
python GVFS client (#5160)
ef44021cd is described below
commit ef44021cd6cde69b685a58b55079a4ab7f980b4c
Author: Qi Yu <[email protected]>
AuthorDate: Mon Oct 21 20:04:36 2024 +0800
[#5139] feat(python-client): Support GCS fileset in the python GVFS client
(#5160)
### What changes were proposed in this pull request?
- Support GCS fileset in the Python GVFS client
- Add an IT about the GCS fileset.
### Why are the changes needed?
It's user needs.
Fix: #5139
### Does this PR introduce _any_ user-facing change?
Modify the Python GVFS client.
### How was this patch tested?
Test locally and add an IT that can't run automatically.
Modify mode as the following picture and execute `./gradlew
:clients:client-python:test -PskipDockerTests=false` success.
<img width="1332" alt="image"
src="https://github.com/user-attachments/assets/e3b781fa-3ac1-458a-9224-d7e01107828b">
<img width="1530" alt="image"
src="https://github.com/user-attachments/assets/f53c2d20-c253-4eb9-a84f-4c13569b513b">
---
.../gravitino/gcs/fs/GCSFileSystemProvider.java | 5 +
.../integration/test/HadoopGCSCatalogIT.java | 2 +-
clients/client-python/gravitino/filesystem/gvfs.py | 47 +++++-
.../gravitino/filesystem/gvfs_config.py | 4 +
clients/client-python/requirements.txt | 3 +-
.../tests/integration/integration_test_env.py | 13 ++
.../tests/integration/test_catalog.py | 15 +-
.../tests/integration/test_gvfs_with_gcs.py | 173 +++++++++++++++++++++
.../tests/integration/test_gvfs_with_hdfs.py | 157 +++++++++++--------
.../test/GravitinoVirtualFileSystemGCSIT.java | 1 +
10 files changed, 339 insertions(+), 81 deletions(-)
diff --git
a/bundles/gcp-bundle/src/main/java/org/apache/gravitino/gcs/fs/GCSFileSystemProvider.java
b/bundles/gcp-bundle/src/main/java/org/apache/gravitino/gcs/fs/GCSFileSystemProvider.java
index 919baa03b..74a70f083 100644
---
a/bundles/gcp-bundle/src/main/java/org/apache/gravitino/gcs/fs/GCSFileSystemProvider.java
+++
b/bundles/gcp-bundle/src/main/java/org/apache/gravitino/gcs/fs/GCSFileSystemProvider.java
@@ -25,8 +25,12 @@ import
org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class GCSFileSystemProvider implements FileSystemProvider {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(GCSFileSystemProvider.class);
+
@Override
public FileSystem getFileSystem(Path path, Map<String, String> config)
throws IOException {
Configuration configuration = new Configuration();
@@ -35,6 +39,7 @@ public class GCSFileSystemProvider implements
FileSystemProvider {
configuration.set(k.replace("gravitino.bypass.", ""), v);
});
+ LOGGER.info("Creating GCS file system with config: {}", config);
return GoogleHadoopFileSystem.newInstance(path.toUri(), configuration);
}
diff --git
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopGCSCatalogIT.java
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopGCSCatalogIT.java
index db1d01336..cca13b770 100644
---
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopGCSCatalogIT.java
+++
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopGCSCatalogIT.java
@@ -44,7 +44,7 @@ public class HadoopGCSCatalogIT extends HadoopCatalogIT {
@Override
public void startIntegrationTest() throws Exception {
- // Do nothing.
+ // Just overwrite super, do nothing.
}
@BeforeAll
diff --git a/clients/client-python/gravitino/filesystem/gvfs.py
b/clients/client-python/gravitino/filesystem/gvfs.py
index e5a565ce0..8f1b2008a 100644
--- a/clients/client-python/gravitino/filesystem/gvfs.py
+++ b/clients/client-python/gravitino/filesystem/gvfs.py
@@ -14,11 +14,12 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-
+import os
from enum import Enum
from pathlib import PurePosixPath
from typing import Dict, Tuple
import re
+import importlib
import fsspec
from cachetools import TTLCache, LRUCache
@@ -26,7 +27,7 @@ from fsspec import AbstractFileSystem
from fsspec.implementations.local import LocalFileSystem
from fsspec.implementations.arrow import ArrowFSWrapper
from fsspec.utils import infer_storage_options
-from pyarrow.fs import HadoopFileSystem
+
from readerwriterlock import rwlock
from gravitino.audit.caller_context import CallerContext, CallerContextHolder
from gravitino.audit.fileset_audit_constants import FilesetAuditConstants
@@ -47,6 +48,7 @@ PROTOCOL_NAME = "gvfs"
class StorageType(Enum):
HDFS = "hdfs"
LOCAL = "file"
+ GCS = "gs"
class FilesetContextPair:
@@ -66,7 +68,7 @@ class FilesetContextPair:
class GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
- """This is a virtual file system which users can access `fileset` and
+ """This is a virtual file system that users can access `fileset` and
other resources.
It obtains the actual storage location corresponding to the resource from
the
@@ -149,6 +151,7 @@ class GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
self._cache_lock = rwlock.RWLockFair()
self._catalog_cache = LRUCache(maxsize=100)
self._catalog_cache_lock = rwlock.RWLockFair()
+ self._options = options
super().__init__(**kwargs)
@@ -309,7 +312,9 @@ class GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
)
dst_actual_path = dst_context_pair.actual_file_location()
- if storage_type == StorageType.HDFS:
+ # convert the following to in
+
+ if storage_type in [StorageType.HDFS, StorageType.GCS]:
src_context_pair.filesystem().mv(
self._strip_storage_protocol(storage_type, src_actual_path),
self._strip_storage_protocol(storage_type, dst_actual_path),
@@ -540,7 +545,11 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
:param virtual_location: Virtual location
:return A virtual path
"""
- if storage_location.startswith(f"{StorageType.HDFS.value}://"):
+
+ # If the storage path starts with hdfs, gcs, we should use the path as
the prefix.
+ if storage_location.startswith(
+ f"{StorageType.HDFS.value}://"
+ ) or storage_location.startswith(f"{StorageType.GCS.value}://"):
actual_prefix = infer_storage_options(storage_location)["path"]
elif storage_location.startswith(f"{StorageType.LOCAL.value}:/"):
actual_prefix =
storage_location[len(f"{StorageType.LOCAL.value}:") :]
@@ -681,6 +690,8 @@ class GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
return StorageType.HDFS
if path.startswith(f"{StorageType.LOCAL.value}:/"):
return StorageType.LOCAL
+ if path.startswith(f"{StorageType.GCS.value}://"):
+ return StorageType.GCS
raise GravitinoRuntimeException(
f"Storage type doesn't support now. Path:{path}"
)
@@ -705,10 +716,11 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
:param path: The path
:return: The stripped path
"""
- if storage_type == StorageType.HDFS:
+ if storage_type in (StorageType.HDFS, StorageType.GCS):
return path
if storage_type == StorageType.LOCAL:
return path[len(f"{StorageType.LOCAL.value}:") :]
+
raise GravitinoRuntimeException(
f"Storage type:{storage_type} doesn't support now."
)
@@ -774,9 +786,12 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
if cache_value is not None:
return cache_value
if storage_type == StorageType.HDFS:
- fs =
ArrowFSWrapper(HadoopFileSystem.from_uri(actual_file_location))
+ fs_class =
importlib.import_module("pyarrow.fs").HadoopFileSystem
+ fs = ArrowFSWrapper(fs_class.from_uri(actual_file_location))
elif storage_type == StorageType.LOCAL:
fs = LocalFileSystem()
+ elif storage_type == StorageType.GCS:
+ fs = ArrowFSWrapper(self._get_gcs_filesystem())
else:
raise GravitinoRuntimeException(
f"Storage type: `{storage_type}` doesn't support now."
@@ -786,5 +801,23 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
finally:
write_lock.release()
+ def _get_gcs_filesystem(self):
+ # get All keys from the options that start with
'gravitino.bypass.gcs.' and remove the prefix
+ gcs_options = {
+ key[len(GVFSConfig.GVFS_FILESYSTEM_BY_PASS_GCS) :]: value
+ for key, value in self._options.items()
+ if key.startswith(GVFSConfig.GVFS_FILESYSTEM_BY_PASS_GCS)
+ }
+
+ # get 'service-account-key' from gcs_options, if the key is not found,
throw an exception
+ service_account_key_path =
gcs_options.get(GVFSConfig.GVFS_FILESYSTEM_KEY_FILE)
+ if service_account_key_path is None:
+ raise GravitinoRuntimeException(
+ "Service account key is not found in the options."
+ )
+ os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = service_account_key_path
+
+ return importlib.import_module("pyarrow.fs").GcsFileSystem()
+
fsspec.register_implementation(PROTOCOL_NAME, GravitinoVirtualFileSystem)
diff --git a/clients/client-python/gravitino/filesystem/gvfs_config.py
b/clients/client-python/gravitino/filesystem/gvfs_config.py
index eb5733b56..618565c70 100644
--- a/clients/client-python/gravitino/filesystem/gvfs_config.py
+++ b/clients/client-python/gravitino/filesystem/gvfs_config.py
@@ -31,3 +31,7 @@ class GVFSConfig:
OAUTH2_CREDENTIAL = "oauth2_credential"
OAUTH2_PATH = "oauth2_path"
OAUTH2_SCOPE = "oauth2_scope"
+
+ GVFS_FILESYSTEM_BY_PASS = "gravitino.bypass"
+ GVFS_FILESYSTEM_BY_PASS_GCS = "gravitino.bypass.gcs."
+ GVFS_FILESYSTEM_KEY_FILE = "service-account-key-path"
diff --git a/clients/client-python/requirements.txt
b/clients/client-python/requirements.txt
index 7242082b7..a330f738a 100644
--- a/clients/client-python/requirements.txt
+++ b/clients/client-python/requirements.txt
@@ -22,4 +22,5 @@ dataclasses-json==0.6.6
readerwriterlock==1.0.9
fsspec==2024.3.1
pyarrow==15.0.2
-cachetools==5.3.3
\ No newline at end of file
+cachetools==5.3.3
+google-auth==2.35.0
\ No newline at end of file
diff --git a/clients/client-python/tests/integration/integration_test_env.py
b/clients/client-python/tests/integration/integration_test_env.py
index cfe6c0eda..d0d39a06d 100644
--- a/clients/client-python/tests/integration/integration_test_env.py
+++ b/clients/client-python/tests/integration/integration_test_env.py
@@ -21,6 +21,7 @@ import unittest
import subprocess
import time
import sys
+import shutil
import requests
@@ -80,6 +81,12 @@ class IntegrationTestEnv(unittest.TestCase):
)
sys.exit(0)
+ # remove data dir under gravitino_home
+ data_dir = os.path.join(cls.gravitino_home, "data")
+ if os.path.exists(data_dir):
+ logger.info("Remove Gravitino data directory: %s", data_dir)
+ shutil.rmtree(data_dir)
+
logger.info("Starting integration test environment...")
# Start Gravitino Server
@@ -141,6 +148,12 @@ class IntegrationTestEnv(unittest.TestCase):
"project root directory."
)
+ # remove data dir under gravitino_home
+ data_dir = os.path.join(gravitino_home, "data")
+ if os.path.exists(data_dir):
+ logger.info("Remove Gravitino data directory: %s", data_dir)
+ shutil.rmtree(data_dir)
+
# Restart Gravitino Server
env_vars = os.environ.copy()
env_vars["HADOOP_USER_NAME"] = "anonymous"
diff --git a/clients/client-python/tests/integration/test_catalog.py
b/clients/client-python/tests/integration/test_catalog.py
index 755b295b0..64208315e 100644
--- a/clients/client-python/tests/integration/test_catalog.py
+++ b/clients/client-python/tests/integration/test_catalog.py
@@ -39,7 +39,8 @@ logger = logging.getLogger(__name__)
class TestCatalog(IntegrationTestEnv):
metalake_name: str = "TestSchema_metalake" + str(randint(1, 10000))
- catalog_name: str = "testCatalog"
+ catalog_name: str = "testCatalog" + str(randint(1, 10000))
+ catalog_name_bak = catalog_name
catalog_comment: str = "catalogComment"
catalog_location_prop: str = "location" # Fileset Catalog must set
`location`
catalog_provider: str = "hadoop"
@@ -81,23 +82,27 @@ class TestCatalog(IntegrationTestEnv):
)
try:
logger.info(
- "Drop catalog %s[%s]",
+ "TestCatalog: drop catalog %s[%s]",
self.catalog_ident,
self.gravitino_client.drop_catalog(name=self.catalog_name,
force=True),
)
except GravitinoRuntimeException:
- logger.warning("Failed to drop catalog %s", self.catalog_name)
+ logger.warning("TestCatalog: failed to drop catalog %s",
self.catalog_name)
try:
logger.info(
- "Drop metalake %s[%s]",
+ "TestCatalog: drop metalake %s[%s]",
self.metalake_name,
self.gravitino_admin_client.drop_metalake(
self.metalake_name, force=True
),
)
except GravitinoRuntimeException:
- logger.warning("Failed to drop metalake %s", self.metalake_name)
+ logger.warning(
+ "TestCatalog: failed to drop metalake %s", self.metalake_name
+ )
+
+ self.catalog_name = self.catalog_name_bak
def test_list_catalogs(self):
self.create_catalog(self.catalog_name)
diff --git a/clients/client-python/tests/integration/test_gvfs_with_gcs.py
b/clients/client-python/tests/integration/test_gvfs_with_gcs.py
new file mode 100644
index 000000000..16f84dff3
--- /dev/null
+++ b/clients/client-python/tests/integration/test_gvfs_with_gcs.py
@@ -0,0 +1,173 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import logging
+import os
+from random import randint
+import unittest
+
+from fsspec.implementations.arrow import ArrowFSWrapper
+from pyarrow.fs import GcsFileSystem
+
+from tests.integration.test_gvfs_with_hdfs import TestGvfsWithHDFS
+from gravitino import (
+ gvfs,
+ GravitinoClient,
+ Catalog,
+ Fileset,
+)
+from gravitino.exceptions.base import GravitinoRuntimeException
+
+
+logger = logging.getLogger(__name__)
+
+
[email protected]("This test require GCS service account key file")
+class TestGvfsWithGCS(TestGvfsWithHDFS):
+ # Before running this test, please set the make sure gcp-bundle-x.jar has
been
+ # copy to the $GRAVITINO_HOME/catalogs/hadoop/libs/ directory
+ key_file = "your_key_file.json"
+ bucket_name = "your_bucket_name"
+ metalake_name: str = "TestGvfsWithGCS_metalake" + str(randint(1, 10000))
+
+ def setUp(self):
+ self.options = {"gravitino.bypass.gcs.service-account-key-path":
self.key_file}
+
+ def tearDown(self):
+ self.options = {}
+
+ @classmethod
+ def setUpClass(cls):
+ cls._get_gravitino_home()
+
+ cls.hadoop_conf_path =
f"{cls.gravitino_home}/catalogs/hadoop/conf/hadoop.conf"
+ # restart the server
+ cls.restart_server()
+ # create entity
+ cls._init_test_entities()
+
+ @classmethod
+ def tearDownClass(cls):
+ cls._clean_test_data()
+ # reset server conf in case of other ITs like HDFS has changed it and
fail
+ # to reset it
+ cls._reset_conf(cls.config, cls.hadoop_conf_path)
+ # restart server
+ cls.restart_server()
+
+ # clear all config in the conf_path
+ @classmethod
+ def _reset_conf(cls, config, conf_path):
+ logger.info("Reset %s.", conf_path)
+ if not os.path.exists(conf_path):
+ raise GravitinoRuntimeException(f"Conf file is not found at
`{conf_path}`.")
+ filtered_lines = []
+ with open(conf_path, mode="r", encoding="utf-8") as file:
+ origin_lines = file.readlines()
+
+ for line in origin_lines:
+ line = line.strip()
+ if line.startswith("#"):
+ # append annotations directly
+ filtered_lines.append(line + "\n")
+
+ with open(conf_path, mode="w", encoding="utf-8") as file:
+ for line in filtered_lines:
+ file.write(line)
+
+ @classmethod
+ def _init_test_entities(cls):
+ cls.gravitino_admin_client.create_metalake(
+ name=cls.metalake_name, comment="", properties={}
+ )
+ cls.gravitino_client = GravitinoClient(
+ uri="http://localhost:8090", metalake_name=cls.metalake_name
+ )
+
+ cls.config = {}
+ cls.conf = {}
+ catalog = cls.gravitino_client.create_catalog(
+ name=cls.catalog_name,
+ catalog_type=Catalog.Type.FILESET,
+ provider=cls.catalog_provider,
+ comment="",
+ properties={
+ "filesystem-providers": "gcs",
+ "gravitino.bypass.fs.gs.auth.service.account.enable": "true",
+ "gravitino.bypass.fs.gs.auth.service.account.json.keyfile":
cls.key_file,
+ },
+ )
+ catalog.as_schemas().create_schema(
+ schema_name=cls.schema_name, comment="", properties={}
+ )
+
+ cls.fileset_storage_location: str = (
+
f"gs://{cls.bucket_name}/{cls.catalog_name}/{cls.schema_name}/{cls.fileset_name}"
+ )
+ cls.fileset_gvfs_location = (
+
f"gvfs://fileset/{cls.catalog_name}/{cls.schema_name}/{cls.fileset_name}"
+ )
+ catalog.as_fileset_catalog().create_fileset(
+ ident=cls.fileset_ident,
+ fileset_type=Fileset.Type.MANAGED,
+ comment=cls.fileset_comment,
+ storage_location=cls.fileset_storage_location,
+ properties=cls.fileset_properties,
+ )
+
+ os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = cls.key_file
+ arrow_gcs_fs = GcsFileSystem()
+ cls.fs = ArrowFSWrapper(arrow_gcs_fs)
+
+ def test_modified(self):
+ modified_dir = self.fileset_gvfs_location + "/test_modified"
+ modified_actual_dir = self.fileset_storage_location + "/test_modified"
+ fs = gvfs.GravitinoVirtualFileSystem(
+ server_uri="http://localhost:8090",
+ metalake_name=self.metalake_name,
+ options=self.options,
+ **self.conf,
+ )
+ self.fs.mkdir(modified_actual_dir)
+ self.assertTrue(self.fs.exists(modified_actual_dir))
+ self.assertTrue(fs.exists(modified_dir))
+
+ # GCP only supports getting the `object` modify time, so the modified
time will be None
+ # if it's a directory.
+ # >>> gcs.mkdir('example_qazwsx/catalog/schema/fileset3')
+ # >>> r = gcs.modified('example_qazwsx/catalog/schema/fileset3')
+ # >>> print(r)
+ # None
+ self.assertIsNone(fs.modified(modified_dir))
+
+ # create a file under the dir 'modified_dir'.
+ file_path = modified_dir + "/test.txt"
+ fs.touch(file_path)
+ self.assertTrue(fs.exists(file_path))
+ self.assertIsNotNone(fs.modified(file_path))
+
+ @unittest.skip(
+ "This test will fail for https://github.com/apache/arrow/issues/44438"
+ )
+ def test_pandas(self):
+ pass
+
+ @unittest.skip(
+ "This test will fail for https://github.com/apache/arrow/issues/44438"
+ )
+ def test_pyarrow(self):
+ pass
diff --git a/clients/client-python/tests/integration/test_gvfs_with_hdfs.py
b/clients/client-python/tests/integration/test_gvfs_with_hdfs.py
index 8bc6597b4..5be5914f1 100644
--- a/clients/client-python/tests/integration/test_gvfs_with_hdfs.py
+++ b/clients/client-python/tests/integration/test_gvfs_with_hdfs.py
@@ -89,6 +89,7 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
uri="http://localhost:8090"
)
gravitino_client: GravitinoClient = None
+ options = {}
@classmethod
def setUpClass(cls):
@@ -124,7 +125,8 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
BaseHadoopEnvironment.clear_hadoop_env()
finally:
# close hdfs container
- cls.hdfs_container.close()
+ if cls.hdfs_container is not None:
+ cls.hdfs_container.close()
@classmethod
def _init_test_entities(cls):
@@ -159,7 +161,7 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
properties=cls.fileset_properties,
)
arrow_hadoop_fs = HadoopFileSystem(host=cls.hdfs_container.get_ip(),
port=9000)
- cls.hdfs = ArrowFSWrapper(arrow_hadoop_fs)
+ cls.fs = ArrowFSWrapper(arrow_hadoop_fs)
cls.conf: Dict = {"fs.defaultFS":
f"hdfs://{cls.hdfs_container.get_ip()}:9000/"}
@classmethod
@@ -234,15 +236,16 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
fs = gvfs.GravitinoVirtualFileSystem(
server_uri="http://localhost:8090",
metalake_name=self.metalake_name,
+ options=self.options,
**self.conf,
)
- self.hdfs.mkdir(ls_actual_dir)
- self.assertTrue(self.hdfs.exists(ls_actual_dir))
+ self.fs.mkdir(ls_actual_dir)
+ self.assertTrue(self.fs.exists(ls_actual_dir))
ls_file = self.fileset_gvfs_location + "/test_ls/test.file"
ls_actual_file = self.fileset_storage_location + "/test_ls/test.file"
- self.hdfs.touch(ls_actual_file)
- self.assertTrue(self.hdfs.exists(ls_actual_file))
+ self.fs.touch(ls_actual_file)
+ self.assertTrue(self.fs.exists(ls_actual_file))
# test detail = false
file_list_without_detail = fs.ls(ls_dir, detail=False)
@@ -260,15 +263,16 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
fs = gvfs.GravitinoVirtualFileSystem(
server_uri="http://localhost:8090",
metalake_name=self.metalake_name,
+ options=self.options,
**self.conf,
)
- self.hdfs.mkdir(info_actual_dir)
- self.assertTrue(self.hdfs.exists(info_actual_dir))
+ self.fs.mkdir(info_actual_dir)
+ self.assertTrue(self.fs.exists(info_actual_dir))
info_file = self.fileset_gvfs_location + "/test_info/test.file"
info_actual_file = self.fileset_storage_location +
"/test_info/test.file"
- self.hdfs.touch(info_actual_file)
- self.assertTrue(self.hdfs.exists(info_actual_file))
+ self.fs.touch(info_actual_file)
+ self.assertTrue(self.fs.exists(info_actual_file))
dir_info = fs.info(info_dir)
self.assertEqual(dir_info["name"], info_dir[len("gvfs://") :])
@@ -282,16 +286,17 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
fs = gvfs.GravitinoVirtualFileSystem(
server_uri="http://localhost:8090",
metalake_name=self.metalake_name,
+ options=self.options,
**self.conf,
)
- self.hdfs.mkdir(exist_actual_dir)
- self.assertTrue(self.hdfs.exists(exist_actual_dir))
+ self.fs.mkdir(exist_actual_dir)
+ self.assertTrue(self.fs.exists(exist_actual_dir))
self.assertTrue(fs.exists(exist_dir))
exist_file = self.fileset_gvfs_location + "/test_exist/test.file"
exist_actual_file = self.fileset_storage_location +
"/test_exist/test.file"
- self.hdfs.touch(exist_actual_file)
- self.assertTrue(self.hdfs.exists(exist_actual_file))
+ self.fs.touch(exist_actual_file)
+ self.assertTrue(self.fs.exists(exist_actual_file))
self.assertTrue(fs.exists(exist_file))
def test_cp_file(self):
@@ -300,19 +305,20 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
fs = gvfs.GravitinoVirtualFileSystem(
server_uri="http://localhost:8090",
metalake_name=self.metalake_name,
+ options=self.options,
**self.conf,
)
- self.hdfs.mkdir(cp_file_actual_dir)
- self.assertTrue(self.hdfs.exists(cp_file_actual_dir))
+ self.fs.mkdir(cp_file_actual_dir)
+ self.assertTrue(self.fs.exists(cp_file_actual_dir))
self.assertTrue(fs.exists(cp_file_dir))
cp_file_file = self.fileset_gvfs_location + "/test_cp_file/test.file"
cp_file_actual_file = self.fileset_storage_location +
"/test_cp_file/test.file"
- self.hdfs.touch(cp_file_actual_file)
- self.assertTrue(self.hdfs.exists(cp_file_actual_file))
+ self.fs.touch(cp_file_actual_file)
+ self.assertTrue(self.fs.exists(cp_file_actual_file))
self.assertTrue(fs.exists(cp_file_file))
- with self.hdfs.open(cp_file_actual_file, "wb") as f:
+ with self.fs.open(cp_file_actual_file, "wb") as f:
f.write(b"test_file_1")
cp_file_new_file = self.fileset_gvfs_location +
"/test_cp_file/test_cp.file"
@@ -322,7 +328,7 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
fs.cp_file(cp_file_file, cp_file_new_file)
self.assertTrue(fs.exists(cp_file_new_file))
- with self.hdfs.open(cp_file_new_actual_file, "rb") as f:
+ with self.fs.open(cp_file_new_actual_file, "rb") as f:
result = f.read()
self.assertEqual(b"test_file_1", result)
@@ -332,10 +338,11 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
fs = gvfs.GravitinoVirtualFileSystem(
server_uri="http://localhost:8090",
metalake_name=self.metalake_name,
+ options=self.options,
**self.conf,
)
- self.hdfs.mkdir(mv_actual_dir)
- self.assertTrue(self.hdfs.exists(mv_actual_dir))
+ self.fs.mkdir(mv_actual_dir)
+ self.assertTrue(self.fs.exists(mv_actual_dir))
self.assertTrue(fs.exists(mv_dir))
mv_new_dir = self.fileset_gvfs_location + "/test_mv_new"
@@ -343,16 +350,17 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
fs = gvfs.GravitinoVirtualFileSystem(
server_uri="http://localhost:8090",
metalake_name=self.metalake_name,
+ options=self.options,
**self.conf,
)
- self.hdfs.mkdir(mv_new_actual_dir)
- self.assertTrue(self.hdfs.exists(mv_new_actual_dir))
+ self.fs.mkdir(mv_new_actual_dir)
+ self.assertTrue(self.fs.exists(mv_new_actual_dir))
self.assertTrue(fs.exists(mv_new_dir))
mv_file = self.fileset_gvfs_location + "/test_mv/test.file"
mv_actual_file = self.fileset_storage_location + "/test_mv/test.file"
- self.hdfs.touch(mv_actual_file)
- self.assertTrue(self.hdfs.exists(mv_actual_file))
+ self.fs.touch(mv_actual_file)
+ self.assertTrue(self.fs.exists(mv_actual_file))
self.assertTrue(fs.exists(mv_file))
mv_new_file = self.fileset_gvfs_location + "/test_mv_new/test_new.file"
@@ -362,7 +370,7 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
fs.mv(mv_file, mv_new_file)
self.assertTrue(fs.exists(mv_new_file))
- self.assertTrue(self.hdfs.exists(mv_new_actual_file))
+ self.assertTrue(self.fs.exists(mv_new_actual_file))
# test rename without sub path, which should throw an exception
with self.assertRaises(GravitinoRuntimeException):
@@ -374,16 +382,17 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
fs = gvfs.GravitinoVirtualFileSystem(
server_uri="http://localhost:8090",
metalake_name=self.metalake_name,
+ options=self.options,
**self.conf,
)
- self.hdfs.mkdir(rm_actual_dir)
- self.assertTrue(self.hdfs.exists(rm_actual_dir))
+ self.fs.mkdir(rm_actual_dir)
+ self.assertTrue(self.fs.exists(rm_actual_dir))
self.assertTrue(fs.exists(rm_dir))
rm_file = self.fileset_gvfs_location + "/test_rm/test.file"
rm_actual_file = self.fileset_storage_location + "/test_rm/test.file"
- self.hdfs.touch(rm_file)
- self.assertTrue(self.hdfs.exists(rm_actual_file))
+ fs.touch(rm_file)
+ self.assertTrue(self.fs.exists(rm_actual_file))
self.assertTrue(fs.exists(rm_file))
# test delete file
@@ -393,8 +402,8 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
# test delete dir with recursive = false
rm_new_file = self.fileset_gvfs_location + "/test_rm/test_new.file"
rm_new_actual_file = self.fileset_storage_location +
"/test_rm/test_new.file"
- self.hdfs.touch(rm_new_actual_file)
- self.assertTrue(self.hdfs.exists(rm_new_actual_file))
+ self.fs.touch(rm_new_actual_file)
+ self.assertTrue(self.fs.exists(rm_new_actual_file))
self.assertTrue(fs.exists(rm_new_file))
with self.assertRaises(ValueError):
fs.rm(rm_dir, recursive=False)
@@ -409,16 +418,17 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
fs = gvfs.GravitinoVirtualFileSystem(
server_uri="http://localhost:8090",
metalake_name=self.metalake_name,
+ options=self.options,
**self.conf,
)
- self.hdfs.mkdir(rm_file_actual_dir)
- self.assertTrue(self.hdfs.exists(rm_file_actual_dir))
+ self.fs.mkdir(rm_file_actual_dir)
+ self.assertTrue(self.fs.exists(rm_file_actual_dir))
self.assertTrue(fs.exists(rm_file_dir))
rm_file_file = self.fileset_gvfs_location + "/test_rm_file/test.file"
rm_file_actual_file = self.fileset_storage_location +
"/test_rm_file/test.file"
- self.hdfs.touch(rm_file_actual_file)
- self.assertTrue(self.hdfs.exists(rm_file_actual_file))
+ self.fs.touch(rm_file_actual_file)
+ self.assertTrue(self.fs.exists(rm_file_actual_file))
self.assertTrue(fs.exists(rm_file_file))
# test delete file
@@ -435,16 +445,17 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
fs = gvfs.GravitinoVirtualFileSystem(
server_uri="http://localhost:8090",
metalake_name=self.metalake_name,
+ options=self.options,
**self.conf,
)
- self.hdfs.mkdir(rmdir_actual_dir)
- self.assertTrue(self.hdfs.exists(rmdir_actual_dir))
+ self.fs.mkdir(rmdir_actual_dir)
+ self.assertTrue(self.fs.exists(rmdir_actual_dir))
self.assertTrue(fs.exists(rmdir_dir))
rmdir_file = self.fileset_gvfs_location + "/test_rmdir/test.file"
rmdir_actual_file = self.fileset_storage_location +
"/test_rmdir/test.file"
- self.hdfs.touch(rmdir_actual_file)
- self.assertTrue(self.hdfs.exists(rmdir_actual_file))
+ self.fs.touch(rmdir_actual_file)
+ self.assertTrue(self.fs.exists(rmdir_actual_file))
self.assertTrue(fs.exists(rmdir_file))
# test delete file
@@ -461,16 +472,17 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
fs = gvfs.GravitinoVirtualFileSystem(
server_uri="http://localhost:8090",
metalake_name=self.metalake_name,
+ options=self.options,
**self.conf,
)
- self.hdfs.mkdir(open_actual_dir)
- self.assertTrue(self.hdfs.exists(open_actual_dir))
+ self.fs.mkdir(open_actual_dir)
+ self.assertTrue(self.fs.exists(open_actual_dir))
self.assertTrue(fs.exists(open_dir))
open_file = self.fileset_gvfs_location + "/test_open/test.file"
open_actual_file = self.fileset_storage_location +
"/test_open/test.file"
- self.hdfs.touch(open_actual_file)
- self.assertTrue(self.hdfs.exists(open_actual_file))
+ self.fs.touch(open_actual_file)
+ self.assertTrue(self.fs.exists(open_actual_file))
self.assertTrue(fs.exists(open_file))
# test open and write file
@@ -488,11 +500,12 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
fs = gvfs.GravitinoVirtualFileSystem(
server_uri="http://localhost:8090",
metalake_name=self.metalake_name,
+ options=self.options,
**self.conf,
)
fs.mkdir(mkdir_dir)
self.assertTrue(fs.exists(mkdir_dir))
- self.assertTrue(self.hdfs.exists(mkdir_actual_dir))
+ self.assertTrue(self.fs.exists(mkdir_actual_dir))
# test mkdir dir with create_parents = false
parent_not_exist_virtual_path = mkdir_dir + "/not_exist/sub_dir"
@@ -514,11 +527,12 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
fs = gvfs.GravitinoVirtualFileSystem(
server_uri="http://localhost:8090",
metalake_name=self.metalake_name,
+ options=self.options,
**self.conf,
)
fs.makedirs(makedirs_dir)
self.assertTrue(fs.exists(makedirs_dir))
- self.assertTrue(self.hdfs.exists(makedirs_actual_dir))
+ self.assertTrue(self.fs.exists(makedirs_actual_dir))
# test mkdir dir not exist
parent_not_exist_virtual_path = makedirs_dir + "/not_exist/sub_dir"
@@ -532,10 +546,11 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
fs = gvfs.GravitinoVirtualFileSystem(
server_uri="http://localhost:8090",
metalake_name=self.metalake_name,
+ options=self.options,
**self.conf,
)
- self.hdfs.mkdir(created_actual_dir)
- self.assertTrue(self.hdfs.exists(created_actual_dir))
+ self.fs.mkdir(created_actual_dir)
+ self.assertTrue(self.fs.exists(created_actual_dir))
self.assertTrue(fs.exists(created_dir))
with self.assertRaises(GravitinoRuntimeException):
@@ -547,10 +562,11 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
fs = gvfs.GravitinoVirtualFileSystem(
server_uri="http://localhost:8090",
metalake_name=self.metalake_name,
+ options=self.options,
**self.conf,
)
- self.hdfs.mkdir(modified_actual_dir)
- self.assertTrue(self.hdfs.exists(modified_actual_dir))
+ self.fs.mkdir(modified_actual_dir)
+ self.assertTrue(self.fs.exists(modified_actual_dir))
self.assertTrue(fs.exists(modified_dir))
# test mkdir dir which exists
@@ -562,16 +578,17 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
fs = gvfs.GravitinoVirtualFileSystem(
server_uri="http://localhost:8090",
metalake_name=self.metalake_name,
+ options=self.options,
**self.conf,
)
- self.hdfs.mkdir(cat_actual_dir)
- self.assertTrue(self.hdfs.exists(cat_actual_dir))
+ self.fs.mkdir(cat_actual_dir)
+ self.assertTrue(self.fs.exists(cat_actual_dir))
self.assertTrue(fs.exists(cat_dir))
cat_file = self.fileset_gvfs_location + "/test_cat/test.file"
cat_actual_file = self.fileset_storage_location + "/test_cat/test.file"
- self.hdfs.touch(cat_actual_file)
- self.assertTrue(self.hdfs.exists(cat_actual_file))
+ self.fs.touch(cat_actual_file)
+ self.assertTrue(self.fs.exists(cat_actual_file))
self.assertTrue(fs.exists(cat_file))
# test open and write file
@@ -589,16 +606,17 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
fs = gvfs.GravitinoVirtualFileSystem(
server_uri="http://localhost:8090",
metalake_name=self.metalake_name,
+ options=self.options,
**self.conf,
)
- self.hdfs.mkdir(get_actual_dir)
- self.assertTrue(self.hdfs.exists(get_actual_dir))
+ self.fs.mkdir(get_actual_dir)
+ self.assertTrue(self.fs.exists(get_actual_dir))
self.assertTrue(fs.exists(get_dir))
get_file = self.fileset_gvfs_location + "/test_get/test.file"
get_actual_file = self.fileset_storage_location + "/test_get/test.file"
- self.hdfs.touch(get_actual_file)
- self.assertTrue(self.hdfs.exists(get_actual_file))
+ self.fs.touch(get_actual_file)
+ self.assertTrue(self.fs.exists(get_actual_file))
self.assertTrue(fs.exists(get_file))
# test open and write file
@@ -628,10 +646,11 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
fs = gvfs.GravitinoVirtualFileSystem(
server_uri="http://localhost:8090",
metalake_name=self.metalake_name,
+ options=self.options,
**self.conf,
)
- self.hdfs.mkdir(pands_actual_dir)
- self.assertTrue(self.hdfs.exists(pands_actual_dir))
+ self.fs.mkdir(pands_actual_dir)
+ self.assertTrue(self.fs.exists(pands_actual_dir))
self.assertTrue(fs.exists(pands_dir))
data = pandas.DataFrame({"Name": ["A", "B", "C", "D"], "ID": [20, 21,
19, 18]})
@@ -642,7 +661,7 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
)
data.to_parquet(parquet_file, filesystem=fs)
self.assertTrue(fs.exists(parquet_file))
- self.assertTrue(self.hdfs.exists(parquet_actual_file))
+ self.assertTrue(self.fs.exists(parquet_actual_file))
# read parquet
ds1 = pandas.read_parquet(path=parquet_file, filesystem=fs)
@@ -650,6 +669,7 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
storage_options = {
"server_uri": "http://localhost:8090",
"metalake_name": self.metalake_name,
+ "options": self.options,
}
# to csv
csv_file = self.fileset_gvfs_location + "/test_pandas/test.csv"
@@ -660,7 +680,7 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
storage_options=storage_options,
)
self.assertTrue(fs.exists(csv_file))
- self.assertTrue(self.hdfs.exists(csv_actual_file))
+ self.assertTrue(self.fs.exists(csv_actual_file))
# read csv
ds2 = pandas.read_csv(csv_file, storage_options=storage_options)
@@ -672,10 +692,11 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
fs = gvfs.GravitinoVirtualFileSystem(
server_uri="http://localhost:8090",
metalake_name=self.metalake_name,
+ options=self.options,
**self.conf,
)
- self.hdfs.mkdir(pyarrow_actual_dir)
- self.assertTrue(self.hdfs.exists(pyarrow_actual_dir))
+ self.fs.mkdir(pyarrow_actual_dir)
+ self.assertTrue(self.fs.exists(pyarrow_actual_dir))
self.assertTrue(fs.exists(pyarrow_dir))
data = pandas.DataFrame({"Name": ["A", "B", "C", "D"], "ID": [20, 21,
19, 18]})
@@ -701,16 +722,18 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
fs = gvfs.GravitinoVirtualFileSystem(
server_uri="http://localhost:8090",
metalake_name=self.metalake_name,
+ options=self.options,
**self.conf,
)
- self.hdfs.mkdir(llama_actual_dir)
- self.assertTrue(self.hdfs.exists(llama_actual_dir))
+ self.fs.mkdir(llama_actual_dir)
+ self.assertTrue(self.fs.exists(llama_actual_dir))
self.assertTrue(fs.exists(llama_dir))
data = pandas.DataFrame({"Name": ["A", "B", "C", "D"], "ID": [20, 21,
19, 18]})
storage_options = {
"server_uri": "http://localhost:8090",
"metalake_name": self.metalake_name,
+ "options": self.options,
}
csv_file = llama_dir + "/test.csv"
# to csv
diff --git
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemGCSIT.java
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemGCSIT.java
index 73a45006f..312236fe5 100644
---
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemGCSIT.java
+++
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemGCSIT.java
@@ -52,6 +52,7 @@ public class GravitinoVirtualFileSystemGCSIT extends
GravitinoVirtualFileSystemI
@BeforeAll
public void startUp() throws Exception {
+ // Copy the GCP jars to the gravitino server if in deploy mode.
copyBundleJarsToHadoop("gcp-bundle");
// Need to download jars to gravitino server
super.startIntegrationTest();