This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new dd1a93007 [#4280] improvement(PyGVFS): Refactor the `getFileLocation`
logic in the Python GVFS (#5026)
dd1a93007 is described below
commit dd1a9300747b3474d54623a6d23a9490a5f10ddf
Author: xloya <[email protected]>
AuthorDate: Tue Oct 8 19:00:09 2024 +0800
[#4280] improvement(PyGVFS): Refactor the `getFileLocation` logic in the
Python GVFS (#5026)
### What changes were proposed in this pull request?
Refactor the logic of getting the file location in Python GVFS.
### Why are the changes needed?
Fix: #4280
### How was this patch tested?
Refactor the UTs and reserved ITs works well.
---
.../gravitino/audit/fileset_data_operation.py | 8 +
clients/client-python/gravitino/filesystem/gvfs.py | 631 ++++-----
.../tests/integration/test_gvfs_with_hdfs.py | 4 +
.../tests/unittests/test_gvfs_with_local.py | 1356 ++++++++++----------
.../gravitino/audit/FilesetDataOperation.java | 4 +
5 files changed, 991 insertions(+), 1012 deletions(-)
diff --git a/clients/client-python/gravitino/audit/fileset_data_operation.py
b/clients/client-python/gravitino/audit/fileset_data_operation.py
index 5f7a5794b..0428d7111 100644
--- a/clients/client-python/gravitino/audit/fileset_data_operation.py
+++ b/clients/client-python/gravitino/audit/fileset_data_operation.py
@@ -28,6 +28,14 @@ class FilesetDataOperation(Enum):
"""Opens a file.
"""
+ OPEN_AND_WRITE = "OPEN_AND_WRITE"
+ """Opens a file and writes to it.
+ """
+
+ OPEN_AND_APPEND = "OPEN_AND_APPEND"
+ """Opens a file and appends to it.
+ """
+
APPEND = "APPEND"
"""Appends some content into a file.
"""
diff --git a/clients/client-python/gravitino/filesystem/gvfs.py
b/clients/client-python/gravitino/filesystem/gvfs.py
index 4870ac505..8d98d0a04 100644
--- a/clients/client-python/gravitino/filesystem/gvfs.py
+++ b/clients/client-python/gravitino/filesystem/gvfs.py
@@ -21,16 +21,19 @@ from typing import Dict, Tuple
import re
import fsspec
-from cachetools import TTLCache
+from cachetools import TTLCache, LRUCache
from fsspec import AbstractFileSystem
from fsspec.implementations.local import LocalFileSystem
from fsspec.implementations.arrow import ArrowFSWrapper
from fsspec.utils import infer_storage_options
from pyarrow.fs import HadoopFileSystem
from readerwriterlock import rwlock
-from gravitino.api.catalog import Catalog
-from gravitino.api.fileset import Fileset
+from gravitino.audit.caller_context import CallerContext, CallerContextHolder
+from gravitino.audit.fileset_audit_constants import FilesetAuditConstants
+from gravitino.audit.fileset_data_operation import FilesetDataOperation
+from gravitino.audit.internal_client_type import InternalClientType
from gravitino.auth.simple_auth_provider import SimpleAuthProvider
+from gravitino.catalog.fileset_catalog import FilesetCatalog
from gravitino.client.gravitino_client import GravitinoClient
from gravitino.exceptions.base import GravitinoRuntimeException
from gravitino.filesystem.gvfs_config import GVFSConfig
@@ -44,39 +47,20 @@ class StorageType(Enum):
LOCAL = "file"
-class FilesetContext:
- """A context object that holds the information about the fileset and the
file system which used in
+class FilesetContextPair:
+ """A context object that holds the information about the actual file
location and the file system which used in
the GravitinoVirtualFileSystem's operations.
"""
- def __init__(
- self,
- name_identifier: NameIdentifier,
- fileset: Fileset,
- fs: AbstractFileSystem,
- storage_type: StorageType,
- actual_path: str,
- ):
- self._name_identifier = name_identifier
- self._fileset = fileset
- self._fs = fs
- self._storage_type = storage_type
- self._actual_path = actual_path
-
- def get_name_identifier(self):
- return self._name_identifier
+ def __init__(self, actual_file_location: str, filesystem:
AbstractFileSystem):
+ self._actual_file_location = actual_file_location
+ self._filesystem = filesystem
- def get_fileset(self):
- return self._fileset
+ def actual_file_location(self):
+ return self._actual_file_location
- def get_fs(self):
- return self._fs
-
- def get_actual_path(self):
- return self._actual_path
-
- def get_storage_type(self):
- return self._storage_type
+ def filesystem(self):
+ return self._filesystem
class GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
@@ -136,6 +120,8 @@ class GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
)
self._cache = TTLCache(maxsize=cache_size, ttl=cache_expired_time)
self._cache_lock = rwlock.RWLockFair()
+ self._catalog_cache = LRUCache(maxsize=100)
+ self._catalog_cache_lock = rwlock.RWLockFair()
super().__init__(**kwargs)
@@ -160,28 +146,42 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
:param kwargs: Extra args
:return If details is true, returns a list of file info dicts, else
returns a list of file paths
"""
- context: FilesetContext = self._get_fileset_context(path)
+ context_pair: FilesetContextPair = self._get_fileset_context(
+ path, FilesetDataOperation.LIST_STATUS
+ )
+ actual_path = context_pair.actual_file_location()
+ storage_type = self._recognize_storage_type(actual_path)
+ pre_process_path: str = self._pre_process_path(path)
+ identifier: NameIdentifier = self._extract_identifier(pre_process_path)
+ sub_path: str = self._get_sub_path_from_virtual_path(
+ identifier, pre_process_path
+ )
+ storage_location: str = actual_path[: len(actual_path) - len(sub_path)]
+ # return entries with details
if detail:
- entries = [
- self._convert_actual_info(entry, context)
- for entry in context.get_fs().ls(
- self._strip_storage_protocol(
- context.get_storage_type(), context.get_actual_path()
- ),
- detail=True,
+ entries = context_pair.filesystem().ls(
+ self._strip_storage_protocol(storage_type, actual_path),
+ detail=True,
+ )
+ virtual_entries = [
+ self._convert_actual_info(
+ entry, storage_location,
self._get_virtual_location(identifier)
)
+ for entry in entries
]
- return entries
- entries = [
- self._convert_actual_path(entry_path, context)
- for entry_path in context.get_fs().ls(
- self._strip_storage_protocol(
- context.get_storage_type(), context.get_actual_path()
- ),
- detail=False,
+ return virtual_entries
+ # only returns paths
+ entry_paths = context_pair.filesystem().ls(
+ self._strip_storage_protocol(storage_type, actual_path),
+ detail=False,
+ )
+ virtual_entry_paths = [
+ self._convert_actual_path(
+ entry_path, storage_location,
self._get_virtual_location(identifier)
)
+ for entry_path in entry_paths
]
- return entries
+ return virtual_entry_paths
def info(self, path, **kwargs):
"""Get file info.
@@ -189,13 +189,23 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
:param kwargs: Extra args
:return A file info dict
"""
- context: FilesetContext = self._get_fileset_context(path)
- actual_info: Dict = context.get_fs().info(
- self._strip_storage_protocol(
- context.get_storage_type(), context.get_actual_path()
- )
+ context_pair: FilesetContextPair = self._get_fileset_context(
+ path, FilesetDataOperation.GET_FILE_STATUS
+ )
+ actual_path = context_pair.actual_file_location()
+ storage_type = self._recognize_storage_type(actual_path)
+ pre_process_path: str = self._pre_process_path(path)
+ identifier: NameIdentifier = self._extract_identifier(pre_process_path)
+ sub_path: str = self._get_sub_path_from_virtual_path(
+ identifier, pre_process_path
+ )
+ storage_location: str = actual_path[: len(actual_path) - len(sub_path)]
+ actual_info: Dict = context_pair.filesystem().info(
+ self._strip_storage_protocol(storage_type, actual_path)
+ )
+ return self._convert_actual_info(
+ actual_info, storage_location,
self._get_virtual_location(identifier)
)
- return self._convert_actual_info(actual_info, context)
def exists(self, path, **kwargs):
"""Check if a file or a directory exists.
@@ -203,11 +213,13 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
:param kwargs: Extra args
:return If a file or directory exists, it returns True, otherwise False
"""
- context: FilesetContext = self._get_fileset_context(path)
- return context.get_fs().exists(
- self._strip_storage_protocol(
- context.get_storage_type(), context.get_actual_path()
- )
+ context_pair: FilesetContextPair = self._get_fileset_context(
+ path, FilesetDataOperation.EXISTS
+ )
+ actual_path = context_pair.actual_file_location()
+ storage_type = self._recognize_storage_type(actual_path)
+ return context_pair.filesystem().exists(
+ self._strip_storage_protocol(storage_type, actual_path)
)
def cp_file(self, path1, path2, **kwargs):
@@ -225,24 +237,20 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
f"Destination file path identifier: `{dst_identifier}` should
be same with src file path "
f"identifier: `{src_identifier}`."
)
- src_context: FilesetContext = self._get_fileset_context(src_path)
- if self._check_mount_single_file(
- src_context.get_fileset(),
- src_context.get_fs(),
- src_context.get_storage_type(),
- ):
- raise GravitinoRuntimeException(
- f"Cannot cp file of the fileset: {src_identifier} which only
mounts to a single file."
- )
- dst_context: FilesetContext = self._get_fileset_context(dst_path)
+ src_context_pair: FilesetContextPair = self._get_fileset_context(
+ src_path, FilesetDataOperation.COPY_FILE
+ )
+ src_actual_path = src_context_pair.actual_file_location()
+
+ dst_context_pair: FilesetContextPair = self._get_fileset_context(
+ dst_path, FilesetDataOperation.COPY_FILE
+ )
+ dst_actual_path = dst_context_pair.actual_file_location()
- src_context.get_fs().cp_file(
- self._strip_storage_protocol(
- src_context.get_storage_type(), src_context.get_actual_path()
- ),
- self._strip_storage_protocol(
- dst_context.get_storage_type(), dst_context.get_actual_path()
- ),
+ storage_type = self._recognize_storage_type(src_actual_path)
+ src_context_pair.filesystem().cp_file(
+ self._strip_storage_protocol(storage_type, src_actual_path),
+ self._strip_storage_protocol(storage_type, dst_actual_path),
)
def mv(self, path1, path2, recursive=False, maxdepth=None, **kwargs):
@@ -264,39 +272,31 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
f"Destination file path identifier: `{dst_identifier}`"
f" should be same with src file path identifier:
`{src_identifier}`."
)
- src_context: FilesetContext = self._get_fileset_context(src_path)
- if self._check_mount_single_file(
- src_context.get_fileset(),
- src_context.get_fs(),
- src_context.get_storage_type(),
- ):
- raise GravitinoRuntimeException(
- f"Cannot cp file of the fileset: {src_identifier} which only
mounts to a single file."
- )
- dst_context: FilesetContext = self._get_fileset_context(dst_path)
- if src_context.get_storage_type() == StorageType.HDFS:
- src_context.get_fs().mv(
- self._strip_storage_protocol(
- src_context.get_storage_type(),
src_context.get_actual_path()
- ),
- self._strip_storage_protocol(
- dst_context.get_storage_type(),
dst_context.get_actual_path()
- ),
+ src_context_pair: FilesetContextPair = self._get_fileset_context(
+ src_path, FilesetDataOperation.RENAME
+ )
+ src_actual_path = src_context_pair.actual_file_location()
+ storage_type = self._recognize_storage_type(src_actual_path)
+ dst_context_pair: FilesetContextPair = self._get_fileset_context(
+ dst_path, FilesetDataOperation.RENAME
+ )
+ dst_actual_path = dst_context_pair.actual_file_location()
+
+ if storage_type == StorageType.HDFS:
+ src_context_pair.filesystem().mv(
+ self._strip_storage_protocol(storage_type, src_actual_path),
+ self._strip_storage_protocol(storage_type, dst_actual_path),
)
- elif src_context.get_storage_type() == StorageType.LOCAL:
- src_context.get_fs().mv(
- self._strip_storage_protocol(
- src_context.get_storage_type(),
src_context.get_actual_path()
- ),
- self._strip_storage_protocol(
- dst_context.get_storage_type(),
dst_context.get_actual_path()
- ),
+ elif storage_type == StorageType.LOCAL:
+ src_context_pair.filesystem().mv(
+ self._strip_storage_protocol(storage_type, src_actual_path),
+ self._strip_storage_protocol(storage_type, dst_actual_path),
recursive,
maxdepth,
)
else:
raise GravitinoRuntimeException(
- f"Storage type:{src_context.get_storage_type()} doesn't
support now."
+ f"Storage type:{storage_type} doesn't support now."
)
def _rm(self, path):
@@ -311,11 +311,13 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
When removing a directory, this parameter should be True.
:param maxdepth: The maximum depth to remove the directory recursively.
"""
- context: FilesetContext = self._get_fileset_context(path)
- context.get_fs().rm(
- self._strip_storage_protocol(
- context.get_storage_type(), context.get_actual_path()
- ),
+ context_pair: FilesetContextPair = self._get_fileset_context(
+ path, FilesetDataOperation.DELETE
+ )
+ actual_path = context_pair.actual_file_location()
+ storage_type = self._recognize_storage_type(actual_path)
+ context_pair.filesystem().rm(
+ self._strip_storage_protocol(storage_type, actual_path),
recursive,
maxdepth,
)
@@ -324,11 +326,13 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
"""Remove a file.
:param path: Virtual fileset path
"""
- context: FilesetContext = self._get_fileset_context(path)
- context.get_fs().rm_file(
- self._strip_storage_protocol(
- context.get_storage_type(), context.get_actual_path()
- )
+ context_pair: FilesetContextPair = self._get_fileset_context(
+ path, FilesetDataOperation.DELETE
+ )
+ actual_path = context_pair.actual_file_location()
+ storage_type = self._recognize_storage_type(actual_path)
+ context_pair.filesystem().rm_file(
+ self._strip_storage_protocol(storage_type, actual_path)
)
def rmdir(self, path):
@@ -337,11 +341,13 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
And it will throw an exception if delete a directory which is
non-empty for LocalFileSystem.
:param path: Virtual fileset path
"""
- context: FilesetContext = self._get_fileset_context(path)
- context.get_fs().rmdir(
- self._strip_storage_protocol(
- context.get_storage_type(), context.get_actual_path()
- )
+ context_pair: FilesetContextPair = self._get_fileset_context(
+ path, FilesetDataOperation.DELETE
+ )
+ actual_path = context_pair.actual_file_location()
+ storage_type = self._recognize_storage_type(actual_path)
+ context_pair.filesystem().rmdir(
+ self._strip_storage_protocol(storage_type, actual_path)
)
def open(
@@ -362,11 +368,19 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
:param kwargs: Extra args
:return A file-like object from the filesystem
"""
- context: FilesetContext = self._get_fileset_context(path)
- return context.get_fs().open(
- self._strip_storage_protocol(
- context.get_storage_type(), context.get_actual_path()
- ),
+ if mode in ("w", "wb"):
+ data_operation = FilesetDataOperation.OPEN_AND_WRITE
+ elif mode in ("a", "ab"):
+ data_operation = FilesetDataOperation.OPEN_AND_APPEND
+ else:
+ data_operation = FilesetDataOperation.OPEN
+ context_pair: FilesetContextPair = self._get_fileset_context(
+ path, data_operation
+ )
+ actual_path = context_pair.actual_file_location()
+ storage_type = self._recognize_storage_type(actual_path)
+ return context_pair.filesystem().open(
+ self._strip_storage_protocol(storage_type, actual_path),
mode,
block_size,
cache_options,
@@ -382,11 +396,13 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
:param create_parents: Create parent directories if missing when set
to True
:param kwargs: Extra args
"""
- context: FilesetContext = self._get_fileset_context(path)
- context.get_fs().mkdir(
- self._strip_storage_protocol(
- context.get_storage_type(), context.get_actual_path()
- ),
+ context_pair: FilesetContextPair = self._get_fileset_context(
+ path, FilesetDataOperation.MKDIRS
+ )
+ actual_path = context_pair.actual_file_location()
+ storage_type = self._recognize_storage_type(actual_path)
+ context_pair.filesystem().mkdir(
+ self._strip_storage_protocol(storage_type, actual_path),
create_parents,
**kwargs,
)
@@ -396,11 +412,13 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
:param path: Virtual fileset path
:param exist_ok: Continue if a directory already exists
"""
- context: FilesetContext = self._get_fileset_context(path)
- context.get_fs().makedirs(
- self._strip_storage_protocol(
- context.get_storage_type(), context.get_actual_path()
- ),
+ context_pair: FilesetContextPair = self._get_fileset_context(
+ path, FilesetDataOperation.MKDIRS
+ )
+ actual_path = context_pair.actual_file_location()
+ storage_type = self._recognize_storage_type(actual_path)
+ context_pair.filesystem().makedirs(
+ self._strip_storage_protocol(storage_type, actual_path),
exist_ok,
)
@@ -410,15 +428,17 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
:param path: Virtual fileset path
:return Created time(datetime.datetime)
"""
- context: FilesetContext = self._get_fileset_context(path)
- if context.get_storage_type() == StorageType.LOCAL:
- return context.get_fs().created(
- self._strip_storage_protocol(
- context.get_storage_type(), context.get_actual_path()
- )
+ context_pair: FilesetContextPair = self._get_fileset_context(
+ path, FilesetDataOperation.CREATED_TIME
+ )
+ actual_path = context_pair.actual_file_location()
+ storage_type = self._recognize_storage_type(actual_path)
+ if storage_type == StorageType.LOCAL:
+ return context_pair.filesystem().created(
+ self._strip_storage_protocol(storage_type, actual_path)
)
raise GravitinoRuntimeException(
- f"Storage type:{context.get_storage_type()} doesn't support now."
+ f"Storage type:{storage_type} doesn't support now."
)
def modified(self, path):
@@ -426,11 +446,13 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
:param path: Virtual fileset path
:return Modified time(datetime.datetime)
"""
- context: FilesetContext = self._get_fileset_context(path)
- return context.get_fs().modified(
- self._strip_storage_protocol(
- context.get_storage_type(), context.get_actual_path()
- )
+ context_pair: FilesetContextPair = self._get_fileset_context(
+ path, FilesetDataOperation.MODIFIED_TIME
+ )
+ actual_path = context_pair.actual_file_location()
+ storage_type = self._recognize_storage_type(actual_path)
+ return context_pair.filesystem().modified(
+ self._strip_storage_protocol(storage_type, actual_path)
)
def cat_file(self, path, start=None, end=None, **kwargs):
@@ -441,11 +463,13 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
:param kwargs: Extra args
:return File content
"""
- context: FilesetContext = self._get_fileset_context(path)
- return context.get_fs().cat_file(
- self._strip_storage_protocol(
- context.get_storage_type(), context.get_actual_path()
- ),
+ context_pair: FilesetContextPair = self._get_fileset_context(
+ path, FilesetDataOperation.CAT_FILE
+ )
+ actual_path = context_pair.actual_file_location()
+ storage_type = self._recognize_storage_type(actual_path)
+ return context_pair.filesystem().cat_file(
+ self._strip_storage_protocol(storage_type, actual_path),
start,
end,
**kwargs,
@@ -465,55 +489,67 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
raise GravitinoRuntimeException(
"Doesn't support copy a remote gvfs file to an another remote
file."
)
- context: FilesetContext = self._get_fileset_context(rpath)
- context.get_fs().get_file(
- self._strip_storage_protocol(
- context.get_storage_type(), context.get_actual_path()
- ),
+ context_pair: FilesetContextPair = self._get_fileset_context(
+ rpath, FilesetDataOperation.GET_FILE
+ )
+ actual_path = context_pair.actual_file_location()
+ storage_type = self._recognize_storage_type(actual_path)
+ context_pair.filesystem().get_file(
+ self._strip_storage_protocol(storage_type, actual_path),
lpath,
**kwargs,
)
- def _convert_actual_path(self, path, context: FilesetContext):
+ def _convert_actual_path(
+ self,
+ actual_path: str,
+ storage_location: str,
+ virtual_location: str,
+ ):
"""Convert an actual path to a virtual path.
The virtual path is like `fileset/{catalog}/{schema}/{fileset}/xxx`.
- :param path: Actual path
- :param context: Fileset context
+ :param actual_path: Actual path
+ :param storage_location: Storage location
+ :param virtual_location: Virtual location
:return A virtual path
"""
- if context.get_storage_type() == StorageType.HDFS:
- actual_prefix = infer_storage_options(
- context.get_fileset().storage_location()
- )["path"]
- elif context.get_storage_type() == StorageType.LOCAL:
- actual_prefix = context.get_fileset().storage_location()[
- len(f"{StorageType.LOCAL.value}:") :
- ]
+ if storage_location.startswith(f"{StorageType.HDFS.value}://"):
+ actual_prefix = infer_storage_options(storage_location)["path"]
+ elif storage_location.startswith(f"{StorageType.LOCAL.value}:/"):
+ actual_prefix =
storage_location[len(f"{StorageType.LOCAL.value}:") :]
else:
raise GravitinoRuntimeException(
- f"Storage type:{context.get_storage_type()} doesn't support
now."
+ f"Storage location:{storage_location} doesn't support now."
)
- if not path.startswith(actual_prefix):
+ if not actual_path.startswith(actual_prefix):
raise GravitinoRuntimeException(
- f"Path {path} does not start with valid prefix
{actual_prefix}."
+ f"Path {actual_path} does not start with valid prefix
{actual_prefix}."
)
- virtual_location =
self._get_virtual_location(context.get_name_identifier())
+
# if the storage location is end with "/",
# we should truncate this to avoid replace issues.
if actual_prefix.endswith(self.SLASH) and not
virtual_location.endswith(
self.SLASH
):
- return f"{path.replace(actual_prefix[:-1], virtual_location)}"
- return f"{path.replace(actual_prefix, virtual_location)}"
+ return f"{actual_path.replace(actual_prefix[:-1],
virtual_location)}"
+ return f"{actual_path.replace(actual_prefix, virtual_location)}"
- def _convert_actual_info(self, entry: Dict, context: FilesetContext):
+ def _convert_actual_info(
+ self,
+ entry: Dict,
+ storage_location: str,
+ virtual_location: str,
+ ):
"""Convert a file info from an actual entry to a virtual entry.
:param entry: A dict of the actual file info
- :param context: Fileset context
+ :param storage_location: Storage location
+ :param virtual_location: Virtual location
:return A dict of the virtual file info
"""
- path = self._convert_actual_path(entry["name"], context)
+ path = self._convert_actual_path(
+ entry["name"], storage_location, virtual_location
+ )
return {
"name": path,
"size": entry["size"],
@@ -521,78 +557,38 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
"mtime": entry["mtime"],
}
- def _get_fileset_context(self, virtual_path: str):
+ def _get_fileset_context(self, virtual_path: str, operation:
FilesetDataOperation):
"""Get a fileset context from the cache or the Gravitino server
:param virtual_path: The virtual path
- :return A fileset context
+ :param operation: The data operation
+ :return A fileset context pair
"""
virtual_path: str = self._pre_process_path(virtual_path)
identifier: NameIdentifier = self._extract_identifier(virtual_path)
- read_lock = self._cache_lock.gen_rlock()
- try:
- read_lock.acquire()
- cache_value: Tuple[Fileset, AbstractFileSystem, StorageType] = (
- self._cache.get(identifier)
- )
- if cache_value is not None:
- actual_path = self._get_actual_path_by_ident(
- identifier,
- cache_value[0],
- cache_value[1],
- cache_value[2],
- virtual_path,
- )
- return FilesetContext(
- identifier,
- cache_value[0],
- cache_value[1],
- cache_value[2],
- actual_path,
- )
- finally:
- read_lock.release()
-
- write_lock = self._cache_lock.gen_wlock()
- try:
- write_lock.acquire()
- cache_value: Tuple[Fileset, AbstractFileSystem] = self._cache.get(
- identifier
- )
- if cache_value is not None:
- actual_path = self._get_actual_path_by_ident(
- identifier,
- cache_value[0],
- cache_value[1],
- cache_value[2],
- virtual_path,
- )
- return FilesetContext(
- identifier,
- cache_value[0],
- cache_value[1],
- cache_value[2],
- actual_path,
- )
- fileset: Fileset = self._load_fileset_from_server(identifier)
- storage_location = fileset.storage_location()
- if storage_location.startswith(f"{StorageType.HDFS.value}://"):
- fs =
ArrowFSWrapper(HadoopFileSystem.from_uri(storage_location))
- storage_type = StorageType.HDFS
- elif storage_location.startswith(f"{StorageType.LOCAL.value}:/"):
- fs = LocalFileSystem()
- storage_type = StorageType.LOCAL
- else:
- raise GravitinoRuntimeException(
- f"Storage under the fileset: `{identifier}` doesn't
support now."
- )
- actual_path = self._get_actual_path_by_ident(
- identifier, fileset, fs, storage_type, virtual_path
+ catalog_ident: NameIdentifier = NameIdentifier.of(
+ self._metalake, identifier.namespace().level(1)
+ )
+ fileset_catalog = self._get_fileset_catalog(catalog_ident)
+ if fileset_catalog is None:
+ raise GravitinoRuntimeException(
+ f"Loaded fileset catalog: {catalog_ident} is null."
)
- self._cache[identifier] = (fileset, fs, storage_type)
- context = FilesetContext(identifier, fileset, fs, storage_type,
actual_path)
- return context
- finally:
- write_lock.release()
+ sub_path: str = self._get_sub_path_from_virtual_path(identifier,
virtual_path)
+ context = {
+ FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION:
operation.name,
+ FilesetAuditConstants.HTTP_HEADER_INTERNAL_CLIENT_TYPE:
InternalClientType.PYTHON_GVFS.name,
+ }
+ caller_context: CallerContext = CallerContext(context)
+ CallerContextHolder.set(caller_context)
+ actual_file_location: (
+ str
+ ) = fileset_catalog.as_fileset_catalog().get_file_location(
+ NameIdentifier.of(identifier.namespace().level(2),
identifier.name()),
+ sub_path,
+ )
+ return FilesetContextPair(
+ actual_file_location, self._get_filesystem(actual_file_location)
+ )
def _extract_identifier(self, path):
"""Extract the fileset identifier from the path.
@@ -613,63 +609,6 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
f"path: `{path}` doesn't contains valid identifier."
)
- def _load_fileset_from_server(self, identifier: NameIdentifier) -> Fileset:
- """Load the fileset from the server.
- If the fileset is not found on the server, an `NoSuchFilesetException`
exception will be raised.
- :param identifier: The fileset identifier
- :return The fileset
- """
- catalog: Catalog =
self._client.load_catalog(identifier.namespace().level(1))
-
- return catalog.as_fileset_catalog().load_fileset(
- NameIdentifier.of(identifier.namespace().level(2),
identifier.name())
- )
-
- def _get_actual_path_by_ident(
- self,
- identifier: NameIdentifier,
- fileset: Fileset,
- fs: AbstractFileSystem,
- storage_type: StorageType,
- virtual_path: str,
- ):
- """Get the actual path by the virtual path and the fileset.
- :param identifier: The fileset identifier
- :param fileset: The fileset
- :param fs: The file system corresponding to the fileset storage
location
- :param storage_type: The storage type of the fileset storage location
- :param virtual_path: The virtual fileset path
- :return The actual path.
- """
- virtual_location = self._get_virtual_location(identifier)
- storage_location = fileset.storage_location()
- if self._check_mount_single_file(fileset, fs, storage_type):
- if virtual_path != virtual_location:
- raise GravitinoRuntimeException(
- f"Path: {virtual_path} should be same with the virtual
location: {virtual_location}"
- " when the fileset only mounts a single file."
- )
- return storage_location
- # if the storage location ends with "/",
- # we should handle the conversion specially
- if storage_location.endswith(self.SLASH):
- sub_path = virtual_path[len(virtual_location) :]
- # For example, if the virtual path is
`gvfs://fileset/catalog/schema/test_fileset/ttt`,
- # and the storage location is `hdfs://cluster:8020/user/`,
- # we should replace `gvfs://fileset/catalog/schema/test_fileset`
- # with `hdfs://localhost:8020/user` which truncates the tailing
slash.
- # If the storage location is `hdfs://cluster:8020/user`,
- # we can replace `gvfs://fileset/catalog/schema/test_fileset`
- # with `hdfs://localhost:8020/user` directly.
- if sub_path.startswith(self.SLASH):
- new_storage_location = storage_location[:-1]
- else:
- new_storage_location = storage_location
-
- # Replace virtual_location with the adjusted storage_location
- return virtual_path.replace(virtual_location,
new_storage_location, 1)
- return virtual_path.replace(virtual_location, storage_location, 1)
-
@staticmethod
def _get_virtual_location(identifier: NameIdentifier):
"""Get the virtual location of the fileset.
@@ -682,20 +621,6 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
f"/{identifier.name()}"
)
- def _check_mount_single_file(
- self, fileset: Fileset, fs: AbstractFileSystem, storage_type:
StorageType
- ):
- """Check if the fileset is mounted a single file.
- :param fileset: The fileset
- :param fs: The file system corresponding to the fileset storage
location
- :param storage_type: The storage type of the fileset storage location
- :return True the fileset is mounted a single file.
- """
- result: Dict = fs.info(
- self._strip_storage_protocol(storage_type,
fileset.storage_location())
- )
- return result["type"] == "file"
-
@staticmethod
def _pre_process_path(virtual_path):
"""Pre-process the path.
@@ -719,6 +644,28 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
)
return pre_processed_path
+ @staticmethod
+ def _recognize_storage_type(path: str):
+ """Recognize the storage type by the path.
+ :param path: The path
+ :return: The storage type
+ """
+ if path.startswith(f"{StorageType.HDFS.value}://"):
+ return StorageType.HDFS
+ if path.startswith(f"{StorageType.LOCAL.value}:/"):
+ return StorageType.LOCAL
+ raise GravitinoRuntimeException(
+ f"Storage type doesn't support now. Path:{path}"
+ )
+
+ @staticmethod
+ def _get_sub_path_from_virtual_path(identifier: NameIdentifier,
virtual_path: str):
+ return virtual_path[
+ len(
+
f"fileset/{identifier.namespace().level(1)}/{identifier.namespace().level(2)}/{identifier.name()}"
+ ) :
+ ]
+
@staticmethod
def _strip_storage_protocol(storage_type: StorageType, path: str):
"""Strip the storage protocol from the path.
@@ -739,5 +686,65 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
f"Storage type:{storage_type} doesn't support now."
)
+ def _get_fileset_catalog(self, catalog_ident: NameIdentifier):
+ read_lock = self._catalog_cache_lock.gen_rlock()
+ try:
+ read_lock.acquire()
+ cache_value: Tuple[NameIdentifier, FilesetCatalog] = (
+ self._catalog_cache.get(catalog_ident)
+ )
+ if cache_value is not None:
+ return cache_value
+ finally:
+ read_lock.release()
+
+ write_lock = self._catalog_cache_lock.gen_wlock()
+ try:
+ write_lock.acquire()
+ cache_value: Tuple[NameIdentifier, FilesetCatalog] = (
+ self._catalog_cache.get(catalog_ident)
+ )
+ if cache_value is not None:
+ return cache_value
+ catalog = self._client.load_catalog(catalog_ident.name())
+ self._catalog_cache[catalog_ident] = catalog
+ return catalog
+ finally:
+ write_lock.release()
+
+ def _get_filesystem(self, actual_file_location: str):
+ storage_type = self._recognize_storage_type(actual_file_location)
+ read_lock = self._cache_lock.gen_rlock()
+ try:
+ read_lock.acquire()
+ cache_value: Tuple[StorageType, AbstractFileSystem] =
self._cache.get(
+ storage_type
+ )
+ if cache_value is not None:
+ return cache_value
+ finally:
+ read_lock.release()
+
+ write_lock = self._cache_lock.gen_wlock()
+ try:
+ write_lock.acquire()
+ cache_value: Tuple[StorageType, AbstractFileSystem] =
self._cache.get(
+ storage_type
+ )
+ if cache_value is not None:
+ return cache_value
+ if storage_type == StorageType.HDFS:
+ fs =
ArrowFSWrapper(HadoopFileSystem.from_uri(actual_file_location))
+ elif storage_type == StorageType.LOCAL:
+ fs = LocalFileSystem()
+ else:
+ raise GravitinoRuntimeException(
+ f"Storage type: `{storage_type}` doesn't support now."
+ )
+ self._cache[storage_type] = fs
+ return fs
+ finally:
+ write_lock.release()
+
fsspec.register_implementation(PROTOCOL_NAME, GravitinoVirtualFileSystem)
diff --git a/clients/client-python/tests/integration/test_gvfs_with_hdfs.py
b/clients/client-python/tests/integration/test_gvfs_with_hdfs.py
index 87b6f1023..9116005b8 100644
--- a/clients/client-python/tests/integration/test_gvfs_with_hdfs.py
+++ b/clients/client-python/tests/integration/test_gvfs_with_hdfs.py
@@ -364,6 +364,10 @@ class TestGvfsWithHDFS(IntegrationTestEnv):
self.assertTrue(fs.exists(mv_new_file))
self.assertTrue(self.hdfs.exists(mv_new_actual_file))
+ # test rename without sub path, which should throw an exception
+ with self.assertRaises(GravitinoRuntimeException):
+ fs.mv(self.fileset_gvfs_location, self.fileset_gvfs_location +
"/test_mv")
+
def test_rm(self):
rm_dir = self.fileset_gvfs_location + "/test_rm"
rm_actual_dir = self.fileset_storage_location + "/test_rm"
diff --git a/clients/client-python/tests/unittests/test_gvfs_with_local.py
b/clients/client-python/tests/unittests/test_gvfs_with_local.py
index 3b28941df..22bdccd8c 100644
--- a/clients/client-python/tests/unittests/test_gvfs_with_local.py
+++ b/clients/client-python/tests/unittests/test_gvfs_with_local.py
@@ -14,16 +14,16 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+import base64
+import os
# pylint: disable=protected-access,too-many-lines,too-many-locals
-import base64
-import os
import random
import string
import time
import unittest
-from unittest import mock
+from datetime import datetime
from unittest.mock import patch
import pandas
@@ -31,16 +31,11 @@ import pyarrow as pa
import pyarrow.dataset as dt
import pyarrow.parquet as pq
from fsspec.implementations.local import LocalFileSystem
-from llama_index.core import SimpleDirectoryReader
-from gravitino import gvfs, Fileset
-from gravitino import NameIdentifier
+from gravitino import gvfs, NameIdentifier
from gravitino.auth.auth_constants import AuthConstants
-from gravitino.dto.audit_dto import AuditDTO
-from gravitino.dto.fileset_dto import FilesetDTO
-from gravitino.filesystem.gvfs import FilesetContext, StorageType
from gravitino.exceptions.base import GravitinoRuntimeException
-
+from gravitino.filesystem.gvfs_config import GVFSConfig
from tests.unittests import mock_base
@@ -67,42 +62,30 @@ class TestLocalFilesystem(unittest.TestCase):
if local_fs.exists(self._local_base_dir_path):
local_fs.rm(self._local_base_dir_path, recursive=True)
- @patch(
- "gravitino.catalog.fileset_catalog.FilesetCatalog.load_fileset",
- return_value=mock_base.mock_load_fileset(
- "test_cache", f"{_fileset_dir}/test_cache"
- ),
- )
def test_cache(self, *mock_methods):
- local_fs = LocalFileSystem()
fileset_storage_location = f"{self._fileset_dir}/test_cache"
fileset_virtual_location = "fileset/fileset_catalog/tmp/test_cache"
- local_fs.mkdir(fileset_storage_location)
- self.assertTrue(local_fs.exists(fileset_storage_location))
- options = {"cache_size": 1, "cache_expired_time": 2}
- fs = gvfs.GravitinoVirtualFileSystem(
- server_uri="http://localhost:9090",
- metalake_name="metalake_demo",
- options=options,
- )
- self.assertTrue(fs.exists(fileset_virtual_location))
- # wait 2 seconds
- time.sleep(2)
- self.assertIsNone(
- fs.cache.get(
- NameIdentifier.of(
- "metalake_demo", "fileset_catalog", "tmp", "test_cache"
- )
+ actual_path = fileset_storage_location
+ with patch(
+
"gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location",
+ return_value=actual_path,
+ ):
+ local_fs = LocalFileSystem()
+ local_fs.mkdir(fileset_storage_location)
+ self.assertTrue(local_fs.exists(fileset_storage_location))
+ options = {GVFSConfig.CACHE_SIZE: 1,
GVFSConfig.CACHE_EXPIRED_TIME: 1}
+ fs = gvfs.GravitinoVirtualFileSystem(
+ server_uri="http://localhost:9090",
+ metalake_name="metalake_demo",
+ options=options,
+ skip_instance_cache=True,
)
- )
+ self.assertTrue(fs.exists(fileset_virtual_location))
+ # wait 2 seconds
+ time.sleep(2)
+ self.assertIsNone(fs._cache.get("file:/"))
- @patch(
- "gravitino.catalog.fileset_catalog.FilesetCatalog.load_fileset",
- return_value=mock_base.mock_load_fileset(
- "test_simple_auth", f"{_fileset_dir}/test_simple_auth"
- ),
- )
- def test_simple_auth(self, mock_method1, mock_method2, mock_method3,
mock_method4):
+ def test_simple_auth(self, *mock_methods):
options = {"auth_type": "simple"}
current_user = (
None if os.environ.get("user.name") is None else
os.environ["user.name"]
@@ -113,6 +96,7 @@ class TestLocalFilesystem(unittest.TestCase):
server_uri="http://localhost:9090",
metalake_name="metalake_demo",
options=options,
+ skip_instance_cache=True,
)
token = fs._client._rest_client.auth_data_provider.get_token_data()
token_string = base64.b64decode(
@@ -122,60 +106,59 @@ class TestLocalFilesystem(unittest.TestCase):
if current_user is not None:
os.environ["user.name"] = current_user
- @patch(
- "gravitino.catalog.fileset_catalog.FilesetCatalog.load_fileset",
- return_value=mock_base.mock_load_fileset("test_ls",
f"{_fileset_dir}/test_ls"),
- )
def test_ls(self, *mock_methods):
- local_fs = LocalFileSystem()
fileset_storage_location = f"{self._fileset_dir}/test_ls"
fileset_virtual_location = "fileset/fileset_catalog/tmp/test_ls"
- local_fs.mkdir(fileset_storage_location)
- sub_dir_path = f"{fileset_storage_location}/test_1"
- local_fs.mkdir(sub_dir_path)
- self.assertTrue(local_fs.exists(sub_dir_path))
- sub_file_path = f"{fileset_storage_location}/test_file_1.par"
- local_fs.touch(sub_file_path)
- self.assertTrue(local_fs.exists(sub_file_path))
-
- fs = gvfs.GravitinoVirtualFileSystem(
- server_uri="http://localhost:9090", metalake_name="metalake_demo"
- )
- self.assertTrue(fs.exists(fileset_virtual_location))
-
- # test detail = false
- file_list_without_detail = fs.ls(fileset_virtual_location,
detail=False)
- file_list_without_detail.sort()
- self.assertEqual(2, len(file_list_without_detail))
- self.assertEqual(
- file_list_without_detail[0], f"{fileset_virtual_location}/test_1"
- )
- self.assertEqual(
- file_list_without_detail[1],
f"{fileset_virtual_location}/test_file_1.par"
- )
+ actual_path = fileset_storage_location
+ with patch(
+
"gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location",
+ return_value=actual_path,
+ ):
+ local_fs = LocalFileSystem()
+ local_fs.mkdir(fileset_storage_location)
+ sub_dir_path = f"{fileset_storage_location}/test_1"
+ local_fs.mkdir(sub_dir_path)
+ self.assertTrue(local_fs.exists(sub_dir_path))
+ sub_file_path = f"{fileset_storage_location}/test_file_1.par"
+ local_fs.touch(sub_file_path)
+ self.assertTrue(local_fs.exists(sub_file_path))
+
+ fs = gvfs.GravitinoVirtualFileSystem(
+ server_uri="http://localhost:9090",
+ metalake_name="metalake_demo",
+ skip_instance_cache=True,
+ )
+ self.assertTrue(fs.exists(fileset_virtual_location))
+
+ # test detail = false
+ file_list_without_detail = fs.ls(fileset_virtual_location,
detail=False)
+ file_list_without_detail.sort()
+ self.assertEqual(2, len(file_list_without_detail))
+ self.assertEqual(
+ file_list_without_detail[0],
f"{fileset_virtual_location}/test_1"
+ )
+ self.assertEqual(
+ file_list_without_detail[1],
+ f"{fileset_virtual_location}/test_file_1.par",
+ )
- # test detail = true
- file_list_with_detail = fs.ls(fileset_virtual_location, detail=True)
- file_list_with_detail.sort(key=lambda x: x["name"])
- self.assertEqual(2, len(file_list_with_detail))
- self.assertEqual(
- file_list_with_detail[0]["name"],
f"{fileset_virtual_location}/test_1"
- )
- self.assertEqual(
- file_list_with_detail[1]["name"],
- f"{fileset_virtual_location}/test_file_1.par",
- )
+ # test detail = true
+ file_list_with_detail = fs.ls(fileset_virtual_location,
detail=True)
+ file_list_with_detail.sort(key=lambda x: x["name"])
+ self.assertEqual(2, len(file_list_with_detail))
+ self.assertEqual(
+ file_list_with_detail[0]["name"],
f"{fileset_virtual_location}/test_1"
+ )
+ self.assertEqual(
+ file_list_with_detail[1]["name"],
+ f"{fileset_virtual_location}/test_file_1.par",
+ )
- @patch(
- "gravitino.catalog.fileset_catalog.FilesetCatalog.load_fileset",
- return_value=mock_base.mock_load_fileset(
- "test_info", f"{_fileset_dir}/test_info"
- ),
- )
def test_info(self, *mock_methods):
- local_fs = LocalFileSystem()
fileset_storage_location = f"{self._fileset_dir}/test_info"
fileset_virtual_location = "fileset/fileset_catalog/tmp/test_info"
+ actual_path = fileset_storage_location
+ local_fs = LocalFileSystem()
local_fs.mkdir(fileset_storage_location)
sub_dir_path = f"{fileset_storage_location}/test_1"
local_fs.mkdir(sub_dir_path)
@@ -185,28 +168,39 @@ class TestLocalFilesystem(unittest.TestCase):
self.assertTrue(local_fs.exists(sub_file_path))
fs = gvfs.GravitinoVirtualFileSystem(
- server_uri="http://localhost:9090", metalake_name="metalake_demo"
+ server_uri="http://localhost:9090",
+ metalake_name="metalake_demo",
+ skip_instance_cache=True,
)
- self.assertTrue(fs.exists(fileset_virtual_location))
+ with patch(
+
"gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location",
+ return_value=actual_path,
+ ):
+ self.assertTrue(fs.exists(fileset_virtual_location))
dir_virtual_path = fileset_virtual_location + "/test_1"
- dir_info = fs.info(dir_virtual_path)
- self.assertEqual(dir_info["name"], dir_virtual_path)
+ actual_path = fileset_storage_location + "/test_1"
+ with patch(
+
"gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location",
+ return_value=actual_path,
+ ):
+ dir_info = fs.info(dir_virtual_path)
+ self.assertEqual(dir_info["name"], dir_virtual_path)
file_virtual_path = fileset_virtual_location + "/test_file_1.par"
- file_info = fs.info(file_virtual_path)
- self.assertEqual(file_info["name"], file_virtual_path)
-
- @patch(
- "gravitino.catalog.fileset_catalog.FilesetCatalog.load_fileset",
- return_value=mock_base.mock_load_fileset(
- "test_exist", f"{_fileset_dir}/test_exist"
- ),
- )
+ actual_path = fileset_storage_location + "/test_file_1.par"
+ with patch(
+
"gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location",
+ return_value=actual_path,
+ ):
+ file_info = fs.info(file_virtual_path)
+ self.assertEqual(file_info["name"], file_virtual_path)
+
def test_exist(self, *mock_methods):
- local_fs = LocalFileSystem()
fileset_storage_location = f"{self._fileset_dir}/test_exist"
fileset_virtual_location = "fileset/fileset_catalog/tmp/test_exist"
+ actual_path = fileset_storage_location
+ local_fs = LocalFileSystem()
local_fs.mkdir(fileset_storage_location)
sub_dir_path = f"{fileset_storage_location}/test_1"
local_fs.mkdir(sub_dir_path)
@@ -216,28 +210,38 @@ class TestLocalFilesystem(unittest.TestCase):
self.assertTrue(local_fs.exists(sub_file_path))
fs = gvfs.GravitinoVirtualFileSystem(
- server_uri="http://localhost:9090", metalake_name="metalake_demo"
+ server_uri="http://localhost:9090",
+ metalake_name="metalake_demo",
+ skip_instance_cache=True,
)
- self.assertTrue(fs.exists(fileset_virtual_location))
+ with patch(
+
"gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location",
+ return_value=actual_path,
+ ):
+ self.assertTrue(fs.exists(fileset_virtual_location))
dir_virtual_path = fileset_virtual_location + "/test_1"
- self.assertTrue(fs.exists(dir_virtual_path))
+ actual_path = fileset_storage_location + "/test_1"
+ with patch(
+
"gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location",
+ return_value=actual_path,
+ ):
+ self.assertTrue(fs.exists(dir_virtual_path))
file_virtual_path = fileset_virtual_location + "/test_file_1.par"
- self.assertTrue(fs.exists(file_virtual_path))
+ actual_path = fileset_storage_location + "/test_file_1.par"
+ with patch(
+
"gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location",
+ return_value=actual_path,
+ ):
+ self.assertTrue(fs.exists(file_virtual_path))
- @patch(
- "gravitino.catalog.fileset_catalog.FilesetCatalog.load_fileset",
- return_value=mock_base.mock_load_fileset(
- "test_cp_file", f"{_fileset_dir}/test_cp_file"
- ),
- )
def test_cp_file(self, *mock_methods):
- local_fs = LocalFileSystem()
fileset_storage_location = f"{self._fileset_dir}/test_cp_file"
- local_fs.mkdir(fileset_storage_location)
-
fileset_virtual_location = "fileset/fileset_catalog/tmp/test_cp_file"
+ actual_path = fileset_storage_location
+ local_fs = LocalFileSystem()
+ local_fs.mkdir(fileset_storage_location)
sub_file_path = f"{fileset_storage_location}/test_file_1.par"
local_fs.touch(sub_file_path)
self.assertTrue(local_fs.exists(sub_file_path))
@@ -246,19 +250,35 @@ class TestLocalFilesystem(unittest.TestCase):
f.write(b"test_file_1")
fs = gvfs.GravitinoVirtualFileSystem(
- server_uri="http://localhost:9090", metalake_name="metalake_demo"
+ server_uri="http://localhost:9090",
+ metalake_name="metalake_demo",
+ skip_instance_cache=True,
)
- self.assertTrue(fs.exists(fileset_virtual_location))
+ with patch(
+
"gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location",
+ return_value=actual_path,
+ ):
+ self.assertTrue(fs.exists(fileset_virtual_location))
file_virtual_path = fileset_virtual_location + "/test_file_1.par"
- self.assertTrue(fs.exists(file_virtual_path))
-
- cp_file_virtual_path = fileset_virtual_location + "/test_cp_file_1.par"
- fs.cp_file(file_virtual_path, cp_file_virtual_path)
- self.assertTrue(fs.exists(cp_file_virtual_path))
- with local_fs.open(sub_file_path, "rb") as f:
- result = f.read()
- self.assertEqual(b"test_file_1", result)
+ src_actual_path = fileset_storage_location + "/test_file_1.par"
+ dst_actual_path = fileset_storage_location + "/test_cp_file_1.par"
+ with patch(
+
"gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location",
+ side_effect=[
+ src_actual_path,
+ src_actual_path,
+ dst_actual_path,
+ dst_actual_path,
+ ],
+ ):
+ self.assertTrue(fs.exists(file_virtual_path))
+ cp_file_virtual_path = fileset_virtual_location +
"/test_cp_file_1.par"
+ fs.cp_file(file_virtual_path, cp_file_virtual_path)
+ self.assertTrue(fs.exists(cp_file_virtual_path))
+ with local_fs.open(sub_file_path, "rb") as f:
+ result = f.read()
+ self.assertEqual(b"test_file_1", result)
# test invalid dst path
cp_file_invalid_virtual_path = (
@@ -267,25 +287,12 @@ class TestLocalFilesystem(unittest.TestCase):
with self.assertRaises(GravitinoRuntimeException):
fs.cp_file(file_virtual_path, cp_file_invalid_virtual_path)
- # test mount a single file
- local_fs.rm(path=fileset_storage_location, recursive=True)
- self.assertFalse(local_fs.exists(fileset_storage_location))
- local_fs.touch(fileset_storage_location)
- self.assertTrue(local_fs.exists(fileset_storage_location))
- with self.assertRaises(GravitinoRuntimeException):
- fs.cp_file(file_virtual_path, cp_file_virtual_path)
-
- @patch(
- "gravitino.catalog.fileset_catalog.FilesetCatalog.load_fileset",
- return_value=mock_base.mock_load_fileset("test_mv",
f"{_fileset_dir}/test_mv"),
- )
def test_mv(self, *mock_methods):
- local_fs = LocalFileSystem()
fileset_storage_location = f"{self._fileset_dir}/test_mv"
- local_fs.mkdir(fileset_storage_location)
-
fileset_virtual_location = "fileset/fileset_catalog/tmp/test_mv"
-
+ actual_path = fileset_storage_location
+ local_fs = LocalFileSystem()
+ local_fs.mkdir(fileset_storage_location)
sub_file_path = f"{fileset_storage_location}/test_file_1.par"
local_fs.touch(sub_file_path)
self.assertTrue(local_fs.exists(sub_file_path))
@@ -295,27 +302,53 @@ class TestLocalFilesystem(unittest.TestCase):
self.assertTrue(local_fs.exists(another_dir_path))
fs = gvfs.GravitinoVirtualFileSystem(
- server_uri="http://localhost:9090", metalake_name="metalake_demo"
+ server_uri="http://localhost:9090",
+ metalake_name="metalake_demo",
+ skip_instance_cache=True,
)
- self.assertTrue(fs.exists(fileset_virtual_location))
+ with patch(
+
"gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location",
+ return_value=actual_path,
+ ):
+ self.assertTrue(fs.exists(fileset_virtual_location))
file_virtual_path = fileset_virtual_location + "/test_file_1.par"
- self.assertTrue(fs.exists(file_virtual_path))
+ src_actual_path = fileset_storage_location + "/test_file_1.par"
+ with patch(
+
"gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location",
+ return_value=src_actual_path,
+ ):
+ self.assertTrue(fs.exists(file_virtual_path))
mv_file_virtual_path = fileset_virtual_location + "/test_cp_file_1.par"
- fs.mv(file_virtual_path, mv_file_virtual_path)
- self.assertTrue(fs.exists(mv_file_virtual_path))
+ dst_actual_path = fileset_storage_location + "/test_cp_file_1.par"
+ with patch(
+
"gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location",
+ side_effect=[src_actual_path, dst_actual_path, dst_actual_path],
+ ):
+ fs.mv(file_virtual_path, mv_file_virtual_path)
+ self.assertTrue(fs.exists(mv_file_virtual_path))
mv_another_dir_virtual_path = (
fileset_virtual_location + "/another_dir/test_file_2.par"
)
- fs.mv(mv_file_virtual_path, mv_another_dir_virtual_path)
- self.assertTrue(fs.exists(mv_another_dir_virtual_path))
+ dst_actual_path1 = fileset_storage_location +
"/another_dir/test_file_2.par"
+ with patch(
+
"gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location",
+ side_effect=[dst_actual_path, dst_actual_path1, dst_actual_path1],
+ ):
+ fs.mv(mv_file_virtual_path, mv_another_dir_virtual_path)
+ self.assertTrue(fs.exists(mv_another_dir_virtual_path))
# test not exist dir
not_exist_dst_dir_path = fileset_virtual_location +
"/not_exist/test_file_2.par"
- with self.assertRaises(FileNotFoundError):
- fs.mv(path1=mv_another_dir_virtual_path,
path2=not_exist_dst_dir_path)
+ dst_actual_path2 = fileset_storage_location +
"/not_exist/test_file_2.par"
+ with patch(
+
"gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location",
+ side_effect=[dst_actual_path1, dst_actual_path2],
+ ):
+ with self.assertRaises(FileNotFoundError):
+ fs.mv(path1=mv_another_dir_virtual_path,
path2=not_exist_dst_dir_path)
# test invalid dst path
mv_file_invalid_virtual_path = (
@@ -324,25 +357,12 @@ class TestLocalFilesystem(unittest.TestCase):
with self.assertRaises(GravitinoRuntimeException):
fs.mv(path1=file_virtual_path, path2=mv_file_invalid_virtual_path)
- # test mount a single file
- local_fs.rm(path=fileset_storage_location, recursive=True)
- self.assertFalse(local_fs.exists(fileset_storage_location))
- local_fs.touch(fileset_storage_location)
- self.assertTrue(local_fs.exists(fileset_storage_location))
- with self.assertRaises(GravitinoRuntimeException):
- fs.mv(file_virtual_path, mv_file_virtual_path)
-
- @patch(
- "gravitino.catalog.fileset_catalog.FilesetCatalog.load_fileset",
- return_value=mock_base.mock_load_fileset("test_rm",
f"{_fileset_dir}/test_rm"),
- )
def test_rm(self, *mock_methods):
- local_fs = LocalFileSystem()
fileset_storage_location = f"{self._fileset_dir}/test_rm"
- local_fs.mkdir(fileset_storage_location)
-
fileset_virtual_location = "fileset/fileset_catalog/tmp/test_rm"
-
+ actual_path = fileset_storage_location
+ local_fs = LocalFileSystem()
+ local_fs.mkdir(fileset_storage_location)
sub_file_path = f"{fileset_storage_location}/test_file_1.par"
local_fs.touch(sub_file_path)
self.assertTrue(local_fs.exists(sub_file_path))
@@ -352,38 +372,48 @@ class TestLocalFilesystem(unittest.TestCase):
self.assertTrue(local_fs.exists(sub_dir_path))
fs = gvfs.GravitinoVirtualFileSystem(
- server_uri="http://localhost:9090", metalake_name="metalake_demo"
+ server_uri="http://localhost:9090",
+ metalake_name="metalake_demo",
+ skip_instance_cache=True,
)
- self.assertTrue(fs.exists(fileset_virtual_location))
+ with patch(
+
"gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location",
+ return_value=actual_path,
+ ):
+ self.assertTrue(fs.exists(fileset_virtual_location))
# test delete file
file_virtual_path = fileset_virtual_location + "/test_file_1.par"
- self.assertTrue(fs.exists(file_virtual_path))
- fs.rm(file_virtual_path)
- self.assertFalse(fs.exists(file_virtual_path))
+ actual_path1 = fileset_storage_location + "/test_file_1.par"
+ with patch(
+
"gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location",
+ return_value=actual_path1,
+ ):
+ self.assertTrue(fs.exists(file_virtual_path))
+ fs.rm(file_virtual_path)
+ self.assertFalse(fs.exists(file_virtual_path))
# test delete dir with recursive = false
dir_virtual_path = fileset_virtual_location + "/sub_dir"
- self.assertTrue(fs.exists(dir_virtual_path))
- with self.assertRaises(ValueError):
- fs.rm(dir_virtual_path, recursive=False)
-
- # test delete dir with recursive = true
- fs.rm(dir_virtual_path, recursive=True)
- self.assertFalse(fs.exists(dir_virtual_path))
-
- @patch(
- "gravitino.catalog.fileset_catalog.FilesetCatalog.load_fileset",
- return_value=mock_base.mock_load_fileset(
- "test_rm_file", f"{_fileset_dir}/test_rm_file"
- ),
- )
+ actual_path2 = fileset_storage_location + "/sub_dir"
+ with patch(
+
"gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location",
+ return_value=actual_path2,
+ ):
+ self.assertTrue(fs.exists(dir_virtual_path))
+ with self.assertRaises(ValueError):
+ fs.rm(dir_virtual_path, recursive=False)
+
+ # test delete dir with recursive = true
+ fs.rm(dir_virtual_path, recursive=True)
+ self.assertFalse(fs.exists(dir_virtual_path))
+
def test_rm_file(self, *mock_methods):
- local_fs = LocalFileSystem()
fileset_storage_location = f"{self._fileset_dir}/test_rm_file"
- local_fs.mkdir(fileset_storage_location)
-
fileset_virtual_location = "fileset/fileset_catalog/tmp/test_rm_file"
+ actual_path = fileset_storage_location
+ local_fs = LocalFileSystem()
+ local_fs.mkdir(fileset_storage_location)
sub_file_path = f"{fileset_storage_location}/test_file_1.par"
local_fs.touch(sub_file_path)
@@ -394,35 +424,44 @@ class TestLocalFilesystem(unittest.TestCase):
self.assertTrue(local_fs.exists(sub_dir_path))
fs = gvfs.GravitinoVirtualFileSystem(
- server_uri="http://localhost:9090", metalake_name="metalake_demo"
+ server_uri="http://localhost:9090",
+ metalake_name="metalake_demo",
+ skip_instance_cache=True,
)
- self.assertTrue(fs.exists(fileset_virtual_location))
+ with patch(
+
"gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location",
+ return_value=actual_path,
+ ):
+ self.assertTrue(fs.exists(fileset_virtual_location))
# test delete file
file_virtual_path = fileset_virtual_location + "/test_file_1.par"
- self.assertTrue(fs.exists(file_virtual_path))
- fs.rm_file(file_virtual_path)
- self.assertFalse(fs.exists(file_virtual_path))
+ actual_path1 = fileset_storage_location + "/test_file_1.par"
+ with patch(
+
"gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location",
+ return_value=actual_path1,
+ ):
+ self.assertTrue(fs.exists(file_virtual_path))
+ fs.rm_file(file_virtual_path)
+ self.assertFalse(fs.exists(file_virtual_path))
# test delete dir
dir_virtual_path = fileset_virtual_location + "/sub_dir"
- self.assertTrue(fs.exists(dir_virtual_path))
- with self.assertRaises((IsADirectoryError, PermissionError)):
- fs.rm_file(dir_virtual_path)
-
- @patch(
- "gravitino.catalog.fileset_catalog.FilesetCatalog.load_fileset",
- return_value=mock_base.mock_load_fileset(
- "test_rmdir", f"{_fileset_dir}/test_rmdir"
- ),
- )
+ actual_path2 = fileset_storage_location + "/sub_dir"
+ with patch(
+
"gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location",
+ return_value=actual_path2,
+ ):
+ self.assertTrue(fs.exists(dir_virtual_path))
+ with self.assertRaises((IsADirectoryError, PermissionError)):
+ fs.rm_file(dir_virtual_path)
+
def test_rmdir(self, *mock_methods):
- local_fs = LocalFileSystem()
fileset_storage_location = f"{self._fileset_dir}/test_rmdir"
- local_fs.mkdir(fileset_storage_location)
-
fileset_virtual_location = "fileset/fileset_catalog/tmp/test_rmdir"
-
+ actual_path = fileset_storage_location
+ local_fs = LocalFileSystem()
+ local_fs.mkdir(fileset_storage_location)
sub_file_path = f"{fileset_storage_location}/test_file_1.par"
local_fs.touch(sub_file_path)
self.assertTrue(local_fs.exists(sub_file_path))
@@ -432,35 +471,44 @@ class TestLocalFilesystem(unittest.TestCase):
self.assertTrue(local_fs.exists(sub_dir_path))
fs = gvfs.GravitinoVirtualFileSystem(
- server_uri="http://localhost:9090", metalake_name="metalake_demo"
+ server_uri="http://localhost:9090",
+ metalake_name="metalake_demo",
+ skip_instance_cache=True,
)
- self.assertTrue(fs.exists(fileset_virtual_location))
+ with patch(
+
"gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location",
+ return_value=actual_path,
+ ):
+ self.assertTrue(fs.exists(fileset_virtual_location))
# test delete file
file_virtual_path = fileset_virtual_location + "/test_file_1.par"
- self.assertTrue(fs.exists(file_virtual_path))
- with self.assertRaises(NotADirectoryError):
- fs.rmdir(file_virtual_path)
+ actual_path1 = fileset_storage_location + "/test_file_1.par"
+ with patch(
+
"gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location",
+ return_value=actual_path1,
+ ):
+ self.assertTrue(fs.exists(file_virtual_path))
+ with self.assertRaises(NotADirectoryError):
+ fs.rmdir(file_virtual_path)
# test delete dir
dir_virtual_path = fileset_virtual_location + "/sub_dir"
- self.assertTrue(fs.exists(dir_virtual_path))
- fs.rmdir(dir_virtual_path)
- self.assertFalse(fs.exists(dir_virtual_path))
-
- @patch(
- "gravitino.catalog.fileset_catalog.FilesetCatalog.load_fileset",
- return_value=mock_base.mock_load_fileset(
- "test_open", f"{_fileset_dir}/test_open"
- ),
- )
+ actual_path2 = fileset_storage_location + "/sub_dir"
+ with patch(
+
"gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location",
+ return_value=actual_path2,
+ ):
+ self.assertTrue(fs.exists(dir_virtual_path))
+ fs.rmdir(dir_virtual_path)
+ self.assertFalse(fs.exists(dir_virtual_path))
+
def test_open(self, *mock_methods):
- local_fs = LocalFileSystem()
fileset_storage_location = f"{self._fileset_dir}/test_open"
- local_fs.mkdir(fileset_storage_location)
-
fileset_virtual_location = "fileset/fileset_catalog/tmp/test_open"
-
+ actual_path = fileset_storage_location
+ local_fs = LocalFileSystem()
+ local_fs.mkdir(fileset_storage_location)
sub_file_path = f"{fileset_storage_location}/test_file_1.par"
local_fs.touch(sub_file_path)
self.assertTrue(local_fs.exists(sub_file_path))
@@ -470,168 +518,198 @@ class TestLocalFilesystem(unittest.TestCase):
self.assertTrue(local_fs.exists(sub_dir_path))
fs = gvfs.GravitinoVirtualFileSystem(
- server_uri="http://localhost:9090", metalake_name="metalake_demo"
+ server_uri="http://localhost:9090",
+ metalake_name="metalake_demo",
+ skip_instance_cache=True,
)
- self.assertTrue(fs.exists(fileset_virtual_location))
+ with patch(
+
"gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location",
+ return_value=actual_path,
+ ):
+ self.assertTrue(fs.exists(fileset_virtual_location))
# test open and write file
file_virtual_path = fileset_virtual_location + "/test_file_1.par"
- self.assertTrue(fs.exists(file_virtual_path))
- with fs.open(file_virtual_path, mode="wb") as f:
- f.write(b"test_open_write")
- self.assertTrue(fs.info(file_virtual_path)["size"] > 0)
-
- # test open and read file
- with fs.open(file_virtual_path, mode="rb") as f:
- self.assertEqual(b"test_open_write", f.read())
+ actual_path1 = fileset_storage_location + "/test_file_1.par"
+ with patch(
+
"gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location",
+ return_value=actual_path1,
+ ):
+ self.assertTrue(fs.exists(file_virtual_path))
+ with fs.open(file_virtual_path, mode="wb") as f:
+ f.write(b"test_open_write")
+ self.assertTrue(fs.info(file_virtual_path)["size"] > 0)
+
+ # test open and read file
+ with fs.open(file_virtual_path, mode="rb") as f:
+ self.assertEqual(b"test_open_write", f.read())
# test open dir
dir_virtual_path = fileset_virtual_location + "/sub_dir"
- self.assertTrue(fs.exists(dir_virtual_path))
- with self.assertRaises(IsADirectoryError):
- fs.open(dir_virtual_path)
-
- @patch(
- "gravitino.catalog.fileset_catalog.FilesetCatalog.load_fileset",
- return_value=mock_base.mock_load_fileset(
- "test_mkdir", f"{_fileset_dir}/test_mkdir"
- ),
- )
+ actual_path2 = fileset_storage_location + "/sub_dir"
+ with patch(
+
"gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location",
+ return_value=actual_path2,
+ ):
+ self.assertTrue(fs.exists(dir_virtual_path))
+ with self.assertRaises(IsADirectoryError):
+ fs.open(dir_virtual_path)
+
def test_mkdir(self, *mock_methods):
- local_fs = LocalFileSystem()
fileset_storage_location = f"{self._fileset_dir}/test_mkdir"
- local_fs.mkdir(fileset_storage_location)
-
fileset_virtual_location = "fileset/fileset_catalog/tmp/test_mkdir"
-
+ actual_path = fileset_storage_location
+ local_fs = LocalFileSystem()
+ local_fs.mkdir(fileset_storage_location)
sub_dir_path = f"{fileset_storage_location}/sub_dir"
local_fs.mkdirs(sub_dir_path)
self.assertTrue(local_fs.exists(sub_dir_path))
fs = gvfs.GravitinoVirtualFileSystem(
- server_uri="http://localhost:9090", metalake_name="metalake_demo"
+ server_uri="http://localhost:9090",
+ metalake_name="metalake_demo",
+ skip_instance_cache=True,
)
- self.assertTrue(fs.exists(fileset_virtual_location))
+ with patch(
+
"gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location",
+ return_value=actual_path,
+ ):
+ self.assertTrue(fs.exists(fileset_virtual_location))
- # test mkdir dir which exists
- existed_dir_virtual_path = fileset_virtual_location
- self.assertTrue(fs.exists(existed_dir_virtual_path))
- with self.assertRaises(FileExistsError):
- fs.mkdir(existed_dir_virtual_path)
+ # test mkdir dir which exists
+ existed_dir_virtual_path = fileset_virtual_location
+ self.assertTrue(fs.exists(existed_dir_virtual_path))
+ with self.assertRaises(FileExistsError):
+ fs.mkdir(existed_dir_virtual_path)
# test mkdir dir with create_parents = false
parent_not_exist_virtual_path = fileset_virtual_location +
"/not_exist/sub_dir"
- self.assertFalse(fs.exists(parent_not_exist_virtual_path))
- with self.assertRaises(FileNotFoundError):
- fs.mkdir(parent_not_exist_virtual_path, create_parents=False)
+ actual_path1 = fileset_storage_location + "/not_exist/sub_dir"
+ with patch(
+
"gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location",
+ return_value=actual_path1,
+ ):
+ self.assertFalse(fs.exists(parent_not_exist_virtual_path))
+ with self.assertRaises(FileNotFoundError):
+ fs.mkdir(parent_not_exist_virtual_path, create_parents=False)
# test mkdir dir with create_parents = true
parent_not_exist_virtual_path2 = fileset_virtual_location +
"/not_exist/sub_dir"
- self.assertFalse(fs.exists(parent_not_exist_virtual_path2))
- fs.mkdir(parent_not_exist_virtual_path2, create_parents=True)
- self.assertTrue(fs.exists(parent_not_exist_virtual_path2))
-
- @patch(
- "gravitino.catalog.fileset_catalog.FilesetCatalog.load_fileset",
- return_value=mock_base.mock_load_fileset(
- "test_makedirs", f"{_fileset_dir}/test_makedirs"
- ),
- )
+ actual_path2 = fileset_storage_location + "/not_exist/sub_dir"
+ with patch(
+
"gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location",
+ return_value=actual_path2,
+ ):
+ self.assertFalse(fs.exists(parent_not_exist_virtual_path2))
+ fs.mkdir(parent_not_exist_virtual_path2, create_parents=True)
+ self.assertTrue(fs.exists(parent_not_exist_virtual_path2))
+
def test_makedirs(self, *mock_methods):
- local_fs = LocalFileSystem()
fileset_storage_location = f"{self._fileset_dir}/test_makedirs"
- local_fs.mkdir(fileset_storage_location)
-
fileset_virtual_location = "fileset/fileset_catalog/tmp/test_makedirs"
-
+ actual_path = fileset_storage_location
+ local_fs = LocalFileSystem()
+ local_fs.mkdir(fileset_storage_location)
sub_dir_path = f"{fileset_storage_location}/sub_dir"
local_fs.mkdirs(sub_dir_path)
self.assertTrue(local_fs.exists(sub_dir_path))
fs = gvfs.GravitinoVirtualFileSystem(
- server_uri="http://localhost:9090", metalake_name="metalake_demo"
+ server_uri="http://localhost:9090",
+ metalake_name="metalake_demo",
+ skip_instance_cache=True,
)
- self.assertTrue(fs.exists(fileset_virtual_location))
+ with patch(
+
"gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location",
+ return_value=actual_path,
+ ):
+ self.assertTrue(fs.exists(fileset_virtual_location))
- # test mkdir dir which exists
- existed_dir_virtual_path = fileset_virtual_location
- self.assertTrue(fs.exists(existed_dir_virtual_path))
- with self.assertRaises(FileExistsError):
- fs.mkdirs(existed_dir_virtual_path)
+ # test mkdir dir which exists
+ existed_dir_virtual_path = fileset_virtual_location
+ self.assertTrue(fs.exists(existed_dir_virtual_path))
+ with self.assertRaises(FileExistsError):
+ fs.mkdirs(existed_dir_virtual_path)
# test mkdir dir not exist
parent_not_exist_virtual_path = fileset_virtual_location +
"/not_exist/sub_dir"
- self.assertFalse(fs.exists(parent_not_exist_virtual_path))
- fs.makedirs(parent_not_exist_virtual_path)
- self.assertTrue(fs.exists(parent_not_exist_virtual_path))
-
- @patch(
- "gravitino.catalog.fileset_catalog.FilesetCatalog.load_fileset",
- return_value=mock_base.mock_load_fileset(
- "test_created", f"{_fileset_dir}/test_created"
- ),
- )
+ actual_path1 = fileset_storage_location + "/not_exist/sub_dir"
+ with patch(
+
"gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location",
+ return_value=actual_path1,
+ ):
+ self.assertFalse(fs.exists(parent_not_exist_virtual_path))
+ fs.makedirs(parent_not_exist_virtual_path)
+ self.assertTrue(fs.exists(parent_not_exist_virtual_path))
+
def test_created(self, *mock_methods):
- local_fs = LocalFileSystem()
fileset_storage_location = f"{self._fileset_dir}/test_created"
- local_fs.mkdir(fileset_storage_location)
-
fileset_virtual_location = "fileset/fileset_catalog/tmp/test_created"
-
+ actual_path = fileset_storage_location
+ local_fs = LocalFileSystem()
+ local_fs.mkdir(fileset_storage_location)
sub_dir_path = f"{fileset_storage_location}/sub_dir"
local_fs.mkdirs(sub_dir_path)
self.assertTrue(local_fs.exists(sub_dir_path))
fs = gvfs.GravitinoVirtualFileSystem(
- server_uri="http://localhost:9090", metalake_name="metalake_demo"
+ server_uri="http://localhost:9090",
+ metalake_name="metalake_demo",
+ skip_instance_cache=True,
)
- self.assertTrue(fs.exists(fileset_virtual_location))
+ with patch(
+
"gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location",
+ return_value=actual_path,
+ ):
+ self.assertTrue(fs.exists(fileset_virtual_location))
# test mkdir dir which exists
dir_virtual_path = fileset_virtual_location + "/sub_dir"
- self.assertTrue(fs.exists(dir_virtual_path))
- self.assertIsNotNone(fs.created(dir_virtual_path))
-
- @patch(
- "gravitino.catalog.fileset_catalog.FilesetCatalog.load_fileset",
- return_value=mock_base.mock_load_fileset(
- "test_modified", f"{_fileset_dir}/test_modified"
- ),
- )
+ actual_path1 = fileset_storage_location + "/sub_dir"
+ with patch(
+
"gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location",
+ return_value=actual_path1,
+ ):
+ self.assertTrue(fs.exists(dir_virtual_path))
+ self.assertIsNotNone(fs.created(dir_virtual_path))
+
def test_modified(self, *mock_methods):
- local_fs = LocalFileSystem()
fileset_storage_location = f"{self._fileset_dir}/test_modified"
- local_fs.mkdir(fileset_storage_location)
-
fileset_virtual_location = "fileset/fileset_catalog/tmp/test_modified"
-
+ actual_path = fileset_storage_location
+ local_fs = LocalFileSystem()
+ local_fs.mkdir(fileset_storage_location)
sub_dir_path = f"{fileset_storage_location}/sub_dir"
local_fs.mkdirs(sub_dir_path)
self.assertTrue(local_fs.exists(sub_dir_path))
fs = gvfs.GravitinoVirtualFileSystem(
- server_uri="http://localhost:9090", metalake_name="metalake_demo"
+ server_uri="http://localhost:9090",
+ metalake_name="metalake_demo",
+ skip_instance_cache=True,
)
- self.assertTrue(fs.exists(fileset_virtual_location))
+ with patch(
+
"gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location",
+ return_value=actual_path,
+ ):
+ self.assertTrue(fs.exists(fileset_virtual_location))
# test mkdir dir which exists
dir_virtual_path = fileset_virtual_location + "/sub_dir"
- self.assertTrue(fs.exists(dir_virtual_path))
- self.assertIsNotNone(fs.modified(dir_virtual_path))
-
- @patch(
- "gravitino.catalog.fileset_catalog.FilesetCatalog.load_fileset",
- return_value=mock_base.mock_load_fileset(
- "test_cat_file", f"{_fileset_dir}/test_cat_file"
- ),
- )
+ actual_path1 = fileset_storage_location + "/sub_dir"
+ with patch(
+
"gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location",
+ return_value=actual_path1,
+ ):
+ self.assertTrue(fs.exists(dir_virtual_path))
+ self.assertIsNotNone(fs.modified(dir_virtual_path))
+
def test_cat_file(self, *mock_methods):
- local_fs = LocalFileSystem()
fileset_storage_location = f"{self._fileset_dir}/test_cat_file"
- local_fs.mkdir(fileset_storage_location)
-
fileset_virtual_location = "fileset/fileset_catalog/tmp/test_cat_file"
-
+ actual_path = fileset_storage_location
+ local_fs = LocalFileSystem()
+ local_fs.mkdir(fileset_storage_location)
sub_file_path = f"{fileset_storage_location}/test_file_1.par"
local_fs.touch(sub_file_path)
self.assertTrue(local_fs.exists(sub_file_path))
@@ -641,40 +719,49 @@ class TestLocalFilesystem(unittest.TestCase):
self.assertTrue(local_fs.exists(sub_dir_path))
fs = gvfs.GravitinoVirtualFileSystem(
- server_uri="http://localhost:9090", metalake_name="metalake_demo"
+ server_uri="http://localhost:9090",
+ metalake_name="metalake_demo",
+ skip_instance_cache=True,
)
- self.assertTrue(fs.exists(fileset_virtual_location))
+ with patch(
+
"gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location",
+ return_value=actual_path,
+ ):
+ self.assertTrue(fs.exists(fileset_virtual_location))
# test open and write file
file_virtual_path = fileset_virtual_location + "/test_file_1.par"
- self.assertTrue(fs.exists(file_virtual_path))
- with fs.open(file_virtual_path, mode="wb") as f:
- f.write(b"test_cat_file")
- self.assertTrue(fs.info(file_virtual_path)["size"] > 0)
-
- # test cat file
- content = fs.cat_file(file_virtual_path)
- self.assertEqual(b"test_cat_file", content)
+ actual_path1 = fileset_storage_location + "/test_file_1.par"
+ with patch(
+
"gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location",
+ return_value=actual_path1,
+ ):
+ self.assertTrue(fs.exists(file_virtual_path))
+ with fs.open(file_virtual_path, mode="wb") as f:
+ f.write(b"test_cat_file")
+ self.assertTrue(fs.info(file_virtual_path)["size"] > 0)
+
+ # test cat file
+ content = fs.cat_file(file_virtual_path)
+ self.assertEqual(b"test_cat_file", content)
# test cat dir
dir_virtual_path = fileset_virtual_location + "/sub_dir"
- self.assertTrue(fs.exists(dir_virtual_path))
- with self.assertRaises(IsADirectoryError):
- fs.cat_file(dir_virtual_path)
-
- @patch(
- "gravitino.catalog.fileset_catalog.FilesetCatalog.load_fileset",
- return_value=mock_base.mock_load_fileset(
- "test_get_file", f"{_fileset_dir}/test_get_file"
- ),
- )
+ actual_path2 = fileset_storage_location + "/sub_dir"
+ with patch(
+
"gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location",
+ return_value=actual_path2,
+ ):
+ self.assertTrue(fs.exists(dir_virtual_path))
+ with self.assertRaises(IsADirectoryError):
+ fs.cat_file(dir_virtual_path)
+
def test_get_file(self, *mock_methods):
- local_fs = LocalFileSystem()
fileset_storage_location = f"{self._fileset_dir}/test_get_file"
- local_fs.mkdir(fileset_storage_location)
-
fileset_virtual_location = "fileset/fileset_catalog/tmp/test_get_file"
-
+ actual_path = fileset_storage_location
+ local_fs = LocalFileSystem()
+ local_fs.mkdir(fileset_storage_location)
sub_file_path = f"{fileset_storage_location}/test_file_1.par"
local_fs.touch(sub_file_path)
self.assertTrue(local_fs.exists(sub_file_path))
@@ -684,30 +771,46 @@ class TestLocalFilesystem(unittest.TestCase):
self.assertTrue(local_fs.exists(sub_dir_path))
fs = gvfs.GravitinoVirtualFileSystem(
- server_uri="http://localhost:9090", metalake_name="metalake_demo"
+ server_uri="http://localhost:9090",
+ metalake_name="metalake_demo",
+ skip_instance_cache=True,
)
- self.assertTrue(fs.exists(fileset_virtual_location))
+ with patch(
+
"gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location",
+ return_value=actual_path,
+ ):
+ self.assertTrue(fs.exists(fileset_virtual_location))
# test open and write file
file_virtual_path = fileset_virtual_location + "/test_file_1.par"
- self.assertTrue(fs.exists(file_virtual_path))
- with fs.open(file_virtual_path, mode="wb") as f:
- f.write(b"test_get_file")
- self.assertTrue(fs.info(file_virtual_path)["size"] > 0)
-
- # test get file
- local_path = self._fileset_dir + "/local_file.par"
- local_fs.touch(local_path)
- self.assertTrue(local_fs.exists(local_path))
- fs.get_file(file_virtual_path, local_path)
- self.assertEqual(b"test_get_file", local_fs.cat_file(local_path))
+ actual_path1 = fileset_storage_location + "/test_file_1.par"
+ with patch(
+
"gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location",
+ return_value=actual_path1,
+ ):
+ self.assertTrue(fs.exists(file_virtual_path))
+ with fs.open(file_virtual_path, mode="wb") as f:
+ f.write(b"test_get_file")
+ self.assertTrue(fs.info(file_virtual_path)["size"] > 0)
+
+ # test get file
+ local_path = self._fileset_dir + "/local_file.par"
+ local_fs.touch(local_path)
+ self.assertTrue(local_fs.exists(local_path))
+ fs.get_file(file_virtual_path, local_path)
+ self.assertEqual(b"test_get_file", local_fs.cat_file(local_path))
# test get a dir
dir_virtual_path = fileset_virtual_location + "/sub_dir"
- local_path = self._fileset_dir + "/local_dir"
- self.assertTrue(fs.exists(dir_virtual_path))
- fs.get_file(dir_virtual_path, local_path)
- self.assertTrue(local_fs.exists(local_path))
+ actual_path2 = fileset_storage_location + "/sub_dir"
+ with patch(
+
"gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location",
+ return_value=actual_path2,
+ ):
+ local_path = self._fileset_dir + "/local_dir"
+ self.assertTrue(fs.exists(dir_virtual_path))
+ fs.get_file(dir_virtual_path, local_path)
+ self.assertTrue(local_fs.exists(local_path))
# test get a file to a remote file
remote_path = "gvfs://" + fileset_virtual_location + "/test_file_2.par"
@@ -715,100 +818,65 @@ class TestLocalFilesystem(unittest.TestCase):
fs.get_file(file_virtual_path, remote_path)
def test_convert_actual_path(self, *mock_methods):
- # test convert actual hdfs path
- audit_dto = AuditDTO(
- _creator="test",
- _create_time="2022-01-01T00:00:00Z",
- _last_modifier="test",
- _last_modified_time="2024-04-05T10:10:35.218Z",
- )
- hdfs_fileset: FilesetDTO = FilesetDTO(
- _name="test_f1",
- _comment="",
- _type=FilesetDTO.Type.MANAGED,
- _storage_location="hdfs://localhost:8090/fileset/test_f1",
- _audit=audit_dto,
- _properties={},
- )
- mock_hdfs_context: FilesetContext = FilesetContext(
- name_identifier=NameIdentifier.of(
- "test_metalake", "test_catalog", "test_schema", "test_f1"
- ),
- storage_type=StorageType.HDFS,
- fileset=hdfs_fileset,
- actual_path=hdfs_fileset.storage_location() + "/actual_path",
- fs=LocalFileSystem(),
- )
-
fs = gvfs.GravitinoVirtualFileSystem(
- server_uri="http://localhost:9090", metalake_name="metalake_demo"
+ server_uri="http://localhost:9090",
+ metalake_name="metalake_demo",
+ skip_instance_cache=True,
+ )
+ storage_location = "hdfs://localhost:8090/fileset/test_f1"
+ virtual_location = fs._get_virtual_location(
+ NameIdentifier.of("test_metalake", "test_catalog", "test_schema",
"test_f1")
)
# test actual path not start with storage location
actual_path = "/not_start_with_storage/ttt"
with self.assertRaises(GravitinoRuntimeException):
- fs._convert_actual_path(actual_path, mock_hdfs_context)
+ fs._convert_actual_path(actual_path, storage_location,
virtual_location)
# test actual path start with storage location
actual_path = "/fileset/test_f1/actual_path"
- virtual_path = fs._convert_actual_path(actual_path, mock_hdfs_context)
+ virtual_path = fs._convert_actual_path(
+ actual_path, storage_location, virtual_location
+ )
self.assertEqual(
"fileset/test_catalog/test_schema/test_f1/actual_path",
virtual_path
)
# test convert actual local path
- audit_dto = AuditDTO(
- _creator="test",
- _create_time="2022-01-01T00:00:00Z",
- _last_modifier="test",
- _last_modified_time="2024-04-05T10:10:35.218Z",
- )
- local_fileset: FilesetDTO = FilesetDTO(
- _name="test_f1",
- _comment="",
- _type=FilesetDTO.Type.MANAGED,
- _storage_location="file:/tmp/fileset/test_f1",
- _audit=audit_dto,
- _properties={},
- )
- mock_local_context: FilesetContext = FilesetContext(
- name_identifier=NameIdentifier.of(
- "test_metalake", "test_catalog", "test_schema", "test_f1"
- ),
- storage_type=StorageType.LOCAL,
- fileset=local_fileset,
- actual_path=local_fileset.storage_location() + "/actual_path",
- fs=LocalFileSystem(),
- )
-
fs = gvfs.GravitinoVirtualFileSystem(
- server_uri="http://localhost:9090", metalake_name="metalake_demo"
+ server_uri="http://localhost:9090",
+ metalake_name="metalake_demo",
+ skip_instance_cache=True,
+ )
+ storage_location = "file:/tmp/fileset/test_f1"
+ virtual_location = fs._get_virtual_location(
+ NameIdentifier.of("test_metalake", "test_catalog", "test_schema",
"test_f1")
)
# test actual path not start with storage location
actual_path = "/not_start_with_storage/ttt"
with self.assertRaises(GravitinoRuntimeException):
- fs._convert_actual_path(actual_path, mock_local_context)
+ fs._convert_actual_path(actual_path, storage_location,
virtual_location)
# test actual path start with storage location
actual_path = "/tmp/fileset/test_f1/actual_path"
- virtual_path = fs._convert_actual_path(actual_path, mock_local_context)
+ virtual_path = fs._convert_actual_path(
+ actual_path, storage_location, virtual_location
+ )
self.assertEqual(
"fileset/test_catalog/test_schema/test_f1/actual_path",
virtual_path
)
# test storage location without "/"
actual_path = "/tmp/test_convert_actual_path/sub_dir/1.parquet"
- storage_location1 = "file:/tmp/test_convert_actual_path"
- mock_fileset1: Fileset = mock.Mock(spec=Fileset)
- mock_fileset1.storage_location.return_value = storage_location1
-
- mock_fileset_context1: FilesetContext = mock.Mock(spec=FilesetContext)
- mock_fileset_context1.get_storage_type.return_value = StorageType.LOCAL
- mock_fileset_context1.get_name_identifier.return_value =
NameIdentifier.of(
- "test_metalake", "catalog", "schema", "test_convert_actual_path"
+ storage_location = "file:/tmp/test_convert_actual_path"
+ virtual_location = fs._get_virtual_location(
+ NameIdentifier.of(
+ "test_metalake", "catalog", "schema",
"test_convert_actual_path"
+ )
)
- mock_fileset_context1.get_fileset.return_value = mock_fileset1
- virtual_path = fs._convert_actual_path(actual_path,
mock_fileset_context1)
+ virtual_path = fs._convert_actual_path(
+ actual_path, storage_location, virtual_location
+ )
self.assertEqual(
"fileset/catalog/schema/test_convert_actual_path/sub_dir/1.parquet",
virtual_path,
@@ -816,107 +884,90 @@ class TestLocalFilesystem(unittest.TestCase):
# test storage location with "/"
actual_path = "/tmp/test_convert_actual_path/sub_dir/1.parquet"
- storage_location2 = "file:/tmp/test_convert_actual_path/"
- mock_fileset2: Fileset = mock.Mock(spec=Fileset)
- mock_fileset2.storage_location.return_value = storage_location2
-
- mock_fileset_context2: FilesetContext = mock.Mock(spec=FilesetContext)
- mock_fileset_context2.get_storage_type.return_value = StorageType.LOCAL
- mock_fileset_context2.get_name_identifier.return_value =
NameIdentifier.of(
- "test_metalake", "catalog", "schema", "test_convert_actual_path"
+ storage_location = "file:/tmp/test_convert_actual_path/"
+ virtual_location = fs._get_virtual_location(
+ NameIdentifier.of(
+ "test_metalake", "catalog", "schema",
"test_convert_actual_path"
+ )
)
- mock_fileset_context2.get_fileset.return_value = mock_fileset2
- virtual_path = fs._convert_actual_path(actual_path,
mock_fileset_context2)
+ virtual_path = fs._convert_actual_path(
+ actual_path, storage_location, virtual_location
+ )
self.assertEqual(
"fileset/catalog/schema/test_convert_actual_path/sub_dir/1.parquet",
virtual_path,
)
- def test_convert_info(self, *mock_methods3):
- # test convert actual hdfs path
- audit_dto = AuditDTO(
- _creator="test",
- _create_time="2022-01-01T00:00:00Z",
- _last_modifier="test",
- _last_modified_time="2024-04-05T10:10:35.218Z",
- )
- hdfs_fileset: FilesetDTO = FilesetDTO(
- _name="test_f1",
- _comment="",
- _type=FilesetDTO.Type.MANAGED,
- _storage_location="hdfs://localhost:8090/fileset/test_f1",
- _audit=audit_dto,
- _properties={},
- )
- mock_hdfs_context: FilesetContext = FilesetContext(
- name_identifier=NameIdentifier.of(
- "test_metalake", "test_catalog", "test_schema", "test_f1"
- ),
- storage_type=StorageType.HDFS,
- fileset=hdfs_fileset,
- actual_path=hdfs_fileset.storage_location() + "/actual_path",
- fs=LocalFileSystem(),
- )
-
+ def test_convert_info(self, *mock_methods):
fs = gvfs.GravitinoVirtualFileSystem(
- server_uri="http://localhost:9090", metalake_name="metalake_demo"
+ server_uri="http://localhost:9090",
+ metalake_name="metalake_demo",
+ skip_instance_cache=True,
)
# test actual path not start with storage location
- actual_path = "/not_start_with_storage/ttt"
+ entry = {
+ "name": "/not_start_with_storage/ttt",
+ "size": 1,
+ "type": "file",
+ "mtime": datetime.now(),
+ }
+ storage_location = "hdfs://localhost:8090/fileset/test_f1"
+ virtual_location = fs._get_virtual_location(
+ NameIdentifier.of("test_metalake", "test_catalog", "test_schema",
"test_f1")
+ )
with self.assertRaises(GravitinoRuntimeException):
- fs._convert_actual_path(actual_path, mock_hdfs_context)
+ fs._convert_actual_info(entry, storage_location, virtual_location)
# test actual path start with storage location
- actual_path = "/fileset/test_f1/actual_path"
- virtual_path = fs._convert_actual_path(actual_path, mock_hdfs_context)
+ entry = {
+ "name": "/fileset/test_f1/actual_path",
+ "size": 1,
+ "type": "file",
+ "mtime": datetime.now(),
+ }
+ info = fs._convert_actual_info(entry, storage_location,
virtual_location)
self.assertEqual(
- "fileset/test_catalog/test_schema/test_f1/actual_path",
virtual_path
+ "fileset/test_catalog/test_schema/test_f1/actual_path",
info["name"]
)
# test convert actual local path
- audit_dto = AuditDTO(
- _creator="test",
- _create_time="2022-01-01T00:00:00Z",
- _last_modifier="test",
- _last_modified_time="2024-04-05T10:10:35.218Z",
- )
- local_fileset: FilesetDTO = FilesetDTO(
- _name="test_f1",
- _comment="",
- _type=FilesetDTO.Type.MANAGED,
- _storage_location="file:/tmp/fileset/test_f1",
- _audit=audit_dto,
- _properties={},
- )
- mock_local_context: FilesetContext = FilesetContext(
- name_identifier=NameIdentifier.of(
- "test_metalake", "test_catalog", "test_schema", "test_f1"
- ),
- storage_type=StorageType.LOCAL,
- fileset=local_fileset,
- actual_path=local_fileset.storage_location() + "/actual_path",
- fs=LocalFileSystem(),
- )
-
fs = gvfs.GravitinoVirtualFileSystem(
- server_uri="http://localhost:9090", metalake_name="metalake_demo"
+ server_uri="http://localhost:9090",
+ metalake_name="metalake_demo",
+ skip_instance_cache=True,
)
# test actual path not start with storage location
- actual_path = "/not_start_with_storage/ttt"
+ entry = {
+ "name": "/not_start_with_storage/ttt",
+ "size": 1,
+ "type": "file",
+ "mtime": datetime.now(),
+ }
+ storage_location = "file:/tmp/fileset/test_f1"
+ virtual_location = fs._get_virtual_location(
+ NameIdentifier.of("test_metalake", "test_catalog", "test_schema",
"test_f1")
+ )
with self.assertRaises(GravitinoRuntimeException):
- fs._convert_actual_path(actual_path, mock_local_context)
+ fs._convert_actual_info(entry, storage_location, virtual_location)
# test actual path start with storage location
- actual_path = "/tmp/fileset/test_f1/actual_path"
- virtual_path = fs._convert_actual_path(actual_path, mock_local_context)
+ entry = {
+ "name": "/tmp/fileset/test_f1/actual_path",
+ "size": 1,
+ "type": "file",
+ "mtime": datetime.now(),
+ }
+ info = fs._convert_actual_info(entry, storage_location,
virtual_location)
self.assertEqual(
- "fileset/test_catalog/test_schema/test_f1/actual_path",
virtual_path
+ "fileset/test_catalog/test_schema/test_f1/actual_path",
info["name"]
)
def test_extract_identifier(self, *mock_methods):
fs = gvfs.GravitinoVirtualFileSystem(
- server_uri="http://localhost:9090", metalake_name="metalake_demo"
+ server_uri="http://localhost:9090",
+ metalake_name="metalake_demo",
+ skip_instance_cache=True,
)
with self.assertRaises(GravitinoRuntimeException):
fs._extract_identifier(path=None)
@@ -932,152 +983,93 @@ class TestLocalFilesystem(unittest.TestCase):
self.assertEqual("schema", identifier.namespace().level(2))
self.assertEqual("fileset", identifier.name())
- @patch(
- "gravitino.catalog.fileset_catalog.FilesetCatalog.load_fileset",
- return_value=mock_base.mock_load_fileset(
- "test_pandas", f"{_fileset_dir}/test_pandas"
- ),
- )
def test_pandas(self, *mock_methods):
- local_fs = LocalFileSystem()
fileset_storage_location = f"{self._fileset_dir}/test_pandas"
+ fileset_virtual_location =
"gvfs://fileset/fileset_catalog/tmp/test_pandas"
+ local_fs = LocalFileSystem()
local_fs.mkdir(fileset_storage_location)
- fileset_virtual_location =
"gvfs://fileset/fileset_catalog/tmp/test_pandas"
data = pandas.DataFrame({"Name": ["A", "B", "C", "D"], "ID": [20, 21,
19, 18]})
fs = gvfs.GravitinoVirtualFileSystem(
- server_uri="http://localhost:8090", metalake_name="test_metalake"
- )
- # to parquet
- data.to_parquet(fileset_virtual_location + "/test.parquet",
filesystem=fs)
- self.assertTrue(local_fs.exists(fileset_storage_location +
"/test.parquet"))
-
- # read parquet
- ds1 = pandas.read_parquet(
- path=fileset_virtual_location + "/test.parquet", filesystem=fs
- )
- self.assertTrue(data.equals(ds1))
+ server_uri="http://localhost:8090",
+ metalake_name="test_metalake",
+ skip_instance_cache=True,
+ )
+ actual_path = fileset_storage_location + "/test.parquet"
+ with patch(
+
"gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location",
+ return_value=actual_path,
+ ):
+ # to parquet
+ data.to_parquet(fileset_virtual_location + "/test.parquet",
filesystem=fs)
+ self.assertTrue(local_fs.exists(fileset_storage_location +
"/test.parquet"))
+
+ # read parquet
+ ds1 = pandas.read_parquet(
+ path=fileset_virtual_location + "/test.parquet", filesystem=fs
+ )
+ self.assertTrue(data.equals(ds1))
storage_options = {
"server_uri": "http://localhost:8090",
"metalake_name": "test_metalake",
}
- # to csv
- data.to_csv(
- fileset_virtual_location + "/test.csv",
- index=False,
- storage_options=storage_options,
- )
- self.assertTrue(local_fs.exists(fileset_storage_location +
"/test.csv"))
- # read csv
- ds2 = pandas.read_csv(
- fileset_virtual_location + "/test.csv",
storage_options=storage_options
- )
- self.assertTrue(data.equals(ds2))
+ actual_path1 = fileset_storage_location
+ actual_path2 = fileset_storage_location + "/test.csv"
+
+ with patch(
+
"gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location",
+ side_effect=[actual_path1, actual_path2, actual_path2],
+ ):
+ # to csv
+ data.to_csv(
+ fileset_virtual_location + "/test.csv",
+ index=False,
+ storage_options=storage_options,
+ )
+ self.assertTrue(local_fs.exists(fileset_storage_location +
"/test.csv"))
+
+ # read csv
+ ds2 = pandas.read_csv(
+ fileset_virtual_location + "/test.csv",
storage_options=storage_options
+ )
+ self.assertTrue(data.equals(ds2))
- @patch(
- "gravitino.catalog.fileset_catalog.FilesetCatalog.load_fileset",
- return_value=mock_base.mock_load_fileset(
- "test_pyarrow", f"{_fileset_dir}/test_pyarrow"
- ),
- )
def test_pyarrow(self, *mock_methods):
- local_fs = LocalFileSystem()
fileset_storage_location = f"{self._fileset_dir}/test_pyarrow"
- local_fs.mkdir(fileset_storage_location)
-
fileset_virtual_location =
"gvfs://fileset/fileset_catalog/tmp/test_pyarrow"
- data = pandas.DataFrame({"Name": ["A", "B", "C", "D"], "ID": [20, 21,
19, 18]})
- fs = gvfs.GravitinoVirtualFileSystem(
- server_uri="http://localhost:8090", metalake_name="test_metalake"
- )
-
- # to parquet
- data.to_parquet(fileset_virtual_location + "/test.parquet",
filesystem=fs)
- self.assertTrue(local_fs.exists(fileset_storage_location +
"/test.parquet"))
-
- # read as arrow dataset
- arrow_dataset = dt.dataset(
- fileset_virtual_location + "/test.parquet", filesystem=fs
- )
- arrow_tb_1 = arrow_dataset.to_table()
-
- arrow_tb_2 = pa.Table.from_pandas(data)
- self.assertTrue(arrow_tb_1.equals(arrow_tb_2))
-
- # read as arrow parquet dataset
- arrow_tb_3 = pq.read_table(
- fileset_virtual_location + "/test.parquet", filesystem=fs
- )
- self.assertTrue(arrow_tb_3.equals(arrow_tb_2))
-
- @patch(
- "gravitino.catalog.fileset_catalog.FilesetCatalog.load_fileset",
- return_value=mock_base.mock_load_fileset(
- "test_llama_index", f"{_fileset_dir}/test_llama_index"
- ),
- )
- def test_llama_index(self, *mock_methods):
local_fs = LocalFileSystem()
- fileset_storage_location = f"{self._fileset_dir}/test_llama_index"
local_fs.mkdir(fileset_storage_location)
-
- fileset_virtual_location =
"gvfs://fileset/fileset_catalog/tmp/test_llama_index"
data = pandas.DataFrame({"Name": ["A", "B", "C", "D"], "ID": [20, 21,
19, 18]})
fs = gvfs.GravitinoVirtualFileSystem(
- server_uri="http://localhost:8090", metalake_name="test_metalake"
- )
-
- storage_options = {
- "server_uri": "http://localhost:8090",
- "metalake_name": "test_metalake",
- }
- # to csv
- data.to_csv(
- fileset_virtual_location + "/test.csv",
- index=False,
- storage_options=storage_options,
- )
- self.assertTrue(local_fs.exists(fileset_storage_location +
"/test.csv"))
+ server_uri="http://localhost:8090",
+ metalake_name="test_metalake",
+ skip_instance_cache=True,
+ )
+ actual_path = fileset_storage_location + "/test.parquet"
+ with patch(
+
"gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location",
+ return_value=actual_path,
+ ):
+ # to parquet
+ data.to_parquet(fileset_virtual_location + "/test.parquet",
filesystem=fs)
+ self.assertTrue(local_fs.exists(fileset_storage_location +
"/test.parquet"))
+
+ # read as arrow dataset
+ arrow_dataset = dt.dataset(
+ fileset_virtual_location + "/test.parquet", filesystem=fs
+ )
+ arrow_tb_1 = arrow_dataset.to_table()
+ arrow_tb_2 = pa.Table.from_pandas(data)
+ self.assertTrue(arrow_tb_1.equals(arrow_tb_2))
- data.to_csv(
- fileset_virtual_location + "/sub_dir/test1.csv",
- index=False,
- storage_options=storage_options,
- )
- self.assertTrue(
- local_fs.exists(fileset_storage_location + "/sub_dir/test1.csv")
- )
+ # read as arrow parquet dataset
+ arrow_tb_3 = pq.read_table(
+ fileset_virtual_location + "/test.parquet", filesystem=fs
+ )
+ self.assertTrue(arrow_tb_3.equals(arrow_tb_2))
- reader = SimpleDirectoryReader(
- input_dir="fileset/fileset_catalog/tmp/test_llama_index",
- fs=fs,
- recursive=True, # recursively searches all subdirectories
- )
- documents = reader.load_data()
- self.assertEqual(len(documents), 2)
- doc_1 = documents[0]
- result_1 = [line.strip().split(", ") for line in
doc_1.text.split("\n")]
- self.assertEqual(4, len(result_1))
- for row in result_1:
- if row[0] == "A":
- self.assertEqual(row[1], "20")
- elif row[0] == "B":
- self.assertEqual(row[1], "21")
- elif row[0] == "C":
- self.assertEqual(row[1], "19")
- elif row[0] == "D":
- self.assertEqual(row[1], "18")
-
- @patch(
- "gravitino.catalog.fileset_catalog.FilesetCatalog.load_fileset",
- return_value=mock_base.mock_load_fileset(
- "test_location_with_tailing_slash",
- f"{_fileset_dir}/test_location_with_tailing_slash/",
- ),
- )
def test_location_with_tailing_slash(self, *mock_methods):
- local_fs = LocalFileSystem()
# storage location is ending with a "/"
fileset_storage_location = (
f"{self._fileset_dir}/test_location_with_tailing_slash/"
@@ -1085,6 +1077,7 @@ class TestLocalFilesystem(unittest.TestCase):
fileset_virtual_location = (
"fileset/fileset_catalog/tmp/test_location_with_tailing_slash"
)
+ local_fs = LocalFileSystem()
local_fs.mkdir(fileset_storage_location)
sub_dir_path = f"{fileset_storage_location}test_1"
local_fs.mkdir(sub_dir_path)
@@ -1093,82 +1086,45 @@ class TestLocalFilesystem(unittest.TestCase):
local_fs.touch(sub_file_path)
self.assertTrue(local_fs.exists(sub_file_path))
+ actual_path = fileset_storage_location
fs = gvfs.GravitinoVirtualFileSystem(
- server_uri="http://localhost:9090", metalake_name="metalake_demo"
+ server_uri="http://localhost:9090",
+ metalake_name="metalake_demo",
+ skip_instance_cache=True,
)
- self.assertTrue(fs.exists(fileset_virtual_location))
+ with patch(
+
"gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location",
+ return_value=actual_path,
+ ):
+ self.assertTrue(fs.exists(fileset_virtual_location))
dir_virtual_path = fileset_virtual_location + "/test_1"
- dir_info = fs.info(dir_virtual_path)
- self.assertEqual(dir_info["name"], dir_virtual_path)
+ actual_path1 = fileset_storage_location + "test_1"
+ with patch(
+
"gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location",
+ return_value=actual_path1,
+ ):
+ dir_info = fs.info(dir_virtual_path)
+ self.assertEqual(dir_info["name"], dir_virtual_path)
file_virtual_path = fileset_virtual_location +
"/test_1/test_file_1.par"
- file_info = fs.info(file_virtual_path)
- self.assertEqual(file_info["name"], file_virtual_path)
-
- file_status = fs.ls(fileset_virtual_location, detail=True)
- for status in file_status:
- if status["name"].endswith("test_1"):
- self.assertEqual(status["name"], dir_virtual_path)
- elif status["name"].endswith("test_file_1.par"):
- self.assertEqual(status["name"], file_virtual_path)
- else:
- raise GravitinoRuntimeException("Unexpected file found")
-
- def test_get_actual_path_by_ident(self, *mock_methods):
- ident1 = NameIdentifier.of(
- "test_metalake", "catalog", "schema",
"test_get_actual_path_by_ident"
- )
- storage_type = gvfs.StorageType.LOCAL
- local_fs = LocalFileSystem()
-
- fs = gvfs.GravitinoVirtualFileSystem(
- server_uri="http://localhost:9090", metalake_name="metalake_demo"
- )
-
- # test storage location end with "/"
- storage_location_1 =
f"{self._fileset_dir}/test_get_actual_path_by_ident/"
- # virtual path end with "/"
- virtual_path1 = "fileset/catalog/schema/test_get_actual_path_by_ident/"
- local_fs.mkdir(storage_location_1)
- self.assertTrue(local_fs.exists(storage_location_1))
-
- mock_fileset1: Fileset = mock.Mock(spec=Fileset)
- mock_fileset1.storage_location.return_value = storage_location_1
-
- actual_path1 = fs._get_actual_path_by_ident(
- ident1, mock_fileset1, local_fs, storage_type, virtual_path1
- )
- self.assertEqual(actual_path1, storage_location_1)
-
- # virtual path end without "/"
- virtual_path2 = "fileset/catalog/schema/test_get_actual_path_by_ident"
- actual_path2 = fs._get_actual_path_by_ident(
- ident1, mock_fileset1, local_fs, storage_type, virtual_path2
- )
- self.assertEqual(actual_path2, storage_location_1)
-
- # test storage location end without "/"
- ident2 = NameIdentifier.of(
- "test_metalake", "catalog", "schema", "test_without_slash"
- )
- storage_location_2 = f"{self._fileset_dir}/test_without_slash"
- # virtual path end with "/"
- virtual_path3 = "fileset/catalog/schema/test_without_slash/"
- local_fs.mkdir(storage_location_2)
- self.assertTrue(local_fs.exists(storage_location_2))
-
- mock_fileset2: Fileset = mock.Mock(spec=Fileset)
- mock_fileset2.storage_location.return_value = storage_location_2
-
- actual_path3 = fs._get_actual_path_by_ident(
- ident2, mock_fileset2, local_fs, storage_type, virtual_path3
- )
- self.assertEqual(actual_path3, f"{storage_location_2}/")
-
- # virtual path end without "/"
- virtual_path4 = "fileset/catalog/schema/test_without_slash"
- actual_path4 = fs._get_actual_path_by_ident(
- ident2, mock_fileset2, local_fs, storage_type, virtual_path4
- )
- self.assertEqual(actual_path4, storage_location_2)
+ actual_path2 = fileset_storage_location + "test_1/test_file_1.par"
+ with patch(
+
"gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location",
+ return_value=actual_path2,
+ ):
+ file_info = fs.info(file_virtual_path)
+ self.assertEqual(file_info["name"], file_virtual_path)
+
+ with patch(
+
"gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location",
+ return_value=actual_path,
+ ):
+ file_status = fs.ls(fileset_virtual_location, detail=True)
+ for status in file_status:
+ if status["name"].endswith("test_1"):
+ self.assertEqual(status["name"], dir_virtual_path)
+ elif status["name"].endswith("test_file_1.par"):
+ self.assertEqual(status["name"], file_virtual_path)
+ else:
+ raise GravitinoRuntimeException("Unexpected file found")
diff --git
a/common/src/main/java/org/apache/gravitino/audit/FilesetDataOperation.java
b/common/src/main/java/org/apache/gravitino/audit/FilesetDataOperation.java
index b76d1f91b..88ac4d11b 100644
--- a/common/src/main/java/org/apache/gravitino/audit/FilesetDataOperation.java
+++ b/common/src/main/java/org/apache/gravitino/audit/FilesetDataOperation.java
@@ -25,6 +25,10 @@ public enum FilesetDataOperation {
CREATE,
/** Opens a file. */
OPEN,
+ /** Opens a file and writes to it. */
+ OPEN_AND_WRITE,
+ /** Opens a file and appends to it. */
+ OPEN_AND_APPEND,
/** Appends some content into a file. */
APPEND,
/** Renames a file or a directory. */