This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f3c18a85e9 [python] Introduce ResolvingFileIO for pypaimon mirroring 
Java implementation (#8165)
f3c18a85e9 is described below

commit f3c18a85e9a4ce1bb8fa2944ebefc1ca5a808bc5
Author: zhoulii <[email protected]>
AuthorDate: Mon Jun 8 19:41:19 2026 +0800

    [python] Introduce ResolvingFileIO for pypaimon mirroring Java 
implementation (#8165)
---
 paimon-python/pypaimon/common/file_io.py           |  12 +-
 paimon-python/pypaimon/common/options/config.py    |  10 ++
 .../pypaimon/filesystem/resolving_file_io.py       | 143 ++++++++++++++++
 .../pypaimon/tests/resolving_file_io_test.py       | 180 +++++++++++++++++++++
 4 files changed, 342 insertions(+), 3 deletions(-)

diff --git a/paimon-python/pypaimon/common/file_io.py 
b/paimon-python/pypaimon/common/file_io.py
index fabda00495..9f35140108 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/common/file_io.py
@@ -296,6 +296,13 @@ class FileIO(ABC):
         import os as _os
         from urllib.parse import urlparse
 
+        from pypaimon.common.options.config import CatalogOptions
+
+        opts = catalog_options or Options({})
+        if opts.get(CatalogOptions.RESOLVING_FILE_IO_ENABLED):
+            from pypaimon.filesystem.resolving_file_io import ResolvingFileIO
+            return ResolvingFileIO(opts)
+
         uri = urlparse(path)
         scheme = uri.scheme
 
@@ -303,8 +310,6 @@ class FileIO(ABC):
             from pypaimon.filesystem.local_file_io import LocalFileIO
             return LocalFileIO(path, catalog_options)
 
-        opts = catalog_options or Options({})
-
         if scheme in ("hdfs", "viewfs"):
             from pypaimon.common.options.config import HdfsOptions
             impl_source = "hdfs.client.impl option"
@@ -321,7 +326,8 @@ class FileIO(ABC):
             impl = impl_value.lower()
             if impl == "native":
                 try:
-                    from pypaimon.filesystem.hdfs_native_file_io import 
HdfsNativeFileIO
+                    from pypaimon.filesystem.hdfs_native_file_io import \
+                        HdfsNativeFileIO
                     return HdfsNativeFileIO(path, opts)
                 except (ImportError, RuntimeError) as e:
                     fallback = 
opts.get(HdfsOptions.HDFS_CLIENT_FALLBACK_TO_PYARROW)
diff --git a/paimon-python/pypaimon/common/options/config.py 
b/paimon-python/pypaimon/common/options/config.py
index 0004cb896a..604d62911a 100644
--- a/paimon-python/pypaimon/common/options/config.py
+++ b/paimon-python/pypaimon/common/options/config.py
@@ -108,6 +108,16 @@ class CatalogOptions:
         
"header.HTTP_USER_AGENT").string_type().no_default_value().with_description("HTTP
 User Agent header")
     SYNC_ALL_PROPERTIES = 
ConfigOptions.key("sync-all-properties").boolean_type().default_value(True).with_description(
         "Sync all table properties to the catalog metastore")
+    RESOLVING_FILE_IO_ENABLED = (
+        ConfigOptions.key("resolving-file-io.enabled")
+        .boolean_type()
+        .default_value(False)
+        .with_description(
+            "Whether to enable resolving file IO. When enabled, Paimon 
dynamically "
+            "selects the appropriate FileIO based on the URI scheme of the 
given path, "
+            "allowing read/write to external storage paths such as OSS or S3."
+        )
+    )
     BLOB_FILE_IO_DEFAULT_CACHE_SIZE = 2 ** 31 - 1
 
 
diff --git a/paimon-python/pypaimon/filesystem/resolving_file_io.py 
b/paimon-python/pypaimon/filesystem/resolving_file_io.py
new file mode 100644
index 0000000000..576d160c81
--- /dev/null
+++ b/paimon-python/pypaimon/filesystem/resolving_file_io.py
@@ -0,0 +1,143 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from collections import defaultdict
+from typing import Dict, List
+from urllib.parse import urlparse
+
+from pypaimon.common.file_io import FileIO
+from pypaimon.common.options import Options
+from pypaimon.common.options.config import CatalogOptions
+
+
+class ResolvingFileIO(FileIO):
+    """A FileIO that dynamically selects the appropriate FileIO based on the
+    URI scheme of the given path. Caches FileIO instances by (scheme, 
authority)
+    to avoid repeated creation.
+
+    This is the Python equivalent of Java's ``ResolvingFileIO``.
+    """
+
+    def __init__(self, catalog_options: Options):
+        opts_map = dict(catalog_options.to_map())
+        opts_map.pop(CatalogOptions.RESOLVING_FILE_IO_ENABLED.key(), None)
+        self._options = Options(opts_map)
+        self._fileio_cache: Dict[tuple, FileIO] = {}
+
+    def _cache_key(self, path: str) -> tuple:
+        uri = urlparse(path)
+        return (uri.scheme or 'file', uri.netloc or '')
+
+    def _get_fileio(self, path: str) -> FileIO:
+        cache_key = self._cache_key(path)
+        fileio = self._fileio_cache.get(cache_key)
+        if fileio is None:
+            fileio = FileIO.get(path, self._options)
+            self._fileio_cache[cache_key] = fileio
+        return fileio
+
+    @property
+    def properties(self):
+        return self._options
+
+    def is_object_store(self) -> bool:
+        """Check if the warehouse path points to an object store (not local or 
HDFS)."""
+        warehouse = self._options.to_map().get(CatalogOptions.WAREHOUSE.key())
+        if not warehouse:
+            return False
+        uri = urlparse(warehouse)
+        scheme = (uri.scheme or '').lower()
+        return scheme != '' and scheme != 'file' and scheme != 'hdfs'
+
+    def new_input_stream(self, path: str):
+        return self._get_fileio(path).new_input_stream(path)
+
+    def new_output_stream(self, path: str):
+        return self._get_fileio(path).new_output_stream(path)
+
+    def get_file_status(self, path: str):
+        return self._get_fileio(path).get_file_status(path)
+
+    def list_status(self, path: str):
+        return self._get_fileio(path).list_status(path)
+
+    def exists(self, path: str) -> bool:
+        return self._get_fileio(path).exists(path)
+
+    def exists_batch(self, paths: List[str]) -> Dict[str, bool]:
+        groups: Dict[tuple, List[str]] = defaultdict(list)
+        for path in paths:
+            groups[self._cache_key(path)].append(path)
+        result = {}
+        for key, group_paths in groups.items():
+            fio = self._get_fileio(group_paths[0])
+            result.update(fio.exists_batch(group_paths))
+        return result
+
+    def delete(self, path: str, recursive: bool = False) -> bool:
+        return self._get_fileio(path).delete(path, recursive)
+
+    def mkdirs(self, path: str) -> bool:
+        return self._get_fileio(path).mkdirs(path)
+
+    def rename(self, src: str, dst: str) -> bool:
+        return self._get_fileio(src).rename(src, dst)
+
+    def get_file_size(self, path: str) -> int:
+        return self._get_fileio(path).get_file_size(path)
+
+    def is_dir(self, path: str) -> bool:
+        return self._get_fileio(path).is_dir(path)
+
+    def to_filesystem_path(self, path: str) -> str:
+        return self._get_fileio(path).to_filesystem_path(path)
+
+    def write_parquet(self, path: str, data, compression: str = 'zstd',
+                      zstd_level: int = 1, **kwargs):
+        return self._get_fileio(path).write_parquet(path, data, compression,
+                                                    zstd_level, **kwargs)
+
+    def write_orc(self, path: str, data, compression: str = 'zstd',
+                  zstd_level: int = 1, **kwargs):
+        return self._get_fileio(path).write_orc(path, data, compression,
+                                                zstd_level, **kwargs)
+
+    def write_avro(self, path: str, data, avro_schema=None,
+                   compression: str = 'zstd', zstd_level: int = 1, **kwargs):
+        return self._get_fileio(path).write_avro(path, data, avro_schema,
+                                                 compression, zstd_level, 
**kwargs)
+
+    def write_lance(self, path: str, data, **kwargs):
+        return self._get_fileio(path).write_lance(path, data, **kwargs)
+
+    def write_blob(self, path: str, data, **kwargs):
+        return self._get_fileio(path).write_blob(path, data, **kwargs)
+
+    def write_mosaic(self, path: str, data, **kwargs):
+        return self._get_fileio(path).write_mosaic(path, data, **kwargs)
+
+    def write_vortex(self, path: str, data, **kwargs):
+        return self._get_fileio(path).write_vortex(path, data, **kwargs)
+
+    def write_row(self, path: str, data, fields=None, zstd_level: int = 1, 
**kwargs):
+        return self._get_fileio(path).write_row(path, data, fields,
+                                                zstd_level, **kwargs)
+
+    def close(self):
+        for fileio in self._fileio_cache.values():
+            fileio.close()
+        self._fileio_cache.clear()
diff --git a/paimon-python/pypaimon/tests/resolving_file_io_test.py 
b/paimon-python/pypaimon/tests/resolving_file_io_test.py
new file mode 100644
index 0000000000..c3dec4ebe5
--- /dev/null
+++ b/paimon-python/pypaimon/tests/resolving_file_io_test.py
@@ -0,0 +1,180 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+import shutil
+import tempfile
+import unittest
+
+from pypaimon.common.file_io import FileIO
+from pypaimon.common.options import Options
+from pypaimon.common.options.config import CatalogOptions
+from pypaimon.filesystem.local_file_io import LocalFileIO
+from pypaimon.filesystem.resolving_file_io import ResolvingFileIO
+
+
+class ResolvingFileIOTest(unittest.TestCase):
+    """Tests for ResolvingFileIO."""
+
+    def test_fileio_get_returns_resolving_when_enabled(self):
+        opts = Options({CatalogOptions.RESOLVING_FILE_IO_ENABLED.key(): 
'true'})
+        file_io = FileIO.get("/tmp/test", opts)
+        self.assertIsInstance(file_io, ResolvingFileIO)
+
+    def test_fileio_get_returns_local_when_disabled(self):
+        opts = Options({CatalogOptions.RESOLVING_FILE_IO_ENABLED.key(): 
'false'})
+        file_io = FileIO.get("/tmp/test", opts)
+        self.assertIsInstance(file_io, LocalFileIO)
+
+    def test_fileio_get_returns_local_when_not_set(self):
+        file_io = FileIO.get("/tmp/test", Options({}))
+        self.assertIsInstance(file_io, LocalFileIO)
+
+    def test_no_recursion(self):
+        """ResolvingFileIO should not create another ResolvingFileIO 
internally."""
+        opts = Options({CatalogOptions.RESOLVING_FILE_IO_ENABLED.key(): 
'true'})
+        resolving = ResolvingFileIO(opts)
+        inner = resolving._get_fileio("/tmp/test")
+        self.assertNotIsInstance(inner, ResolvingFileIO)
+        self.assertIsInstance(inner, LocalFileIO)
+
+    def test_cache_hit_same_scheme_authority(self):
+        resolving = ResolvingFileIO(Options({}))
+        fio1 = resolving._get_fileio("/tmp/test/a.txt")
+        fio2 = resolving._get_fileio("/tmp/test/b.txt")
+        self.assertIs(fio1, fio2)
+
+    def test_cache_key_uses_scheme_and_authority(self):
+        resolving = ResolvingFileIO(Options({}))
+        fio_local = resolving._get_fileio("/tmp/test")
+        fio_file = resolving._get_fileio("file:///tmp/test2")
+        self.assertIsInstance(fio_local, LocalFileIO)
+        self.assertIsInstance(fio_file, LocalFileIO)
+
+    def test_is_object_store_with_oss_warehouse(self):
+        opts = Options({CatalogOptions.WAREHOUSE.key(): 
'oss://bucket/warehouse'})
+        resolving = ResolvingFileIO(opts)
+        self.assertTrue(resolving.is_object_store())
+
+    def test_is_object_store_with_s3_warehouse(self):
+        opts = Options({CatalogOptions.WAREHOUSE.key(): 
's3://bucket/warehouse'})
+        resolving = ResolvingFileIO(opts)
+        self.assertTrue(resolving.is_object_store())
+
+    def test_is_object_store_with_hdfs_warehouse(self):
+        opts = Options({CatalogOptions.WAREHOUSE.key(): 
'hdfs://cluster/warehouse'})
+        resolving = ResolvingFileIO(opts)
+        self.assertFalse(resolving.is_object_store())
+
+    def test_is_object_store_with_local_warehouse(self):
+        opts = Options({CatalogOptions.WAREHOUSE.key(): 
'file:///tmp/warehouse'})
+        resolving = ResolvingFileIO(opts)
+        self.assertFalse(resolving.is_object_store())
+
+
+class ResolvingFileIOReadWriteTest(unittest.TestCase):
+    """End-to-end read/write tests using ResolvingFileIO with local 
filesystem."""
+
+    def setUp(self):
+        self.temp_dir = tempfile.mkdtemp()
+        self.resolving = ResolvingFileIO(Options({}))
+
+    def tearDown(self):
+        self.resolving.close()
+        shutil.rmtree(self.temp_dir, ignore_errors=True)
+
+    def test_write_and_read(self):
+        path = os.path.join(self.temp_dir, "test.txt")
+        content = "hello resolving fileio"
+        self.resolving.write_file(path, content)
+        result = self.resolving.read_file_utf8(path)
+        self.assertEqual(result, content)
+
+    def test_exists(self):
+        path = os.path.join(self.temp_dir, "test_exists.txt")
+        self.assertFalse(self.resolving.exists(path))
+        self.resolving.write_file(path, "data")
+        self.assertTrue(self.resolving.exists(path))
+
+    def test_delete(self):
+        path = os.path.join(self.temp_dir, "test_delete.txt")
+        self.resolving.write_file(path, "data")
+        self.assertTrue(self.resolving.exists(path))
+        self.assertTrue(self.resolving.delete(path))
+        self.assertFalse(self.resolving.exists(path))
+
+    def test_mkdirs(self):
+        dir_path = os.path.join(self.temp_dir, "a", "b", "c")
+        self.assertFalse(os.path.exists(dir_path))
+        self.resolving.mkdirs(dir_path)
+        self.assertTrue(os.path.isdir(dir_path))
+
+    def test_rename(self):
+        src = os.path.join(self.temp_dir, "src.txt")
+        dst = os.path.join(self.temp_dir, "dst.txt")
+        self.resolving.write_file(src, "move me")
+        self.assertTrue(self.resolving.rename(src, dst))
+        self.assertFalse(self.resolving.exists(src))
+        self.assertEqual(self.resolving.read_file_utf8(dst), "move me")
+
+    def test_list_status(self):
+        for name in ["a.txt", "b.txt"]:
+            self.resolving.write_file(os.path.join(self.temp_dir, name), name)
+        entries = self.resolving.list_status(self.temp_dir)
+        names = sorted([os.path.basename(e.path) for e in entries])
+        self.assertEqual(names, ["a.txt", "b.txt"])
+
+    def test_get_file_status(self):
+        path = os.path.join(self.temp_dir, "status.txt")
+        self.resolving.write_file(path, "12345")
+        status = self.resolving.get_file_status(path)
+        self.assertEqual(status.size, 5)
+
+    def test_exists_batch(self):
+        p1 = os.path.join(self.temp_dir, "batch_a.txt")
+        p2 = os.path.join(self.temp_dir, "batch_b.txt")
+        p3 = os.path.join(self.temp_dir, "batch_no.txt")
+        self.resolving.write_file(p1, "a")
+        self.resolving.write_file(p2, "b")
+        result = self.resolving.exists_batch([p1, p2, p3])
+        self.assertTrue(result[p1])
+        self.assertTrue(result[p2])
+        self.assertFalse(result[p3])
+
+    def test_close_clears_cache(self):
+        self.resolving._get_fileio("/tmp/test")
+        self.assertTrue(len(self.resolving._fileio_cache) > 0)
+        self.resolving.close()
+        self.assertEqual(len(self.resolving._fileio_cache), 0)
+
+    def test_cross_path_delegation(self):
+        """Different local paths should use the same cached FileIO."""
+        dir_a = os.path.join(self.temp_dir, "a")
+        dir_b = os.path.join(self.temp_dir, "b")
+        self.resolving.mkdirs(dir_a)
+        self.resolving.mkdirs(dir_b)
+        path_a = os.path.join(dir_a, "file.txt")
+        path_b = os.path.join(dir_b, "file.txt")
+        self.resolving.write_file(path_a, "content_a")
+        self.resolving.write_file(path_b, "content_b")
+        self.assertEqual(self.resolving.read_file_utf8(path_a), "content_a")
+        self.assertEqual(self.resolving.read_file_utf8(path_b), "content_b")
+        self.assertEqual(len(self.resolving._fileio_cache), 1)
+
+
+if __name__ == '__main__':
+    unittest.main()

Reply via email to