Copilot commented on code in PR #7001:
URL: https://github.com/apache/paimon/pull/7001#discussion_r2681323603


##########
paimon-python/pypaimon/common/file_io.py:
##########
@@ -211,51 +219,67 @@ def list_directories(self, path: str):
         return [info for info in file_infos if info.type == 
pyarrow.fs.FileType.Directory]
 
     def exists(self, path: str) -> bool:
-        try:
-            path_str = self.to_filesystem_path(path)
-            file_info = self.filesystem.get_file_info([path_str])[0]
-            result = file_info.type != pyarrow.fs.FileType.NotFound
-            return result
-        except Exception:
-            return False
+        path_str = self.to_filesystem_path(path)
+        file_info = self.filesystem.get_file_info([path_str])[0]
+        return file_info.type != pyarrow.fs.FileType.NotFound
 
     def delete(self, path: str, recursive: bool = False) -> bool:
-        try:
-            path_str = self.to_filesystem_path(path)
-            file_info = self.filesystem.get_file_info([path_str])[0]
-            if file_info.type == pyarrow.fs.FileType.Directory:
-                if recursive:
-                    self.filesystem.delete_dir_contents(path_str)
-                else:
-                    self.filesystem.delete_dir(path_str)
-            else:
-                self.filesystem.delete_file(path_str)
-            return True
-        except Exception as e:
-            self.logger.warning(f"Failed to delete {path}: {e}")
+        path_str = self.to_filesystem_path(path)
+        file_info = self.filesystem.get_file_info([path_str])[0]
+        
+        if file_info.type == pyarrow.fs.FileType.NotFound:
             return False
+        
+        if file_info.type == pyarrow.fs.FileType.Directory:
+            if not recursive:
+                selector = pyarrow.fs.FileSelector(path_str, recursive=False, 
allow_not_found=True)
+                dir_contents = self.filesystem.get_file_info(selector)
+                if len(dir_contents) > 0:
+                    raise OSError(f"Directory {path_str} is not empty")
+            if recursive:
+                self.filesystem.delete_dir_contents(path_str)
+                self.filesystem.delete_dir(path_str)
+            else:
+                self.filesystem.delete_dir(path_str)
+        else:
+            self.filesystem.delete_file(path_str)
+        return True
 
     def mkdirs(self, path: str) -> bool:
-        try:
-            path_str = self.to_filesystem_path(path)
-            self.filesystem.create_dir(path_str, recursive=True)
+        path_str = self.to_filesystem_path(path)
+        file_info = self.filesystem.get_file_info([path_str])[0]
+        
+        if file_info.type == pyarrow.fs.FileType.Directory:
             return True
