This is an automated email from the ASF dual-hosted git repository.
jerry-024 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4cb3f6b14d [python] PVFS supports jindo backend for OSS reads/writes
(#7902)
4cb3f6b14d is described below
commit 4cb3f6b14d18925d23c16aab853f79c1a4be838a
Author: shyjsarah <[email protected]>
AuthorDate: Wed May 20 19:35:36 2026 -0700
[python] PVFS supports jindo backend for OSS reads/writes (#7902)
---
.../filesystem/jindo_file_system_handler.py | 101 +++++--
paimon-python/pypaimon/filesystem/pvfs.py | 143 ++++++++--
.../pypaimon/tests/pvfs_oss_filesystem_test.py | 315 +++++++++++++++++++++
paimon-python/setup.py | 3 +
4 files changed, 516 insertions(+), 46 deletions(-)
diff --git a/paimon-python/pypaimon/filesystem/jindo_file_system_handler.py
b/paimon-python/pypaimon/filesystem/jindo_file_system_handler.py
index 2ab9646ca2..efbfe7f189 100644
--- a/paimon-python/pypaimon/filesystem/jindo_file_system_handler.py
+++ b/paimon-python/pypaimon/filesystem/jindo_file_system_handler.py
@@ -22,6 +22,11 @@ from pyarrow import PythonFile
from pyarrow._fs import FileSystemHandler
from pyarrow.fs import FileInfo, FileSelector, FileType
+# `JindoFileSystemHandler` (the PyArrow FileIO path) only needs `pyjindo.fs`
+# and `pyjindo.util`. The PVFS jindo backend (`create_jindo_oss_filesystem`)
+# additionally needs `pyjindo.ossfs`. Track the two surfaces independently so
+# that a pyjindosdk build without `pyjindo.ossfs` does not silently disable
+# the previously-working PyArrow path.
try:
import pyjindo.fs as jfs
import pyjindo.util as jutil
@@ -31,10 +36,83 @@ except ImportError:
jfs = None
jutil = None
+try:
+ import pyjindo.ossfs as jossfs
+ JINDO_OSSFS_AVAILABLE = True
+except ImportError:
+ jossfs = None
+ JINDO_OSSFS_AVAILABLE = False
+
from pypaimon.common.options import Options
from pypaimon.common.options.config import OssOptions
+def build_jindo_config(catalog_options: Options):
+ """Build a pyjindo ``Config`` from OSS catalog options.
+
+ Shared by ``JindoFileSystemHandler`` (the PyArrow FileIO path) and
+ ``create_jindo_oss_filesystem`` (the PVFS fsspec path) so both jindo entry
+ points consume exactly the same credential / endpoint options.
+ """
+ if not JINDO_AVAILABLE:
+ raise ImportError("Module pyjindo is not available. Please install
pyjindosdk.")
+
+ config = jutil.Config()
+
+ access_key_id = catalog_options.get(OssOptions.OSS_ACCESS_KEY_ID)
+ access_key_secret = catalog_options.get(OssOptions.OSS_ACCESS_KEY_SECRET)
+ security_token = catalog_options.get(OssOptions.OSS_SECURITY_TOKEN)
+ endpoint = catalog_options.get(OssOptions.OSS_ENDPOINT)
+ region = catalog_options.get(OssOptions.OSS_REGION)
+
+ if access_key_id:
+ config.set("fs.oss.accessKeyId", access_key_id)
+ if access_key_secret:
+ config.set("fs.oss.accessKeySecret", access_key_secret)
+ if security_token:
+ config.set("fs.oss.securityToken", security_token)
+ if endpoint:
+ endpoint_clean = endpoint.replace('http://', '').replace('https://',
'')
+ config.set("fs.oss.endpoint", endpoint_clean)
+ if region:
+ config.set("fs.oss.region", region)
+ config.set("fs.oss.user.agent.features", "pypaimon")
+ return config
+
+
+def create_jindo_oss_filesystem(root_uri: str, catalog_options: Options):
+ """Create an fsspec-compatible ``JindoOssFileSystem`` for an OSS bucket.
+
+ ``PaimonVirtualFileSystem`` uses this to back OSS reads/writes with the
+ native JindoSDK instead of ``ossfs``. JindoSDK writes objects via
+ PutObject / multipart upload, so it never issues OSS ``AppendObject`` --
+ the call that fails with ``PositionNotEqualToLength`` (409) on the OSS
+ data-acceleration endpoint when ``ossfs`` flushes a multi-chunk write.
+
+ ``root_uri`` is the bucket root, e.g. ``oss://my-bucket/``; it must carry
+ the bucket so ``JindoOssFileSystem`` can re-attach the ``oss://`` scheme to
+ the bucket-relative paths that ``PaimonVirtualFileSystem`` passes in.
+ """
+ if not (JINDO_AVAILABLE and JINDO_OSSFS_AVAILABLE):
+ raise ImportError(
+ "pyjindo.ossfs is not available. Please install
pyjindosdk>=6.10.4."
+ )
+
+ return jossfs.JindoOssFileSystem(
+ uri=root_uri,
+ config=build_jindo_config(catalog_options),
+ # PaimonVirtualFileSystem owns directory semantics for the virtual FS;
+ # the backing object-store fs must not auto-create dir-marker objects.
+ auto_mkdir=False,
+ # Bypass fsspec's _Cached metaclass instance cache, so the only
+ # reference to this filesystem -- and to its underlying native jindo
+ # connection -- is the PaimonRealStorage cache in PVFS. On token
+ # refresh PVFS replaces that entry and the native resources can be
+ # released, instead of being pinned forever by fsspec's global cache.
+ skip_instance_cache=True,
+ )
+
+
class JindoInputFile:
def __init__(self, jindo_stream):
self._stream = jindo_stream
@@ -129,28 +207,7 @@ class JindoFileSystemHandler(FileSystemHandler):
self.root_path = root_path
self.properties = catalog_options
- # Build jindo config from catalog_options
- config = jutil.Config()
-
- access_key_id = catalog_options.get(OssOptions.OSS_ACCESS_KEY_ID)
- access_key_secret =
catalog_options.get(OssOptions.OSS_ACCESS_KEY_SECRET)
- security_token = catalog_options.get(OssOptions.OSS_SECURITY_TOKEN)
- endpoint = catalog_options.get(OssOptions.OSS_ENDPOINT)
- region = catalog_options.get(OssOptions.OSS_REGION)
-
- if access_key_id:
- config.set("fs.oss.accessKeyId", access_key_id)
- if access_key_secret:
- config.set("fs.oss.accessKeySecret", access_key_secret)
- if security_token:
- config.set("fs.oss.securityToken", security_token)
- if endpoint:
- endpoint_clean = endpoint.replace('http://',
'').replace('https://', '')
- config.set("fs.oss.endpoint", endpoint_clean)
- if region:
- config.set("fs.oss.region", region)
- config.set("fs.oss.user.agent.features", "pypaimon")
-
+ config = build_jindo_config(catalog_options)
self._jindo_fs = jfs.connect(self.root_path, "root", config)
def __eq__(self, other):
diff --git a/paimon-python/pypaimon/filesystem/pvfs.py
b/paimon-python/pypaimon/filesystem/pvfs.py
index 5128493165..236c00481c 100644
--- a/paimon-python/pypaimon/filesystem/pvfs.py
+++ b/paimon-python/pypaimon/filesystem/pvfs.py
@@ -17,6 +17,7 @@
import datetime
import importlib
+import logging
import time
from abc import ABC
from dataclasses import dataclass
@@ -35,8 +36,15 @@ from pypaimon.api.rest_api import RESTApi
from pypaimon.common.options import Options
from pypaimon.common.options.config import CatalogOptions, OssOptions,
PVFSOptions
from pypaimon.common.identifier import Identifier
+from pypaimon.filesystem.jindo_file_system_handler import (
+ JINDO_AVAILABLE,
+ JINDO_OSSFS_AVAILABLE,
+ create_jindo_oss_filesystem,
+)
from pypaimon.schema.schema import Schema
+logger = logging.getLogger(__name__)
+
PROTOCOL_NAME = "pvfs"
@@ -201,7 +209,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
storage_location = table_path
actual_path = pvfs_identifier.get_actual_path(storage_location)
virtual_location = pvfs_identifier.get_virtual_location()
- fs = self._get_filesystem(pvfs_identifier, storage_type)
+ fs = self._get_filesystem(pvfs_identifier, storage_type,
storage_location)
entries = fs.ls(actual_path, detail=detail, **kwargs)
if detail:
virtual_entities = [
@@ -233,7 +241,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
storage_location = table_path
actual_path = pvfs_identifier.get_actual_path(storage_location)
virtual_location = pvfs_identifier.get_virtual_location()
- fs = self._get_filesystem(pvfs_identifier, storage_type)
+ fs = self._get_filesystem(pvfs_identifier, storage_type,
storage_location)
entry = fs.info(actual_path)
return self._convert_actual_info(entry, storage_type,
storage_location, virtual_location)
@@ -257,7 +265,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
storage_type = self._get_storage_type(table_path)
storage_location = table_path
actual_path = pvfs_identifier.get_actual_path(storage_location)
- fs = self._get_filesystem(pvfs_identifier, storage_type)
+ fs = self._get_filesystem(pvfs_identifier, storage_type,
storage_location)
return fs.exists(actual_path)
except NoSuchResourceException:
return False
@@ -276,7 +284,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
storage_location = table_path
source_actual_path = source.get_actual_path(storage_location)
target_actual_path = target.get_actual_path(storage_location)
- fs = self._get_filesystem(source, storage_type)
+ fs = self._get_filesystem(source, storage_type, storage_location)
fs.cp_file(
self._strip_storage_protocol(storage_type, source_actual_path),
self._strip_storage_protocol(storage_type, target_actual_path),
@@ -302,7 +310,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
storage_location = table_path
source_actual_path = source.get_actual_path(storage_location)
target_actual_path = target.get_actual_path(storage_location)
- fs = self._get_filesystem(source, storage_type)
+ fs = self._get_filesystem(source, storage_type,
storage_location)
if storage_type == StorageType.LOCAL:
fs.mv(
self._strip_storage_protocol(storage_type,
source_actual_path),
@@ -343,7 +351,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
storage_type = self._get_storage_type(table_path)
storage_location = table_path
actual_path = pvfs_identifier.get_actual_path(storage_location)
- fs = self._get_filesystem(pvfs_identifier, storage_type)
+ fs = self._get_filesystem(pvfs_identifier, storage_type,
storage_location)
return fs.rm(
self._strip_storage_protocol(storage_type, actual_path),
recursive,
@@ -362,7 +370,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
storage_type = self._get_storage_type(table_path)
storage_location = table_path
actual_path = pvfs_identifier.get_actual_path(storage_location)
- fs = self._get_filesystem(pvfs_identifier, storage_type)
+ fs = self._get_filesystem(pvfs_identifier, storage_type,
storage_location)
return fs.rm_file(
self._strip_storage_protocol(storage_type, actual_path),
)
@@ -387,7 +395,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
storage_type = self._get_storage_type(table_path)
storage_location = table_path
actual_path = pvfs_identifier.get_actual_path(storage_location)
- fs = self._get_filesystem(pvfs_identifier, storage_type)
+ fs = self._get_filesystem(pvfs_identifier, storage_type,
storage_location)
return fs.rmdir(
self._strip_storage_protocol(storage_type, actual_path)
)
@@ -420,7 +428,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
storage_type = self._get_storage_type(table_path)
storage_location = table_path
actual_path = pvfs_identifier.get_actual_path(storage_location)
- fs = self._get_filesystem(pvfs_identifier, storage_type)
+ fs = self._get_filesystem(pvfs_identifier, storage_type,
storage_location)
return fs.open(
self._strip_storage_protocol(storage_type, actual_path),
mode,
@@ -464,7 +472,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
storage_type = self._get_storage_type(table_path)
storage_location = table_path
actual_path = pvfs_identifier.get_actual_path(storage_location)
- fs = self._get_filesystem(pvfs_identifier, storage_type)
+ fs = self._get_filesystem(pvfs_identifier, storage_type,
storage_location)
return fs.mkdir(
self._strip_storage_protocol(storage_type, actual_path),
create_parents,
@@ -503,7 +511,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
storage_type = self._get_storage_type(table_path)
storage_location = table_path
actual_path = pvfs_identifier.get_actual_path(storage_location)
- fs = self._get_filesystem(pvfs_identifier, storage_type)
+ fs = self._get_filesystem(pvfs_identifier, storage_type,
storage_location)
return fs.makedirs(
self._strip_storage_protocol(storage_type, actual_path),
exist_ok
@@ -524,7 +532,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
storage_type = self._get_storage_type(table.path)
storage_location = table.path
actual_path = pvfs_identifier.get_actual_path(storage_location)
- fs = self._get_filesystem(pvfs_identifier, storage_type)
+ fs = self._get_filesystem(pvfs_identifier, storage_type,
storage_location)
return fs.created(
self._strip_storage_protocol(storage_type, actual_path)
)
@@ -545,7 +553,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
storage_type = self._get_storage_type(table.path)
storage_location = table.path
actual_path = pvfs_identifier.get_actual_path(storage_location)
- fs = self._get_filesystem(pvfs_identifier, storage_type)
+ fs = self._get_filesystem(pvfs_identifier, storage_type,
storage_location)
return fs.modified(
self._strip_storage_protocol(storage_type, actual_path)
)
@@ -571,7 +579,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
storage_type = self._get_storage_type(table.path)
storage_location = table.path
actual_path = pvfs_identifier.get_actual_path(storage_location)
- fs = self._get_filesystem(pvfs_identifier, storage_type)
+ fs = self._get_filesystem(pvfs_identifier, storage_type,
storage_location)
return fs.cat_file(
self._strip_storage_protocol(storage_type, actual_path),
start,
@@ -600,7 +608,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
storage_type = self._get_storage_type(table.path)
storage_location = table.path
actual_path = pvfs_identifier.get_actual_path(storage_location)
- fs = self._get_filesystem(pvfs_identifier, storage_type)
+ fs = self._get_filesystem(pvfs_identifier, storage_type,
storage_location)
return fs.get_file(
self._strip_storage_protocol(storage_type, actual_path),
lpath,
@@ -622,15 +630,19 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
def __converse_ts_to_datatime(ts: int):
return datetime.datetime.fromtimestamp(ts / 1000,
tz=datetime.timezone.utc)
- @staticmethod
- def _strip_storage_protocol(storage_type: StorageType, path: str):
+ def _strip_storage_protocol(self, storage_type: StorageType, path: str):
if storage_type == StorageType.LOCAL:
return path[len("{}:".format(StorageType.LOCAL.value)):]
- # OSS has different behavior than S3 and GCS, if we do not remove the
- # protocol, it will always return an empty array.
+ # The two OSS backends want opposite path forms. The legacy ossfs
+ # backend mishandles an oss://-prefixed path (older versions return an
+ # empty array from ls), so the scheme is stripped for it. The jindo
+ # backend (pyjindo) requires the oss:// scheme and crashes without it,
+ # so the path is passed through unchanged.
if storage_type == StorageType.OSS:
if path.startswith("{}://".format(StorageType.OSS.value)):
+ if self._use_jindo_oss_backend():
+ return path
return path[len("{}://".format(StorageType.OSS.value)):]
return path
@@ -805,7 +817,8 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
finally:
write_lock.release()
- def _get_filesystem(self, pvfs_table_identifier: PVFSTableIdentifier,
storage_type: StorageType) -> 'FileSystem':
+ def _get_filesystem(self, pvfs_table_identifier: PVFSTableIdentifier,
storage_type: StorageType,
+ storage_location: str) -> 'FileSystem':
read_lock = self._fs_cache_lock.gen_rlock()
try:
read_lock.acquire()
@@ -817,6 +830,12 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
finally:
read_lock.release()
+ # The previous filesystem (if any) is closed after the write lock is
+ # released. For the jindo backend, fs.close() tears down a native
+ # JindoSDK connection, which can block on flushing buffers and
+ # terminating worker threads; holding the write lock through that
+ # would stall every other OSS filesystem rebuild.
+ stale_fs = None
write_lock = self._table_cache_lock.gen_wlock()
try:
write_lock.acquire()
@@ -827,16 +846,19 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
fs = LocalFileSystem()
elif storage_type == StorageType.OSS:
rest_api = self.__rest_api(pvfs_table_identifier)
- load_token_response: GetTableTokenResponse =
rest_api.load_table_token(
- Identifier.create(pvfs_table_identifier.database,
pvfs_table_identifier.table))
+ table_identifier = Identifier.create(
+ pvfs_table_identifier.database,
pvfs_table_identifier.table)
+ load_token_response: GetTableTokenResponse =
rest_api.load_table_token(table_identifier)
merged_token =
self._merge_token_with_catalog_options(load_token_response.token)
- fs = self._get_oss_filesystem(Options(merged_token))
+ fs = self._get_oss_filesystem(Options(merged_token),
storage_location)
paimon_real_storage = PaimonRealStorage(
token=load_token_response.token,
expires_at_millis=load_token_response.expires_at_millis,
file_system=fs
)
self._fs_cache[pvfs_table_identifier] = paimon_real_storage
+ if cache_value is not None:
+ stale_fs = cache_value.file_system
else:
raise Exception(
"Storage type: `{}` doesn't support
now.".format(storage_type)
@@ -844,6 +866,8 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
return fs
finally:
write_lock.release()
+ if stale_fs is not None:
+ PaimonVirtualFileSystem._close_filesystem_quietly(stale_fs)
def _merge_token_with_catalog_options(self, token: dict) -> dict:
"""Merge token with catalog options, DLF OSS endpoint should override
the standard OSS endpoint."""
@@ -863,8 +887,79 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
"Storage type doesn't support now. Path:{}".format(path)
)
+ def _get_oss_filesystem(self, options: Options, storage_location: str) ->
AbstractFileSystem:
+ """Build the fsspec filesystem backing OSS reads/writes.
+
+ Honors ``fs.oss.impl`` (the same option PyArrowFileIO uses): ``jindo``
+ backs OSS with the native JindoSDK (PutObject / multipart upload),
+ ``legacy`` backs it with ``ossfs``. ``jindo`` is the default; when
+ pyjindosdk is not installed it falls back to ``ossfs`` -- consistent
+ with PyArrowFileIO.
+
+ ``ossfs`` writes every object through OSS ``AppendObject``, which can
+ fail with ``PositionNotEqualToLength`` (409) on the OSS
data-acceleration
+ endpoint for multi-chunk writes; the jindo backend avoids that path.
+ """
+ oss_impl = self.options.get(OssOptions.OSS_IMPL)
+ if oss_impl not in ("jindo", "legacy"):
+ raise Exception(
+ "Unsupported fs.oss.impl value: '{}'. "
+ "Supported values are 'jindo' and 'legacy'.".format(oss_impl)
+ )
+ if self._use_jindo_oss_backend():
+ bucket =
PaimonVirtualFileSystem._extract_oss_bucket(storage_location)
+ return create_jindo_oss_filesystem("oss://{}/".format(bucket),
options)
+ if oss_impl == "jindo":
+ logger.warning(
+ "fs.oss.impl is 'jindo' but pyjindosdk is not installed. "
+ "Falling back to the ossfs (OSS AppendObject) implementation. "
+ "Install pyjindosdk for native multipart upload: pip install
pyjindosdk"
+ )
+ return PaimonVirtualFileSystem._get_ossfs_filesystem(options)
+
+ def _use_jindo_oss_backend(self) -> bool:
+ """Whether OSS access uses the native jindo backend rather than ossfs.
+
+ Decided per filesystem instance from ``fs.oss.impl`` plus the
+ availability of the two pyjindosdk surfaces this path requires
+ (``pyjindo.fs``/``pyjindo.util`` for credential config and
+ ``pyjindo.ossfs`` for the fsspec backend). This is the single
+ condition both ``_get_oss_filesystem`` and ``_strip_storage_protocol``
+ rely on, so the backend choice and the path form handed to it cannot
+ drift apart.
+ """
+ return (
+ self.options.get(OssOptions.OSS_IMPL) == "jindo"
+ and JINDO_AVAILABLE
+ and JINDO_OSSFS_AVAILABLE
+ )
+
+ @staticmethod
+ def _close_filesystem_quietly(fs) -> None:
+ """Best-effort release of a filesystem that is being evicted from
cache."""
+ close = getattr(fs, "close", None)
+ if not callable(close):
+ return
+ try:
+ close()
+ except Exception:
+ logger.warning(
+ "ignoring error while closing stale OSS filesystem",
+ exc_info=True,
+ )
+
+ @staticmethod
+ def _extract_oss_bucket(oss_path: str) -> str:
+ scheme = "{}://".format(StorageType.OSS.value)
+ if not oss_path.startswith(scheme):
+ raise Exception("Invalid OSS path: {}".format(oss_path))
+ bucket = oss_path[len(scheme):].split("/", 1)[0]
+ if not bucket:
+ raise Exception("Invalid OSS path without bucket:
{}".format(oss_path))
+ return bucket
+
@staticmethod
- def _get_oss_filesystem(options: Options) -> AbstractFileSystem:
+ def _get_ossfs_filesystem(options: Options) -> AbstractFileSystem:
access_key_id = options.get(OssOptions.OSS_ACCESS_KEY_ID)
if access_key_id is None:
raise Exception(
diff --git a/paimon-python/pypaimon/tests/pvfs_oss_filesystem_test.py
b/paimon-python/pypaimon/tests/pvfs_oss_filesystem_test.py
new file mode 100644
index 0000000000..d35a851299
--- /dev/null
+++ b/paimon-python/pypaimon/tests/pvfs_oss_filesystem_test.py
@@ -0,0 +1,315 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Unit tests for OSS backend selection in PaimonVirtualFileSystem.
+
+These run in CI with no external dependencies -- DLF, OSS and pyjindosdk are
+all stubbed. They cover the ``fs.oss.impl`` dispatch and the
``_get_filesystem``
+wiring that routes OSS reads/writes through the jindo or ossfs backend. The
+end-to-end behavior of those backends is covered separately by the (DLF-gated)
+``pvfs_oss_backend_alignment_test``.
+"""
+
+import unittest
+from unittest import mock
+
+from pypaimon.common.options import Options
+from pypaimon.common.options.config import OssOptions
+from pypaimon.filesystem import pvfs as pvfs_module
+from pypaimon.filesystem.pvfs import (
+ PaimonRealStorage,
+ PaimonVirtualFileSystem,
+ PVFSTableIdentifier,
+ StorageType,
+)
+
+
+def _make_pvfs(extra_options=None):
+ options = {OssOptions.OSS_ACCESS_KEY_ID.key(): "ak"}
+ if extra_options:
+ options.update(extra_options)
+ # skip_instance_cache so each test gets a fresh PVFS (fsspec's _Cached
+ # metaclass would otherwise share _fs_cache state across tests).
+ return PaimonVirtualFileSystem(options, skip_instance_cache=True)
+
+
+class ExtractOssBucketTest(unittest.TestCase):
+
+ def test_extracts_bucket_from_oss_path(self):
+ self.assertEqual(
+
PaimonVirtualFileSystem._extract_oss_bucket("oss://my-bucket/wh/db/tbl"),
+ "my-bucket")
+
+ def test_extracts_bucket_from_bucket_root(self):
+ self.assertEqual(
+ PaimonVirtualFileSystem._extract_oss_bucket("oss://my-bucket/"),
+ "my-bucket")
+
+ def test_rejects_non_oss_path(self):
+ with self.assertRaises(Exception):
+ PaimonVirtualFileSystem._extract_oss_bucket("s3://my-bucket/key")
+
+ def test_rejects_path_without_bucket(self):
+ with self.assertRaises(Exception):
+ PaimonVirtualFileSystem._extract_oss_bucket("oss:///key")
+
+
+class GetOssFilesystemDispatchTest(unittest.TestCase):
+ """fs.oss.impl selects the backend; jindo falls back to ossfs when
absent."""
+
+ STORAGE_LOCATION = "oss://my-bucket/warehouse/db/tbl"
+
+ def setUp(self):
+ # Table-scoped OSS credentials handed to the backend builder.
+ self.token_options = Options({
+ OssOptions.OSS_ACCESS_KEY_ID.key(): "tk-ak",
+ OssOptions.OSS_ACCESS_KEY_SECRET.key(): "tk-sk",
+ OssOptions.OSS_ENDPOINT.key(): "oss-cn-hangzhou.aliyuncs.com",
+ })
+
+ def _dispatch(self, oss_impl, jindo_ossfs_available):
+ extra = {OssOptions.OSS_IMPL.key(): oss_impl} if oss_impl is not None
else None
+ pvfs = _make_pvfs(extra)
+ ossfs_sentinel = object()
+ jindo_calls = []
+
+ def fake_create_jindo(root_uri, options):
+ jindo_calls.append((root_uri, options))
+ return "JINDO_FS"
+
+ # Patch both surfaces. JINDO_AVAILABLE is held True so the test
+ # behaves the same whether or not pyjindosdk is installed in the
+ # CI image; backend selection is then driven solely by
+ # JINDO_OSSFS_AVAILABLE -- the surface the PVFS jindo backend
+ # actually needs.
+ with mock.patch.object(pvfs_module, "JINDO_AVAILABLE", True), \
+ mock.patch.object(pvfs_module, "JINDO_OSSFS_AVAILABLE",
jindo_ossfs_available), \
+ mock.patch.object(pvfs_module, "create_jindo_oss_filesystem",
fake_create_jindo), \
+ mock.patch.object(PaimonVirtualFileSystem,
"_get_ossfs_filesystem",
+ staticmethod(lambda options: ossfs_sentinel)):
+ fs = pvfs._get_oss_filesystem(self.token_options,
self.STORAGE_LOCATION)
+ return fs, ossfs_sentinel, jindo_calls
+
+ def test_legacy_uses_ossfs(self):
+ fs, ossfs_sentinel, jindo_calls = self._dispatch("legacy",
jindo_ossfs_available=True)
+ self.assertIs(fs, ossfs_sentinel)
+ self.assertEqual(jindo_calls, [])
+
+ def test_jindo_uses_jindo_when_available(self):
+ fs, _, jindo_calls = self._dispatch("jindo",
jindo_ossfs_available=True)
+ self.assertEqual(fs, "JINDO_FS")
+ self.assertEqual(len(jindo_calls), 1)
+ root_uri, options = jindo_calls[0]
+ self.assertEqual(root_uri, "oss://my-bucket/")
+ self.assertIs(options, self.token_options)
+
+ def test_default_impl_is_jindo(self):
+ # No fs.oss.impl set -> OssOptions.OSS_IMPL default value ("jindo").
+ fs, _, jindo_calls = self._dispatch(None, jindo_ossfs_available=True)
+ self.assertEqual(fs, "JINDO_FS")
+ self.assertEqual(len(jindo_calls), 1)
+
+ def test_jindo_falls_back_to_ossfs_when_pyjindo_ossfs_missing(self):
+ # fs.oss.impl=jindo but pyjindo.ossfs not importable (e.g. an older
+ # pyjindosdk build that ships only fs/util). PyArrow jindo path stays
+ # available; PVFS jindo backend falls back to ossfs.
+ fs, ossfs_sentinel, jindo_calls = self._dispatch("jindo",
jindo_ossfs_available=False)
+ self.assertIs(fs, ossfs_sentinel)
+ self.assertEqual(jindo_calls, [])
+
+ def test_invalid_impl_raises(self):
+ with self.assertRaises(Exception) as ctx:
+ self._dispatch("garbage", jindo_ossfs_available=True)
+ self.assertIn("Unsupported fs.oss.impl", str(ctx.exception))
+
+
+class GetFilesystemOssWiringTest(unittest.TestCase):
+ """_get_filesystem must forward the caller-supplied storage_location into
+ the OSS backend factory. Callers always have the table path in hand (from
+ _get_table_store) by the time they reach _get_filesystem, so threading it
+ through avoids a redundant REST round-trip inside the write critical
+ section."""
+
+ def test_oss_branch_forwards_caller_storage_location(self):
+ pvfs = _make_pvfs()
+ identifier = PVFSTableIdentifier(
+ catalog="cat", endpoint="http://rest", database="db", table="tbl")
+
+ class FakeTokenResponse:
+ token = {
+ OssOptions.OSS_ACCESS_KEY_ID.key(): "tk-ak",
+ OssOptions.OSS_ACCESS_KEY_SECRET.key(): "tk-sk",
+ OssOptions.OSS_ENDPOINT.key(): "oss-cn-hangzhou.aliyuncs.com",
+ }
+ expires_at_millis = None
+
+ class FakeRestApi:
+ def load_table_token(self, ident):
+ return FakeTokenResponse()
+
+ captured = {}
+
+ def fake_get_oss_filesystem(self, options, storage_location):
+ captured["options"] = options
+ captured["storage_location"] = storage_location
+ return "FS"
+
+ with mock.patch.object(PaimonVirtualFileSystem,
+ "_PaimonVirtualFileSystem__rest_api",
+ lambda self, ident: FakeRestApi()), \
+ mock.patch.object(PaimonVirtualFileSystem, "_get_oss_filesystem",
+ fake_get_oss_filesystem):
+ fs = pvfs._get_filesystem(
+ identifier, StorageType.OSS,
"oss://wired-bucket/warehouse/db/tbl")
+
+ self.assertEqual(fs, "FS")
+ self.assertEqual(captured["storage_location"],
"oss://wired-bucket/warehouse/db/tbl")
+ self.assertEqual(
+
PaimonVirtualFileSystem._extract_oss_bucket(captured["storage_location"]),
+ "wired-bucket")
+
self.assertEqual(captured["options"].get(OssOptions.OSS_ACCESS_KEY_ID), "tk-ak")
+
+
+class StripStorageProtocolTest(unittest.TestCase):
+ """OSS path form depends on the backend: jindo keeps oss://, ossfs strips
it."""
+
+ def _strip(self, oss_impl, jindo_ossfs_available, path):
+ pvfs = _make_pvfs({OssOptions.OSS_IMPL.key(): oss_impl})
+ with mock.patch.object(pvfs_module, "JINDO_AVAILABLE", True), \
+ mock.patch.object(pvfs_module, "JINDO_OSSFS_AVAILABLE",
jindo_ossfs_available):
+ return pvfs._strip_storage_protocol(StorageType.OSS, path)
+
+ def test_jindo_backend_keeps_oss_scheme(self):
+ self.assertEqual(
+ self._strip("jindo", True, "oss://b/db/tbl/f.bin"),
+ "oss://b/db/tbl/f.bin")
+
+ def test_legacy_backend_strips_oss_scheme(self):
+ self.assertEqual(
+ self._strip("legacy", True, "oss://b/db/tbl/f.bin"),
+ "b/db/tbl/f.bin")
+
+ def test_jindo_unavailable_falls_back_to_stripping(self):
+ # fs.oss.impl=jindo but pyjindo.ossfs missing -> ossfs backend ->
strip.
+ self.assertEqual(
+ self._strip("jindo", False, "oss://b/db/tbl/f.bin"),
+ "b/db/tbl/f.bin")
+
+
+class CloseStaleFilesystemOnRefreshTest(unittest.TestCase):
+ """When _get_filesystem rebuilds a cached fs (token near expiry), the old
+ filesystem must be released. For the jindo backend this is the only way
+ the underlying native JindoSDK connection gets reclaimed. The close must
+ also happen outside _table_cache_lock -- JindoSDK close() can block on
+ native teardown, and holding the write lock through it would stall every
+ other OSS rebuild."""
+
+ STORAGE_LOCATION = "oss://b/wh/db/tbl"
+
+ @staticmethod
+ def _stub_oss_rebuild():
+ identifier = PVFSTableIdentifier(
+ catalog="cat", endpoint="http://rest", database="db", table="tbl")
+
+ class FakeTokenResponse:
+ token = {
+ OssOptions.OSS_ACCESS_KEY_ID.key(): "tk-ak",
+ OssOptions.OSS_ACCESS_KEY_SECRET.key(): "tk-sk",
+ OssOptions.OSS_ENDPOINT.key(): "oss-cn-hangzhou.aliyuncs.com",
+ }
+ expires_at_millis = None
+
+ class FakeRestApi:
+ def load_table_token(self, ident):
+ return FakeTokenResponse()
+
+ return identifier, mock.patch.object(
+ PaimonVirtualFileSystem,
+ "_PaimonVirtualFileSystem__rest_api",
+ lambda self, ident: FakeRestApi(),
+ ), mock.patch.object(
+ PaimonVirtualFileSystem,
+ "_get_oss_filesystem",
+ lambda self, opts, loc: "NEW_FS",
+ )
+
+ def test_old_filesystem_close_called_when_token_expires(self):
+ pvfs = _make_pvfs()
+ identifier, patch_rest, patch_build = self._stub_oss_rebuild()
+
+ close_log = []
+
+ class StaleFs:
+ def close(self_inner):
+ close_log.append("closed")
+
+ # expires_at_millis=0 -> need_refresh() is True -> _get_filesystem
rebuilds.
+ pvfs._fs_cache[identifier] = PaimonRealStorage(
+ token={}, expires_at_millis=0, file_system=StaleFs())
+
+ with patch_rest, patch_build:
+ new_fs = pvfs._get_filesystem(identifier, StorageType.OSS,
self.STORAGE_LOCATION)
+
+ self.assertEqual(new_fs, "NEW_FS")
+ self.assertEqual(close_log, ["closed"],
+ "stale filesystem must be closed on token refresh")
+
+ def test_first_build_does_not_invoke_close(self):
+ pvfs = _make_pvfs()
+ identifier, patch_rest, patch_build = self._stub_oss_rebuild()
+
+ with patch_rest, patch_build:
+ new_fs = pvfs._get_filesystem(identifier, StorageType.OSS,
self.STORAGE_LOCATION)
+
+ # No prior cache entry -> nothing to close, must not raise.
+ self.assertEqual(new_fs, "NEW_FS")
+
+ def test_close_runs_after_write_lock_released(self):
+ # Regression guard: close() of the stale fs runs after the write
+ # lock is released. If close were still inside the critical section,
+ # attempting to acquire the same non-reentrant write lock from
+ # within close would fail (blocking=False -> False).
+ pvfs = _make_pvfs()
+ identifier, patch_rest, patch_build = self._stub_oss_rebuild()
+ lock_was_free_during_close = []
+
+ class StaleFs:
+ def close(self_inner):
+ probe = pvfs._table_cache_lock.gen_wlock()
+ got = probe.acquire(blocking=False)
+ lock_was_free_during_close.append(got)
+ if got:
+ probe.release()
+
+ pvfs._fs_cache[identifier] = PaimonRealStorage(
+ token={}, expires_at_millis=0, file_system=StaleFs())
+
+ with patch_rest, patch_build:
+ pvfs._get_filesystem(identifier, StorageType.OSS,
self.STORAGE_LOCATION)
+
+ self.assertEqual(lock_was_free_during_close, [True],
+ "stale fs close must run after _table_cache_lock is
released")
+
+ def test_close_without_close_method_is_no_op(self):
+ # _close_filesystem_quietly must tolerate filesystems that don't
+ # implement close() (e.g. pyjindo 6.10.2's JindoOssFileSystem).
+ PaimonVirtualFileSystem._close_filesystem_quietly(object())
+ PaimonVirtualFileSystem._close_filesystem_quietly(None)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/paimon-python/setup.py b/paimon-python/setup.py
index a8c6766922..1f464b1988 100644
--- a/paimon-python/setup.py
+++ b/paimon-python/setup.py
@@ -164,6 +164,9 @@ setup(
'ossfs>=2021.8; python_version<"3.8"',
'ossfs>=2023; python_version>="3.8"'
],
+ 'jindo': [
+ 'pyjindosdk>=6.10.4',
+ ],
'lance': [
'pylance>=0.20,<1; python_version>="3.9"',
'pylance>=0.10,<1; python_version>="3.8" and python_version<"3.9"'