This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 309a354ee6 [python] Fix pyarrow6 compatibility issue with path parsing 
and get_file_info (#7292)
309a354ee6 is described below

commit 309a354ee6fe845a4d4b7001e43fa1f60c2084dc
Author: XiaoHongbo <[email protected]>
AuthorDate: Wed Feb 25 20:12:25 2026 +0800

    [python] Fix pyarrow6 compatibility issue with path parsing and 
get_file_info (#7292)
    
    1. REST read and write fail with error:
    `oss://bucket_name/xxxx/yyy/sample/bucket-0/test-0.parquet:
    When listing objects under key 'yyy/sample/bucket-0' in bucket 'xxxx':
    AWS Error [code 133]: The specified key does not exist.`
    
    PR #7180 addressed OSS + PyArrow 6 by passing key-only (no netloc), but
    PyArrow 6 still has a issue about parsing the first path segment as
    bucket.
    
       **Behavior difference**:
       - PyArrow 6: treats path `a/b/c` as `bucket=a, key=b/c`
    - PyArrow 7+: uses bucket from connection and treats entire string as
    key `a/b/c`
    
    2. **get_file_info() inconsistency**:
    - PyArrow 7+: returns `FileInfo` with `FileType.NotFound` for
    non-existent paths
    - PyArrow 6: throws `OSError` with message `AWS Error [code 133]: The
    specified key does not exist`
---
 .../pypaimon/filesystem/pyarrow_file_io.py         | 48 ++++++++++------
 paimon-python/pypaimon/tests/file_io_test.py       | 66 ++++++++++++++++++----
 2 files changed, 88 insertions(+), 26 deletions(-)

diff --git a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py 
b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
index cb689baeaf..c02c5f62a8 100644
--- a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
+++ b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
@@ -17,6 +17,7 @@
 
################################################################################
 import logging
 import os
+import re
 import subprocess
 import uuid
 from datetime import datetime, timezone
@@ -40,11 +41,15 @@ from pypaimon.table.row.row_kind import RowKind
 from pypaimon.write.blob_format_writer import BlobFormatWriter
 
 
+def _pyarrow_lt_7():
+    return parse(pyarrow.__version__) < parse("7.0.0")
+
+
 class PyArrowFileIO(FileIO):
     def __init__(self, path: str, catalog_options: Options):
         self.properties = catalog_options
         self.logger = logging.getLogger(__name__)
-        self._pyarrow_gte_7 = parse(pyarrow.__version__) >= parse("7.0.0")
+        self._pyarrow_gte_7 = not _pyarrow_lt_7()
         self._pyarrow_gte_8 = parse(pyarrow.__version__) >= parse("8.0.0")
         scheme, netloc, _ = self.parse_location(path)
         self.uri_reader_factory = UriReaderFactory(catalog_options)
@@ -194,10 +199,21 @@ class PyArrowFileIO(FileIO):
 
         return self.filesystem.open_output_stream(path_str)
 
+    def _get_file_info(self, path_str: str):
+        try:
+            file_infos = self.filesystem.get_file_info([path_str])
+            return file_infos[0]
+        except OSError as e:
+            # this is for compatible with pyarrow < 7
+            msg = str(e).lower()
+            if ("does not exist" in msg or "not exist" in msg or "nosuchkey" 
in msg
+                    or re.search(r'\b133\b', msg) or "notfound" in msg):
+                return pafs.FileInfo(path_str, pafs.FileType.NotFound)
+            raise
+
     def get_file_status(self, path: str):
         path_str = self.to_filesystem_path(path)
-        file_infos = self.filesystem.get_file_info([path_str])
-        file_info = file_infos[0]
+        file_info = self._get_file_info(path_str)
         
         if file_info.type == pafs.FileType.NotFound:
             raise FileNotFoundError(f"File {path} (resolved as {path_str}) 
does not exist")
@@ -215,12 +231,11 @@ class PyArrowFileIO(FileIO):
 
     def exists(self, path: str) -> bool:
         path_str = self.to_filesystem_path(path)
-        file_info = self.filesystem.get_file_info([path_str])[0]
-        return file_info.type != pafs.FileType.NotFound
+        return self._get_file_info(path_str).type != pafs.FileType.NotFound
 
     def delete(self, path: str, recursive: bool = False) -> bool:
         path_str = self.to_filesystem_path(path)
-        file_info = self.filesystem.get_file_info([path_str])[0]
+        file_info = self._get_file_info(path_str)
         
         if file_info.type == pafs.FileType.NotFound:
             return False
@@ -242,8 +257,11 @@ class PyArrowFileIO(FileIO):
 
     def mkdirs(self, path: str) -> bool:
         path_str = self.to_filesystem_path(path)
-        file_info = self.filesystem.get_file_info([path_str])[0]
+        file_info = self._get_file_info(path_str)
         
+        if file_info.type == pafs.FileType.NotFound:
+            self.filesystem.create_dir(path_str, recursive=True)
+            return True
         if file_info.type == pafs.FileType.Directory:
             return True
         elif file_info.type == pafs.FileType.File:
@@ -264,7 +282,7 @@ class PyArrowFileIO(FileIO):
             if hasattr(self.filesystem, 'rename'):
                 return self.filesystem.rename(src_str, dst_str)
             
-            dst_file_info = self.filesystem.get_file_info([dst_str])[0]
+            dst_file_info = self._get_file_info(dst_str)
             if dst_file_info.type != pafs.FileType.NotFound:
                 if dst_file_info.type == pafs.FileType.File:
                     return False
@@ -272,7 +290,7 @@ class PyArrowFileIO(FileIO):
                 # dst=dst/srcFileName
                 src_name = Path(src_str).name
                 dst_str = str(Path(dst_str) / src_name)
-                final_dst_info = self.filesystem.get_file_info([dst_str])[0]
+                final_dst_info = self._get_file_info(dst_str)
                 if final_dst_info.type != pafs.FileType.NotFound:
                     return False
             
@@ -310,7 +328,7 @@ class PyArrowFileIO(FileIO):
     def try_to_write_atomic(self, path: str, content: str) -> bool:
         if self.exists(path):
             path_str = self.to_filesystem_path(path)
-            file_info = self.filesystem.get_file_info([path_str])[0]
+            file_info = self._get_file_info(path_str)
             if file_info.type == pafs.FileType.Directory:
                 return False
         
@@ -508,13 +526,11 @@ class PyArrowFileIO(FileIO):
             if parsed.scheme:
                 if parsed.netloc:
                     path_part = normalized_path.lstrip('/')
+                    # OSS+PyArrow<7: endpoint_override has bucket, pass key 
only.
                     if self._is_oss and not self._pyarrow_gte_7:
-                        # For PyArrow 6.x + OSS, endpoint_override already 
contains bucket,
-                        result = path_part if path_part else '.'
-                        return result
-                    else:
-                        result = f"{parsed.netloc}/{path_part}" if path_part 
else parsed.netloc
-                        return result
+                        return path_part if path_part else '.'
+                    result = f"{parsed.netloc}/{path_part}" if path_part else 
parsed.netloc
+                    return result
                 else:
                     result = normalized_path.lstrip('/')
                     return result if result else '.'
diff --git a/paimon-python/pypaimon/tests/file_io_test.py 
b/paimon-python/pypaimon/tests/file_io_test.py
index ae9abeff23..ba91f94a18 100644
--- a/paimon-python/pypaimon/tests/file_io_test.py
+++ b/paimon-python/pypaimon/tests/file_io_test.py
@@ -22,19 +22,18 @@ import unittest
 from pathlib import Path
 from unittest.mock import MagicMock, patch
 
-import pyarrow
 import pyarrow.fs as pafs
 
 from pypaimon.common.options import Options
 from pypaimon.common.options.config import OssOptions
 from pypaimon.filesystem.local_file_io import LocalFileIO
-from pypaimon.filesystem.pyarrow_file_io import PyArrowFileIO
+from pypaimon.filesystem.pyarrow_file_io import PyArrowFileIO, _pyarrow_lt_7
 
 
 class FileIOTest(unittest.TestCase):
     """Test cases for FileIO.to_filesystem_path method."""
 
-    def test_s3_filesystem_path_conversion(self):
+    def test_filesystem_path_conversion(self):
         """Test S3FileSystem path conversion with various formats."""
         file_io = PyArrowFileIO("s3://bucket/warehouse", Options({}))
         self.assertIsInstance(file_io.filesystem, pafs.S3FileSystem)
@@ -66,18 +65,31 @@ class FileIOTest(unittest.TestCase):
         parent_str = str(Path(converted_path).parent)
         self.assertEqual(file_io.to_filesystem_path(parent_str), parent_str)
 
-        from packaging.version import parse as parse_version
+        lt7 = _pyarrow_lt_7()
         oss_io = PyArrowFileIO("oss://test-bucket/warehouse", Options({
             OssOptions.OSS_ENDPOINT.key(): 'oss-cn-hangzhou.aliyuncs.com'
         }))
-        lt7 = parse_version(pyarrow.__version__) < parse_version("7.0.0")
         got = oss_io.to_filesystem_path("oss://test-bucket/path/to/file.txt")
-        expected_path = (
-            "path/to/file.txt" if lt7 else "test-bucket/path/to/file.txt")
-        self.assertEqual(got, expected_path)
+        self.assertEqual(got, "path/to/file.txt" if lt7 else 
"test-bucket/path/to/file.txt")
+        if lt7:
+            
self.assertEqual(oss_io.to_filesystem_path("db-xxx.db/tbl-xxx/data.parquet"),
+                             "db-xxx.db/tbl-xxx/data.parquet")
+            self.assertEqual(oss_io.to_filesystem_path("db-xxx.db/tbl-xxx"), 
"db-xxx.db/tbl-xxx")
+            manifest_uri = 
"oss://test-bucket/warehouse/db.db/table/manifest/manifest-list-abc-0"
+            manifest_key = oss_io.to_filesystem_path(manifest_uri)
+            self.assertEqual(manifest_key, 
"warehouse/db.db/table/manifest/manifest-list-abc-0",
+                             "OSS+PyArrow6 must pass key only to PyArrow so 
manifest is written to correct bucket")
+            self.assertFalse(manifest_key.startswith("test-bucket/"),
+                             "path must not start with bucket name or PyArrow 
6 writes to wrong bucket")
         nf = MagicMock(type=pafs.FileType.NotFound)
+        get_file_info_calls = []
+
+        def record_get_file_info(paths):
+            get_file_info_calls.append(list(paths))
+            return [MagicMock(type=pafs.FileType.NotFound) for _ in paths]
+
         mock_fs = MagicMock()
-        mock_fs.get_file_info.side_effect = [[nf], [nf]]
+        mock_fs.get_file_info.side_effect = record_get_file_info if lt7 else 
[[nf], [nf]]
         mock_fs.create_dir = MagicMock()
         mock_fs.open_output_stream.return_value = MagicMock()
         oss_io.filesystem = mock_fs
@@ -87,8 +99,42 @@ class FileIOTest(unittest.TestCase):
         if lt7:
             expected_parent = '/'.join(path_str.split('/')[:-1]) if '/' in 
path_str else ''
         else:
-            expected_parent = str(Path(path_str).parent)
+            expected_parent = "/".join(path_str.split("/")[:-1]) if "/" in 
path_str else str(Path(path_str).parent)
         self.assertEqual(mock_fs.create_dir.call_args[0][0], expected_parent)
+        if lt7:
+            for call_paths in get_file_info_calls:
+                for p in call_paths:
+                    self.assertFalse(
+                        p.startswith("test-bucket/"),
+                        "OSS+PyArrow<7 must pass key only to get_file_info, 
not bucket/key. Got: %r" % (p,)
+                    )
+
+    def test_exists(self):
+        lt7 = _pyarrow_lt_7()
+        with tempfile.TemporaryDirectory(prefix="file_io_nonexistent_") as 
tmpdir:
+            file_io = LocalFileIO("file://" + tmpdir, Options({}))
+            missing_uri = "file://" + os.path.join(tmpdir, "nonexistent_xyz")
+            path_str = file_io.to_filesystem_path(missing_uri)
+            raised = None
+            infos = None
+            try:
+                infos = file_io.filesystem.get_file_info([path_str])
+            except OSError as e:
+                raised = e
+            if lt7:
+                if raised is not None:
+                    err = str(raised).lower()
+                    self.assertTrue("133" in err or "does not exist" in err or 
"not exist" in err, str(raised))
+                else:
+                    self.assertEqual(len(infos), 1)
+                    self.assertEqual(infos[0].type, pafs.FileType.NotFound)
+            else:
+                self.assertIsNone(raised)
+                self.assertEqual(len(infos), 1)
+                self.assertEqual(infos[0].type, pafs.FileType.NotFound)
+            self.assertFalse(file_io.exists(missing_uri))
+            with self.assertRaises(FileNotFoundError):
+                file_io.get_file_status(missing_uri)
 
     def test_local_filesystem_path_conversion(self):
         file_io = LocalFileIO("file:///tmp/warehouse", Options({}))

Reply via email to