-        except Exception as e:
-            self.logger.warning(f"Failed to create directory {path}: {e}")
-            return False
+        elif file_info.type == pyarrow.fs.FileType.File:
+            raise FileExistsError(f"Path exists but is not a directory: 
{path_str}")
+        
+        self.filesystem.create_dir(path_str, recursive=True)
+        return True
 
     def rename(self, src: str, dst: str) -> bool:
-        try:
-            dst_str = self.to_filesystem_path(dst)
-            dst_parent = Path(dst_str).parent
-            if str(dst_parent) and not self.exists(str(dst_parent)):
-                self.mkdirs(str(dst_parent))
+        src_str = self.to_filesystem_path(src)
+        dst_str = self.to_filesystem_path(dst)
+        dst_parent = Path(dst_str).parent
+        
+        if str(dst_parent) and not self.exists(str(dst_parent)):
+            self.mkdirs(str(dst_parent))
 
-            src_str = self.to_filesystem_path(src)
-            self.filesystem.move(src_str, dst_str)
-            return True
-        except Exception as e:
-            self.logger.warning(f"Failed to rename {src} to {dst}: {e}")
+        try:
+            with self._RENAME_LOCK:
+                dst_file_info = self.filesystem.get_file_info([dst_str])[0]
+                if dst_file_info.type != pyarrow.fs.FileType.NotFound:
+                    if dst_file_info.type == pyarrow.fs.FileType.File:
+                        return False
+                    src_name = Path(src_str).name
+                    dst_str = f"{dst_str.rstrip('/')}/{src_name}"
+                    final_dst_info = 
self.filesystem.get_file_info([dst_str])[0]
+                    if final_dst_info.type != pyarrow.fs.FileType.NotFound:
+                        return False
+                
+                self.filesystem.move(src_str, dst_str)
+                return True
+        except (FileNotFoundError, PermissionError, OSError):
             return False

Review Comment:
   The rename operation has a race condition. The lock only protects the 
check-and-move within a single process, but does not provide atomicity across 
multiple processes or in distributed environments. Between checking if the 
destination exists (lines 270-278) and performing the move (line 280), another 
process could create the destination file, leading to either overwrite or 
failure depending on the underlying filesystem behavior. Consider using the 
filesystem's atomic move operation if available, or document this limitation 
clearly.



##########
paimon-python/pypaimon/common/file_io.py:
##########
@@ -329,8 +353,13 @@ def copy_file(self, source_path: str, target_path: str, 
overwrite: bool = False)
         if not overwrite and self.exists(target_path):
             raise FileExistsError(f"Target file {target_path} already exists 
and overwrite=False")
 
-        source_str = self.to_filesystem_path(source_path)
         target_str = self.to_filesystem_path(target_path)
+        target_parent = Path(target_str).parent
+        
+        if str(target_parent) and not self.exists(str(target_parent)):
+            self.mkdirs(str(target_parent))
+
+        source_str = self.to_filesystem_path(source_path)

Review Comment:
   The source path is converted to filesystem path (line 362) after the target 
parent directory is potentially created (lines 356-360). If the source path is 
invalid or to_filesystem_path raises an exception, the parent directory for the 
target will have already been created unnecessarily. Consider converting both 
source and target paths first before any filesystem operations.
   ```suggestion
           # Convert both source and target paths before performing any 
filesystem modifications
           source_str = self.to_filesystem_path(source_path)
           target_str = self.to_filesystem_path(target_path)
           target_parent = Path(target_str).parent
   
           if str(target_parent) and not self.exists(str(target_parent)):
               self.mkdirs(str(target_parent))
   ```



##########
paimon-python/pypaimon/common/file_io.py:
##########
@@ -211,51 +219,67 @@ def list_directories(self, path: str):
         return [info for info in file_infos if info.type == 
pyarrow.fs.FileType.Directory]
 
     def exists(self, path: str) -> bool:
-        try:
-            path_str = self.to_filesystem_path(path)
-            file_info = self.filesystem.get_file_info([path_str])[0]
-            result = file_info.type != pyarrow.fs.FileType.NotFound
-            return result
-        except Exception:
-            return False
+        path_str = self.to_filesystem_path(path)
+        file_info = self.filesystem.get_file_info([path_str])[0]
+        return file_info.type != pyarrow.fs.FileType.NotFound
 
     def delete(self, path: str, recursive: bool = False) -> bool:
-        try:
-            path_str = self.to_filesystem_path(path)
-            file_info = self.filesystem.get_file_info([path_str])[0]
-            if file_info.type == pyarrow.fs.FileType.Directory:
-                if recursive:
-                    self.filesystem.delete_dir_contents(path_str)
-                else:
-                    self.filesystem.delete_dir(path_str)
-            else:
-                self.filesystem.delete_file(path_str)
-            return True
-        except Exception as e:
-            self.logger.warning(f"Failed to delete {path}: {e}")
+        path_str = self.to_filesystem_path(path)
+        file_info = self.filesystem.get_file_info([path_str])[0]
+        
+        if file_info.type == pyarrow.fs.FileType.NotFound:
             return False
+        
+        if file_info.type == pyarrow.fs.FileType.Directory:
+            if not recursive:
+                selector = pyarrow.fs.FileSelector(path_str, recursive=False, 
allow_not_found=True)
+                dir_contents = self.filesystem.get_file_info(selector)
+                if len(dir_contents) > 0:
+                    raise OSError(f"Directory {path_str} is not empty")
+            if recursive:
+                self.filesystem.delete_dir_contents(path_str)
+                self.filesystem.delete_dir(path_str)
+            else:
+                self.filesystem.delete_dir(path_str)
+        else:
+            self.filesystem.delete_file(path_str)
+        return True
 
     def mkdirs(self, path: str) -> bool:
-        try:
-            path_str = self.to_filesystem_path(path)
-            self.filesystem.create_dir(path_str, recursive=True)
+        path_str = self.to_filesystem_path(path)
+        file_info = self.filesystem.get_file_info([path_str])[0]
+        
+        if file_info.type == pyarrow.fs.FileType.Directory:
             return True
-        except Exception as e:
-            self.logger.warning(f"Failed to create directory {path}: {e}")
-            return False
+        elif file_info.type == pyarrow.fs.FileType.File:
+            raise FileExistsError(f"Path exists but is not a directory: 
{path_str}")

Review Comment:
   The error message uses path_str (filesystem path) instead of the original 
path parameter. For consistency with FileNotFoundError at line 208 which uses 
the original path, consider using the original path here too.
   ```suggestion
               raise FileExistsError(f"Path exists but is not a directory: 
{path}")
   ```



##########
paimon-python/pypaimon/common/file_io.py:
##########
@@ -39,6 +40,8 @@
 
 
 class FileIO:
+    _RENAME_LOCK = threading.Lock()
+
     def __init__(self, path: str, catalog_options: Options):
         self.properties = catalog_options
         self.logger = logging.getLogger(__name__)

Review Comment:
   The class-level lock _RENAME_LOCK is shared across all FileIO instances. 
This means that rename operations on different FileIO instances (e.g., 
different warehouses or filesystems) will block each other unnecessarily. 
Consider using an instance-level lock instead, or document this limitation.
   ```suggestion
       # Class-level sentinel; actual lock is per-instance (see __init__).
       _RENAME_LOCK = None
   
       def __init__(self, path: str, catalog_options: Options):
           self.properties = catalog_options
           self.logger = logging.getLogger(__name__)
           # Use an instance-level lock to avoid cross-instance contention on 
rename.
           self._RENAME_LOCK = threading.Lock()
   ```



##########
paimon-python/pypaimon/common/file_io.py:
##########
@@ -211,51 +219,67 @@ def list_directories(self, path: str):
         return [info for info in file_infos if info.type == 
pyarrow.fs.FileType.Directory]
 
     def exists(self, path: str) -> bool:
-        try:
-            path_str = self.to_filesystem_path(path)
-            file_info = self.filesystem.get_file_info([path_str])[0]
-            result = file_info.type != pyarrow.fs.FileType.NotFound
-            return result
-        except Exception:
-            return False
+        path_str = self.to_filesystem_path(path)
+        file_info = self.filesystem.get_file_info([path_str])[0]
+        return file_info.type != pyarrow.fs.FileType.NotFound
 
     def delete(self, path: str, recursive: bool = False) -> bool:
-        try:
-            path_str = self.to_filesystem_path(path)
-            file_info = self.filesystem.get_file_info([path_str])[0]
-            if file_info.type == pyarrow.fs.FileType.Directory:
-                if recursive:
-                    self.filesystem.delete_dir_contents(path_str)
-                else:
-                    self.filesystem.delete_dir(path_str)
-            else:
-                self.filesystem.delete_file(path_str)
-            return True
-        except Exception as e:
-            self.logger.warning(f"Failed to delete {path}: {e}")
+        path_str = self.to_filesystem_path(path)
+        file_info = self.filesystem.get_file_info([path_str])[0]
+        
+        if file_info.type == pyarrow.fs.FileType.NotFound:
             return False
+        
+        if file_info.type == pyarrow.fs.FileType.Directory:
+            if not recursive:
+                selector = pyarrow.fs.FileSelector(path_str, recursive=False, 
allow_not_found=True)
+                dir_contents = self.filesystem.get_file_info(selector)
+                if len(dir_contents) > 0:
+                    raise OSError(f"Directory {path_str} is not empty")
+            if recursive:
+                self.filesystem.delete_dir_contents(path_str)
+                self.filesystem.delete_dir(path_str)
+            else:
+                self.filesystem.delete_dir(path_str)
+        else:
+            self.filesystem.delete_file(path_str)
+        return True
 
     def mkdirs(self, path: str) -> bool:
-        try:
-            path_str = self.to_filesystem_path(path)
-            self.filesystem.create_dir(path_str, recursive=True)
+        path_str = self.to_filesystem_path(path)
+        file_info = self.filesystem.get_file_info([path_str])[0]
+        
+        if file_info.type == pyarrow.fs.FileType.Directory:
             return True
-        except Exception as e:
-            self.logger.warning(f"Failed to create directory {path}: {e}")
-            return False
+        elif file_info.type == pyarrow.fs.FileType.File:
+            raise FileExistsError(f"Path exists but is not a directory: 
{path_str}")
+        
+        self.filesystem.create_dir(path_str, recursive=True)
+        return True
 
     def rename(self, src: str, dst: str) -> bool:
-        try:
-            dst_str = self.to_filesystem_path(dst)
-            dst_parent = Path(dst_str).parent
-            if str(dst_parent) and not self.exists(str(dst_parent)):
-                self.mkdirs(str(dst_parent))
+        src_str = self.to_filesystem_path(src)
+        dst_str = self.to_filesystem_path(dst)
+        dst_parent = Path(dst_str).parent
+        
+        if str(dst_parent) and not self.exists(str(dst_parent)):
+            self.mkdirs(str(dst_parent))
 
-            src_str = self.to_filesystem_path(src)
-            self.filesystem.move(src_str, dst_str)
-            return True
-        except Exception as e:
-            self.logger.warning(f"Failed to rename {src} to {dst}: {e}")
+        try:
+            with self._RENAME_LOCK:
+                dst_file_info = self.filesystem.get_file_info([dst_str])[0]
+                if dst_file_info.type != pyarrow.fs.FileType.NotFound:
+                    if dst_file_info.type == pyarrow.fs.FileType.File:
+                        return False
+                    src_name = Path(src_str).name
+                    dst_str = f"{dst_str.rstrip('/')}/{src_name}"
+                    final_dst_info = 
self.filesystem.get_file_info([dst_str])[0]
+                    if final_dst_info.type != pyarrow.fs.FileType.NotFound:
+                        return False
+                
+                self.filesystem.move(src_str, dst_str)
+                return True
+        except (FileNotFoundError, PermissionError, OSError):
             return False

Review Comment:
   The rename method catches OSError and returns False (line 282-283), but 
OSErrors from the exists() and mkdirs() calls (lines 265-266) that occur before 
the try block are not caught. This creates inconsistent error handling - some 
OSErrors are propagated while others are swallowed and return False. Either 
move the exists/mkdirs calls inside the try block or allow all OSErrors to 
propagate for consistency.



##########
paimon-python/pypaimon/common/file_io.py:
##########
@@ -211,51 +219,67 @@ def list_directories(self, path: str):
         return [info for info in file_infos if info.type == 
pyarrow.fs.FileType.Directory]
 
     def exists(self, path: str) -> bool:
-        try:
-            path_str = self.to_filesystem_path(path)
-            file_info = self.filesystem.get_file_info([path_str])[0]
-            result = file_info.type != pyarrow.fs.FileType.NotFound
-            return result
-        except Exception:
-            return False
+        path_str = self.to_filesystem_path(path)
+        file_info = self.filesystem.get_file_info([path_str])[0]
+        return file_info.type != pyarrow.fs.FileType.NotFound
 
     def delete(self, path: str, recursive: bool = False) -> bool:
-        try:
-            path_str = self.to_filesystem_path(path)
-            file_info = self.filesystem.get_file_info([path_str])[0]
-            if file_info.type == pyarrow.fs.FileType.Directory:
-                if recursive:
-                    self.filesystem.delete_dir_contents(path_str)
-                else:
-                    self.filesystem.delete_dir(path_str)
-            else:
-                self.filesystem.delete_file(path_str)
-            return True
-        except Exception as e:
-            self.logger.warning(f"Failed to delete {path}: {e}")
+        path_str = self.to_filesystem_path(path)
+        file_info = self.filesystem.get_file_info([path_str])[0]
+        
+        if file_info.type == pyarrow.fs.FileType.NotFound:
             return False
+        
+        if file_info.type == pyarrow.fs.FileType.Directory:
+            if not recursive:
+                selector = pyarrow.fs.FileSelector(path_str, recursive=False, 
allow_not_found=True)
+                dir_contents = self.filesystem.get_file_info(selector)
+                if len(dir_contents) > 0:
+                    raise OSError(f"Directory {path_str} is not empty")
+            if recursive:
+                self.filesystem.delete_dir_contents(path_str)
+                self.filesystem.delete_dir(path_str)
+            else:
+                self.filesystem.delete_dir(path_str)
+        else:
+            self.filesystem.delete_file(path_str)
+        return True
 
     def mkdirs(self, path: str) -> bool:
-        try:
-            path_str = self.to_filesystem_path(path)
-            self.filesystem.create_dir(path_str, recursive=True)
+        path_str = self.to_filesystem_path(path)
+        file_info = self.filesystem.get_file_info([path_str])[0]
+        
+        if file_info.type == pyarrow.fs.FileType.Directory:
             return True
-        except Exception as e:
-            self.logger.warning(f"Failed to create directory {path}: {e}")
-            return False
+        elif file_info.type == pyarrow.fs.FileType.File:
+            raise FileExistsError(f"Path exists but is not a directory: 
{path_str}")
+        
+        self.filesystem.create_dir(path_str, recursive=True)
+        return True
 
     def rename(self, src: str, dst: str) -> bool:
-        try:
-            dst_str = self.to_filesystem_path(dst)
-            dst_parent = Path(dst_str).parent
-            if str(dst_parent) and not self.exists(str(dst_parent)):
-                self.mkdirs(str(dst_parent))
+        src_str = self.to_filesystem_path(src)
+        dst_str = self.to_filesystem_path(dst)
+        dst_parent = Path(dst_str).parent
+        
+        if str(dst_parent) and not self.exists(str(dst_parent)):
+            self.mkdirs(str(dst_parent))
 
-            src_str = self.to_filesystem_path(src)
-            self.filesystem.move(src_str, dst_str)
-            return True
-        except Exception as e:
-            self.logger.warning(f"Failed to rename {src} to {dst}: {e}")
+        try:
+            with self._RENAME_LOCK:
+                dst_file_info = self.filesystem.get_file_info([dst_str])[0]
+                if dst_file_info.type != pyarrow.fs.FileType.NotFound:
+                    if dst_file_info.type == pyarrow.fs.FileType.File:
+                        return False
+                    src_name = Path(src_str).name
+                    dst_str = f"{dst_str.rstrip('/')}/{src_name}"

Review Comment:
   When destination is a directory, the code constructs a new destination path 
by appending the source filename (lines 274-275). However, this logic assumes 
Unix-style path separators and doesn't use Path.joinpath or os.path.join. For 
cross-platform compatibility, consider using Path(dst_str) / src_name or 
ensuring consistent path handling.
   ```suggestion
                       dst_path = Path(dst_str) / src_name
                       dst_str = str(dst_path)
   ```



##########
paimon-python/pypaimon/common/file_io.py:
##########
@@ -329,8 +353,13 @@ def copy_file(self, source_path: str, target_path: str, 
overwrite: bool = False)
         if not overwrite and self.exists(target_path):
             raise FileExistsError(f"Target file {target_path} already exists 
and overwrite=False")
 

Review Comment:
   The copy_file method checks exists(target_path) which can now raise OSError 
(e.g., permission errors) instead of returning False. This means that when 
overwrite=False and there's a permission error checking the target path, it 
will raise OSError instead of the expected FileExistsError. Consider catching 
OSError from exists() and re-raising as a more appropriate exception, or 
document this behavior change.
   ```suggestion
           if not overwrite:
               try:
                   target_exists = self.exists(target_path)
               except OSError as e:
                   # Normalize filesystem errors during existence check to 
FileExistsError
                   # so callers see a consistent error when overwrite=False.
                   raise FileExistsError(
                       f"Target file {target_path} already exists and 
overwrite=False"
                   ) from e
   
               if target_exists:
                   raise FileExistsError(f"Target file {target_path} already 
exists and overwrite=False")
   ```



##########
paimon-python/pypaimon/common/file_io.py:
##########
@@ -211,51 +219,67 @@ def list_directories(self, path: str):
         return [info for info in file_infos if info.type == 
pyarrow.fs.FileType.Directory]
 
     def exists(self, path: str) -> bool:
-        try:
-            path_str = self.to_filesystem_path(path)
-            file_info = self.filesystem.get_file_info([path_str])[0]
-            result = file_info.type != pyarrow.fs.FileType.NotFound
-            return result
-        except Exception:
-            return False
+        path_str = self.to_filesystem_path(path)
+        file_info = self.filesystem.get_file_info([path_str])[0]
+        return file_info.type != pyarrow.fs.FileType.NotFound
 
     def delete(self, path: str, recursive: bool = False) -> bool:
-        try:
-            path_str = self.to_filesystem_path(path)
-            file_info = self.filesystem.get_file_info([path_str])[0]
-            if file_info.type == pyarrow.fs.FileType.Directory:
-                if recursive:
-                    self.filesystem.delete_dir_contents(path_str)
-                else:
-                    self.filesystem.delete_dir(path_str)
-            else:
-                self.filesystem.delete_file(path_str)
-            return True
-        except Exception as e:
-            self.logger.warning(f"Failed to delete {path}: {e}")
+        path_str = self.to_filesystem_path(path)
+        file_info = self.filesystem.get_file_info([path_str])[0]
+        
+        if file_info.type == pyarrow.fs.FileType.NotFound:
             return False
+        
+        if file_info.type == pyarrow.fs.FileType.Directory:
+            if not recursive:
+                selector = pyarrow.fs.FileSelector(path_str, recursive=False, 
allow_not_found=True)
+                dir_contents = self.filesystem.get_file_info(selector)
+                if len(dir_contents) > 0:
+                    raise OSError(f"Directory {path_str} is not empty")

Review Comment:
   The error message uses path_str (filesystem path) instead of the original 
path parameter. For consistency with the FileNotFoundError message at line 208 
which uses the original path, consider using the original path here too, or 
update both to use path_str consistently.
   ```suggestion
                       raise OSError(f"Directory {path} is not empty")
   ```



##########
paimon-python/pypaimon/common/file_io.py:
##########
@@ -199,7 +202,12 @@ def new_output_stream(self, path: str):
     def get_file_status(self, path: str):
         path_str = self.to_filesystem_path(path)
         file_infos = self.filesystem.get_file_info([path_str])
-        return file_infos[0]
+        file_info = file_infos[0]
+        
+        if file_info.type == pyarrow.fs.FileType.NotFound:
+            raise FileNotFoundError(f"File {path} does not exist")

Review Comment:
   The error message uses the original path parameter instead of the 
filesystem-transformed path_str. This could be confusing when debugging issues 
with path transformations, especially for S3 or other non-local filesystems 
where the internal path representation differs from the user-provided path. 
Consider including path_str in the error message for better diagnostics.
   ```suggestion
               raise FileNotFoundError(f"File {path} (resolved as {path_str}) 
does not exist")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to