This is an automated email from the ASF dual-hosted git repository.

jerry-024 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 4cb3f6b14d [python] PVFS supports jindo backend for OSS reads/writes 
(#7902)
4cb3f6b14d is described below

commit 4cb3f6b14d18925d23c16aab853f79c1a4be838a
Author: shyjsarah <[email protected]>
AuthorDate: Wed May 20 19:35:36 2026 -0700

    [python] PVFS supports jindo backend for OSS reads/writes (#7902)
---
 .../filesystem/jindo_file_system_handler.py        | 101 +++++--
 paimon-python/pypaimon/filesystem/pvfs.py          | 143 ++++++++--
 .../pypaimon/tests/pvfs_oss_filesystem_test.py     | 315 +++++++++++++++++++++
 paimon-python/setup.py                             |   3 +
 4 files changed, 516 insertions(+), 46 deletions(-)

diff --git a/paimon-python/pypaimon/filesystem/jindo_file_system_handler.py 
b/paimon-python/pypaimon/filesystem/jindo_file_system_handler.py
index 2ab9646ca2..efbfe7f189 100644
--- a/paimon-python/pypaimon/filesystem/jindo_file_system_handler.py
+++ b/paimon-python/pypaimon/filesystem/jindo_file_system_handler.py
@@ -22,6 +22,11 @@ from pyarrow import PythonFile
 from pyarrow._fs import FileSystemHandler
 from pyarrow.fs import FileInfo, FileSelector, FileType
 
+# `JindoFileSystemHandler` (the PyArrow FileIO path) only needs `pyjindo.fs`
+# and `pyjindo.util`. The PVFS jindo backend (`create_jindo_oss_filesystem`)
+# additionally needs `pyjindo.ossfs`. Track the two surfaces independently so
+# that a pyjindosdk build without `pyjindo.ossfs` does not silently disable
+# the previously-working PyArrow path.
 try:
     import pyjindo.fs as jfs
     import pyjindo.util as jutil
@@ -31,10 +36,83 @@ except ImportError:
     jfs = None
     jutil = None
 
+try:
+    import pyjindo.ossfs as jossfs
+    JINDO_OSSFS_AVAILABLE = True
+except ImportError:
+    jossfs = None
+    JINDO_OSSFS_AVAILABLE = False
+
 from pypaimon.common.options import Options
 from pypaimon.common.options.config import OssOptions
 
 
+def build_jindo_config(catalog_options: Options):
+    """Build a pyjindo ``Config`` from OSS catalog options.
+
+    Shared by ``JindoFileSystemHandler`` (the PyArrow FileIO path) and
+    ``create_jindo_oss_filesystem`` (the PVFS fsspec path) so both jindo entry
+    points consume exactly the same credential / endpoint options.
+    """
+    if not JINDO_AVAILABLE:
+        raise ImportError("Module pyjindo is not available. Please install 
pyjindosdk.")
+
+    config = jutil.Config()
+
+    access_key_id = catalog_options.get(OssOptions.OSS_ACCESS_KEY_ID)
+    access_key_secret = catalog_options.get(OssOptions.OSS_ACCESS_KEY_SECRET)
+    security_token = catalog_options.get(OssOptions.OSS_SECURITY_TOKEN)
+    endpoint = catalog_options.get(OssOptions.OSS_ENDPOINT)
+    region = catalog_options.get(OssOptions.OSS_REGION)
+
+    if access_key_id:
+        config.set("fs.oss.accessKeyId", access_key_id)
+    if access_key_secret:
+        config.set("fs.oss.accessKeySecret", access_key_secret)
+    if security_token:
+        config.set("fs.oss.securityToken", security_token)
+    if endpoint:
+        endpoint_clean = endpoint.replace('http://', '').replace('https://', 
'')
+        config.set("fs.oss.endpoint", endpoint_clean)
+    if region:
+        config.set("fs.oss.region", region)
+    config.set("fs.oss.user.agent.features", "pypaimon")
+    return config
+
+
+def create_jindo_oss_filesystem(root_uri: str, catalog_options: Options):
+    """Create an fsspec-compatible ``JindoOssFileSystem`` for an OSS bucket.
+
+    ``PaimonVirtualFileSystem`` uses this to back OSS reads/writes with the
+    native JindoSDK instead of ``ossfs``. JindoSDK writes objects via
+    PutObject / multipart upload, so it never issues OSS ``AppendObject`` --
+    the call that fails with ``PositionNotEqualToLength`` (409) on the OSS
+    data-acceleration endpoint when ``ossfs`` flushes a multi-chunk write.
+
+    ``root_uri`` is the bucket root, e.g. ``oss://my-bucket/``; it must carry
+    the bucket so ``JindoOssFileSystem`` can re-attach the ``oss://`` scheme to
+    the bucket-relative paths that ``PaimonVirtualFileSystem`` passes in.
+    """
+    if not (JINDO_AVAILABLE and JINDO_OSSFS_AVAILABLE):
+        raise ImportError(
+            "pyjindo.ossfs is not available. Please install 
pyjindosdk>=6.10.4."
+        )
+
+    return jossfs.JindoOssFileSystem(
+        uri=root_uri,
+        config=build_jindo_config(catalog_options),
+        # PaimonVirtualFileSystem owns directory semantics for the virtual FS;
+        # the backing object-store fs must not auto-create dir-marker objects.
+        auto_mkdir=False,
+        # Bypass fsspec's _Cached metaclass instance cache, so the only
+        # reference to this filesystem -- and to its underlying native jindo
+        # connection -- is the PaimonRealStorage cache in PVFS. On token
+        # refresh PVFS replaces that entry and the native resources can be
+        # released, instead of being pinned forever by fsspec's global cache.
+        skip_instance_cache=True,
+    )
+
+
 class JindoInputFile:
     def __init__(self, jindo_stream):
         self._stream = jindo_stream
@@ -129,28 +207,7 @@ class JindoFileSystemHandler(FileSystemHandler):
         self.root_path = root_path
         self.properties = catalog_options
 
-        # Build jindo config from catalog_options
-        config = jutil.Config()
-
-        access_key_id = catalog_options.get(OssOptions.OSS_ACCESS_KEY_ID)
-        access_key_secret = 
catalog_options.get(OssOptions.OSS_ACCESS_KEY_SECRET)
-        security_token = catalog_options.get(OssOptions.OSS_SECURITY_TOKEN)
-        endpoint = catalog_options.get(OssOptions.OSS_ENDPOINT)
-        region = catalog_options.get(OssOptions.OSS_REGION)
-
-        if access_key_id:
-            config.set("fs.oss.accessKeyId", access_key_id)
-        if access_key_secret:
-            config.set("fs.oss.accessKeySecret", access_key_secret)
-        if security_token:
-            config.set("fs.oss.securityToken", security_token)
-        if endpoint:
-            endpoint_clean = endpoint.replace('http://', 
'').replace('https://', '')
-            config.set("fs.oss.endpoint", endpoint_clean)
-        if region:
-            config.set("fs.oss.region", region)
-        config.set("fs.oss.user.agent.features", "pypaimon")
-
+        config = build_jindo_config(catalog_options)
         self._jindo_fs = jfs.connect(self.root_path, "root", config)
 
     def __eq__(self, other):
diff --git a/paimon-python/pypaimon/filesystem/pvfs.py 
b/paimon-python/pypaimon/filesystem/pvfs.py
index 5128493165..236c00481c 100644
--- a/paimon-python/pypaimon/filesystem/pvfs.py
+++ b/paimon-python/pypaimon/filesystem/pvfs.py
@@ -17,6 +17,7 @@
 
 import datetime
 import importlib
+import logging
 import time
 from abc import ABC
 from dataclasses import dataclass
@@ -35,8 +36,15 @@ from pypaimon.api.rest_api import RESTApi
 from pypaimon.common.options import Options
 from pypaimon.common.options.config import CatalogOptions, OssOptions, 
PVFSOptions
 from pypaimon.common.identifier import Identifier
+from pypaimon.filesystem.jindo_file_system_handler import (
+    JINDO_AVAILABLE,
+    JINDO_OSSFS_AVAILABLE,
+    create_jindo_oss_filesystem,
+)
 from pypaimon.schema.schema import Schema
 
+logger = logging.getLogger(__name__)
+
 PROTOCOL_NAME = "pvfs"
 
 
@@ -201,7 +209,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
             storage_location = table_path
             actual_path = pvfs_identifier.get_actual_path(storage_location)
             virtual_location = pvfs_identifier.get_virtual_location()
-            fs = self._get_filesystem(pvfs_identifier, storage_type)
+            fs = self._get_filesystem(pvfs_identifier, storage_type, 
storage_location)
             entries = fs.ls(actual_path, detail=detail, **kwargs)
             if detail:
                 virtual_entities = [
@@ -233,7 +241,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
             storage_location = table_path
             actual_path = pvfs_identifier.get_actual_path(storage_location)
             virtual_location = pvfs_identifier.get_virtual_location()
-            fs = self._get_filesystem(pvfs_identifier, storage_type)
+            fs = self._get_filesystem(pvfs_identifier, storage_type, 
storage_location)
             entry = fs.info(actual_path)
             return self._convert_actual_info(entry, storage_type, 
storage_location, virtual_location)
 
@@ -257,7 +265,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
                 storage_type = self._get_storage_type(table_path)
                 storage_location = table_path
                 actual_path = pvfs_identifier.get_actual_path(storage_location)
-                fs = self._get_filesystem(pvfs_identifier, storage_type)
+                fs = self._get_filesystem(pvfs_identifier, storage_type, 
storage_location)
                 return fs.exists(actual_path)
             except NoSuchResourceException:
                 return False
@@ -276,7 +284,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
             storage_location = table_path
             source_actual_path = source.get_actual_path(storage_location)
             target_actual_path = target.get_actual_path(storage_location)
-            fs = self._get_filesystem(source, storage_type)
+            fs = self._get_filesystem(source, storage_type, storage_location)
             fs.cp_file(
                 self._strip_storage_protocol(storage_type, source_actual_path),
                 self._strip_storage_protocol(storage_type, target_actual_path),
@@ -302,7 +310,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
                 storage_location = table_path
                 source_actual_path = source.get_actual_path(storage_location)
                 target_actual_path = target.get_actual_path(storage_location)
-                fs = self._get_filesystem(source, storage_type)
+                fs = self._get_filesystem(source, storage_type, 
storage_location)
                 if storage_type == StorageType.LOCAL:
                     fs.mv(
                         self._strip_storage_protocol(storage_type, 
source_actual_path),
@@ -343,7 +351,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
             storage_type = self._get_storage_type(table_path)
             storage_location = table_path
             actual_path = pvfs_identifier.get_actual_path(storage_location)
-            fs = self._get_filesystem(pvfs_identifier, storage_type)
+            fs = self._get_filesystem(pvfs_identifier, storage_type, 
storage_location)
             return fs.rm(
                 self._strip_storage_protocol(storage_type, actual_path),
                 recursive,
@@ -362,7 +370,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
                 storage_type = self._get_storage_type(table_path)
                 storage_location = table_path
                 actual_path = pvfs_identifier.get_actual_path(storage_location)
-                fs = self._get_filesystem(pvfs_identifier, storage_type)
+                fs = self._get_filesystem(pvfs_identifier, storage_type, 
storage_location)
                 return fs.rm_file(
                     self._strip_storage_protocol(storage_type, actual_path),
                 )
@@ -387,7 +395,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
                 storage_type = self._get_storage_type(table_path)
                 storage_location = table_path
                 actual_path = pvfs_identifier.get_actual_path(storage_location)
-                fs = self._get_filesystem(pvfs_identifier, storage_type)
+                fs = self._get_filesystem(pvfs_identifier, storage_type, 
storage_location)
                 return fs.rmdir(
                     self._strip_storage_protocol(storage_type, actual_path)
                 )
@@ -420,7 +428,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
                 storage_type = self._get_storage_type(table_path)
                 storage_location = table_path
                 actual_path = pvfs_identifier.get_actual_path(storage_location)
-                fs = self._get_filesystem(pvfs_identifier, storage_type)
+                fs = self._get_filesystem(pvfs_identifier, storage_type, 
storage_location)
                 return fs.open(
                     self._strip_storage_protocol(storage_type, actual_path),
                     mode,
@@ -464,7 +472,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
                 storage_type = self._get_storage_type(table_path)
                 storage_location = table_path
                 actual_path = pvfs_identifier.get_actual_path(storage_location)
-                fs = self._get_filesystem(pvfs_identifier, storage_type)
+                fs = self._get_filesystem(pvfs_identifier, storage_type, 
storage_location)
                 return fs.mkdir(
                     self._strip_storage_protocol(storage_type, actual_path),
                     create_parents,
@@ -503,7 +511,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
                 storage_type = self._get_storage_type(table_path)
                 storage_location = table_path
                 actual_path = pvfs_identifier.get_actual_path(storage_location)
-                fs = self._get_filesystem(pvfs_identifier, storage_type)
+                fs = self._get_filesystem(pvfs_identifier, storage_type, 
storage_location)
                 return fs.makedirs(
                     self._strip_storage_protocol(storage_type, actual_path),
                     exist_ok
@@ -524,7 +532,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
                 storage_type = self._get_storage_type(table.path)
                 storage_location = table.path
                 actual_path = pvfs_identifier.get_actual_path(storage_location)
-                fs = self._get_filesystem(pvfs_identifier, storage_type)
+                fs = self._get_filesystem(pvfs_identifier, storage_type, 
storage_location)
                 return fs.created(
                     self._strip_storage_protocol(storage_type, actual_path)
                 )
@@ -545,7 +553,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
                 storage_type = self._get_storage_type(table.path)
                 storage_location = table.path
                 actual_path = pvfs_identifier.get_actual_path(storage_location)
-                fs = self._get_filesystem(pvfs_identifier, storage_type)
+                fs = self._get_filesystem(pvfs_identifier, storage_type, 
storage_location)
                 return fs.modified(
                     self._strip_storage_protocol(storage_type, actual_path)
                 )
@@ -571,7 +579,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
                 storage_type = self._get_storage_type(table.path)
                 storage_location = table.path
                 actual_path = pvfs_identifier.get_actual_path(storage_location)
-                fs = self._get_filesystem(pvfs_identifier, storage_type)
+                fs = self._get_filesystem(pvfs_identifier, storage_type, 
storage_location)
                 return fs.cat_file(
                     self._strip_storage_protocol(storage_type, actual_path),
                     start,
@@ -600,7 +608,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
                 storage_type = self._get_storage_type(table.path)
                 storage_location = table.path
                 actual_path = pvfs_identifier.get_actual_path(storage_location)
-                fs = self._get_filesystem(pvfs_identifier, storage_type)
+                fs = self._get_filesystem(pvfs_identifier, storage_type, 
storage_location)
                 return fs.get_file(
                     self._strip_storage_protocol(storage_type, actual_path),
                     lpath,
@@ -622,15 +630,19 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
     def __converse_ts_to_datatime(ts: int):
         return datetime.datetime.fromtimestamp(ts / 1000, 
tz=datetime.timezone.utc)
 
-    @staticmethod
-    def _strip_storage_protocol(storage_type: StorageType, path: str):
+    def _strip_storage_protocol(self, storage_type: StorageType, path: str):
         if storage_type == StorageType.LOCAL:
             return path[len("{}:".format(StorageType.LOCAL.value)):]
 
-        # OSS has different behavior than S3 and GCS, if we do not remove the
-        # protocol, it will always return an empty array.
+        # The two OSS backends want opposite path forms. The legacy ossfs
+        # backend mishandles an oss://-prefixed path (older versions return an
+        # empty array from ls), so the scheme is stripped for it. The jindo
+        # backend (pyjindo) requires the oss:// scheme and crashes without it,
+        # so the path is passed through unchanged.
         if storage_type == StorageType.OSS:
             if path.startswith("{}://".format(StorageType.OSS.value)):
+                if self._use_jindo_oss_backend():
+                    return path
                 return path[len("{}://".format(StorageType.OSS.value)):]
             return path
 
@@ -805,7 +817,8 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
         finally:
             write_lock.release()
 
-    def _get_filesystem(self, pvfs_table_identifier: PVFSTableIdentifier, 
storage_type: StorageType) -> 'FileSystem':
+    def _get_filesystem(self, pvfs_table_identifier: PVFSTableIdentifier, 
storage_type: StorageType,
+                        storage_location: str) -> 'FileSystem':
         read_lock = self._fs_cache_lock.gen_rlock()
         try:
             read_lock.acquire()
@@ -817,6 +830,12 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
         finally:
             read_lock.release()
 
+        # The previous filesystem (if any) is closed after the write lock is
+        # released. For the jindo backend, fs.close() tears down a native
+        # JindoSDK connection, which can block on flushing buffers and
+        # terminating worker threads; holding the write lock through that
+        # would stall every other OSS filesystem rebuild.
+        stale_fs = None
         write_lock = self._table_cache_lock.gen_wlock()
         try:
             write_lock.acquire()
@@ -827,16 +846,19 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
                 fs = LocalFileSystem()
             elif storage_type == StorageType.OSS:
                 rest_api = self.__rest_api(pvfs_table_identifier)
-                load_token_response: GetTableTokenResponse = 
rest_api.load_table_token(
-                    Identifier.create(pvfs_table_identifier.database, 
pvfs_table_identifier.table))
+                table_identifier = Identifier.create(
+                    pvfs_table_identifier.database, 
pvfs_table_identifier.table)
+                load_token_response: GetTableTokenResponse = 
rest_api.load_table_token(table_identifier)
                 merged_token = 
self._merge_token_with_catalog_options(load_token_response.token)
-                fs = self._get_oss_filesystem(Options(merged_token))
+                fs = self._get_oss_filesystem(Options(merged_token), 
storage_location)
                 paimon_real_storage = PaimonRealStorage(
                     token=load_token_response.token,
                     expires_at_millis=load_token_response.expires_at_millis,
                     file_system=fs
                 )
                 self._fs_cache[pvfs_table_identifier] = paimon_real_storage
+                if cache_value is not None:
+                    stale_fs = cache_value.file_system
             else:
                 raise Exception(
                     "Storage type: `{}` doesn't support 
now.".format(storage_type)
@@ -844,6 +866,8 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
             return fs
         finally:
             write_lock.release()
+            if stale_fs is not None:
+                PaimonVirtualFileSystem._close_filesystem_quietly(stale_fs)
 
     def _merge_token_with_catalog_options(self, token: dict) -> dict:
         """Merge token with catalog options, DLF OSS endpoint should override 
the standard OSS endpoint."""
@@ -863,8 +887,79 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
             "Storage type doesn't support now. Path:{}".format(path)
         )
 
+    def _get_oss_filesystem(self, options: Options, storage_location: str) -> 
AbstractFileSystem:
+        """Build the fsspec filesystem backing OSS reads/writes.
+
+        Honors ``fs.oss.impl`` (the same option PyArrowFileIO uses): ``jindo``
+        backs OSS with the native JindoSDK (PutObject / multipart upload),
+        ``legacy`` backs it with ``ossfs``. ``jindo`` is the default; when
+        pyjindosdk is not installed it falls back to ``ossfs`` -- consistent
+        with PyArrowFileIO.
+
+        ``ossfs`` writes every object through OSS ``AppendObject``, which can
+        fail with ``PositionNotEqualToLength`` (409) on the OSS 
data-acceleration
+        endpoint for multi-chunk writes; the jindo backend avoids that path.
+        """
+        oss_impl = self.options.get(OssOptions.OSS_IMPL)
+        if oss_impl not in ("jindo", "legacy"):
+            raise Exception(
+                "Unsupported fs.oss.impl value: '{}'. "
+                "Supported values are 'jindo' and 'legacy'.".format(oss_impl)
+            )
+        if self._use_jindo_oss_backend():
+            bucket = 
PaimonVirtualFileSystem._extract_oss_bucket(storage_location)
+            return create_jindo_oss_filesystem("oss://{}/".format(bucket), 
options)
+        if oss_impl == "jindo":
+            logger.warning(
+                "fs.oss.impl is 'jindo' but pyjindosdk is not installed. "
+                "Falling back to the ossfs (OSS AppendObject) implementation. "
+                "Install pyjindosdk for native multipart upload: pip install 
pyjindosdk"
+            )
+        return PaimonVirtualFileSystem._get_ossfs_filesystem(options)
+
+    def _use_jindo_oss_backend(self) -> bool:
+        """Whether OSS access uses the native jindo backend rather than ossfs.
+
+        Decided per filesystem instance from ``fs.oss.impl`` plus the
+        availability of the two pyjindosdk surfaces this path requires
+        (``pyjindo.fs``/``pyjindo.util`` for credential config and
+        ``pyjindo.ossfs`` for the fsspec backend). This is the single
+        condition both ``_get_oss_filesystem`` and ``_strip_storage_protocol``
+        rely on, so the backend choice and the path form handed to it cannot
+        drift apart.
+        """
+        return (
+            self.options.get(OssOptions.OSS_IMPL) == "jindo"
+            and JINDO_AVAILABLE
+            and JINDO_OSSFS_AVAILABLE
+        )
+
+    @staticmethod
+    def _close_filesystem_quietly(fs) -> None:
+        """Best-effort release of a filesystem that is being evicted from 
cache."""
+        close = getattr(fs, "close", None)
+        if not callable(close):
+            return
+        try:
+            close()
+        except Exception:
+            logger.warning(
+                "ignoring error while closing stale OSS filesystem",
+                exc_info=True,
+            )
+
+    @staticmethod
+    def _extract_oss_bucket(oss_path: str) -> str:
+        scheme = "{}://".format(StorageType.OSS.value)
+        if not oss_path.startswith(scheme):
+            raise Exception("Invalid OSS path: {}".format(oss_path))
+        bucket = oss_path[len(scheme):].split("/", 1)[0]
+        if not bucket:
+            raise Exception("Invalid OSS path without bucket: 
{}".format(oss_path))
+        return bucket
+
     @staticmethod
-    def _get_oss_filesystem(options: Options) -> AbstractFileSystem:
+    def _get_ossfs_filesystem(options: Options) -> AbstractFileSystem:
         access_key_id = options.get(OssOptions.OSS_ACCESS_KEY_ID)
         if access_key_id is None:
             raise Exception(
diff --git a/paimon-python/pypaimon/tests/pvfs_oss_filesystem_test.py 
b/paimon-python/pypaimon/tests/pvfs_oss_filesystem_test.py
new file mode 100644
index 0000000000..d35a851299
--- /dev/null
+++ b/paimon-python/pypaimon/tests/pvfs_oss_filesystem_test.py
@@ -0,0 +1,315 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Unit tests for OSS backend selection in PaimonVirtualFileSystem.
+
+These run in CI with no external dependencies -- DLF, OSS and pyjindosdk are
+all stubbed. They cover the ``fs.oss.impl`` dispatch and the 
``_get_filesystem``
+wiring that routes OSS reads/writes through the jindo or ossfs backend. The
+end-to-end behavior of those backends is covered separately by the (DLF-gated)
+``pvfs_oss_backend_alignment_test``.
+"""
+
+import unittest
+from unittest import mock
+
+from pypaimon.common.options import Options
+from pypaimon.common.options.config import OssOptions
+from pypaimon.filesystem import pvfs as pvfs_module
+from pypaimon.filesystem.pvfs import (
+    PaimonRealStorage,
+    PaimonVirtualFileSystem,
+    PVFSTableIdentifier,
+    StorageType,
+)
+
+
+def _make_pvfs(extra_options=None):
+    options = {OssOptions.OSS_ACCESS_KEY_ID.key(): "ak"}
+    if extra_options:
+        options.update(extra_options)
+    # skip_instance_cache so each test gets a fresh PVFS (fsspec's _Cached
+    # metaclass would otherwise share _fs_cache state across tests).
+    return PaimonVirtualFileSystem(options, skip_instance_cache=True)
+
+
+class ExtractOssBucketTest(unittest.TestCase):
+
+    def test_extracts_bucket_from_oss_path(self):
+        self.assertEqual(
+            
PaimonVirtualFileSystem._extract_oss_bucket("oss://my-bucket/wh/db/tbl"),
+            "my-bucket")
+
+    def test_extracts_bucket_from_bucket_root(self):
+        self.assertEqual(
+            PaimonVirtualFileSystem._extract_oss_bucket("oss://my-bucket/"),
+            "my-bucket")
+
+    def test_rejects_non_oss_path(self):
+        with self.assertRaises(Exception):
+            PaimonVirtualFileSystem._extract_oss_bucket("s3://my-bucket/key")
+
+    def test_rejects_path_without_bucket(self):
+        with self.assertRaises(Exception):
+            PaimonVirtualFileSystem._extract_oss_bucket("oss:///key")
+
+
+class GetOssFilesystemDispatchTest(unittest.TestCase):
+    """fs.oss.impl selects the backend; jindo falls back to ossfs when 
absent."""
+
+    STORAGE_LOCATION = "oss://my-bucket/warehouse/db/tbl"
+
+    def setUp(self):
+        # Table-scoped OSS credentials handed to the backend builder.
+        self.token_options = Options({
+            OssOptions.OSS_ACCESS_KEY_ID.key(): "tk-ak",
+            OssOptions.OSS_ACCESS_KEY_SECRET.key(): "tk-sk",
+            OssOptions.OSS_ENDPOINT.key(): "oss-cn-hangzhou.aliyuncs.com",
+        })
+
+    def _dispatch(self, oss_impl, jindo_ossfs_available):
+        extra = {OssOptions.OSS_IMPL.key(): oss_impl} if oss_impl is not None 
else None
+        pvfs = _make_pvfs(extra)
+        ossfs_sentinel = object()
+        jindo_calls = []
+
+        def fake_create_jindo(root_uri, options):
+            jindo_calls.append((root_uri, options))
+            return "JINDO_FS"
+
+        # Patch both surfaces. JINDO_AVAILABLE is held True so the test
+        # behaves the same whether or not pyjindosdk is installed in the
+        # CI image; backend selection is then driven solely by
+        # JINDO_OSSFS_AVAILABLE -- the surface the PVFS jindo backend
+        # actually needs.
+        with mock.patch.object(pvfs_module, "JINDO_AVAILABLE", True), \
+             mock.patch.object(pvfs_module, "JINDO_OSSFS_AVAILABLE", 
jindo_ossfs_available), \
+             mock.patch.object(pvfs_module, "create_jindo_oss_filesystem", 
fake_create_jindo), \
+             mock.patch.object(PaimonVirtualFileSystem, 
"_get_ossfs_filesystem",
+                               staticmethod(lambda options: ossfs_sentinel)):
+            fs = pvfs._get_oss_filesystem(self.token_options, 
self.STORAGE_LOCATION)
+        return fs, ossfs_sentinel, jindo_calls
+
+    def test_legacy_uses_ossfs(self):
+        fs, ossfs_sentinel, jindo_calls = self._dispatch("legacy", 
jindo_ossfs_available=True)
+        self.assertIs(fs, ossfs_sentinel)
+        self.assertEqual(jindo_calls, [])
+
+    def test_jindo_uses_jindo_when_available(self):
+        fs, _, jindo_calls = self._dispatch("jindo", 
jindo_ossfs_available=True)
+        self.assertEqual(fs, "JINDO_FS")
+        self.assertEqual(len(jindo_calls), 1)
+        root_uri, options = jindo_calls[0]
+        self.assertEqual(root_uri, "oss://my-bucket/")
+        self.assertIs(options, self.token_options)
+
+    def test_default_impl_is_jindo(self):
+        # No fs.oss.impl set -> OssOptions.OSS_IMPL default value ("jindo").
+        fs, _, jindo_calls = self._dispatch(None, jindo_ossfs_available=True)
+        self.assertEqual(fs, "JINDO_FS")
+        self.assertEqual(len(jindo_calls), 1)
+
+    def test_jindo_falls_back_to_ossfs_when_pyjindo_ossfs_missing(self):
+        # fs.oss.impl=jindo but pyjindo.ossfs not importable (e.g. an older
+        # pyjindosdk build that ships only fs/util). PyArrow jindo path stays
+        # available; PVFS jindo backend falls back to ossfs.
+        fs, ossfs_sentinel, jindo_calls = self._dispatch("jindo", 
jindo_ossfs_available=False)
+        self.assertIs(fs, ossfs_sentinel)
+        self.assertEqual(jindo_calls, [])
+
+    def test_invalid_impl_raises(self):
+        with self.assertRaises(Exception) as ctx:
+            self._dispatch("garbage", jindo_ossfs_available=True)
+        self.assertIn("Unsupported fs.oss.impl", str(ctx.exception))
+
+
+class GetFilesystemOssWiringTest(unittest.TestCase):
+    """_get_filesystem must forward the caller-supplied storage_location into
+    the OSS backend factory. Callers always have the table path in hand (from
+    _get_table_store) by the time they reach _get_filesystem, so threading it
+    through avoids a redundant REST round-trip inside the write critical
+    section."""
+
+    def test_oss_branch_forwards_caller_storage_location(self):
+        pvfs = _make_pvfs()
+        identifier = PVFSTableIdentifier(
+            catalog="cat", endpoint="http://rest";, database="db", table="tbl")
+
+        class FakeTokenResponse:
+            token = {
+                OssOptions.OSS_ACCESS_KEY_ID.key(): "tk-ak",
+                OssOptions.OSS_ACCESS_KEY_SECRET.key(): "tk-sk",
+                OssOptions.OSS_ENDPOINT.key(): "oss-cn-hangzhou.aliyuncs.com",
+            }
+            expires_at_millis = None
+
+        class FakeRestApi:
+            def load_table_token(self, ident):
+                return FakeTokenResponse()
+
+        captured = {}
+
+        def fake_get_oss_filesystem(self, options, storage_location):
+            captured["options"] = options
+            captured["storage_location"] = storage_location
+            return "FS"
+
+        with mock.patch.object(PaimonVirtualFileSystem,
+                               "_PaimonVirtualFileSystem__rest_api",
+                               lambda self, ident: FakeRestApi()), \
+             mock.patch.object(PaimonVirtualFileSystem, "_get_oss_filesystem",
+                               fake_get_oss_filesystem):
+            fs = pvfs._get_filesystem(
+                identifier, StorageType.OSS, 
"oss://wired-bucket/warehouse/db/tbl")
+
+        self.assertEqual(fs, "FS")
+        self.assertEqual(captured["storage_location"], 
"oss://wired-bucket/warehouse/db/tbl")
+        self.assertEqual(
+            
PaimonVirtualFileSystem._extract_oss_bucket(captured["storage_location"]),
+            "wired-bucket")
+        
self.assertEqual(captured["options"].get(OssOptions.OSS_ACCESS_KEY_ID), "tk-ak")
+
+
+class StripStorageProtocolTest(unittest.TestCase):
+    """OSS path form depends on the backend: jindo keeps oss://, ossfs strips 
it."""
+
+    def _strip(self, oss_impl, jindo_ossfs_available, path):
+        pvfs = _make_pvfs({OssOptions.OSS_IMPL.key(): oss_impl})
+        with mock.patch.object(pvfs_module, "JINDO_AVAILABLE", True), \
+             mock.patch.object(pvfs_module, "JINDO_OSSFS_AVAILABLE", 
jindo_ossfs_available):
+            return pvfs._strip_storage_protocol(StorageType.OSS, path)
+
+    def test_jindo_backend_keeps_oss_scheme(self):
+        self.assertEqual(
+            self._strip("jindo", True, "oss://b/db/tbl/f.bin"),
+            "oss://b/db/tbl/f.bin")
+
+    def test_legacy_backend_strips_oss_scheme(self):
+        self.assertEqual(
+            self._strip("legacy", True, "oss://b/db/tbl/f.bin"),
+            "b/db/tbl/f.bin")
+
+    def test_jindo_unavailable_falls_back_to_stripping(self):
+        # fs.oss.impl=jindo but pyjindo.ossfs missing -> ossfs backend -> 
strip.
+        self.assertEqual(
+            self._strip("jindo", False, "oss://b/db/tbl/f.bin"),
+            "b/db/tbl/f.bin")
+
+
+class CloseStaleFilesystemOnRefreshTest(unittest.TestCase):
+    """When _get_filesystem rebuilds a cached fs (token near expiry), the old
+    filesystem must be released. For the jindo backend this is the only way
+    the underlying native JindoSDK connection gets reclaimed. The close must
+    also happen outside _table_cache_lock -- JindoSDK close() can block on
+    native teardown, and holding the write lock through it would stall every
+    other OSS rebuild."""
+
+    STORAGE_LOCATION = "oss://b/wh/db/tbl"
+
+    @staticmethod
+    def _stub_oss_rebuild():
+        identifier = PVFSTableIdentifier(
+            catalog="cat", endpoint="http://rest";, database="db", table="tbl")
+
+        class FakeTokenResponse:
+            token = {
+                OssOptions.OSS_ACCESS_KEY_ID.key(): "tk-ak",
+                OssOptions.OSS_ACCESS_KEY_SECRET.key(): "tk-sk",
+                OssOptions.OSS_ENDPOINT.key(): "oss-cn-hangzhou.aliyuncs.com",
+            }
+            expires_at_millis = None
+
+        class FakeRestApi:
+            def load_table_token(self, ident):
+                return FakeTokenResponse()
+
+        return identifier, mock.patch.object(
+            PaimonVirtualFileSystem,
+            "_PaimonVirtualFileSystem__rest_api",
+            lambda self, ident: FakeRestApi(),
+        ), mock.patch.object(
+            PaimonVirtualFileSystem,
+            "_get_oss_filesystem",
+            lambda self, opts, loc: "NEW_FS",
+        )
+
+    def test_old_filesystem_close_called_when_token_expires(self):
+        pvfs = _make_pvfs()
+        identifier, patch_rest, patch_build = self._stub_oss_rebuild()
+
+        close_log = []
+
+        class StaleFs:
+            def close(self_inner):
+                close_log.append("closed")
+
+        # expires_at_millis=0 -> need_refresh() is True -> _get_filesystem 
rebuilds.
+        pvfs._fs_cache[identifier] = PaimonRealStorage(
+            token={}, expires_at_millis=0, file_system=StaleFs())
+
+        with patch_rest, patch_build:
+            new_fs = pvfs._get_filesystem(identifier, StorageType.OSS, 
self.STORAGE_LOCATION)
+
+        self.assertEqual(new_fs, "NEW_FS")
+        self.assertEqual(close_log, ["closed"],
+                         "stale filesystem must be closed on token refresh")
+
+    def test_first_build_does_not_invoke_close(self):
+        pvfs = _make_pvfs()
+        identifier, patch_rest, patch_build = self._stub_oss_rebuild()
+
+        with patch_rest, patch_build:
+            new_fs = pvfs._get_filesystem(identifier, StorageType.OSS, 
self.STORAGE_LOCATION)
+
+        # No prior cache entry -> nothing to close, must not raise.
+        self.assertEqual(new_fs, "NEW_FS")
+
+    def test_close_runs_after_write_lock_released(self):
+        # Regression guard: close() of the stale fs runs after the write
+        # lock is released. If close were still inside the critical section,
+        # attempting to acquire the same non-reentrant write lock from
+        # within close would fail (blocking=False -> False).
+        pvfs = _make_pvfs()
+        identifier, patch_rest, patch_build = self._stub_oss_rebuild()
+        lock_was_free_during_close = []
+
+        class StaleFs:
+            def close(self_inner):
+                probe = pvfs._table_cache_lock.gen_wlock()
+                got = probe.acquire(blocking=False)
+                lock_was_free_during_close.append(got)
+                if got:
+                    probe.release()
+
+        pvfs._fs_cache[identifier] = PaimonRealStorage(
+            token={}, expires_at_millis=0, file_system=StaleFs())
+
+        with patch_rest, patch_build:
+            pvfs._get_filesystem(identifier, StorageType.OSS, 
self.STORAGE_LOCATION)
+
+        self.assertEqual(lock_was_free_during_close, [True],
+                         "stale fs close must run after _table_cache_lock is 
released")
+
+    def test_close_without_close_method_is_no_op(self):
+        # _close_filesystem_quietly must tolerate filesystems that don't
+        # implement close() (e.g. pyjindo 6.10.2's JindoOssFileSystem).
+        PaimonVirtualFileSystem._close_filesystem_quietly(object())
+        PaimonVirtualFileSystem._close_filesystem_quietly(None)
+
+
+if __name__ == "__main__":
+    unittest.main()
diff --git a/paimon-python/setup.py b/paimon-python/setup.py
index a8c6766922..1f464b1988 100644
--- a/paimon-python/setup.py
+++ b/paimon-python/setup.py
@@ -164,6 +164,9 @@ setup(
             'ossfs>=2021.8; python_version<"3.8"',
             'ossfs>=2023; python_version>="3.8"'
         ],
+        'jindo': [
+            'pyjindosdk>=6.10.4',
+        ],
         'lance': [
             'pylance>=0.20,<1; python_version>="3.9"',
             'pylance>=0.10,<1; python_version>="3.8" and python_version<"3.9"'


Reply via email to