This is an automated email from the ASF dual-hosted git repository.
mchades pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 8cea42e126 [#8752] feat(gvfs-python): add fallback for Python gvfs
(#8869)
8cea42e126 is described below
commit 8cea42e126836d9e7a254b33758daa3bbfc0e361
Author: Junda Yang <[email protected]>
AuthorDate: Thu Oct 23 04:27:54 2025 -0700
[#8752] feat(gvfs-python): add fallback for Python gvfs (#8869)
### What changes were proposed in this pull request?
Add fallback support in Python gvfs hooks
### Why are the changes needed?
this is the last resort for fallback scenario, in case anything goes
wrong with Gravitino server or gvfs. The change in Python GVFS is
consistent with the JAVA GVFS change in this PR -
https://github.com/apache/gravitino/pull/8753
Fix: #8752
### Does this PR introduce _any_ user-facing change?
By default, no behavior change. However, user now can customize
fallback.
### How was this patch tested?
Unit tests added and updated
---
clients/client-python/gravitino/filesystem/gvfs.py | 378 ++++++++++++---------
.../gravitino/filesystem/gvfs_hook.py | 273 +++++++++++++++
.../tests/unittests/test_gvfs_with_hook.py | 190 +++++++++++
3 files changed, 680 insertions(+), 161 deletions(-)
diff --git a/clients/client-python/gravitino/filesystem/gvfs.py
b/clients/client-python/gravitino/filesystem/gvfs.py
index 5f0b13aa88..59b772efc4 100644
--- a/clients/client-python/gravitino/filesystem/gvfs.py
+++ b/clients/client-python/gravitino/filesystem/gvfs.py
@@ -133,12 +133,16 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
:param kwargs: Extra args
:return If details is true, returns a list of file info dicts, else
returns a list of file paths
"""
- new_path = self._hook.pre_ls(path, detail, **kwargs)
- decorated_ls = self._with_exception_translation(
- FilesetDataOperation.LIST_STATUS
- )(self._operations.ls)
- result = decorated_ls(new_path, detail, **kwargs)
- return self._hook.post_ls(detail, result, **kwargs)
+ try:
+ new_path = self._hook.pre_ls(path, detail, **kwargs)
+ decorated_ls = self._with_exception_translation(
+ FilesetDataOperation.LIST_STATUS
+ )(self._operations.ls)
+ result = decorated_ls(new_path, detail, **kwargs)
+ return self._hook.post_ls(detail, result, **kwargs)
+ except Exception as e: # pylint: disable=broad-exception-caught
+ # Hook will either provide fallback value or re-raise the exception
+ return self._hook.on_ls_failure(path, e, **kwargs)
def info(self, path, **kwargs):
"""Get file info.
@@ -146,15 +150,19 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
:param kwargs: Extra args
:return A file info dict
"""
- new_path = self._hook.pre_info(path, **kwargs)
- decorated_info = self._with_exception_translation(
- FilesetDataOperation.GET_FILE_STATUS
- )(self._operations.info)
- result = decorated_info(new_path, **kwargs)
- return self._hook.post_info(
- result,
- **kwargs,
- )
+ try:
+ new_path = self._hook.pre_info(path, **kwargs)
+ decorated_info = self._with_exception_translation(
+ FilesetDataOperation.GET_FILE_STATUS
+ )(self._operations.info)
+ result = decorated_info(new_path, **kwargs)
+ return self._hook.post_info(
+ result,
+ **kwargs,
+ )
+ except Exception as e: # pylint: disable=broad-exception-caught
+ # Hook will either provide fallback value or re-raise the exception
+ return self._hook.on_info_failure(path, e, **kwargs)
def exists(self, path, **kwargs):
"""Check if a file or a directory exists.
@@ -162,20 +170,24 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
:param kwargs: Extra args
:return If a file or directory exists, it returns True, otherwise False
"""
- new_path = self._hook.pre_exists(path, **kwargs)
- decorated_exists = self._with_exception_translation(
- FilesetDataOperation.EXISTS
- )(self._operations.exists)
try:
- result = decorated_exists(new_path, **kwargs)
- except FileNotFoundError:
- return False
-
- return self._hook.post_exists(
- new_path,
- result,
- **kwargs,
- )
+ new_path = self._hook.pre_exists(path, **kwargs)
+ decorated_exists = self._with_exception_translation(
+ FilesetDataOperation.EXISTS
+ )(self._operations.exists)
+ try:
+ result = decorated_exists(new_path, **kwargs)
+ except FileNotFoundError:
+ return False
+
+ return self._hook.post_exists(
+ new_path,
+ result,
+ **kwargs,
+ )
+ except Exception as e: # pylint: disable=broad-exception-caught
+ # Hook will either provide fallback value or re-raise the exception
+ return self._hook.on_exists_failure(path, e, **kwargs)
def cp_file(self, path1, path2, **kwargs):
"""Copy a file.
@@ -183,12 +195,15 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
:param path2: Virtual dst fileset path, should be consistent with the
src path fileset identifier
:param kwargs: Extra args
"""
- new_path1, new_path2 = self._hook.pre_cp_file(path1, path2, **kwargs)
- decorated_cp_file = self._with_exception_translation(
- FilesetDataOperation.COPY_FILE
- )(self._operations.cp_file)
- decorated_cp_file(new_path1, new_path2, **kwargs)
- self._hook.post_cp_file(new_path1, new_path2, **kwargs)
+ try:
+ new_path1, new_path2 = self._hook.pre_cp_file(path1, path2,
**kwargs)
+ decorated_cp_file = self._with_exception_translation(
+ FilesetDataOperation.COPY_FILE
+ )(self._operations.cp_file)
+ decorated_cp_file(new_path1, new_path2, **kwargs)
+ self._hook.post_cp_file(new_path1, new_path2, **kwargs)
+ except Exception as e: # pylint: disable=broad-exception-caught
+ self._hook.on_cp_file_failure(path1, path2, e, **kwargs)
def mv(self, path1, path2, recursive=False, maxdepth=None, **kwargs):
"""Move a file to another directory.
@@ -200,14 +215,17 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
:param maxdepth: Maximum depth of recursive move
:param kwargs: Extra args
"""
- new_path1, new_path2 = self._hook.pre_mv(
- path1, path2, recursive, maxdepth, **kwargs
- )
- decorated_mv =
self._with_exception_translation(FilesetDataOperation.RENAME)(
- self._operations.mv
- )
- decorated_mv(new_path1, new_path2, recursive, maxdepth, **kwargs)
- self._hook.post_mv(new_path1, new_path2, recursive, maxdepth, **kwargs)
+ try:
+ new_path1, new_path2 = self._hook.pre_mv(
+ path1, path2, recursive, maxdepth, **kwargs
+ )
+ decorated_mv = self._with_exception_translation(
+ FilesetDataOperation.RENAME
+ )(self._operations.mv)
+ decorated_mv(new_path1, new_path2, recursive, maxdepth, **kwargs)
+ self._hook.post_mv(new_path1, new_path2, recursive, maxdepth,
**kwargs)
+ except Exception as e: # pylint: disable=broad-exception-caught
+ self._hook.on_mv_failure(path1, path2, recursive, maxdepth, e,
**kwargs)
def _rm(self, path):
raise GravitinoRuntimeException(
@@ -221,23 +239,29 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
When removing a directory, this parameter should be True.
:param maxdepth: The maximum depth to remove the directory recursively.
"""
- new_path = self._hook.pre_rm(path, recursive, maxdepth)
- decorated_rm =
self._with_exception_translation(FilesetDataOperation.DELETE)(
- self._operations.rm
- )
- decorated_rm(new_path, recursive, maxdepth)
- self._hook.post_rm(new_path, recursive, maxdepth)
+ try:
+ new_path = self._hook.pre_rm(path, recursive, maxdepth)
+ decorated_rm = self._with_exception_translation(
+ FilesetDataOperation.DELETE
+ )(self._operations.rm)
+ decorated_rm(new_path, recursive, maxdepth)
+ self._hook.post_rm(new_path, recursive, maxdepth)
+ except Exception as e: # pylint: disable=broad-exception-caught
+ self._hook.on_rm_failure(path, recursive, maxdepth, e)
def rm_file(self, path):
"""Remove a file.
:param path: Virtual fileset path
"""
- new_path = self._hook.pre_rm_file(path)
- decorated_rm_file = self._with_exception_translation(
- FilesetDataOperation.DELETE
- )(self._operations.rm_file)
- decorated_rm_file(new_path)
- self._hook.post_rm_file(new_path)
+ try:
+ new_path = self._hook.pre_rm_file(path)
+ decorated_rm_file = self._with_exception_translation(
+ FilesetDataOperation.DELETE
+ )(self._operations.rm_file)
+ decorated_rm_file(new_path)
+ self._hook.post_rm_file(new_path)
+ except Exception as e: # pylint: disable=broad-exception-caught
+ self._hook.on_rm_file_failure(path, e)
def rmdir(self, path):
"""Remove a directory.
@@ -245,12 +269,15 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
And it will throw an exception if delete a directory which is
non-empty for LocalFileSystem.
:param path: Virtual fileset path
"""
- new_path = self._hook.pre_rmdir(path)
- decorated_rmdir =
self._with_exception_translation(FilesetDataOperation.DELETE)(
- self._operations.rmdir
- )
- decorated_rmdir(new_path)
- self._hook.post_rmdir(new_path)
+ try:
+ new_path = self._hook.pre_rmdir(path)
+ decorated_rmdir = self._with_exception_translation(
+ FilesetDataOperation.DELETE
+ )(self._operations.rmdir)
+ decorated_rmdir(new_path)
+ self._hook.post_rmdir(new_path)
+ except Exception as e: # pylint: disable=broad-exception-caught
+ self._hook.on_rmdir_failure(path, e)
def open(
self,
@@ -270,45 +297,51 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
:param kwargs: Extra args
:return A file-like object from the filesystem
"""
- new_path = self._hook.pre_open(
- path, mode, block_size, cache_options, compression, **kwargs
- )
- if mode in ("w", "wb"):
- data_operation = FilesetDataOperation.OPEN_AND_WRITE
- elif mode in ("a", "ab"):
- data_operation = FilesetDataOperation.OPEN_AND_APPEND
- else:
- data_operation = FilesetDataOperation.OPEN
-
- decorated_open = self._with_exception_translation(data_operation)(
- self._operations.open
- )
try:
- result = decorated_open(
+ new_path = self._hook.pre_open(
+ path, mode, block_size, cache_options, compression, **kwargs
+ )
+ if mode in ("w", "wb"):
+ data_operation = FilesetDataOperation.OPEN_AND_WRITE
+ elif mode in ("a", "ab"):
+ data_operation = FilesetDataOperation.OPEN_AND_APPEND
+ else:
+ data_operation = FilesetDataOperation.OPEN
+
+ decorated_open = self._with_exception_translation(data_operation)(
+ self._operations.open
+ )
+ try:
+ result = decorated_open(
+ new_path,
+ mode,
+ block_size,
+ cache_options,
+ compression,
+ **kwargs,
+ )
+ except FileNotFoundError as e:
+ if mode in ("w", "wb", "x", "xb", "a", "ab"):
+ raise OSError(
+ f"Fileset is not found for path: {new_path} for
operation OPEN. This "
+ f"may be caused by fileset related metadata not found
or not in use "
+ f"in Gravitino,"
+ ) from e
+ raise
+ return self._hook.post_open(
new_path,
mode,
block_size,
cache_options,
compression,
+ result,
**kwargs,
)
- except FileNotFoundError as e:
- if mode in ("w", "wb", "x", "xb", "a", "ab"):
- raise OSError(
- f"Fileset is not found for path: {new_path} for operation
OPEN. This "
- f"may be caused by fileset related metadata not found or
not in use "
- f"in Gravitino,"
- ) from e
- raise
- return self._hook.post_open(
- new_path,
- mode,
- block_size,
- cache_options,
- compression,
- result,
- **kwargs,
- )
+ except Exception as e: # pylint: disable=broad-exception-caught
+ # Hook will either provide fallback value or re-raise the exception
+ return self._hook.on_open_failure(
+ path, mode, block_size, cache_options, compression, e, **kwargs
+ )
def mkdir(self, path, create_parents=True, **kwargs):
"""Make a directory.
@@ -318,42 +351,48 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
:param create_parents: Create parent directories if missing when set
to True
:param kwargs: Extra args
"""
- new_path = self._hook.pre_mkdir(path, create_parents, **kwargs)
try:
- self._operations.mkdir(new_path, create_parents, **kwargs)
- except (
- NoSuchCatalogException,
- CatalogNotInUseException,
- NoSuchFilesetException,
- NoSuchLocationNameException,
- ) as e:
- raise OSError(
- f"Fileset is not found for path: {new_path} for operation
MKDIRS. This "
- f"may be caused by fileset related metadata not found or not
in use "
- f"in Gravitino,"
- ) from e
- self._hook.post_mkdir(new_path, create_parents, **kwargs)
+ new_path = self._hook.pre_mkdir(path, create_parents, **kwargs)
+ try:
+ self._operations.mkdir(new_path, create_parents, **kwargs)
+ except (
+ NoSuchCatalogException,
+ CatalogNotInUseException,
+ NoSuchFilesetException,
+ NoSuchLocationNameException,
+ ) as e:
+ raise OSError(
+ f"Fileset is not found for path: {new_path} for operation
MKDIRS. This "
+ f"may be caused by fileset related metadata not found or
not in use "
+ f"in Gravitino,"
+ ) from e
+ self._hook.post_mkdir(new_path, create_parents, **kwargs)
+ except Exception as e: # pylint: disable=broad-exception-caught
+ self._hook.on_mkdir_failure(path, create_parents, e, **kwargs)
def makedirs(self, path, exist_ok=True):
"""Make a directory recursively.
:param path: Virtual fileset path
:param exist_ok: Continue if a directory already exists
"""
- new_path = self._hook.pre_makedirs(path, exist_ok)
try:
- self._operations.makedirs(new_path, exist_ok)
- except (
- NoSuchCatalogException,
- CatalogNotInUseException,
- NoSuchFilesetException,
- NoSuchLocationNameException,
- ) as e:
- raise OSError(
- f"Fileset is not found for path: {new_path} for operation
MKDIRS. This "
- f"may be caused by fileset related metadata not found or not
in use "
- f"in Gravitino,"
- ) from e
- self._hook.post_makedirs(new_path, exist_ok)
+ new_path = self._hook.pre_makedirs(path, exist_ok)
+ try:
+ self._operations.makedirs(new_path, exist_ok)
+ except (
+ NoSuchCatalogException,
+ CatalogNotInUseException,
+ NoSuchFilesetException,
+ NoSuchLocationNameException,
+ ) as e:
+ raise OSError(
+ f"Fileset is not found for path: {new_path} for operation
MKDIRS. This "
+ f"may be caused by fileset related metadata not found or
not in use "
+ f"in Gravitino,"
+ ) from e
+ self._hook.post_makedirs(new_path, exist_ok)
+ except Exception as e: # pylint: disable=broad-exception-caught
+ self._hook.on_makedirs_failure(path, exist_ok, e)
def created(self, path):
"""Return the created timestamp of a file as a datetime.datetime
@@ -361,30 +400,38 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
:param path: Virtual fileset path
:return Created time(datetime.datetime)
"""
- new_path = self._hook.pre_created(path)
- decorated_created = self._with_exception_translation(
- FilesetDataOperation.CREATED_TIME
- )(self._operations.created)
- result = decorated_created(new_path)
- return self._hook.post_created(
- new_path,
- result,
- )
+ try:
+ new_path = self._hook.pre_created(path)
+ decorated_created = self._with_exception_translation(
+ FilesetDataOperation.CREATED_TIME
+ )(self._operations.created)
+ result = decorated_created(new_path)
+ return self._hook.post_created(
+ new_path,
+ result,
+ )
+ except Exception as e: # pylint: disable=broad-exception-caught
+ # Hook will either provide fallback value or re-raise the exception
+ return self._hook.on_created_failure(path, e)
def modified(self, path):
"""Returns the modified time of the path file if it exists.
:param path: Virtual fileset path
:return Modified time(datetime.datetime)
"""
- new_path = self._hook.pre_modified(path)
- decorated_modified = self._with_exception_translation(
- FilesetDataOperation.MODIFIED_TIME
- )(self._operations.modified)
- result = decorated_modified(new_path)
- return self._hook.post_modified(
- new_path,
- result,
- )
+ try:
+ new_path = self._hook.pre_modified(path)
+ decorated_modified = self._with_exception_translation(
+ FilesetDataOperation.MODIFIED_TIME
+ )(self._operations.modified)
+ result = decorated_modified(new_path)
+ return self._hook.post_modified(
+ new_path,
+ result,
+ )
+ except Exception as e: # pylint: disable=broad-exception-caught
+ # Hook will either provide fallback value or re-raise the exception
+ return self._hook.on_modified_failure(path, e)
def cat_file(self, path, start=None, end=None, **kwargs):
"""Get the content of a file.
@@ -394,18 +441,22 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
:param kwargs: Extra args
:return File content
"""
- new_path = self._hook.pre_cat_file(path, start, end, **kwargs)
- decorated_cat_file = self._with_exception_translation(
- FilesetDataOperation.CAT_FILE
- )(self._operations.cat_file)
- result = decorated_cat_file(new_path, start, end, **kwargs)
- return self._hook.post_cat_file(
- new_path,
- start,
- end,
- result,
- **kwargs,
- )
+ try:
+ new_path = self._hook.pre_cat_file(path, start, end, **kwargs)
+ decorated_cat_file = self._with_exception_translation(
+ FilesetDataOperation.CAT_FILE
+ )(self._operations.cat_file)
+ result = decorated_cat_file(new_path, start, end, **kwargs)
+ return self._hook.post_cat_file(
+ new_path,
+ start,
+ end,
+ result,
+ **kwargs,
+ )
+ except Exception as e: # pylint: disable=broad-exception-caught
+ # Hook will either provide fallback value or re-raise the exception
+ return self._hook.on_cat_file_failure(path, start, end, e,
**kwargs)
def get_file(self, rpath, lpath, callback=None, outfile=None, **kwargs):
"""Copy single remote file to local.
@@ -415,18 +466,23 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
:param outfile: The output file path
:param kwargs: Extra args
"""
- new_rpath = self._hook.pre_get_file(rpath, lpath, callback, outfile,
**kwargs)
- decorated_get_file = self._with_exception_translation(
- FilesetDataOperation.GET_FILE_STATUS
- )(self._operations.get_file)
- decorated_get_file(
- new_rpath,
- lpath,
- callback,
- outfile,
- **kwargs,
- )
- self._hook.post_get_file(new_rpath, lpath, outfile, **kwargs)
+ try:
+ new_rpath = self._hook.pre_get_file(
+ rpath, lpath, callback, outfile, **kwargs
+ )
+ decorated_get_file = self._with_exception_translation(
+ FilesetDataOperation.GET_FILE_STATUS
+ )(self._operations.get_file)
+ decorated_get_file(
+ new_rpath,
+ lpath,
+ callback,
+ outfile,
+ **kwargs,
+ )
+ self._hook.post_get_file(new_rpath, lpath, outfile, **kwargs)
+ except Exception as e: # pylint: disable=broad-exception-caught
+ self._hook.on_get_file_failure(rpath, lpath, callback, outfile, e,
**kwargs)
def _get_hook_class(
self, options: Optional[Dict[str, str]]
diff --git a/clients/client-python/gravitino/filesystem/gvfs_hook.py
b/clients/client-python/gravitino/filesystem/gvfs_hook.py
index c8ac00249e..f98410c875 100644
--- a/clients/client-python/gravitino/filesystem/gvfs_hook.py
+++ b/clients/client-python/gravitino/filesystem/gvfs_hook.py
@@ -525,6 +525,279 @@ class GravitinoVirtualFileSystemHook(ABC):
The modification time to get.
"""
+ def on_ls_failure(self, path: str, exception: Exception, **kwargs):
+ """
+ Called when listing directory contents fails.
+
+ Args:
+ path: The path that failed to list contents for.
+ exception: The exception that caused the failure.
+ **kwargs: Additional arguments.
+
+ Returns:
+ The fallback entries list if available.
+
+ Raises:
+ Exception: Re-raises the exception if no fallback is available.
+ """
+ raise exception
+
+ def on_info_failure(self, path: str, exception: Exception, **kwargs):
+ """
+ Called when getting file info fails.
+
+ Args:
+ path: The path that failed to get info for.
+ exception: The exception that caused the failure.
+ **kwargs: Additional arguments.
+
+ Returns:
+ The fallback file info dict if available.
+
+ Raises:
+ Exception: Re-raises the exception if no fallback is available.
+ """
+ raise exception
+
+ def on_exists_failure(self, path: str, exception: Exception, **kwargs):
+ """
+ Called when checking file existence fails.
+
+ Args:
+ path: The path that failed to check existence for.
+ exception: The exception that caused the failure.
+ **kwargs: Additional arguments.
+
+ Returns:
+ The fallback boolean value if available.
+
+ Raises:
+ Exception: Re-raises the exception if no fallback is available.
+ """
+ raise exception
+
+ def on_open_failure(
+ self,
+ path: str,
+ mode: str,
+ block_size: int,
+ cache_options: dict,
+ compression: str,
+ exception: Exception,
+ **kwargs
+ ):
+ """
+ Called when opening a file fails.
+
+ Args:
+ path: The path of the file that failed to open.
+ mode: The mode to open the file.
+ block_size: The block size of the file.
+ cache_options: The cache options of the file.
+ compression: The compression of the file.
+ exception: The exception that caused the failure.
+ **kwargs: Additional arguments.
+
+ Returns:
+ The fallback file object if available.
+
+ Raises:
+ Exception: Re-raises the exception if no fallback is available.
+ """
+ raise exception
+
+ def on_cp_file_failure(self, src: str, dst: str, exception: Exception,
**kwargs):
+ """
+ Called when copying a file fails.
+
+ Args:
+ src: The source path that failed to copy.
+ dst: The destination path for the copy operation.
+ exception: The exception that caused the failure.
+ **kwargs: Additional arguments.
+
+ Raises:
+ Exception: Can re-raise the exception if no fallback is available.
+ """
+ raise exception
+
+ def on_mv_failure(
+ self,
+ src: str,
+ dst: str,
+ recursive: bool,
+ maxdepth: int,
+ exception: Exception,
+ **kwargs
+ ):
+ """
+ Called when moving a file or directory fails.
+
+ Args:
+ src: The source path that failed to be moved.
+ dst: The destination path for the move operation.
+ recursive: Whether the move was requested to be recursive.
+ maxdepth: The maximum depth to move.
+ exception: The exception that caused the failure.
+ **kwargs: Additional arguments.
+
+ Raises:
+ Exception: Can re-raise the exception if no fallback is available.
+ """
+ raise exception
+
+ def on_rm_failure(
+ self, path: str, recursive: bool, maxdepth: int, exception: Exception
+ ):
+ """
+ Called when deleting a file or directory fails.
+
+ Args:
+ path: The path that failed to be deleted.
+ recursive: Whether the deletion was requested to be recursive.
+ maxdepth: The maximum depth to delete.
+ exception: The exception that caused the failure.
+
+ Raises:
+ Exception: Re-raises the exception if no fallback is available.
+ """
+ raise exception
+
+ def on_rm_file_failure(self, path: str, exception: Exception):
+ """
+ Called when deleting a file fails.
+
+ Args:
+ path: The path that failed to be deleted.
+ exception: The exception that caused the failure.
+
+ Raises:
+ Exception: Can re-raise the exception if no fallback is available.
+ """
+ raise exception
+
+ def on_rmdir_failure(self, path: str, exception: Exception):
+ """
+ Called when deleting a directory fails.
+
+ Args:
+ path: The path that failed to be deleted.
+ exception: The exception that caused the failure.
+
+ Raises:
+ Exception: Can re-raise the exception if no fallback is available.
+ """
+ raise exception
+
+ def on_mkdir_failure(
+ self, path: str, create_parents: bool, exception: Exception, **kwargs
+ ):
+ """
+ Called when creating a directory fails.
+
+ Args:
+ path: The path that failed to create directory for.
+ create_parents: Whether to create parent directories.
+ exception: The exception that caused the failure.
+ **kwargs: Additional arguments.
+
+ Raises:
+ Exception: Re-raises the exception if no fallback is available.
+ """
+ raise exception
+
+ def on_makedirs_failure(self, path: str, exist_ok: bool, exception:
Exception):
+ """
+ Called when creating directories fails.
+
+ Args:
+ path: The path that failed to create directories for.
+ exist_ok: Whether it's okay if the directory already exists.
+ exception: The exception that caused the failure.
+
+ Raises:
+ Exception: Can re-raise the exception if no fallback is available.
+ """
+ raise exception
+
+ def on_cat_file_failure(
+ self, path: str, start: int, end: int, exception: Exception, **kwargs
+ ):
+ """
+ Called when reading file contents fails.
+
+ Args:
+ path: The path of the file that failed to read.
+ start: The start position to read.
+ end: The end position to read.
+ exception: The exception that caused the failure.
+ **kwargs: Additional arguments.
+
+ Returns:
+ The fallback content if available.
+
+ Raises:
+ Exception: Re-raises the exception if no fallback is available.
+ """
+ raise exception
+
+ def on_get_file_failure(
+ self,
+ rpath: str,
+ lpath: str,
+ callback: Callback,
+ outfile: str,
+ exception: Exception,
+ **kwargs
+ ):
+ """
+ Called when downloading a file fails.
+
+ Args:
+ rpath: The remote path that failed to download.
+ lpath: The local path for the file.
+ callback: The callback to call.
+ outfile: The output file.
+ exception: The exception that caused the failure.
+ **kwargs: Additional arguments.
+
+ Raises:
+ Exception: Can re-raise the exception if no fallback is available.
+ """
+ raise exception
+
+ def on_created_failure(self, path: str, exception: Exception):
+ """
+ Called when getting file creation time fails.
+
+ Args:
+ path: The path that failed to get creation time for.
+ exception: The exception that caused the failure.
+
+ Returns:
+ The fallback creation time if available.
+
+ Raises:
+ Exception: Re-raises the exception if no fallback is available.
+ """
+ raise exception
+
+ def on_modified_failure(self, path: str, exception: Exception):
+ """
+ Called when getting file modification time fails.
+
+ Args:
+ path: The path that failed to get modification time for.
+ exception: The exception that caused the failure.
+
+ Returns:
+ The fallback modification time if available.
+
+ Raises:
+ Exception: Re-raises the exception if no fallback is available.
+ """
+ raise exception
+
class NoOpHook(GravitinoVirtualFileSystemHook):
"""
diff --git a/clients/client-python/tests/unittests/test_gvfs_with_hook.py
b/clients/client-python/tests/unittests/test_gvfs_with_hook.py
index 150b6ce07a..900c5ac065 100644
--- a/clients/client-python/tests/unittests/test_gvfs_with_hook.py
+++ b/clients/client-python/tests/unittests/test_gvfs_with_hook.py
@@ -66,6 +66,22 @@ class MockGVFSHook(GravitinoVirtualFileSystemHook):
self.post_get_file_called = False
self.post_created_called = False
self.post_modified_called = False
+ # Failure callback tracking
+ self.on_ls_failure_called = False
+ self.on_info_failure_called = False
+ self.on_exists_failure_called = False
+ self.on_open_failure_called = False
+ self.on_cp_file_failure_called = False
+ self.on_mv_failure_called = False
+ self.on_rm_failure_called = False
+ self.on_rm_file_failure_called = False
+ self.on_rmdir_failure_called = False
+ self.on_mkdir_failure_called = False
+ self.on_makedirs_failure_called = False
+ self.on_cat_file_failure_called = False
+ self.on_get_file_failure_called = False
+ self.on_created_failure_called = False
+ self.on_modified_failure_called = False
def set_operations_context(self, operations):
self.set_operations_context_called = True
@@ -216,6 +232,102 @@ class MockGVFSHook(GravitinoVirtualFileSystemHook):
self.post_created_called = True
return created
+ # Failure callback implementations
+ def on_ls_failure(self, path: str, exception: Exception, **kwargs):
+ self.on_ls_failure_called = True
+ super().on_ls_failure(path, exception, **kwargs)
+
+ def on_info_failure(self, path: str, exception: Exception, **kwargs):
+ self.on_info_failure_called = True
+ super().on_info_failure(path, exception, **kwargs)
+
+ def on_exists_failure(self, path: str, exception: Exception, **kwargs):
+ self.on_exists_failure_called = True
+ super().on_exists_failure(path, exception, **kwargs)
+
+ def on_open_failure(
+ self,
+ path: str,
+ mode: str,
+ block_size: int,
+ cache_options: dict,
+ compression: str,
+ exception: Exception,
+ **kwargs,
+ ):
+ self.on_open_failure_called = True
+ super().on_open_failure(
+ path, mode, block_size, cache_options, compression, exception,
**kwargs
+ )
+
+ def on_cp_file_failure(self, src: str, dst: str, exception: Exception,
**kwargs):
+ self.on_cp_file_failure_called = True
+ super().on_cp_file_failure(src, dst, exception, **kwargs)
+
+ def on_mv_failure(
+ self,
+ src: str,
+ dst: str,
+ recursive: bool,
+ maxdepth: int,
+ exception: Exception,
+ **kwargs,
+ ):
+ self.on_mv_failure_called = True
+ super().on_mv_failure(src, dst, recursive, maxdepth, exception,
**kwargs)
+
+ def on_rm_failure(
+ self, path: str, recursive: bool, maxdepth: int, exception: Exception
+ ):
+ self.on_rm_failure_called = True
+ super().on_rm_failure(path, recursive, maxdepth, exception)
+
+ def on_rm_file_failure(self, path: str, exception: Exception):
+ self.on_rm_file_failure_called = True
+ super().on_rm_file_failure(path, exception)
+
+ def on_rmdir_failure(self, path: str, exception: Exception):
+ self.on_rmdir_failure_called = True
+ super().on_rmdir_failure(path, exception)
+
+ def on_mkdir_failure(
+ self, path: str, create_parents: bool, exception: Exception, **kwargs
+ ):
+ self.on_mkdir_failure_called = True
+ super().on_mkdir_failure(path, create_parents, exception, **kwargs)
+
+ def on_makedirs_failure(self, path: str, exist_ok: bool, exception:
Exception):
+ self.on_makedirs_failure_called = True
+ super().on_makedirs_failure(path, exist_ok, exception)
+
+ def on_cat_file_failure(
+ self, path: str, start: int, end: int, exception: Exception, **kwargs
+ ):
+ self.on_cat_file_failure_called = True
+ super().on_cat_file_failure(path, start, end, exception, **kwargs)
+
+ def on_get_file_failure(
+ self,
+ rpath: str,
+ lpath: str,
+ callback: Callback,
+ outfile: str,
+ exception: Exception,
+ **kwargs,
+ ):
+ self.on_get_file_failure_called = True
+ super().on_get_file_failure(
+ rpath, lpath, callback, outfile, exception, **kwargs
+ )
+
+ def on_created_failure(self, path: str, exception: Exception):
+ self.on_created_failure_called = True
+ super().on_created_failure(path, exception)
+
+ def on_modified_failure(self, path: str, exception: Exception):
+ self.on_modified_failure_called = True
+ super().on_modified_failure(path, exception)
+
@patch(
"gravitino.client.generic_fileset.GenericFileset.get_credentials",
@@ -395,3 +507,81 @@ class TestGVFSHook(unittest.TestCase):
fs.rmdir(f"{fileset_virtual_path}/test_dir")
self.assertTrue(fs.hook.rmdir_called)
self.assertTrue(fs.hook.post_rmdir_called)
+
+ def test_failure_callbacks(self, *mock_method):
+ """Test that failure callbacks are invoked when operations fail."""
+ fileset_storage_location = f"{self._fileset_dir}/test_failure_location"
+ fileset_virtual_path =
"fileset/fileset_catalog/tmp/test_failure_location"
+
+ with patch.multiple(
+ "gravitino.client.fileset_catalog.FilesetCatalog",
+ load_fileset=mock.MagicMock(
+ return_value=mock_base.mock_load_fileset(
+ "fileset", fileset_storage_location
+ )
+ ),
+
get_file_location=mock.MagicMock(return_value=fileset_storage_location),
+ ):
+ fs = gvfs.GravitinoVirtualFileSystem(
+ server_uri="http://localhost:9090",
+ metalake_name="metalake_demo",
+ skip_instance_catch=True,
+ options={
+ GVFSConfig.GVFS_FILESYSTEM_HOOK:
"tests.unittests.test_gvfs_with_hook.MockGVFSHook"
+ },
+ )
+
+ # Test on_ls_failure
+ with patch.object(
+ fs.operations, "ls", side_effect=RuntimeError("ls failed")
+ ):
+ with self.assertRaises(RuntimeError):
+ fs.ls(fileset_virtual_path)
+ self.assertTrue(
+ fs.hook.on_ls_failure_called,
+ "on_ls_failure should be called when ls operation fails",
+ )
+
+ # Test on_info_failure
+ with patch.object(
+ fs.operations, "info", side_effect=RuntimeError("info failed")
+ ):
+ with self.assertRaises(RuntimeError):
+ fs.info(fileset_virtual_path)
+ self.assertTrue(
+ fs.hook.on_info_failure_called,
+ "on_info_failure should be called when info operation
fails",
+ )
+
+ # Test on_open_failure
+ with patch.object(
+ fs.operations, "open", side_effect=RuntimeError("open failed")
+ ):
+ with self.assertRaises(RuntimeError):
+ fs.open(fileset_virtual_path, mode="rb")
+ self.assertTrue(
+ fs.hook.on_open_failure_called,
+ "on_open_failure should be called when open operation
fails",
+ )
+
+ # Test on_mkdir_failure
+ with patch.object(
+ fs.operations, "mkdir", side_effect=RuntimeError("mkdir
failed")
+ ):
+ with self.assertRaises(RuntimeError):
+ fs.mkdir(fileset_virtual_path)
+ self.assertTrue(
+ fs.hook.on_mkdir_failure_called,
+ "on_mkdir_failure should be called when mkdir operation
fails",
+ )
+
+ # Test on_rm_file_failure
+ with patch.object(
+ fs.operations, "rm_file", side_effect=RuntimeError("rm_file
failed")
+ ):
+ with self.assertRaises(RuntimeError):
+ fs.rm_file(fileset_virtual_path)
+ self.assertTrue(
+ fs.hook.on_rm_file_failure_called,
+ "on_rm_file_failure should be called when rm_file
operation fails",
+ )