XiaoHongbo-Hope commented on code in PR #7001:
URL: https://github.com/apache/paimon/pull/7001#discussion_r2687138089


##########
paimon-python/pypaimon/common/file_io.py:
##########
@@ -211,51 +219,67 @@ def list_directories(self, path: str):
         return [info for info in file_infos if info.type == 
pyarrow.fs.FileType.Directory]
 
     def exists(self, path: str) -> bool:
-        try:
-            path_str = self.to_filesystem_path(path)
-            file_info = self.filesystem.get_file_info([path_str])[0]
-            result = file_info.type != pyarrow.fs.FileType.NotFound
-            return result
-        except Exception:
-            return False
+        path_str = self.to_filesystem_path(path)
+        file_info = self.filesystem.get_file_info([path_str])[0]
+        return file_info.type != pyarrow.fs.FileType.NotFound
 
     def delete(self, path: str, recursive: bool = False) -> bool:
-        try:
-            path_str = self.to_filesystem_path(path)
-            file_info = self.filesystem.get_file_info([path_str])[0]
-            if file_info.type == pyarrow.fs.FileType.Directory:
-                if recursive:
-                    self.filesystem.delete_dir_contents(path_str)
-                else:
-                    self.filesystem.delete_dir(path_str)
-            else:
-                self.filesystem.delete_file(path_str)
-            return True
-        except Exception as e:
-            self.logger.warning(f"Failed to delete {path}: {e}")
+        path_str = self.to_filesystem_path(path)
+        file_info = self.filesystem.get_file_info([path_str])[0]
+        
+        if file_info.type == pyarrow.fs.FileType.NotFound:
             return False
+        
+        if file_info.type == pyarrow.fs.FileType.Directory:
+            if not recursive:
+                selector = pyarrow.fs.FileSelector(path_str, recursive=False, 
allow_not_found=True)
+                dir_contents = self.filesystem.get_file_info(selector)
+                if len(dir_contents) > 0:
+                    raise OSError(f"Directory {path_str} is not empty")
+            if recursive:
+                self.filesystem.delete_dir_contents(path_str)
+                self.filesystem.delete_dir(path_str)
+            else:
+                self.filesystem.delete_dir(path_str)
+        else:
+            self.filesystem.delete_file(path_str)
+        return True
 
     def mkdirs(self, path: str) -> bool:
-        try:
-            path_str = self.to_filesystem_path(path)
-            self.filesystem.create_dir(path_str, recursive=True)
+        path_str = self.to_filesystem_path(path)
+        file_info = self.filesystem.get_file_info([path_str])[0]
+        
+        if file_info.type == pyarrow.fs.FileType.Directory:
             return True
-        except Exception as e:
-            self.logger.warning(f"Failed to create directory {path}: {e}")
-            return False
+        elif file_info.type == pyarrow.fs.FileType.File:
+            raise FileExistsError(f"Path exists but is not a directory: 
{path_str}")
+        
+        self.filesystem.create_dir(path_str, recursive=True)
+        return True
 
     def rename(self, src: str, dst: str) -> bool:
-        try:
-            dst_str = self.to_filesystem_path(dst)
-            dst_parent = Path(dst_str).parent
-            if str(dst_parent) and not self.exists(str(dst_parent)):
-                self.mkdirs(str(dst_parent))
+        src_str = self.to_filesystem_path(src)
+        dst_str = self.to_filesystem_path(dst)
+        dst_parent = Path(dst_str).parent
+        
+        if str(dst_parent) and not self.exists(str(dst_parent)):
+            self.mkdirs(str(dst_parent))
 
-            src_str = self.to_filesystem_path(src)
-            self.filesystem.move(src_str, dst_str)
-            return True
-        except Exception as e:
-            self.logger.warning(f"Failed to rename {src} to {dst}: {e}")
+        try:
+            with self._RENAME_LOCK:
+                dst_file_info = self.filesystem.get_file_info([dst_str])[0]
+                if dst_file_info.type != pyarrow.fs.FileType.NotFound:
+                    if dst_file_info.type == pyarrow.fs.FileType.File:
+                        return False
+                    src_name = Path(src_str).name
+                    dst_str = f"{dst_str.rstrip('/')}/{src_name}"
+                    final_dst_info = 
self.filesystem.get_file_info([dst_str])[0]
+                    if final_dst_info.type != pyarrow.fs.FileType.NotFound:
+                        return False
+                
+                self.filesystem.move(src_str, dst_str)
+                return True
+        except (FileNotFoundError, PermissionError, OSError):
             return False

Review Comment:
   > The rename operation has a race condition. The lock only protects the 
check-and-move within a single process, but does not provide atomicity across 
multiple processes or in distributed environments. Between checking if the 
destination exists (lines 270-278) and performing the move (line 280), another 
process could create the destination file, leading to either overwrite or 
failure depending on the underlying filesystem behavior. Consider using the 
filesystem's atomic move operation if available, or document this limitation 
clearly.
   
   For local filesystems, atomicity is provided by the OS atomic move operation
   For object stores (OSS/S3), we rely on the underlying filesystem's 
atomicity, similar with java.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to