This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f3c18a85e9 [python] Introduce ResolvingFileIO for pypaimon mirroring
Java implementation (#8165)
f3c18a85e9 is described below
commit f3c18a85e9a4ce1bb8fa2944ebefc1ca5a808bc5
Author: zhoulii <[email protected]>
AuthorDate: Mon Jun 8 19:41:19 2026 +0800
[python] Introduce ResolvingFileIO for pypaimon mirroring Java
implementation (#8165)
---
paimon-python/pypaimon/common/file_io.py | 12 +-
paimon-python/pypaimon/common/options/config.py | 10 ++
.../pypaimon/filesystem/resolving_file_io.py | 143 ++++++++++++++++
.../pypaimon/tests/resolving_file_io_test.py | 180 +++++++++++++++++++++
4 files changed, 342 insertions(+), 3 deletions(-)
diff --git a/paimon-python/pypaimon/common/file_io.py
b/paimon-python/pypaimon/common/file_io.py
index fabda00495..9f35140108 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/common/file_io.py
@@ -296,6 +296,13 @@ class FileIO(ABC):
import os as _os
from urllib.parse import urlparse
+ from pypaimon.common.options.config import CatalogOptions
+
+ opts = catalog_options or Options({})
+ if opts.get(CatalogOptions.RESOLVING_FILE_IO_ENABLED):
+ from pypaimon.filesystem.resolving_file_io import ResolvingFileIO
+ return ResolvingFileIO(opts)
+
uri = urlparse(path)
scheme = uri.scheme
@@ -303,8 +310,6 @@ class FileIO(ABC):
from pypaimon.filesystem.local_file_io import LocalFileIO
return LocalFileIO(path, catalog_options)
- opts = catalog_options or Options({})
-
if scheme in ("hdfs", "viewfs"):
from pypaimon.common.options.config import HdfsOptions
impl_source = "hdfs.client.impl option"
@@ -321,7 +326,8 @@ class FileIO(ABC):
impl = impl_value.lower()
if impl == "native":
try:
- from pypaimon.filesystem.hdfs_native_file_io import
HdfsNativeFileIO
+ from pypaimon.filesystem.hdfs_native_file_io import \
+ HdfsNativeFileIO
return HdfsNativeFileIO(path, opts)
except (ImportError, RuntimeError) as e:
fallback =
opts.get(HdfsOptions.HDFS_CLIENT_FALLBACK_TO_PYARROW)
diff --git a/paimon-python/pypaimon/common/options/config.py
b/paimon-python/pypaimon/common/options/config.py
index 0004cb896a..604d62911a 100644
--- a/paimon-python/pypaimon/common/options/config.py
+++ b/paimon-python/pypaimon/common/options/config.py
@@ -108,6 +108,16 @@ class CatalogOptions:
"header.HTTP_USER_AGENT").string_type().no_default_value().with_description("HTTP
User Agent header")
SYNC_ALL_PROPERTIES =
ConfigOptions.key("sync-all-properties").boolean_type().default_value(True).with_description(
"Sync all table properties to the catalog metastore")
+ RESOLVING_FILE_IO_ENABLED = (
+ ConfigOptions.key("resolving-file-io.enabled")
+ .boolean_type()
+ .default_value(False)
+ .with_description(
+ "Whether to enable resolving file IO. When enabled, Paimon
dynamically "
+ "selects the appropriate FileIO based on the URI scheme of the
given path, "
+ "allowing read/write to external storage paths such as OSS or S3."
+ )
+ )
BLOB_FILE_IO_DEFAULT_CACHE_SIZE = 2 ** 31 - 1
diff --git a/paimon-python/pypaimon/filesystem/resolving_file_io.py
b/paimon-python/pypaimon/filesystem/resolving_file_io.py
new file mode 100644
index 0000000000..576d160c81
--- /dev/null
+++ b/paimon-python/pypaimon/filesystem/resolving_file_io.py
@@ -0,0 +1,143 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from collections import defaultdict
+from typing import Dict, List
+from urllib.parse import urlparse
+
+from pypaimon.common.file_io import FileIO
+from pypaimon.common.options import Options
+from pypaimon.common.options.config import CatalogOptions
+
+
+class ResolvingFileIO(FileIO):
+ """A FileIO that dynamically selects the appropriate FileIO based on the
+ URI scheme of the given path. Caches FileIO instances by (scheme,
authority)
+ to avoid repeated creation.
+
+ This is the Python equivalent of Java's ``ResolvingFileIO``.
+ """
+
+ def __init__(self, catalog_options: Options):
+ opts_map = dict(catalog_options.to_map())
+ opts_map.pop(CatalogOptions.RESOLVING_FILE_IO_ENABLED.key(), None)
+ self._options = Options(opts_map)
+ self._fileio_cache: Dict[tuple, FileIO] = {}
+
+ def _cache_key(self, path: str) -> tuple:
+ uri = urlparse(path)
+ return (uri.scheme or 'file', uri.netloc or '')
+
+ def _get_fileio(self, path: str) -> FileIO:
+ cache_key = self._cache_key(path)
+ fileio = self._fileio_cache.get(cache_key)
+ if fileio is None:
+ fileio = FileIO.get(path, self._options)
+ self._fileio_cache[cache_key] = fileio
+ return fileio
+
+ @property
+ def properties(self):
+ return self._options
+
+ def is_object_store(self) -> bool:
+ """Check if the warehouse path points to an object store (not local or
HDFS)."""
+ warehouse = self._options.to_map().get(CatalogOptions.WAREHOUSE.key())
+ if not warehouse:
+ return False
+ uri = urlparse(warehouse)
+ scheme = (uri.scheme or '').lower()
+ return scheme != '' and scheme != 'file' and scheme != 'hdfs'
+
+ def new_input_stream(self, path: str):
+ return self._get_fileio(path).new_input_stream(path)
+
+ def new_output_stream(self, path: str):
+ return self._get_fileio(path).new_output_stream(path)
+
+ def get_file_status(self, path: str):
+ return self._get_fileio(path).get_file_status(path)
+
+ def list_status(self, path: str):
+ return self._get_fileio(path).list_status(path)
+
+ def exists(self, path: str) -> bool:
+ return self._get_fileio(path).exists(path)
+
+ def exists_batch(self, paths: List[str]) -> Dict[str, bool]:
+ groups: Dict[tuple, List[str]] = defaultdict(list)
+ for path in paths:
+ groups[self._cache_key(path)].append(path)
+ result = {}
+ for key, group_paths in groups.items():
+ fio = self._get_fileio(group_paths[0])
+ result.update(fio.exists_batch(group_paths))
+ return result
+
+ def delete(self, path: str, recursive: bool = False) -> bool:
+ return self._get_fileio(path).delete(path, recursive)
+
+ def mkdirs(self, path: str) -> bool:
+ return self._get_fileio(path).mkdirs(path)
+
+ def rename(self, src: str, dst: str) -> bool:
+ return self._get_fileio(src).rename(src, dst)
+
+ def get_file_size(self, path: str) -> int:
+ return self._get_fileio(path).get_file_size(path)
+
+ def is_dir(self, path: str) -> bool:
+ return self._get_fileio(path).is_dir(path)
+
+ def to_filesystem_path(self, path: str) -> str:
+ return self._get_fileio(path).to_filesystem_path(path)
+
+ def write_parquet(self, path: str, data, compression: str = 'zstd',
+ zstd_level: int = 1, **kwargs):
+ return self._get_fileio(path).write_parquet(path, data, compression,
+ zstd_level, **kwargs)
+
+ def write_orc(self, path: str, data, compression: str = 'zstd',
+ zstd_level: int = 1, **kwargs):
+ return self._get_fileio(path).write_orc(path, data, compression,
+ zstd_level, **kwargs)
+
+ def write_avro(self, path: str, data, avro_schema=None,
+ compression: str = 'zstd', zstd_level: int = 1, **kwargs):
+ return self._get_fileio(path).write_avro(path, data, avro_schema,
+ compression, zstd_level,
**kwargs)
+
+ def write_lance(self, path: str, data, **kwargs):
+ return self._get_fileio(path).write_lance(path, data, **kwargs)
+
+ def write_blob(self, path: str, data, **kwargs):
+ return self._get_fileio(path).write_blob(path, data, **kwargs)
+
+ def write_mosaic(self, path: str, data, **kwargs):
+ return self._get_fileio(path).write_mosaic(path, data, **kwargs)
+
+ def write_vortex(self, path: str, data, **kwargs):
+ return self._get_fileio(path).write_vortex(path, data, **kwargs)
+
+ def write_row(self, path: str, data, fields=None, zstd_level: int = 1,
**kwargs):
+ return self._get_fileio(path).write_row(path, data, fields,
+ zstd_level, **kwargs)
+
+ def close(self):
+ for fileio in self._fileio_cache.values():
+ fileio.close()
+ self._fileio_cache.clear()
diff --git a/paimon-python/pypaimon/tests/resolving_file_io_test.py
b/paimon-python/pypaimon/tests/resolving_file_io_test.py
new file mode 100644
index 0000000000..c3dec4ebe5
--- /dev/null
+++ b/paimon-python/pypaimon/tests/resolving_file_io_test.py
@@ -0,0 +1,180 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+import shutil
+import tempfile
+import unittest
+
+from pypaimon.common.file_io import FileIO
+from pypaimon.common.options import Options
+from pypaimon.common.options.config import CatalogOptions
+from pypaimon.filesystem.local_file_io import LocalFileIO
+from pypaimon.filesystem.resolving_file_io import ResolvingFileIO
+
+
+class ResolvingFileIOTest(unittest.TestCase):
+ """Tests for ResolvingFileIO."""
+
+ def test_fileio_get_returns_resolving_when_enabled(self):
+ opts = Options({CatalogOptions.RESOLVING_FILE_IO_ENABLED.key():
'true'})
+ file_io = FileIO.get("/tmp/test", opts)
+ self.assertIsInstance(file_io, ResolvingFileIO)
+
+ def test_fileio_get_returns_local_when_disabled(self):
+ opts = Options({CatalogOptions.RESOLVING_FILE_IO_ENABLED.key():
'false'})
+ file_io = FileIO.get("/tmp/test", opts)
+ self.assertIsInstance(file_io, LocalFileIO)
+
+ def test_fileio_get_returns_local_when_not_set(self):
+ file_io = FileIO.get("/tmp/test", Options({}))
+ self.assertIsInstance(file_io, LocalFileIO)
+
+ def test_no_recursion(self):
+ """ResolvingFileIO should not create another ResolvingFileIO
internally."""
+ opts = Options({CatalogOptions.RESOLVING_FILE_IO_ENABLED.key():
'true'})
+ resolving = ResolvingFileIO(opts)
+ inner = resolving._get_fileio("/tmp/test")
+ self.assertNotIsInstance(inner, ResolvingFileIO)
+ self.assertIsInstance(inner, LocalFileIO)
+
+ def test_cache_hit_same_scheme_authority(self):
+ resolving = ResolvingFileIO(Options({}))
+ fio1 = resolving._get_fileio("/tmp/test/a.txt")
+ fio2 = resolving._get_fileio("/tmp/test/b.txt")
+ self.assertIs(fio1, fio2)
+
+ def test_cache_key_uses_scheme_and_authority(self):
+ resolving = ResolvingFileIO(Options({}))
+ fio_local = resolving._get_fileio("/tmp/test")
+ fio_file = resolving._get_fileio("file:///tmp/test2")
+ self.assertIsInstance(fio_local, LocalFileIO)
+ self.assertIsInstance(fio_file, LocalFileIO)
+
+ def test_is_object_store_with_oss_warehouse(self):
+ opts = Options({CatalogOptions.WAREHOUSE.key():
'oss://bucket/warehouse'})
+ resolving = ResolvingFileIO(opts)
+ self.assertTrue(resolving.is_object_store())
+
+ def test_is_object_store_with_s3_warehouse(self):
+ opts = Options({CatalogOptions.WAREHOUSE.key():
's3://bucket/warehouse'})
+ resolving = ResolvingFileIO(opts)
+ self.assertTrue(resolving.is_object_store())
+
+ def test_is_object_store_with_hdfs_warehouse(self):
+ opts = Options({CatalogOptions.WAREHOUSE.key():
'hdfs://cluster/warehouse'})
+ resolving = ResolvingFileIO(opts)
+ self.assertFalse(resolving.is_object_store())
+
+ def test_is_object_store_with_local_warehouse(self):
+ opts = Options({CatalogOptions.WAREHOUSE.key():
'file:///tmp/warehouse'})
+ resolving = ResolvingFileIO(opts)
+ self.assertFalse(resolving.is_object_store())
+
+
+class ResolvingFileIOReadWriteTest(unittest.TestCase):
+ """End-to-end read/write tests using ResolvingFileIO with local
filesystem."""
+
+ def setUp(self):
+ self.temp_dir = tempfile.mkdtemp()
+ self.resolving = ResolvingFileIO(Options({}))
+
+ def tearDown(self):
+ self.resolving.close()
+ shutil.rmtree(self.temp_dir, ignore_errors=True)
+
+ def test_write_and_read(self):
+ path = os.path.join(self.temp_dir, "test.txt")
+ content = "hello resolving fileio"
+ self.resolving.write_file(path, content)
+ result = self.resolving.read_file_utf8(path)
+ self.assertEqual(result, content)
+
+ def test_exists(self):
+ path = os.path.join(self.temp_dir, "test_exists.txt")
+ self.assertFalse(self.resolving.exists(path))
+ self.resolving.write_file(path, "data")
+ self.assertTrue(self.resolving.exists(path))
+
+ def test_delete(self):
+ path = os.path.join(self.temp_dir, "test_delete.txt")
+ self.resolving.write_file(path, "data")
+ self.assertTrue(self.resolving.exists(path))
+ self.assertTrue(self.resolving.delete(path))
+ self.assertFalse(self.resolving.exists(path))
+
+ def test_mkdirs(self):
+ dir_path = os.path.join(self.temp_dir, "a", "b", "c")
+ self.assertFalse(os.path.exists(dir_path))
+ self.resolving.mkdirs(dir_path)
+ self.assertTrue(os.path.isdir(dir_path))
+
+ def test_rename(self):
+ src = os.path.join(self.temp_dir, "src.txt")
+ dst = os.path.join(self.temp_dir, "dst.txt")
+ self.resolving.write_file(src, "move me")
+ self.assertTrue(self.resolving.rename(src, dst))
+ self.assertFalse(self.resolving.exists(src))
+ self.assertEqual(self.resolving.read_file_utf8(dst), "move me")
+
+ def test_list_status(self):
+ for name in ["a.txt", "b.txt"]:
+ self.resolving.write_file(os.path.join(self.temp_dir, name), name)
+ entries = self.resolving.list_status(self.temp_dir)
+ names = sorted([os.path.basename(e.path) for e in entries])
+ self.assertEqual(names, ["a.txt", "b.txt"])
+
+ def test_get_file_status(self):
+ path = os.path.join(self.temp_dir, "status.txt")
+ self.resolving.write_file(path, "12345")
+ status = self.resolving.get_file_status(path)
+ self.assertEqual(status.size, 5)
+
+ def test_exists_batch(self):
+ p1 = os.path.join(self.temp_dir, "batch_a.txt")
+ p2 = os.path.join(self.temp_dir, "batch_b.txt")
+ p3 = os.path.join(self.temp_dir, "batch_no.txt")
+ self.resolving.write_file(p1, "a")
+ self.resolving.write_file(p2, "b")
+ result = self.resolving.exists_batch([p1, p2, p3])
+ self.assertTrue(result[p1])
+ self.assertTrue(result[p2])
+ self.assertFalse(result[p3])
+
+ def test_close_clears_cache(self):
+ self.resolving._get_fileio("/tmp/test")
+ self.assertTrue(len(self.resolving._fileio_cache) > 0)
+ self.resolving.close()
+ self.assertEqual(len(self.resolving._fileio_cache), 0)
+
+ def test_cross_path_delegation(self):
+ """Different local paths should use the same cached FileIO."""
+ dir_a = os.path.join(self.temp_dir, "a")
+ dir_b = os.path.join(self.temp_dir, "b")
+ self.resolving.mkdirs(dir_a)
+ self.resolving.mkdirs(dir_b)
+ path_a = os.path.join(dir_a, "file.txt")
+ path_b = os.path.join(dir_b, "file.txt")
+ self.resolving.write_file(path_a, "content_a")
+ self.resolving.write_file(path_b, "content_b")
+ self.assertEqual(self.resolving.read_file_utf8(path_a), "content_a")
+ self.assertEqual(self.resolving.read_file_utf8(path_b), "content_b")
+ self.assertEqual(len(self.resolving._fileio_cache), 1)
+
+
+if __name__ == '__main__':
+ unittest.main()