discivigour commented on code in PR #7001:
URL: https://github.com/apache/paimon/pull/7001#discussion_r2681318849
##########
paimon-python/pypaimon/common/file_io.py:
##########
@@ -211,51 +219,67 @@ def list_directories(self, path: str):
return [info for info in file_infos if info.type ==
pyarrow.fs.FileType.Directory]
def exists(self, path: str) -> bool:
- try:
- path_str = self.to_filesystem_path(path)
- file_info = self.filesystem.get_file_info([path_str])[0]
- result = file_info.type != pyarrow.fs.FileType.NotFound
- return result
- except Exception:
- return False
+ path_str = self.to_filesystem_path(path)
+ file_info = self.filesystem.get_file_info([path_str])[0]
+ return file_info.type != pyarrow.fs.FileType.NotFound
def delete(self, path: str, recursive: bool = False) -> bool:
- try:
- path_str = self.to_filesystem_path(path)
- file_info = self.filesystem.get_file_info([path_str])[0]
- if file_info.type == pyarrow.fs.FileType.Directory:
- if recursive:
- self.filesystem.delete_dir_contents(path_str)
- else:
- self.filesystem.delete_dir(path_str)
- else:
- self.filesystem.delete_file(path_str)
- return True
- except Exception as e:
- self.logger.warning(f"Failed to delete {path}: {e}")
+ path_str = self.to_filesystem_path(path)
+ file_info = self.filesystem.get_file_info([path_str])[0]
+
+ if file_info.type == pyarrow.fs.FileType.NotFound:
return False
+
+ if file_info.type == pyarrow.fs.FileType.Directory:
+ if not recursive:
+ selector = pyarrow.fs.FileSelector(path_str, recursive=False,
allow_not_found=True)
+ dir_contents = self.filesystem.get_file_info(selector)
+ if len(dir_contents) > 0:
+ raise OSError(f"Directory {path_str} is not empty")
+ if recursive:
+ self.filesystem.delete_dir_contents(path_str)
+ self.filesystem.delete_dir(path_str)
+ else:
+ self.filesystem.delete_dir(path_str)
+ else:
+ self.filesystem.delete_file(path_str)
+ return True
def mkdirs(self, path: str) -> bool:
- try:
- path_str = self.to_filesystem_path(path)
- self.filesystem.create_dir(path_str, recursive=True)
+ path_str = self.to_filesystem_path(path)
+ file_info = self.filesystem.get_file_info([path_str])[0]
+
+ if file_info.type == pyarrow.fs.FileType.Directory:
return True
- except Exception as e:
- self.logger.warning(f"Failed to create directory {path}: {e}")
- return False
+ elif file_info.type == pyarrow.fs.FileType.File:
+ raise FileExistsError(f"Path exists but is not a directory:
{path_str}")
+
+ self.filesystem.create_dir(path_str, recursive=True)
+ return True
def rename(self, src: str, dst: str) -> bool:
- try:
- dst_str = self.to_filesystem_path(dst)
- dst_parent = Path(dst_str).parent
- if str(dst_parent) and not self.exists(str(dst_parent)):
- self.mkdirs(str(dst_parent))
+ src_str = self.to_filesystem_path(src)
+ dst_str = self.to_filesystem_path(dst)
+ dst_parent = Path(dst_str).parent
+
+ if str(dst_parent) and not self.exists(str(dst_parent)):
+ self.mkdirs(str(dst_parent))
- src_str = self.to_filesystem_path(src)
- self.filesystem.move(src_str, dst_str)
- return True
- except Exception as e:
- self.logger.warning(f"Failed to rename {src} to {dst}: {e}")
+ try:
+ with self._RENAME_LOCK:
Review Comment:
All filesystems are not atomic here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]