This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 309a354ee6 [python] Fix pyarrow6 compatibility issue with path parsing
and get_file_info (#7292)
309a354ee6 is described below
commit 309a354ee6fe845a4d4b7001e43fa1f60c2084dc
Author: XiaoHongbo <[email protected]>
AuthorDate: Wed Feb 25 20:12:25 2026 +0800
[python] Fix pyarrow6 compatibility issue with path parsing and
get_file_info (#7292)
1. REST read and write fail with error:
`oss://bucket_name/xxxx/yyy/sample/bucket-0/test-0.parquet:
When listing objects under key 'yyy/sample/bucket-0' in bucket 'xxxx':
AWS Error [code 133]: The specified key does not exist.`
PR #7180 addressed OSS + PyArrow 6 by passing key-only (no netloc), but
PyArrow 6 still has a issue about parsing the first path segment as
bucket.
**Behavior difference**:
- PyArrow 6: treats path `a/b/c` as `bucket=a, key=b/c`
- PyArrow 7+: uses bucket from connection and treats entire string as
key `a/b/c`
2. **get_file_info() inconsistency**:
- PyArrow 7+: returns `FileInfo` with `FileType.NotFound` for
non-existent paths
- PyArrow 6: throws `OSError` with message `AWS Error [code 133]: The
specified key does not exist`
---
.../pypaimon/filesystem/pyarrow_file_io.py | 48 ++++++++++------
paimon-python/pypaimon/tests/file_io_test.py | 66 ++++++++++++++++++----
2 files changed, 88 insertions(+), 26 deletions(-)
diff --git a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
index cb689baeaf..c02c5f62a8 100644
--- a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
+++ b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
@@ -17,6 +17,7 @@
################################################################################
import logging
import os
+import re
import subprocess
import uuid
from datetime import datetime, timezone
@@ -40,11 +41,15 @@ from pypaimon.table.row.row_kind import RowKind
from pypaimon.write.blob_format_writer import BlobFormatWriter
+def _pyarrow_lt_7():
+ return parse(pyarrow.__version__) < parse("7.0.0")
+
+
class PyArrowFileIO(FileIO):
def __init__(self, path: str, catalog_options: Options):
self.properties = catalog_options
self.logger = logging.getLogger(__name__)
- self._pyarrow_gte_7 = parse(pyarrow.__version__) >= parse("7.0.0")
+ self._pyarrow_gte_7 = not _pyarrow_lt_7()
self._pyarrow_gte_8 = parse(pyarrow.__version__) >= parse("8.0.0")
scheme, netloc, _ = self.parse_location(path)
self.uri_reader_factory = UriReaderFactory(catalog_options)
@@ -194,10 +199,21 @@ class PyArrowFileIO(FileIO):
return self.filesystem.open_output_stream(path_str)
+ def _get_file_info(self, path_str: str):
+ try:
+ file_infos = self.filesystem.get_file_info([path_str])
+ return file_infos[0]
+ except OSError as e:
+ # this is for compatible with pyarrow < 7
+ msg = str(e).lower()
+ if ("does not exist" in msg or "not exist" in msg or "nosuchkey"
in msg
+ or re.search(r'\b133\b', msg) or "notfound" in msg):
+ return pafs.FileInfo(path_str, pafs.FileType.NotFound)
+ raise
+
def get_file_status(self, path: str):
path_str = self.to_filesystem_path(path)
- file_infos = self.filesystem.get_file_info([path_str])
- file_info = file_infos[0]
+ file_info = self._get_file_info(path_str)
if file_info.type == pafs.FileType.NotFound:
raise FileNotFoundError(f"File {path} (resolved as {path_str})
does not exist")
@@ -215,12 +231,11 @@ class PyArrowFileIO(FileIO):
def exists(self, path: str) -> bool:
path_str = self.to_filesystem_path(path)
- file_info = self.filesystem.get_file_info([path_str])[0]
- return file_info.type != pafs.FileType.NotFound
+ return self._get_file_info(path_str).type != pafs.FileType.NotFound
def delete(self, path: str, recursive: bool = False) -> bool:
path_str = self.to_filesystem_path(path)
- file_info = self.filesystem.get_file_info([path_str])[0]
+ file_info = self._get_file_info(path_str)
if file_info.type == pafs.FileType.NotFound:
return False
@@ -242,8 +257,11 @@ class PyArrowFileIO(FileIO):
def mkdirs(self, path: str) -> bool:
path_str = self.to_filesystem_path(path)
- file_info = self.filesystem.get_file_info([path_str])[0]
+ file_info = self._get_file_info(path_str)
+ if file_info.type == pafs.FileType.NotFound:
+ self.filesystem.create_dir(path_str, recursive=True)
+ return True
if file_info.type == pafs.FileType.Directory:
return True
elif file_info.type == pafs.FileType.File:
@@ -264,7 +282,7 @@ class PyArrowFileIO(FileIO):
if hasattr(self.filesystem, 'rename'):
return self.filesystem.rename(src_str, dst_str)
- dst_file_info = self.filesystem.get_file_info([dst_str])[0]
+ dst_file_info = self._get_file_info(dst_str)
if dst_file_info.type != pafs.FileType.NotFound:
if dst_file_info.type == pafs.FileType.File:
return False
@@ -272,7 +290,7 @@ class PyArrowFileIO(FileIO):
# dst=dst/srcFileName
src_name = Path(src_str).name
dst_str = str(Path(dst_str) / src_name)
- final_dst_info = self.filesystem.get_file_info([dst_str])[0]
+ final_dst_info = self._get_file_info(dst_str)
if final_dst_info.type != pafs.FileType.NotFound:
return False
@@ -310,7 +328,7 @@ class PyArrowFileIO(FileIO):
def try_to_write_atomic(self, path: str, content: str) -> bool:
if self.exists(path):
path_str = self.to_filesystem_path(path)
- file_info = self.filesystem.get_file_info([path_str])[0]
+ file_info = self._get_file_info(path_str)
if file_info.type == pafs.FileType.Directory:
return False
@@ -508,13 +526,11 @@ class PyArrowFileIO(FileIO):
if parsed.scheme:
if parsed.netloc:
path_part = normalized_path.lstrip('/')
+ # OSS+PyArrow<7: endpoint_override has bucket, pass key
only.
if self._is_oss and not self._pyarrow_gte_7:
- # For PyArrow 6.x + OSS, endpoint_override already
contains bucket,
- result = path_part if path_part else '.'
- return result
- else:
- result = f"{parsed.netloc}/{path_part}" if path_part
else parsed.netloc
- return result
+ return path_part if path_part else '.'
+ result = f"{parsed.netloc}/{path_part}" if path_part else
parsed.netloc
+ return result
else:
result = normalized_path.lstrip('/')
return result if result else '.'
diff --git a/paimon-python/pypaimon/tests/file_io_test.py
b/paimon-python/pypaimon/tests/file_io_test.py
index ae9abeff23..ba91f94a18 100644
--- a/paimon-python/pypaimon/tests/file_io_test.py
+++ b/paimon-python/pypaimon/tests/file_io_test.py
@@ -22,19 +22,18 @@ import unittest
from pathlib import Path
from unittest.mock import MagicMock, patch
-import pyarrow
import pyarrow.fs as pafs
from pypaimon.common.options import Options
from pypaimon.common.options.config import OssOptions
from pypaimon.filesystem.local_file_io import LocalFileIO
-from pypaimon.filesystem.pyarrow_file_io import PyArrowFileIO
+from pypaimon.filesystem.pyarrow_file_io import PyArrowFileIO, _pyarrow_lt_7
class FileIOTest(unittest.TestCase):
"""Test cases for FileIO.to_filesystem_path method."""
- def test_s3_filesystem_path_conversion(self):
+ def test_filesystem_path_conversion(self):
"""Test S3FileSystem path conversion with various formats."""
file_io = PyArrowFileIO("s3://bucket/warehouse", Options({}))
self.assertIsInstance(file_io.filesystem, pafs.S3FileSystem)
@@ -66,18 +65,31 @@ class FileIOTest(unittest.TestCase):
parent_str = str(Path(converted_path).parent)
self.assertEqual(file_io.to_filesystem_path(parent_str), parent_str)
- from packaging.version import parse as parse_version
+ lt7 = _pyarrow_lt_7()
oss_io = PyArrowFileIO("oss://test-bucket/warehouse", Options({
OssOptions.OSS_ENDPOINT.key(): 'oss-cn-hangzhou.aliyuncs.com'
}))
- lt7 = parse_version(pyarrow.__version__) < parse_version("7.0.0")
got = oss_io.to_filesystem_path("oss://test-bucket/path/to/file.txt")
- expected_path = (
- "path/to/file.txt" if lt7 else "test-bucket/path/to/file.txt")
- self.assertEqual(got, expected_path)
+ self.assertEqual(got, "path/to/file.txt" if lt7 else
"test-bucket/path/to/file.txt")
+ if lt7:
+
self.assertEqual(oss_io.to_filesystem_path("db-xxx.db/tbl-xxx/data.parquet"),
+ "db-xxx.db/tbl-xxx/data.parquet")
+ self.assertEqual(oss_io.to_filesystem_path("db-xxx.db/tbl-xxx"),
"db-xxx.db/tbl-xxx")
+ manifest_uri =
"oss://test-bucket/warehouse/db.db/table/manifest/manifest-list-abc-0"
+ manifest_key = oss_io.to_filesystem_path(manifest_uri)
+ self.assertEqual(manifest_key,
"warehouse/db.db/table/manifest/manifest-list-abc-0",
+ "OSS+PyArrow6 must pass key only to PyArrow so
manifest is written to correct bucket")
+ self.assertFalse(manifest_key.startswith("test-bucket/"),
+ "path must not start with bucket name or PyArrow
6 writes to wrong bucket")
nf = MagicMock(type=pafs.FileType.NotFound)
+ get_file_info_calls = []
+
+ def record_get_file_info(paths):
+ get_file_info_calls.append(list(paths))
+ return [MagicMock(type=pafs.FileType.NotFound) for _ in paths]
+
mock_fs = MagicMock()
- mock_fs.get_file_info.side_effect = [[nf], [nf]]
+ mock_fs.get_file_info.side_effect = record_get_file_info if lt7 else
[[nf], [nf]]
mock_fs.create_dir = MagicMock()
mock_fs.open_output_stream.return_value = MagicMock()
oss_io.filesystem = mock_fs
@@ -87,8 +99,42 @@ class FileIOTest(unittest.TestCase):
if lt7:
expected_parent = '/'.join(path_str.split('/')[:-1]) if '/' in
path_str else ''
else:
- expected_parent = str(Path(path_str).parent)
+ expected_parent = "/".join(path_str.split("/")[:-1]) if "/" in
path_str else str(Path(path_str).parent)
self.assertEqual(mock_fs.create_dir.call_args[0][0], expected_parent)
+ if lt7:
+ for call_paths in get_file_info_calls:
+ for p in call_paths:
+ self.assertFalse(
+ p.startswith("test-bucket/"),
+ "OSS+PyArrow<7 must pass key only to get_file_info,
not bucket/key. Got: %r" % (p,)
+ )
+
+ def test_exists(self):
+ lt7 = _pyarrow_lt_7()
+ with tempfile.TemporaryDirectory(prefix="file_io_nonexistent_") as
tmpdir:
+ file_io = LocalFileIO("file://" + tmpdir, Options({}))
+ missing_uri = "file://" + os.path.join(tmpdir, "nonexistent_xyz")
+ path_str = file_io.to_filesystem_path(missing_uri)
+ raised = None
+ infos = None
+ try:
+ infos = file_io.filesystem.get_file_info([path_str])
+ except OSError as e:
+ raised = e
+ if lt7:
+ if raised is not None:
+ err = str(raised).lower()
+ self.assertTrue("133" in err or "does not exist" in err or
"not exist" in err, str(raised))
+ else:
+ self.assertEqual(len(infos), 1)
+ self.assertEqual(infos[0].type, pafs.FileType.NotFound)
+ else:
+ self.assertIsNone(raised)
+ self.assertEqual(len(infos), 1)
+ self.assertEqual(infos[0].type, pafs.FileType.NotFound)
+ self.assertFalse(file_io.exists(missing_uri))
+ with self.assertRaises(FileNotFoundError):
+ file_io.get_file_status(missing_uri)
def test_local_filesystem_path_conversion(self):
file_io = LocalFileIO("file:///tmp/warehouse", Options({}))