This is an automated email from the ASF dual-hosted git repository.

hope pushed a commit to branch release-1.4
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit afb2effb1f240795c0199ac66c0a71f852573e99
Author: shyjsarah <[email protected]>
AuthorDate: Mon Mar 30 03:15:30 2026 -0700

    [python] Add FUSE support for REST Catalog (#7483)
---
 docs/content/pypaimon/fuse-support.md              |  94 ++++++
 .../pypaimon/catalog/rest/fuse_support.py          | 173 +++++++++++
 .../pypaimon/catalog/rest/rest_catalog.py          |  25 +-
 paimon-python/pypaimon/common/options/config.py    |  35 +++
 paimon-python/pypaimon/filesystem/local_file_io.py |  24 ++
 .../pypaimon/tests/rest/test_fuse_local_path.py    | 346 +++++++++++++++++++++
 6 files changed, 695 insertions(+), 2 deletions(-)

diff --git a/docs/content/pypaimon/fuse-support.md 
b/docs/content/pypaimon/fuse-support.md
new file mode 100644
index 0000000000..a9bea49f86
--- /dev/null
+++ b/docs/content/pypaimon/fuse-support.md
@@ -0,0 +1,94 @@
+---
+title: "FUSE Support"
+weight: 7
+type: docs
+aliases:
+  - /pypaimon/fuse-support.html
+---
+
+# FUSE Support
+
+When using PyPaimon REST Catalog to access remote object storage (such as OSS, 
S3, or HDFS), data access typically goes through remote storage SDKs. However, 
in scenarios where remote storage paths are mounted locally via FUSE 
(Filesystem in Userspace), users can access data directly through local 
filesystem paths for better performance.
+
+This feature enables PyPaimon to use local file access when FUSE mount is 
available, bypassing remote storage SDKs.
+
+## Configuration
+
+| Option | Type | Default | Description |
+|--------|------|---------|-------------|
+| `fuse.enabled` | Boolean | `false` | Whether to enable FUSE local path 
mapping |
+| `fuse.root` | String | (none) | FUSE mounted local root path, e.g., 
`/mnt/fuse/warehouse` |
+| `fuse.validation-mode` | String | `strict` | Validation mode: `strict`, 
`warn`, or `none` |
+
+## Usage
+
+```python
+from pypaimon import CatalogFactory
+
+catalog_options = {
+    'metastore': 'rest',
+    'uri': 'http://rest-server:8080',
+    'warehouse': 'oss://my-catalog/',
+    'token.provider': 'xxx',
+
+    # FUSE local path configuration
+    'fuse.enabled': 'true',
+    'fuse.root': '/mnt/fuse/warehouse',
+    'fuse.validation-mode': 'strict'
+}
+
+catalog = CatalogFactory.create(catalog_options)
+```
+
+## Validation Modes
+
+Validation is performed on first data access to verify FUSE mount correctness. 
The `validation-mode` controls behavior when the local path does not exist:
+
+| Mode | Behavior | Use Case |
+|------|----------|----------|
+| `strict` | Throw exception, block operation | Production, safety first |
+| `warn` | Log warning, fallback to default FileIO | Testing, compatibility 
first |
+| `none` | Skip validation, use directly | Trusted environment, performance 
first |
+
+**Note**: Configuration errors (e.g., `fuse.enabled=true` but `fuse.root` not 
configured) will throw exceptions directly, regardless of validation mode.
+
+## How It Works
+
+1. When `fuse.enabled=true`, PyPaimon attempts to use local file access
+2. On first data access, validation is triggered (unless mode is `none`)
+3. Validation fetches the `default` database location and converts it to local 
path
+4. If local path exists, subsequent data access uses `FuseLocalFileIO`
+5. Path translation uses database/table logical names: remote path 
`oss://<catalog-id>/<db-id>/<table-id>` → local path 
`<root>/<db-name>/<table-name>`
+6. If validation fails, behavior depends on `validation-mode`
+
+## Example Scenario
+
+Assume you have:
+- Remote storage paths use UUIDs: `oss://clg-paimon-xxx/db-xxx/tbl-xxx`
+- FUSE mount: `/mnt/fuse/warehouse` (mounted to `pvfs://demo_catalog`)
+- FUSE exposes logical names: `/mnt/fuse/warehouse/my_db/my_table`
+
+```python
+from pypaimon import CatalogFactory
+
+catalog = CatalogFactory.create({
+    'metastore': 'rest',
+    'uri': 'http://rest-server:8080',
+    'warehouse': 'oss://my-catalog/',
+    'fuse.enabled': 'true',
+    'fuse.root': '/mnt/fuse/warehouse',
+    'fuse.validation-mode': 'none'
+})
+
+# When reading table 'my_db.my_table', PyPaimon will:
+# 1. Convert "oss://clg-paimon-xxx/db-xxx/tbl-xxx" to 
"/mnt/fuse/warehouse/my_db/my_table"
+# 2. Use FuseLocalFileIO to read from local path
+table = catalog.get_table('my_db.my_table')
+reader = table.new_read_builder().new_read()
+```
+
+## Limitations
+
+- Only catalog-level FUSE mount is supported (single `fuse.root` configuration)
+- Validation only checks if local path exists, not data consistency
+- If FUSE mount becomes unavailable after validation, file operations may fail
diff --git a/paimon-python/pypaimon/catalog/rest/fuse_support.py 
b/paimon-python/pypaimon/catalog/rest/fuse_support.py
new file mode 100644
index 0000000000..a56de0492b
--- /dev/null
+++ b/paimon-python/pypaimon/catalog/rest/fuse_support.py
@@ -0,0 +1,173 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+import logging
+from urllib.parse import urlparse
+
+from pypaimon.common.identifier import Identifier
+from pypaimon.common.options.config import FuseOptions
+from pypaimon.filesystem.local_file_io import LocalFileIO, FuseLocalFileIO
+
+logger = logging.getLogger(__name__)
+
+
+class FusePathResolver:
+    """Resolves FUSE local paths and validates FUSE mount availability."""
+
+    def __init__(self, options, rest_api):
+        self.fuse_root = options.get(FuseOptions.FUSE_ROOT)
+        self.validation_mode = options.get(FuseOptions.FUSE_VALIDATION_MODE, 
"strict")
+        self.fuse_mode = options.get(FuseOptions.FUSE_MODE, "pvfs")
+        self._validation_state = None  # None=not validated, True=passed, 
False=failed
+        self._rest_api = rest_api
+        self._options = options
+
+    def resolve_local_path(self, original_path, identifier=None):
+        """
+        Resolve FUSE local path.
+
+        In 'pvfs' mode, use database/table logical names from identifier to 
build the path.
+        If identifier has no object name, returns database-level path (used 
for validation).
+        In 'raw' mode, use URI path segments directly.
+
+        Returns:
+            Local path
+
+        Raises:
+            ValueError: If fuse.root is not configured or pvfs mode missing 
identifier
+        """
+        if not self.fuse_root:
+            raise ValueError(
+                "FUSE local path is enabled but fuse.root is not configured"
+            )
+
+        root = self.fuse_root.rstrip('/')
+
+        if self.fuse_mode == "pvfs":
+            if identifier is None:
+                raise ValueError(
+                    "FUSE path mode 'pvfs' requires an Identifier to resolve "
+                    "the local path, but identifier is None."
+                )
+            db = identifier.get_database_name()
+            obj = identifier.get_object_name()
+            if obj:
+                return "{}/{}/{}".format(root, db, obj)
+            return "{}/{}".format(root, db)
+        elif self.fuse_mode == "raw":
+            # raw mode: use URI path segments directly
+            uri = urlparse(original_path)
+            path_part = uri.path.lstrip('/')
+            if not uri.scheme:
+                # No scheme means path like "catalog/db/table",
+                # skip the first segment (catalog name) to align with 
scheme-based paths
+                segments = path_part.split('/')
+                if len(segments) > 1:
+                    path_part = '/'.join(segments[1:])
+            return "{}/{}".format(root, path_part)
+        else:
+            raise ValueError(
+                "Invalid fuse.mode: '{}'. "
+                "Supported modes are 'pvfs' and 'raw'.".format(self.fuse_mode)
+            )
+
+    def validate(self):
+        """
+        Validate FUSE local path is correctly mounted.
+
+        Get default database's location, convert to local path and check if it 
exists.
+        """
+        if self.validation_mode == "none":
+            self._validation_state = True
+            return
+
+        # Get default database details, API call failure raises exception 
directly
+        db = self._rest_api.get_database("default")
+        remote_location = db.location
+
+        if not remote_location:
+            logger.info("Default database has no location, skipping FUSE 
validation")
+            self._validation_state = True
+            return
+
+        expected_local = self.resolve_local_path(
+            remote_location, Identifier.create("default", None)
+        )
+        local_file_io = LocalFileIO(expected_local, self._options)
+
+        # Only validate if local path exists, handle based on validation mode
+        if not local_file_io.exists(expected_local):
+            error_msg = (
+                "FUSE local path validation failed: "
+                "local path '{}' does not exist "
+                "for default database location '{}'".format(expected_local, 
remote_location)
+            )
+            self._handle_validation_error(error_msg)
+        else:
+            self._validation_state = True
+            logger.info("FUSE local path validation passed")
+
+    def _handle_validation_error(self, error_msg):
+        """Handle validation error based on validation mode."""
+        if self.validation_mode == "strict":
+            raise ValueError(error_msg)
+        elif self.validation_mode == "warn":
+            logger.warning("%s. Falling back to default FileIO.", error_msg)
+            self._validation_state = False  # Mark validation failed, fallback 
to default FileIO
+
+    def get_file_io(self, table_path, identifier, data_token_enabled,
+                    rest_token_file_io_factory, default_file_io_factory):
+        """
+        Get FileIO for data access, supporting FUSE local path mapping.
+
+        Args:
+            table_path: The remote table path
+            identifier: Table identifier
+            data_token_enabled: Whether data token is enabled
+            rest_token_file_io_factory: Factory callable for RESTTokenFileIO
+            default_file_io_factory: Factory callable for default FileIO
+
+        Returns:
+            FileIO instance (FuseLocalFileIO or fallback)
+        """
+        # Configuration error raises exception directly
+        local_path = self.resolve_local_path(table_path, identifier)
+
+        # Perform validation (only once)
+        if self._validation_state is None:
+            self.validate()
+
+        # Validation passed, return FUSE-aware local FileIO
+        if self._validation_state:
+            return FuseLocalFileIO(
+                path=table_path.rstrip('/'),
+                fuse_path=local_path.rstrip('/'),
+                catalog_options=self._options,
+            )
+
+        # warn mode validation failed, fallback to default FileIO
+        if data_token_enabled:
+            return rest_token_file_io_factory()
+        return default_file_io_factory()
+
+    @property
+    def validation_state(self):
+        return self._validation_state
+
+    @validation_state.setter
+    def validation_state(self, value):
+        self._validation_state = value
diff --git a/paimon-python/pypaimon/catalog/rest/rest_catalog.py 
b/paimon-python/pypaimon/catalog/rest/rest_catalog.py
index a8f232a19c..fc2f516a8b 100644
--- a/paimon-python/pypaimon/catalog/rest/rest_catalog.py
+++ b/paimon-python/pypaimon/catalog/rest/rest_catalog.py
@@ -15,8 +15,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 
or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
 """
+import logging
 from typing import Any, Callable, Dict, List, Optional, Union
-
 from pypaimon.api.api_response import GetTableResponse, PagedList, 
ErrorResponse
 from pypaimon.api.rest_api import RESTApi
 from pypaimon.api.rest_exception import NoSuchResourceException, 
AlreadyExistsException, ForbiddenException
@@ -32,7 +32,7 @@ from pypaimon.catalog.database import Database
 from pypaimon.catalog.rest.property_change import PropertyChange
 from pypaimon.catalog.rest.rest_token_file_io import RESTTokenFileIO
 from pypaimon.catalog.rest.table_metadata import TableMetadata
-from pypaimon.common.options.config import CatalogOptions
+from pypaimon.common.options.config import CatalogOptions, FuseOptions
 from pypaimon.common.options.core_options import CoreOptions
 from pypaimon.common.file_io import FileIO
 from pypaimon.common.identifier import Identifier
@@ -46,6 +46,8 @@ from pypaimon.table.format.format_table import FormatTable, 
Format
 from pypaimon.table.iceberg.iceberg_table import IcebergTable
 from pypaimon.table.object.object_table import ObjectTable
 
+logger = logging.getLogger(__name__)
+
 FORMAT_TABLE_TYPE = "format-table"
 ICEBERG_TABLE_TYPE = "iceberg-table"
 OBJECT_TABLE_TYPE = "object-table"
@@ -59,6 +61,13 @@ class RESTCatalog(Catalog):
                                              context.prefer_io_loader, 
context.fallback_io_loader)
         self.data_token_enabled = 
self.rest_api.options.get(CatalogOptions.DATA_TOKEN_ENABLED)
 
+        # FUSE support (lazy import only when enabled)
+        self.fuse_enabled = self.context.options.get(FuseOptions.FUSE_ENABLED, 
False)
+        self._fuse_resolver = None
+        if self.fuse_enabled:
+            from pypaimon.catalog.rest.fuse_support import FusePathResolver
+            self._fuse_resolver = FusePathResolver(self.context.options, 
self.rest_api)
+
     def catalog_loader(self):
         """
         Create and return a RESTCatalogLoader for this catalog.
@@ -385,6 +394,18 @@ class RESTCatalog(Catalog):
         return FileIO.get(table_path, self.context.options)
 
     def file_io_for_data(self, table_path: str, identifier: Identifier):
+        """
+        Get FileIO for data access, supporting FUSE local path mapping.
+        """
+        if self._fuse_resolver is not None:
+            return self._fuse_resolver.get_file_io(
+                table_path, identifier, self.data_token_enabled,
+                rest_token_file_io_factory=lambda: RESTTokenFileIO(
+                    identifier, table_path, self.context.options),
+                default_file_io_factory=lambda: 
self.file_io_from_options(table_path),
+            )
+
+        # Fallback to original logic
         return RESTTokenFileIO(identifier, table_path, self.context.options) \
             if self.data_token_enabled else 
self.file_io_from_options(table_path)
 
diff --git a/paimon-python/pypaimon/common/options/config.py 
b/paimon-python/pypaimon/common/options/config.py
index 8b0230da64..83d46c85bf 100644
--- a/paimon-python/pypaimon/common/options/config.py
+++ b/paimon-python/pypaimon/common/options/config.py
@@ -85,3 +85,38 @@ class CatalogOptions:
     HTTP_USER_AGENT_HEADER = ConfigOptions.key(
         
"header.HTTP_USER_AGENT").string_type().no_default_value().with_description("HTTP
 User Agent header")
     BLOB_FILE_IO_DEFAULT_CACHE_SIZE = 2 ** 31 - 1
+
+
+class FuseOptions:
+    """FUSE configuration options."""
+
+    FUSE_ENABLED = (
+        ConfigOptions.key("fuse.enabled")
+        .boolean_type()
+        .default_value(False)
+        .with_description("Whether to enable FUSE local path mapping")
+    )
+
+    FUSE_ROOT = (
+        ConfigOptions.key("fuse.root")
+        .string_type()
+        .no_default_value()
+        .with_description("FUSE mounted local root path, e.g., 
/mnt/fuse/warehouse")
+    )
+
+    FUSE_VALIDATION_MODE = (
+        ConfigOptions.key("fuse.validation-mode")
+        .string_type()
+        .default_value("strict")
+        .with_description("Validation mode: strict, warn, or none")
+    )
+
+    FUSE_MODE = (
+        ConfigOptions.key("fuse.mode")
+        .string_type()
+        .default_value("pvfs")
+        .with_description(
+            "FUSE path mode: 'pvfs' uses database/table logical names, "
+            "'raw' uses URI path segments directly"
+        )
+    )
diff --git a/paimon-python/pypaimon/filesystem/local_file_io.py 
b/paimon-python/pypaimon/filesystem/local_file_io.py
index cf9399309f..44dadaaf66 100644
--- a/paimon-python/pypaimon/filesystem/local_file_io.py
+++ b/paimon-python/pypaimon/filesystem/local_file_io.py
@@ -453,3 +453,27 @@ class LocalFileIO(FileIO):
         except Exception as e:
             self.delete_quietly(path)
             raise RuntimeError(f"Failed to write blob file {path}: {e}") from e
+
+
+class FuseLocalFileIO(LocalFileIO):
+    """LocalFileIO that translates remote OSS paths to FUSE-mounted local 
paths.
+
+    All file operations receive paths like:
+        oss://clg-paimon-xxx/db-xxx/tbl-xxx/manifest/manifest-xxx
+    This class replaces the path prefix with fuse_path so the actual
+    I/O goes through the FUSE mount point.
+    """
+
+    def __init__(self, path: str, fuse_path: str,
+                 catalog_options: Optional[Options] = None):
+        super().__init__(path=fuse_path, catalog_options=catalog_options)
+        self.path = path
+        self.fuse_path = fuse_path
+
+    def _to_file(self, path: str) -> Path:
+        return super()._to_file(self._translate(path))
+
+    def _translate(self, path: str) -> str:
+        if path == self.path or path.startswith(self.path + "/"):
+            return self.fuse_path + path[len(self.path):]
+        return path
diff --git a/paimon-python/pypaimon/tests/rest/test_fuse_local_path.py 
b/paimon-python/pypaimon/tests/rest/test_fuse_local_path.py
new file mode 100644
index 0000000000..13e56c7099
--- /dev/null
+++ b/paimon-python/pypaimon/tests/rest/test_fuse_local_path.py
@@ -0,0 +1,346 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+################################################################################
+import unittest
+from unittest.mock import MagicMock, patch
+
+from pypaimon.catalog.rest.fuse_support import FusePathResolver
+from pypaimon.catalog.rest.rest_catalog import RESTCatalog
+from pypaimon.common.options import Options
+from pypaimon.common.options.config import FuseOptions
+
+
+class TestFuseLocalPath(unittest.TestCase):
+    """Test cases for FUSE local path functionality."""
+
+    def _create_resolver(
+        self,
+        root="/mnt/fuse/warehouse",
+        validation_mode="strict",
+        mode="pvfs"
+    ):
+        """Helper to create a FusePathResolver with given config."""
+        options = Options({
+            "uri": "http://localhost:8080";,
+            "warehouse": "oss://catalog/warehouse",
+            FuseOptions.FUSE_ENABLED.key(): "true",
+            FuseOptions.FUSE_ROOT.key(): root,
+            FuseOptions.FUSE_VALIDATION_MODE.key(): validation_mode,
+            FuseOptions.FUSE_MODE.key(): mode,
+        })
+        rest_api = MagicMock()
+        return FusePathResolver(options, rest_api), options, rest_api
+
+    def _create_catalog_with_fuse(
+        self,
+        enabled=True,
+        root="/mnt/fuse/warehouse",
+        validation_mode="strict",
+        mode="pvfs"
+    ):
+        """Helper to create a mock RESTCatalog with FUSE configuration."""
+        options = Options({
+            "uri": "http://localhost:8080";,
+            "warehouse": "oss://catalog/warehouse",
+            FuseOptions.FUSE_ENABLED.key(): str(enabled).lower(),
+            FuseOptions.FUSE_ROOT.key(): root,
+            FuseOptions.FUSE_VALIDATION_MODE.key(): validation_mode,
+            FuseOptions.FUSE_MODE.key(): mode,
+        })
+
+        catalog = MagicMock(spec=RESTCatalog)
+        catalog.fuse_enabled = enabled
+        catalog.data_token_enabled = False
+        catalog.rest_api = MagicMock()
+        catalog.context = MagicMock()
+        catalog.context.options = options
+
+        if enabled:
+            resolver = FusePathResolver(options, catalog.rest_api)
+            catalog._fuse_resolver = resolver
+        else:
+            catalog._fuse_resolver = None
+
+        catalog.file_io_for_data = 
RESTCatalog.file_io_for_data.__get__(catalog)
+        catalog.file_io_from_options = MagicMock(return_value=MagicMock())
+
+        return catalog
+
+    # ========== _resolve_fuse_local_path Tests ==========
+
+    # --- pvfs mode tests ---
+
+    def test_resolve_pvfs_mode_with_identifier(self):
+        """Test pvfs mode uses identifier logical names."""
+        from pypaimon.common.identifier import Identifier
+        resolver, _, _ = self._create_resolver(mode="pvfs")
+        identifier = Identifier.create("my_db", "my_table")
+
+        result = resolver.resolve_local_path(
+            "oss://clg-paimon-xxx/db-xxx/tbl-xxx", identifier
+        )
+        self.assertEqual(result, "/mnt/fuse/warehouse/my_db/my_table")
+
+    def test_resolve_pvfs_mode_with_trailing_slash(self):
+        """Test pvfs mode with trailing slash on root."""
+        from pypaimon.common.identifier import Identifier
+        resolver, _, _ = self._create_resolver(mode="pvfs", 
root="/mnt/fuse/warehouse/")
+        identifier = Identifier.create("my_db", "my_table")
+
+        result = resolver.resolve_local_path(
+            "oss://clg-paimon-xxx/db-xxx/tbl-xxx", identifier
+        )
+        self.assertEqual(result, "/mnt/fuse/warehouse/my_db/my_table")
+
+    def test_resolve_pvfs_mode_without_identifier_raises(self):
+        """Test pvfs mode raises ValueError when identifier is None."""
+        resolver, _, _ = self._create_resolver(mode="pvfs")
+
+        with self.assertRaises(ValueError) as context:
+            resolver.resolve_local_path("oss://clg-paimon-xxx/db-xxx/tbl-xxx")
+
+        self.assertIn("identifier is None", str(context.exception))
+
+    # --- raw mode tests ---
+
+    def test_resolve_raw_mode_basic(self):
+        """Test raw mode basic path conversion."""
+        resolver, _, _ = self._create_resolver(mode="raw")
+
+        result = resolver.resolve_local_path("oss://catalog/db1/table1")
+        self.assertEqual(result, "/mnt/fuse/warehouse/db1/table1")
+
+    def test_resolve_raw_mode_with_trailing_slash(self):
+        """Test raw mode with trailing slash on root."""
+        resolver, _, _ = self._create_resolver(mode="raw", 
root="/mnt/fuse/warehouse/")
+
+        result = resolver.resolve_local_path("oss://catalog/db1/table1")
+        self.assertEqual(result, "/mnt/fuse/warehouse/db1/table1")
+
+    def test_resolve_raw_mode_deep_path(self):
+        """Test raw mode with deep path."""
+        resolver, _, _ = self._create_resolver(mode="raw")
+
+        result = resolver.resolve_local_path(
+            "oss://catalog/db1/table1/partition1/file.parquet"
+        )
+        self.assertEqual(
+            result,
+            "/mnt/fuse/warehouse/db1/table1/partition1/file.parquet"
+        )
+
+    def test_resolve_raw_mode_without_scheme(self):
+        """Test raw mode path without scheme skips first segment."""
+        resolver, _, _ = self._create_resolver(mode="raw")
+
+        result = resolver.resolve_local_path("catalog/db1/table1")
+        self.assertEqual(result, "/mnt/fuse/warehouse/db1/table1")
+
+    def test_resolve_raw_mode_ignores_identifier(self):
+        """Test raw mode uses URI path even when identifier is provided."""
+        from pypaimon.common.identifier import Identifier
+        resolver, _, _ = self._create_resolver(mode="raw")
+        identifier = Identifier.create("my_db", "my_table")
+
+        result = resolver.resolve_local_path(
+            "oss://catalog/db-uuid/tbl-uuid", identifier
+        )
+        self.assertEqual(result, "/mnt/fuse/warehouse/db-uuid/tbl-uuid")
+
+    # --- common tests ---
+
+    def test_resolve_fuse_local_path_missing_root(self):
+        """Test error when root is not configured."""
+        resolver, _, _ = self._create_resolver(root=None)
+
+        with self.assertRaises(ValueError) as context:
+            resolver.resolve_local_path("oss://catalog/db1/table1")
+
+        self.assertIn("fuse.root is not configured", str(context.exception))
+
+    # ========== Validation Tests ==========
+
+    def test_validation_mode_none_skips_validation(self):
+        """Test none mode skips validation."""
+        resolver, _, _ = self._create_resolver(validation_mode="none")
+
+        resolver.validate()
+
+        self.assertTrue(resolver.validation_state)
+
+    def test_validation_mode_strict_raises_on_failure(self):
+        """Test strict mode raises exception on validation failure."""
+        resolver, _, rest_api = self._create_resolver(validation_mode="strict")
+
+        # Mock default database with location
+        mock_db = MagicMock()
+        mock_db.location = "oss://catalog/default"
+        rest_api.get_database.return_value = mock_db
+
+        # Mock LocalFileIO to return False for exists
+        with patch('pypaimon.catalog.rest.fuse_support.LocalFileIO') as 
mock_local_io:
+            mock_instance = MagicMock()
+            mock_instance.exists.return_value = False
+            mock_local_io.return_value = mock_instance
+
+            with self.assertRaises(ValueError) as context:
+                resolver.validate()
+
+            self.assertIn("FUSE local path validation failed", 
str(context.exception))
+
+    def test_validation_mode_warn_fallback_on_failure(self):
+        """Test warn mode falls back to default FileIO on validation 
failure."""
+        resolver, _, rest_api = self._create_resolver(validation_mode="warn")
+
+        # Mock default database with location
+        mock_db = MagicMock()
+        mock_db.location = "oss://catalog/default"
+        rest_api.get_database.return_value = mock_db
+
+        # Mock LocalFileIO to return False for exists
+        with patch('pypaimon.catalog.rest.fuse_support.LocalFileIO') as 
mock_local_io:
+            mock_instance = MagicMock()
+            mock_instance.exists.return_value = False
+            mock_local_io.return_value = mock_instance
+
+            # Should not raise, just set state to False
+            resolver.validate()
+
+            self.assertFalse(resolver.validation_state)
+
+    def test_validation_passes_when_local_exists(self):
+        """Test validation passes when local path exists."""
+        resolver, _, rest_api = self._create_resolver(validation_mode="strict")
+
+        # Mock default database with location
+        mock_db = MagicMock()
+        mock_db.location = "oss://catalog/default"
+        rest_api.get_database.return_value = mock_db
+
+        # Mock LocalFileIO to return True for exists
+        with patch('pypaimon.catalog.rest.fuse_support.LocalFileIO') as 
mock_local_io:
+            mock_instance = MagicMock()
+            mock_instance.exists.return_value = True
+            mock_local_io.return_value = mock_instance
+
+            resolver.validate()
+
+            self.assertTrue(resolver.validation_state)
+
+    def test_validation_skips_when_no_location(self):
+        """Test validation skips when default database has no location."""
+        resolver, _, rest_api = self._create_resolver(validation_mode="strict")
+
+        # Mock default database without location
+        mock_db = MagicMock()
+        mock_db.location = None
+        rest_api.get_database.return_value = mock_db
+
+        resolver.validate()
+
+        self.assertTrue(resolver.validation_state)
+
+    # ========== file_io_for_data Tests ==========
+
+    def test_file_io_for_data_disabled_fuse(self):
+        """Test that disabled FUSE uses default FileIO."""
+        catalog = self._create_catalog_with_fuse(enabled=False)
+        catalog.data_token_enabled = False
+
+        from pypaimon.common.identifier import Identifier
+        identifier = Identifier.create("db1", "table1")
+
+        _ = catalog.file_io_for_data("oss://catalog/db1/table1", identifier)
+        catalog.file_io_from_options.assert_called_once()
+
+    def test_file_io_for_data_uses_local_when_validated(self):
+        """Test that validated FUSE uses FuseLocalFileIO."""
+        catalog = self._create_catalog_with_fuse(enabled=True, 
validation_mode="none")
+        catalog._fuse_resolver.validation_state = True  # Already validated
+
+        from pypaimon.common.identifier import Identifier
+        identifier = Identifier.create("db1", "table1")
+
+        with patch('pypaimon.catalog.rest.fuse_support.FuseLocalFileIO') as 
mock_fuse_io:
+            mock_fuse_io.return_value = MagicMock()
+            _ = catalog.file_io_for_data("oss://catalog/db1/table1", 
identifier)
+            mock_fuse_io.assert_called_once()
+
+    def test_file_io_for_data_fallback_when_validation_failed(self):
+        """Test that failed validation falls back to default FileIO."""
+        catalog = self._create_catalog_with_fuse(enabled=True, 
validation_mode="warn")
+        catalog._fuse_resolver.validation_state = False  # Validation failed
+        catalog.data_token_enabled = False
+
+        from pypaimon.common.identifier import Identifier
+        identifier = Identifier.create("db1", "table1")
+
+        _ = catalog.file_io_for_data("oss://catalog/db1/table1", identifier)
+        catalog.file_io_from_options.assert_called_once()
+
+    # ========== Invalid Mode Tests ==========
+
+    def test_resolve_invalid_mode_raises(self):
+        """Test that an invalid mode raises ValueError."""
+        resolver, _, _ = self._create_resolver(mode="invalid")
+
+        with self.assertRaises(ValueError) as context:
+            resolver.resolve_local_path("oss://catalog/db1/table1")
+
+        self.assertIn("Invalid fuse.mode", str(context.exception))
+        self.assertIn("invalid", str(context.exception))
+
+    # ========== Raw Mode Validation Tests ==========
+
+    def test_validation_raw_mode_strict_raises_on_failure(self):
+        """Test strict validation in raw mode raises exception on failure."""
+        resolver, _, rest_api = 
self._create_resolver(validation_mode="strict", mode="raw")
+
+        mock_db = MagicMock()
+        mock_db.location = "oss://catalog/default"
+        rest_api.get_database.return_value = mock_db
+
+        with patch('pypaimon.catalog.rest.fuse_support.LocalFileIO') as 
mock_local_io:
+            mock_instance = MagicMock()
+            mock_instance.exists.return_value = False
+            mock_local_io.return_value = mock_instance
+
+            with self.assertRaises(ValueError) as context:
+                resolver.validate()
+
+            self.assertIn("FUSE local path validation failed", 
str(context.exception))
+
+    def test_validation_raw_mode_passes_when_local_exists(self):
+        """Test validation passes in raw mode when local path exists."""
+        resolver, _, rest_api = 
self._create_resolver(validation_mode="strict", mode="raw")
+
+        mock_db = MagicMock()
+        mock_db.location = "oss://catalog/default"
+        rest_api.get_database.return_value = mock_db
+
+        with patch('pypaimon.catalog.rest.fuse_support.LocalFileIO') as 
mock_local_io:
+            mock_instance = MagicMock()
+            mock_instance.exists.return_value = True
+            mock_local_io.return_value = mock_instance
+
+            resolver.validate()
+
+            self.assertTrue(resolver.validation_state)
+
+
+if __name__ == '__main__':
+    unittest.main()

Reply via email to