Copilot commented on code in PR #7001:
URL: https://github.com/apache/paimon/pull/7001#discussion_r2681323603
##########
paimon-python/pypaimon/common/file_io.py:
##########
@@ -211,51 +219,67 @@ def list_directories(self, path: str):
return [info for info in file_infos if info.type ==
pyarrow.fs.FileType.Directory]
def exists(self, path: str) -> bool:
- try:
- path_str = self.to_filesystem_path(path)
- file_info = self.filesystem.get_file_info([path_str])[0]
- result = file_info.type != pyarrow.fs.FileType.NotFound
- return result
- except Exception:
- return False
+ path_str = self.to_filesystem_path(path)
+ file_info = self.filesystem.get_file_info([path_str])[0]
+ return file_info.type != pyarrow.fs.FileType.NotFound
def delete(self, path: str, recursive: bool = False) -> bool:
- try:
- path_str = self.to_filesystem_path(path)
- file_info = self.filesystem.get_file_info([path_str])[0]
- if file_info.type == pyarrow.fs.FileType.Directory:
- if recursive:
- self.filesystem.delete_dir_contents(path_str)
- else:
- self.filesystem.delete_dir(path_str)
- else:
- self.filesystem.delete_file(path_str)
- return True
- except Exception as e:
- self.logger.warning(f"Failed to delete {path}: {e}")
+ path_str = self.to_filesystem_path(path)
+ file_info = self.filesystem.get_file_info([path_str])[0]
+
+ if file_info.type == pyarrow.fs.FileType.NotFound:
return False
+
+ if file_info.type == pyarrow.fs.FileType.Directory:
+ if not recursive:
+ selector = pyarrow.fs.FileSelector(path_str, recursive=False,
allow_not_found=True)
+ dir_contents = self.filesystem.get_file_info(selector)
+ if len(dir_contents) > 0:
+ raise OSError(f"Directory {path_str} is not empty")
+ if recursive:
+ self.filesystem.delete_dir_contents(path_str)
+ self.filesystem.delete_dir(path_str)
+ else:
+ self.filesystem.delete_dir(path_str)
+ else:
+ self.filesystem.delete_file(path_str)
+ return True
def mkdirs(self, path: str) -> bool:
- try:
- path_str = self.to_filesystem_path(path)
- self.filesystem.create_dir(path_str, recursive=True)
+ path_str = self.to_filesystem_path(path)
+ file_info = self.filesystem.get_file_info([path_str])[0]
+
+ if file_info.type == pyarrow.fs.FileType.Directory:
return True
- except Exception as e:
- self.logger.warning(f"Failed to create directory {path}: {e}")
- return False
+ elif file_info.type == pyarrow.fs.FileType.File:
+ raise FileExistsError(f"Path exists but is not a directory:
{path_str}")
+
+ self.filesystem.create_dir(path_str, recursive=True)
+ return True
def rename(self, src: str, dst: str) -> bool:
- try:
- dst_str = self.to_filesystem_path(dst)
- dst_parent = Path(dst_str).parent
- if str(dst_parent) and not self.exists(str(dst_parent)):
- self.mkdirs(str(dst_parent))
+ src_str = self.to_filesystem_path(src)
+ dst_str = self.to_filesystem_path(dst)
+ dst_parent = Path(dst_str).parent
+
+ if str(dst_parent) and not self.exists(str(dst_parent)):
+ self.mkdirs(str(dst_parent))
- src_str = self.to_filesystem_path(src)
- self.filesystem.move(src_str, dst_str)
- return True
- except Exception as e:
- self.logger.warning(f"Failed to rename {src} to {dst}: {e}")
+ try:
+ with self._RENAME_LOCK:
+ dst_file_info = self.filesystem.get_file_info([dst_str])[0]
+ if dst_file_info.type != pyarrow.fs.FileType.NotFound:
+ if dst_file_info.type == pyarrow.fs.FileType.File:
+ return False
+ src_name = Path(src_str).name
+ dst_str = f"{dst_str.rstrip('/')}/{src_name}"
+ final_dst_info =
self.filesystem.get_file_info([dst_str])[0]
+ if final_dst_info.type != pyarrow.fs.FileType.NotFound:
+ return False
+
+ self.filesystem.move(src_str, dst_str)
+ return True
+ except (FileNotFoundError, PermissionError, OSError):
return False
Review Comment:
The rename operation has a race condition. The lock only protects the
check-and-move within a single process, but does not provide atomicity across
multiple processes or in distributed environments. Between checking if the
destination exists (lines 270-278) and performing the move (line 280), another
process could create the destination file, leading to either overwrite or
failure depending on the underlying filesystem behavior. Consider using the
filesystem's atomic move operation if available, or document this limitation
clearly.
##########
paimon-python/pypaimon/common/file_io.py:
##########
@@ -329,8 +353,13 @@ def copy_file(self, source_path: str, target_path: str,
overwrite: bool = False)
if not overwrite and self.exists(target_path):
raise FileExistsError(f"Target file {target_path} already exists
and overwrite=False")
- source_str = self.to_filesystem_path(source_path)
target_str = self.to_filesystem_path(target_path)
+ target_parent = Path(target_str).parent
+
+ if str(target_parent) and not self.exists(str(target_parent)):
+ self.mkdirs(str(target_parent))
+
+ source_str = self.to_filesystem_path(source_path)
Review Comment:
The source path is converted to filesystem path (line 362) after the target
parent directory is potentially created (lines 356-360). If the source path is
invalid or to_filesystem_path raises an exception, the parent directory for the
target will have already been created unnecessarily. Consider converting both
source and target paths first before any filesystem operations.
```suggestion
# Convert both source and target paths before performing any
filesystem modifications
source_str = self.to_filesystem_path(source_path)
target_str = self.to_filesystem_path(target_path)
target_parent = Path(target_str).parent
if str(target_parent) and not self.exists(str(target_parent)):
self.mkdirs(str(target_parent))
```
##########
paimon-python/pypaimon/common/file_io.py:
##########
@@ -211,51 +219,67 @@ def list_directories(self, path: str):
return [info for info in file_infos if info.type ==
pyarrow.fs.FileType.Directory]
def exists(self, path: str) -> bool:
- try:
- path_str = self.to_filesystem_path(path)
- file_info = self.filesystem.get_file_info([path_str])[0]
- result = file_info.type != pyarrow.fs.FileType.NotFound
- return result
- except Exception:
- return False
+ path_str = self.to_filesystem_path(path)
+ file_info = self.filesystem.get_file_info([path_str])[0]
+ return file_info.type != pyarrow.fs.FileType.NotFound
def delete(self, path: str, recursive: bool = False) -> bool:
- try:
- path_str = self.to_filesystem_path(path)
- file_info = self.filesystem.get_file_info([path_str])[0]
- if file_info.type == pyarrow.fs.FileType.Directory:
- if recursive:
- self.filesystem.delete_dir_contents(path_str)
- else:
- self.filesystem.delete_dir(path_str)
- else:
- self.filesystem.delete_file(path_str)
- return True
- except Exception as e:
- self.logger.warning(f"Failed to delete {path}: {e}")
+ path_str = self.to_filesystem_path(path)
+ file_info = self.filesystem.get_file_info([path_str])[0]
+
+ if file_info.type == pyarrow.fs.FileType.NotFound:
return False
+
+ if file_info.type == pyarrow.fs.FileType.Directory:
+ if not recursive:
+ selector = pyarrow.fs.FileSelector(path_str, recursive=False,
allow_not_found=True)
+ dir_contents = self.filesystem.get_file_info(selector)
+ if len(dir_contents) > 0:
+ raise OSError(f"Directory {path_str} is not empty")
+ if recursive:
+ self.filesystem.delete_dir_contents(path_str)
+ self.filesystem.delete_dir(path_str)
+ else:
+ self.filesystem.delete_dir(path_str)
+ else:
+ self.filesystem.delete_file(path_str)
+ return True
def mkdirs(self, path: str) -> bool:
- try:
- path_str = self.to_filesystem_path(path)
- self.filesystem.create_dir(path_str, recursive=True)
+ path_str = self.to_filesystem_path(path)
+ file_info = self.filesystem.get_file_info([path_str])[0]
+
+ if file_info.type == pyarrow.fs.FileType.Directory:
return True
- except Exception as e:
- self.logger.warning(f"Failed to create directory {path}: {e}")
- return False
+ elif file_info.type == pyarrow.fs.FileType.File:
+ raise FileExistsError(f"Path exists but is not a directory:
{path_str}")
Review Comment:
The error message uses path_str (filesystem path) instead of the original
path parameter. For consistency with FileNotFoundError at line 208 which uses
the original path, consider using the original path here too.
```suggestion
raise FileExistsError(f"Path exists but is not a directory:
{path}")
```
##########
paimon-python/pypaimon/common/file_io.py:
##########
@@ -39,6 +40,8 @@
class FileIO:
+ _RENAME_LOCK = threading.Lock()
+
def __init__(self, path: str, catalog_options: Options):
self.properties = catalog_options
self.logger = logging.getLogger(__name__)
Review Comment:
The class-level lock _RENAME_LOCK is shared across all FileIO instances.
This means that rename operations on different FileIO instances (e.g.,
different warehouses or filesystems) will block each other unnecessarily.
Consider using an instance-level lock instead, or document this limitation.
```suggestion
# Class-level sentinel; actual lock is per-instance (see __init__).
_RENAME_LOCK = None
def __init__(self, path: str, catalog_options: Options):
self.properties = catalog_options
self.logger = logging.getLogger(__name__)
# Use an instance-level lock to avoid cross-instance contention on
rename.
self._RENAME_LOCK = threading.Lock()
```
##########
paimon-python/pypaimon/common/file_io.py:
##########
@@ -211,51 +219,67 @@ def list_directories(self, path: str):
return [info for info in file_infos if info.type ==
pyarrow.fs.FileType.Directory]
def exists(self, path: str) -> bool:
- try:
- path_str = self.to_filesystem_path(path)
- file_info = self.filesystem.get_file_info([path_str])[0]
- result = file_info.type != pyarrow.fs.FileType.NotFound
- return result
- except Exception:
- return False
+ path_str = self.to_filesystem_path(path)
+ file_info = self.filesystem.get_file_info([path_str])[0]
+ return file_info.type != pyarrow.fs.FileType.NotFound
def delete(self, path: str, recursive: bool = False) -> bool:
- try:
- path_str = self.to_filesystem_path(path)
- file_info = self.filesystem.get_file_info([path_str])[0]
- if file_info.type == pyarrow.fs.FileType.Directory:
- if recursive:
- self.filesystem.delete_dir_contents(path_str)
- else:
- self.filesystem.delete_dir(path_str)
- else:
- self.filesystem.delete_file(path_str)
- return True
- except Exception as e:
- self.logger.warning(f"Failed to delete {path}: {e}")
+ path_str = self.to_filesystem_path(path)
+ file_info = self.filesystem.get_file_info([path_str])[0]
+
+ if file_info.type == pyarrow.fs.FileType.NotFound:
return False
+
+ if file_info.type == pyarrow.fs.FileType.Directory:
+ if not recursive:
+ selector = pyarrow.fs.FileSelector(path_str, recursive=False,
allow_not_found=True)
+ dir_contents = self.filesystem.get_file_info(selector)
+ if len(dir_contents) > 0:
+ raise OSError(f"Directory {path_str} is not empty")
+ if recursive:
+ self.filesystem.delete_dir_contents(path_str)
+ self.filesystem.delete_dir(path_str)
+ else:
+ self.filesystem.delete_dir(path_str)
+ else:
+ self.filesystem.delete_file(path_str)
+ return True
def mkdirs(self, path: str) -> bool:
- try:
- path_str = self.to_filesystem_path(path)
- self.filesystem.create_dir(path_str, recursive=True)
+ path_str = self.to_filesystem_path(path)
+ file_info = self.filesystem.get_file_info([path_str])[0]
+
+ if file_info.type == pyarrow.fs.FileType.Directory:
return True
- except Exception as e:
- self.logger.warning(f"Failed to create directory {path}: {e}")
- return False
+ elif file_info.type == pyarrow.fs.FileType.File:
+ raise FileExistsError(f"Path exists but is not a directory:
{path_str}")
+
+ self.filesystem.create_dir(path_str, recursive=True)
+ return True
def rename(self, src: str, dst: str) -> bool:
- try:
- dst_str = self.to_filesystem_path(dst)
- dst_parent = Path(dst_str).parent
- if str(dst_parent) and not self.exists(str(dst_parent)):
- self.mkdirs(str(dst_parent))
+ src_str = self.to_filesystem_path(src)
+ dst_str = self.to_filesystem_path(dst)
+ dst_parent = Path(dst_str).parent
+
+ if str(dst_parent) and not self.exists(str(dst_parent)):
+ self.mkdirs(str(dst_parent))
- src_str = self.to_filesystem_path(src)
- self.filesystem.move(src_str, dst_str)
- return True
- except Exception as e:
- self.logger.warning(f"Failed to rename {src} to {dst}: {e}")
+ try:
+ with self._RENAME_LOCK:
+ dst_file_info = self.filesystem.get_file_info([dst_str])[0]
+ if dst_file_info.type != pyarrow.fs.FileType.NotFound:
+ if dst_file_info.type == pyarrow.fs.FileType.File:
+ return False
+ src_name = Path(src_str).name
+ dst_str = f"{dst_str.rstrip('/')}/{src_name}"
+ final_dst_info =
self.filesystem.get_file_info([dst_str])[0]
+ if final_dst_info.type != pyarrow.fs.FileType.NotFound:
+ return False
+
+ self.filesystem.move(src_str, dst_str)
+ return True
+ except (FileNotFoundError, PermissionError, OSError):
return False
Review Comment:
The rename method catches OSError and returns False (line 282-283), but
OSErrors from the exists() and mkdirs() calls (lines 265-266) that occur before
the try block are not caught. This creates inconsistent error handling - some
OSErrors are propagated while others are swallowed and return False. Either
move the exists/mkdirs calls inside the try block or allow all OSErrors to
propagate for consistency.
##########
paimon-python/pypaimon/common/file_io.py:
##########
@@ -211,51 +219,67 @@ def list_directories(self, path: str):
return [info for info in file_infos if info.type ==
pyarrow.fs.FileType.Directory]
def exists(self, path: str) -> bool:
- try:
- path_str = self.to_filesystem_path(path)
- file_info = self.filesystem.get_file_info([path_str])[0]
- result = file_info.type != pyarrow.fs.FileType.NotFound
- return result
- except Exception:
- return False
+ path_str = self.to_filesystem_path(path)
+ file_info = self.filesystem.get_file_info([path_str])[0]
+ return file_info.type != pyarrow.fs.FileType.NotFound
def delete(self, path: str, recursive: bool = False) -> bool:
- try:
- path_str = self.to_filesystem_path(path)
- file_info = self.filesystem.get_file_info([path_str])[0]
- if file_info.type == pyarrow.fs.FileType.Directory:
- if recursive:
- self.filesystem.delete_dir_contents(path_str)
- else:
- self.filesystem.delete_dir(path_str)
- else:
- self.filesystem.delete_file(path_str)
- return True
- except Exception as e:
- self.logger.warning(f"Failed to delete {path}: {e}")
+ path_str = self.to_filesystem_path(path)
+ file_info = self.filesystem.get_file_info([path_str])[0]
+
+ if file_info.type == pyarrow.fs.FileType.NotFound:
return False
+
+ if file_info.type == pyarrow.fs.FileType.Directory:
+ if not recursive:
+ selector = pyarrow.fs.FileSelector(path_str, recursive=False,
allow_not_found=True)
+ dir_contents = self.filesystem.get_file_info(selector)
+ if len(dir_contents) > 0:
+ raise OSError(f"Directory {path_str} is not empty")
+ if recursive:
+ self.filesystem.delete_dir_contents(path_str)
+ self.filesystem.delete_dir(path_str)
+ else:
+ self.filesystem.delete_dir(path_str)
+ else:
+ self.filesystem.delete_file(path_str)
+ return True
def mkdirs(self, path: str) -> bool:
- try:
- path_str = self.to_filesystem_path(path)
- self.filesystem.create_dir(path_str, recursive=True)
+ path_str = self.to_filesystem_path(path)
+ file_info = self.filesystem.get_file_info([path_str])[0]
+
+ if file_info.type == pyarrow.fs.FileType.Directory:
return True
- except Exception as e:
- self.logger.warning(f"Failed to create directory {path}: {e}")
- return False
+ elif file_info.type == pyarrow.fs.FileType.File:
+ raise FileExistsError(f"Path exists but is not a directory:
{path_str}")
+
+ self.filesystem.create_dir(path_str, recursive=True)
+ return True
def rename(self, src: str, dst: str) -> bool:
- try:
- dst_str = self.to_filesystem_path(dst)
- dst_parent = Path(dst_str).parent
- if str(dst_parent) and not self.exists(str(dst_parent)):
- self.mkdirs(str(dst_parent))
+ src_str = self.to_filesystem_path(src)
+ dst_str = self.to_filesystem_path(dst)
+ dst_parent = Path(dst_str).parent
+
+ if str(dst_parent) and not self.exists(str(dst_parent)):
+ self.mkdirs(str(dst_parent))
- src_str = self.to_filesystem_path(src)
- self.filesystem.move(src_str, dst_str)
- return True
- except Exception as e:
- self.logger.warning(f"Failed to rename {src} to {dst}: {e}")
+ try:
+ with self._RENAME_LOCK:
+ dst_file_info = self.filesystem.get_file_info([dst_str])[0]
+ if dst_file_info.type != pyarrow.fs.FileType.NotFound:
+ if dst_file_info.type == pyarrow.fs.FileType.File:
+ return False
+ src_name = Path(src_str).name
+ dst_str = f"{dst_str.rstrip('/')}/{src_name}"
Review Comment:
When destination is a directory, the code constructs a new destination path
by appending the source filename (lines 274-275). However, this logic assumes
Unix-style path separators and doesn't use Path.joinpath or os.path.join. For
cross-platform compatibility, consider using Path(dst_str) / src_name or
ensuring consistent path handling.
```suggestion
dst_path = Path(dst_str) / src_name
dst_str = str(dst_path)
```
##########
paimon-python/pypaimon/common/file_io.py:
##########
@@ -329,8 +353,13 @@ def copy_file(self, source_path: str, target_path: str,
overwrite: bool = False)
if not overwrite and self.exists(target_path):
raise FileExistsError(f"Target file {target_path} already exists
and overwrite=False")
Review Comment:
The copy_file method checks exists(target_path) which can now raise OSError
(e.g., permission errors) instead of returning False. This means that when
overwrite=False and there's a permission error checking the target path, it
will raise OSError instead of the expected FileExistsError. Consider catching
OSError from exists() and re-raising as a more appropriate exception, or
document this behavior change.
```suggestion
if not overwrite:
try:
target_exists = self.exists(target_path)
except OSError as e:
# Normalize filesystem errors during existence check to
FileExistsError
# so callers see a consistent error when overwrite=False.
raise FileExistsError(
f"Target file {target_path} already exists and
overwrite=False"
) from e
if target_exists:
raise FileExistsError(f"Target file {target_path} already
exists and overwrite=False")
```
##########
paimon-python/pypaimon/common/file_io.py:
##########
@@ -211,51 +219,67 @@ def list_directories(self, path: str):
return [info for info in file_infos if info.type ==
pyarrow.fs.FileType.Directory]
def exists(self, path: str) -> bool:
- try:
- path_str = self.to_filesystem_path(path)
- file_info = self.filesystem.get_file_info([path_str])[0]
- result = file_info.type != pyarrow.fs.FileType.NotFound
- return result
- except Exception:
- return False
+ path_str = self.to_filesystem_path(path)
+ file_info = self.filesystem.get_file_info([path_str])[0]
+ return file_info.type != pyarrow.fs.FileType.NotFound
def delete(self, path: str, recursive: bool = False) -> bool:
- try:
- path_str = self.to_filesystem_path(path)
- file_info = self.filesystem.get_file_info([path_str])[0]
- if file_info.type == pyarrow.fs.FileType.Directory:
- if recursive:
- self.filesystem.delete_dir_contents(path_str)
- else:
- self.filesystem.delete_dir(path_str)
- else:
- self.filesystem.delete_file(path_str)
- return True
- except Exception as e:
- self.logger.warning(f"Failed to delete {path}: {e}")
+ path_str = self.to_filesystem_path(path)
+ file_info = self.filesystem.get_file_info([path_str])[0]
+
+ if file_info.type == pyarrow.fs.FileType.NotFound:
return False
+
+ if file_info.type == pyarrow.fs.FileType.Directory:
+ if not recursive:
+ selector = pyarrow.fs.FileSelector(path_str, recursive=False,
allow_not_found=True)
+ dir_contents = self.filesystem.get_file_info(selector)
+ if len(dir_contents) > 0:
+ raise OSError(f"Directory {path_str} is not empty")
Review Comment:
The error message uses path_str (filesystem path) instead of the original
path parameter. For consistency with the FileNotFoundError message at line 208
which uses the original path, consider using the original path here too, or
update both to use path_str consistently.
```suggestion
raise OSError(f"Directory {path} is not empty")
```
##########
paimon-python/pypaimon/common/file_io.py:
##########
@@ -199,7 +202,12 @@ def new_output_stream(self, path: str):
def get_file_status(self, path: str):
path_str = self.to_filesystem_path(path)
file_infos = self.filesystem.get_file_info([path_str])
- return file_infos[0]
+ file_info = file_infos[0]
+
+ if file_info.type == pyarrow.fs.FileType.NotFound:
+ raise FileNotFoundError(f"File {path} does not exist")
Review Comment:
The error message uses the original path parameter instead of the
filesystem-transformed path_str. This could be confusing when debugging issues
with path transformations, especially for S3 or other non-local filesystems
where the internal path representation differs from the user-provided path.
Consider including path_str in the error message for better diagnostics.
```suggestion
raise FileNotFoundError(f"File {path} (resolved as {path_str})
does not exist")
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]