This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b839753f46 [python] Add HDFS native FileIO backend (no Hadoop install 
required) (#8031)
b839753f46 is described below

commit b839753f46d8d447ab4ba5f8770e1ca13c0a5288
Author: chaoyang <[email protected]>
AuthorDate: Wed Jun 3 10:28:14 2026 +0800

    [python] Add HDFS native FileIO backend (no Hadoop install required) (#8031)
    
    Introduces HdfsNativeFileIO backed by the hdfs-native protocol client
    (Rust + PyO3)
    
    Default backend for hdfs:// and viewfs:// switches to native; the
    PyArrow / libhdfs path is kept, with auto-fallback when hdfs-native is
    unavailable (e.g. on Windows or when the extra is not installed).
---
 paimon-python/README.md                            |  79 ++
 paimon-python/pypaimon/benchmark/hdfs_io_bench.py  | 168 ++++
 paimon-python/pypaimon/common/file_io.py           |  41 +-
 paimon-python/pypaimon/common/options/config.py    |  35 +
 paimon-python/pypaimon/filesystem/_kerberos.py     |  45 +
 .../pypaimon/filesystem/hdfs_native_file_io.py     | 689 ++++++++++++++++
 .../pypaimon/filesystem/pyarrow_file_io.py         |  21 +-
 paimon-python/pypaimon/tests/e2e/hdfs/README.md    |  75 ++
 paimon-python/pypaimon/tests/e2e/hdfs/__init__.py  |  16 +
 .../pypaimon/tests/e2e/hdfs/docker-compose.yml     |  55 ++
 .../tests/e2e/hdfs/hdfs_native_e2e_test.py         | 107 +++
 paimon-python/pypaimon/tests/hdfs_native_test.py   | 917 +++++++++++++++++++++
 paimon-python/pypaimon/tests/kerberos_test.py      |   2 +-
 paimon-python/setup.py                             |   3 +
 14 files changed, 2233 insertions(+), 20 deletions(-)

diff --git a/paimon-python/README.md b/paimon-python/README.md
index e216dc0019..5f716c8828 100644
--- a/paimon-python/README.md
+++ b/paimon-python/README.md
@@ -31,3 +31,82 @@ pip3 install dist/*.tar.gz
 
 The command will install the package and core dependencies to your local 
Python environment.
 
+# HDFS without a local Hadoop install
+
+`pypaimon` supports HDFS through a pure-protocol client based on
+[`hdfs-native`](https://github.com/Kimahriman/hdfs-native) (Rust + PyO3).
+Use it when you want HDFS access **without** installing Hadoop, a JDK,
+`libhdfs`, or wrestling with `CLASSPATH` / `LD_LIBRARY_PATH`.
+
+Install with the optional extra:
+
+```commandline
+pip install 'pypaimon[hdfs]'
+```
+
+The native backend requires **Python 3.10+** (and is unavailable on Windows).
+On older interpreters the extra is skipped, so `pypaimon` still installs — keep
+using the legacy `pyarrow` (`libhdfs`/JVM) backend there via
+`hdfs.client.impl=pyarrow`.
+
+For `hdfs://` and `viewfs://` URIs this backend is now the default.
+Switch back to the legacy `libhdfs` (JNI) path with:
+
+```python
+catalog = CatalogFactory.create({
+    "warehouse": "hdfs://ns1/warehouse",
+    "hdfs.client.impl": "pyarrow",   # default: "native"
+})
+```
+
+## Sourcing the cluster wiring
+
+The client still needs to know about NameNode addresses, HA failover
+groups, and `viewfs` mount tables. Three options:
+
+1. **Local xml** — set `HADOOP_CONF_DIR` (or the `hdfs.conf-dir` option)
+   to a directory containing `core-site.xml` / `hdfs-site.xml`. Only the
+   xml is required; no Hadoop binaries or JDK.
+
+2. **Catalog options (REST-friendly)** — pass the original Hadoop
+   key/values directly in catalog options. Keys with prefixes `dfs.`,
+   `fs.`, `hadoop.`, `ipc.`, `io.` are forwarded as-is. A REST catalog
+   can deliver these in its response, giving a fully zero-file client
+   experience:
+
+   ```python
+   CatalogFactory.create({
+       "warehouse": "viewfs://cluster/warehouse",
+       "dfs.nameservices": "ns1",
+       "dfs.ha.namenodes.ns1": "nn1,nn2",
+       "dfs.namenode.rpc-address.ns1.nn1": "host-1:8020",
+       "dfs.namenode.rpc-address.ns1.nn2": "host-2:8020",
+       "fs.viewfs.mounttable.cluster.link./prod": "hdfs://ns1/prod",
+   })
+   ```
+
+3. **Namespaced overrides** — use `hdfs.config.<key>` to forward any
+   other Hadoop key not covered by the prefix whitelist.
+
+The three sources can be combined; catalog options take precedence over
+xml.
+
+## Kerberos
+
+A secured cluster still needs the GSSAPI system library
+(`libgssapi-krb5-2` on Debian/Ubuntu, `krb5` via Homebrew on macOS,
+`krb5-libs` on RHEL) plus a `krb5.conf`. Provide credentials by either:
+
+- Running `kinit` yourself and pointing `KRB5CCNAME` at the cache, or
+- Setting `security.kerberos.login.principal` and
+  `security.kerberos.login.keytab` in catalog options — `pypaimon` will
+  run `kinit` for you.
+
+## Fallback behaviour
+
+If the native backend fails to initialise (e.g. wheel missing on an
+unsupported platform such as Windows), `pypaimon` automatically falls
+back to the `pyarrow` (`libhdfs`/JVM) path and logs a warning. Disable
+the fallback with `hdfs.client.fallback-to-pyarrow=false` if you want
+hard failures instead.
+
diff --git a/paimon-python/pypaimon/benchmark/hdfs_io_bench.py 
b/paimon-python/pypaimon/benchmark/hdfs_io_bench.py
new file mode 100644
index 0000000000..5b6b12dd18
--- /dev/null
+++ b/paimon-python/pypaimon/benchmark/hdfs_io_bench.py
@@ -0,0 +1,168 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""HDFS FileIO benchmark: native (hdfs-native) vs pyarrow (libhdfs/JVM).
+
+Compares throughput of common FileIO operations between the two backends
+against the same HDFS cluster. Each backend is exercised via the FileIO
+factory by toggling the `hdfs.client.impl` option.
+
+Usage:
+    python -m pypaimon.benchmark.hdfs_io_bench \\
+        --warehouse hdfs://localhost:8020/bench \\
+        [--backend native|pyarrow|both] \\
+        [--write-size-mb 256] \\
+        [--list-files 1000] \\
+        [--read-iters 3]
+
+Notes:
+- `pyarrow` backend requires HADOOP_HOME + HADOOP_CONF_DIR + libhdfs.
+- `native` backend requires `pip install pypaimon[hdfs]`.
+- The benchmark writes/reads scratch files under <warehouse>/bench_<uuid>/
+  and removes them on exit.
+"""
+
+import argparse
+import os
+import sys
+import time
+import uuid
+from pathlib import Path
+
+sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
+
+from pypaimon.common.file_io import FileIO  # noqa: E402
+from pypaimon.common.options import Options  # noqa: E402
+
+
+def _build_file_io(warehouse: str, backend: str) -> FileIO:
+    opts = Options({"hdfs.client.impl": backend})
+    return FileIO.get(warehouse, opts)
+
+
+def _human(seconds: float) -> str:
+    if seconds < 1e-3:
+        return f"{seconds * 1e6:.0f}us"
+    if seconds < 1:
+        return f"{seconds * 1e3:.1f}ms"
+    return f"{seconds:.2f}s"
+
+
+def _bench_write(file_io: FileIO, root: str, size_mb: int) -> float:
+    payload = os.urandom(min(size_mb, 16) * 1024 * 1024)
+    path = f"{root}/write-{uuid.uuid4().hex[:8]}.bin"
+    t0 = time.perf_counter()
+    with file_io.new_output_stream(path) as stream:
+        written = 0
+        target = size_mb * 1024 * 1024
+        while written < target:
+            chunk = payload[: min(len(payload), target - written)]
+            n = stream.write(chunk)
+            written += n if isinstance(n, int) and n > 0 else len(chunk)
+    return time.perf_counter() - t0
+
+
+def _bench_read(file_io: FileIO, path: str) -> float:
+    t0 = time.perf_counter()
+    with file_io.new_input_stream(path) as stream:
+        while True:
+            chunk = stream.read(8 * 1024 * 1024)
+            if not chunk:
+                break
+    return time.perf_counter() - t0
+
+
+def _bench_list(file_io: FileIO, root: str, num_files: int) -> float:
+    scratch = f"{root}/list-{uuid.uuid4().hex[:8]}"
+    file_io.mkdirs(scratch)
+    try:
+        for i in range(num_files):
+            with file_io.new_output_stream(f"{scratch}/f-{i:06d}.txt") as s:
+                s.write(b"x")
+        t0 = time.perf_counter()
+        results = file_io.list_status(scratch)
+        _ = list(results)
+        return time.perf_counter() - t0
+    finally:
+        file_io.delete(scratch, recursive=True)
+
+
+def run_one(backend: str, args) -> None:
+    print(f"\n=== backend={backend} ===")
+    try:
+        file_io = _build_file_io(args.warehouse, backend)
+    except Exception as e:
+        print(f"  init failed: {e}")
+        return
+
+    bench_root = f"{args.warehouse.rstrip('/')}/bench_{uuid.uuid4().hex[:8]}"
+    file_io.mkdirs(bench_root)
+    try:
+        # Write
+        sample_path = f"{bench_root}/write-sample.bin"
+        with file_io.new_output_stream(sample_path) as stream:
+            payload = os.urandom(min(args.write_size_mb, 16) * 1024 * 1024)
+            written = 0
+            target = args.write_size_mb * 1024 * 1024
+            t0 = time.perf_counter()
+            while written < target:
+                chunk = payload[: min(len(payload), target - written)]
+                n = stream.write(chunk)
+                written += n if isinstance(n, int) and n > 0 else len(chunk)
+            write_elapsed = time.perf_counter() - t0
+        mb_per_s = args.write_size_mb / write_elapsed if write_elapsed else 0
+        print(f"  write {args.write_size_mb}MB:  "
+              f"{_human(write_elapsed)}  ({mb_per_s:.1f} MB/s)")
+
+        # Read (warm)
+        read_times = []
+        for _ in range(args.read_iters):
+            read_times.append(_bench_read(file_io, sample_path))
+        avg_read = sum(read_times) / len(read_times)
+        rmb_per_s = args.write_size_mb / avg_read if avg_read else 0
+        print(f"  read  {args.write_size_mb}MB (avg of {args.read_iters}): "
+              f"{_human(avg_read)}  ({rmb_per_s:.1f} MB/s)")
+
+        # List
+        list_elapsed = _bench_list(file_io, bench_root, args.list_files)
+        print(f"  list  {args.list_files} files: {_human(list_elapsed)}")
+
+    finally:
+        try:
+            file_io.delete(bench_root, recursive=True)
+        except Exception as e:
+            print(f"  cleanup failed: {e}")
+
+
+def main():
+    parser = argparse.ArgumentParser()
+    parser.add_argument("--warehouse", required=True,
+                        help="HDFS URI under which scratch files are created")
+    parser.add_argument("--backend", default="both",
+                        choices=["native", "pyarrow", "both"])
+    parser.add_argument("--write-size-mb", type=int, default=128)
+    parser.add_argument("--list-files", type=int, default=1000)
+    parser.add_argument("--read-iters", type=int, default=3)
+    args = parser.parse_args()
+
+    backends = ["native", "pyarrow"] if args.backend == "both" else 
[args.backend]
+    for backend in backends:
+        run_one(backend, args)
+
+
+if __name__ == "__main__":
+    main()
diff --git a/paimon-python/pypaimon/common/file_io.py 
b/paimon-python/pypaimon/common/file_io.py
index 217ae4fd72..fabda00495 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/common/file_io.py
@@ -289,8 +289,11 @@ class FileIO(ABC):
         """
         Returns a FileIO instance for accessing the file system identified by 
the given path.
         - LocalFileIO for local file system (file:// or no scheme)
-        - PyArrowFileIO for remote file systems (oss://, s3://, hdfs://, etc.)
+        - HdfsNativeFileIO for HDFS/ViewFS (default; pure protocol client, no 
Hadoop install)
+        - PyArrowFileIO for other remote file systems (oss://, s3://, gs://, 
...),
+          and for HDFS when explicitly requested via hdfs.client.impl=pyarrow
         """
+        import os as _os
         from urllib.parse import urlparse
 
         uri = urlparse(path)
@@ -300,5 +303,39 @@ class FileIO(ABC):
             from pypaimon.filesystem.local_file_io import LocalFileIO
             return LocalFileIO(path, catalog_options)
 
+        opts = catalog_options or Options({})
+
+        if scheme in ("hdfs", "viewfs"):
+            from pypaimon.common.options.config import HdfsOptions
+            impl_source = "hdfs.client.impl option"
+            # Treat an empty option value the same as "unset" so callers can
+            # blank it out (common in templated configs) without tripping
+            # the unsupported-impl branch.
+            impl_value = opts.to_map().get(HdfsOptions.HDFS_CLIENT_IMPL.key())
+            if not impl_value:
+                impl_value = _os.environ.get("PYPAIMON_HDFS_IMPL")
+                impl_source = "PYPAIMON_HDFS_IMPL env var"
+            if not impl_value:
+                impl_value = HdfsOptions.HDFS_CLIENT_IMPL.default_value()
+                impl_source = "default"
+            impl = impl_value.lower()
+            if impl == "native":
+                try:
+                    from pypaimon.filesystem.hdfs_native_file_io import 
HdfsNativeFileIO
+                    return HdfsNativeFileIO(path, opts)
+                except (ImportError, RuntimeError) as e:
+                    fallback = 
opts.get(HdfsOptions.HDFS_CLIENT_FALLBACK_TO_PYARROW)
+                    if not fallback:
+                        raise
+                    logging.getLogger(__name__).warning(
+                        "Native HDFS backend init failed, falling back to "
+                        "pyarrow: %s", e,
+                    )
+            elif impl != "pyarrow":
+                raise ValueError(
+                    f"Unsupported hdfs.client.impl '{impl_value}' "
+                    f"(from {impl_source}). Supported: 'native', 'pyarrow'."
+                )
+
         from pypaimon.filesystem.pyarrow_file_io import PyArrowFileIO
-        return PyArrowFileIO(path, catalog_options or Options({}))
+        return PyArrowFileIO(path, opts)
diff --git a/paimon-python/pypaimon/common/options/config.py 
b/paimon-python/pypaimon/common/options/config.py
index 465671725d..6189ebaf89 100644
--- a/paimon-python/pypaimon/common/options/config.py
+++ b/paimon-python/pypaimon/common/options/config.py
@@ -104,6 +104,41 @@ class CatalogOptions:
     BLOB_FILE_IO_DEFAULT_CACHE_SIZE = 2 ** 31 - 1
 
 
+class HdfsOptions:
+    HDFS_CLIENT_IMPL = (
+        ConfigOptions.key("hdfs.client.impl")
+        .string_type()
+        .default_value("native")
+        .with_description(
+            "HDFS FileIO backend. Supported values: 'native' (default, uses "
+            "hdfs-native protocol client, no Hadoop install required), "
+            "'pyarrow' (legacy, requires HADOOP_HOME / libhdfs / JVM)."
+        )
+    )
+    HDFS_CLIENT_FALLBACK_TO_PYARROW = (
+        ConfigOptions.key("hdfs.client.fallback-to-pyarrow")
+        .boolean_type()
+        .default_value(True)
+        .with_description(
+            "When the native backend fails to initialise (e.g. missing wheel "
+            "or unsupported platform), fall back to the pyarrow backend "
+            "instead of raising."
+        )
+    )
+    HDFS_CONF_DIR = (
+        ConfigOptions.key("hdfs.conf-dir")
+        .string_type()
+        .no_default_value()
+        .with_description(
+            "Directory containing core-site.xml / hdfs-site.xml that the "
+            "native client should load. Defaults to $HADOOP_CONF_DIR."
+        )
+    )
+
+    HDFS_CONFIG_PREFIX = "hdfs.config."
+    HDFS_NATIVE_CONFIG_KEY_PREFIXES = ("dfs.", "fs.", "hadoop.", "ipc.", "io.")
+
+
 class SecurityOptions:
     KERBEROS_PRINCIPAL = (
         ConfigOptions.key("security.kerberos.login.principal")
diff --git a/paimon-python/pypaimon/filesystem/_kerberos.py 
b/paimon-python/pypaimon/filesystem/_kerberos.py
new file mode 100644
index 0000000000..00a31ba975
--- /dev/null
+++ b/paimon-python/pypaimon/filesystem/_kerberos.py
@@ -0,0 +1,45 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Shared Kerberos helpers used by HDFS FileIO backends."""
+
+import os
+import subprocess
+from typing import Optional
+
+
+def kerberos_login_from_keytab(principal: str, keytab: str) -> None:
+    if not os.path.isfile(keytab):
+        raise FileNotFoundError(f"Kerberos keytab file not found: {keytab}")
+    if not os.access(keytab, os.R_OK):
+        raise PermissionError(f"Kerberos keytab file is not readable: 
{keytab}")
+    subprocess.run(
+        ['kinit', '-kt', keytab, principal],
+        check=True, capture_output=True, text=True,
+    )
+
+
+def get_ticket_cache_path() -> Optional[str]:
+    cc = os.environ.get('KRB5CCNAME')
+    if cc:
+        if cc.startswith('FILE:'):
+            return cc[5:]
+        return cc
+    default_path = f'/tmp/krb5cc_{os.getuid()}'
+    if os.path.exists(default_path):
+        return default_path
+    return None
diff --git a/paimon-python/pypaimon/filesystem/hdfs_native_file_io.py 
b/paimon-python/pypaimon/filesystem/hdfs_native_file_io.py
new file mode 100644
index 0000000000..ba6fc0f4b6
--- /dev/null
+++ b/paimon-python/pypaimon/filesystem/hdfs_native_file_io.py
@@ -0,0 +1,689 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""HDFS FileIO backed by the hdfs-native protocol client (no JVM, no 
libhdfs)."""
+
+import logging
+import os
+import xml.etree.ElementTree as ET
+from datetime import datetime, timezone
+from pathlib import PurePosixPath
+from typing import Dict, Optional
+from urllib.parse import urlparse
+
+import pyarrow
+import pyarrow.fs as pafs
+
+from pypaimon.common.file_io import FileIO
+from pypaimon.common.options import Options
+from pypaimon.common.options.config import HdfsOptions, SecurityOptions
+from pypaimon.common.uri_reader import UriReaderFactory
+from pypaimon.filesystem import _kerberos
+from pypaimon.schema.data_types import AtomicType, DataField, 
PyarrowFieldParser
+from pypaimon.write.blob_format_writer import BlobFormatWriter
+
+
+class _HdfsFileInfo:
+    """pafs.FileInfo-shaped adapter built from hdfs_native.FileStatus."""
+    __slots__ = ('path', 'size', 'type', 'mtime', 'base_name')
+
+    def __init__(self, path: str, size: Optional[int], file_type, mtime):
+        self.path = path
+        self.size = size
+        self.type = file_type
+        self.mtime = mtime
+        self.base_name = path.rsplit('/', 1)[-1] if path else ''
+
+
+class _HdfsWriterAdapter:
+    """File-like wrapper over hdfs_native.FileWriter."""
+
+    def __init__(self, fw):
+        self._fw = fw
+        self._pos = 0
+        self._closed = False
+
+    def write(self, buf) -> int:
+        n = self._fw.write(buf)
+        if n is None:
+            n = len(buf) if hasattr(buf, '__len__') else 0
+        self._pos += n
+        return n
+
+    def tell(self) -> int:
+        return self._pos
+
+    def flush(self):
+        pass
+
+    def close(self):
+        if not self._closed:
+            try:
+                self._fw.close()
+            finally:
+                self._closed = True
+
+    @property
+    def closed(self) -> bool:
+        return self._closed
+
+    def writable(self) -> bool:
+        return True
+
+    def readable(self) -> bool:
+        return False
+
+    def seekable(self) -> bool:
+        return False
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, *exc):
+        self.close()
+        return False
+
+
+class _HdfsReaderAdapter:
+    """File-like wrapper over hdfs_native.FileReader.
+
+    Delegates read/seek/tell straight to the underlying reader (which is an
+    io.RawIOBase subclass with full seek/tell support). The wrapper only
+    exists so that exiting a `with` block guarantees the underlying handle
+    is closed — hdfs-native's own FileReader.__exit__ is a no-op.
+    """
+
+    def __init__(self, fr):
+        self._fr = fr
+        self._closed = False
+
+    def read(self, size: int = -1) -> bytes:
+        return self._fr.read(-1 if size is None else size)
+
+    def read1(self, size: int = -1) -> bytes:
+        return self.read(size)
+
+    def seek(self, pos: int, whence: int = 0) -> int:
+        self._fr.seek(pos, whence)
+        return self._fr.tell()
+
+    def tell(self) -> int:
+        return self._fr.tell()
+
+    def close(self):
+        if self._closed:
+            return
+        try:
+            close = getattr(self._fr, 'close', None)
+            if close is not None:
+                close()
+        finally:
+            self._closed = True
+
+    @property
+    def closed(self) -> bool:
+        return self._closed
+
+    def readable(self) -> bool:
+        return True
+
+    def writable(self) -> bool:
+        return False
+
+    def seekable(self) -> bool:
+        return True
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, *exc):
+        self.close()
+        return False
+
+
+class HdfsNativeFileIO(FileIO):
+    """HDFS FileIO that speaks the HDFS RPC protocol directly.
+
+    No JVM, no libhdfs, no Hadoop install required. Hadoop xml is still
+    consumed if present (HADOOP_CONF_DIR or `hdfs.conf-dir` option) for
+    viewfs mount tables and HA NameNode lists; alternatively the same
+    key/values can be delivered via the catalog options channel (a REST
+    catalog can therefore push the cluster wiring with the response).
+    """
+
+    NATIVE_KEY_PREFIXES = HdfsOptions.HDFS_NATIVE_CONFIG_KEY_PREFIXES
+    NS_PREFIX = HdfsOptions.HDFS_CONFIG_PREFIX
+
+    def __init__(self, path: str, catalog_options: Options):
+        self.properties = catalog_options or Options({})
+        self.logger = logging.getLogger(__name__)
+        self.uri_reader_factory = UriReaderFactory(self.properties)
+
+        scheme, netloc, _ = self.parse_location(path)
+        if scheme not in {"hdfs", "viewfs"}:
+            raise ValueError(
+                f"HdfsNativeFileIO does not support scheme '{scheme}'"
+            )
+        self._scheme = scheme
+        self._netloc = netloc
+
+        try:
+            from hdfs_native import Client, WriteOptions
+        except ImportError as e:
+            raise ImportError(
+                "hdfs-native is not installed. "
+                "Install with: pip install 'pypaimon[hdfs]'"
+            ) from e
+        self._WriteOptions = WriteOptions
+
+        self._setup_kerberos()
+
+        config_dir = (
+            self.properties.get(HdfsOptions.HDFS_CONF_DIR)
+            or os.environ.get("HADOOP_CONF_DIR")
+        )
+        hadoop_xml = self._load_hadoop_xml(config_dir)
+
+        config = self._build_config_dict()
+        self._maybe_inject_viewfs_fallback(scheme, netloc, config, hadoop_xml)
+
+        # Stash for the lazy `filesystem` property (the fsspec/pyarrow facade
+        # is only built if a caller asks for it).
+        self._config = config
+        self._hadoop_xml = hadoop_xml
+        self._config_dir = config_dir
+        self._filesystem = None
+
+        client_kwargs = {}
+        url = self._build_url(scheme, netloc)
+        if url:
+            client_kwargs["url"] = url
+        if config:
+            client_kwargs["config"] = config
+        if config_dir:
+            client_kwargs["config_dir"] = config_dir
+
+        self._client = Client(**client_kwargs)
+
+    def __reduce__(self):
+        """Pickle support for Ray / multiprocessing.
+
+        hdfs_native.Client is a Rust binding that can't be pickled; rather
+        than try to serialise live handles, we serialise the constructor
+        inputs and let workers re-init their own Client. Same pattern
+        pyarrow.fs.HadoopFileSystem uses.
+
+        Pin the resolved config_dir into the carried options. If the
+        driver resolved it from $HADOOP_CONF_DIR, a worker on a host with
+        a different env var would otherwise pick up the worker's value
+        and silently talk to a different cluster.
+        """
+        netloc = self._netloc or ""
+        path = f"{self._scheme}://{netloc}"
+        props_map = dict(self.properties.to_map())
+        if self._config_dir and not props_map.get(
+            HdfsOptions.HDFS_CONF_DIR.key()
+        ):
+            props_map[HdfsOptions.HDFS_CONF_DIR.key()] = self._config_dir
+        return (type(self), (path, Options(props_map)))
+
+    @property
+    def filesystem(self):
+        """pyarrow.fs.FileSystem facade backed by hdfs_native.fsspec.
+
+        Lazily constructed: FileIO-only call paths
+        (exists/list_status/new_input_stream/...) never pay the fsspec init
+        cost; only ds.dataset / open_input_file callers do.
+        """
+        if self._filesystem is None:
+            import pyarrow.fs as pafs
+            try:
+                from hdfs_native.fsspec import (
+                    HdfsFileSystem,
+                    ViewfsFileSystem,
+                )
+            except ImportError as e:
+                raise RuntimeError(
+                    "hdfs-native fsspec adapter is required to bridge "
+                    "HdfsNativeFileIO to a pyarrow.fs filesystem; upgrade "
+                    "hdfs-native (>=0.13)."
+                ) from e
+            cls = (ViewfsFileSystem if self._scheme == "viewfs"
+                   else HdfsFileSystem)
+            # Merge xml + overrides so the fsspec instance can connect
+            # without relying on HADOOP_CONF_DIR (BaseFileSystem.__init__
+            # only forwards storage_options to Client, not config_dir).
+            merged_config = {**self._hadoop_xml, **self._config}
+            fsspec_fs = cls(host=self._netloc, **merged_config)
+            self._filesystem = pafs.PyFileSystem(
+                pafs.FSSpecHandler(fsspec_fs))
+        return self._filesystem
+
+    @staticmethod
+    def parse_location(location: str):
+        uri = urlparse(location)
+        if not uri.scheme:
+            return "file", uri.netloc, os.path.abspath(location)
+        return uri.scheme, uri.netloc, uri.path
+
+    @staticmethod
+    def _build_url(scheme: str, netloc: Optional[str]) -> Optional[str]:
+        if not netloc:
+            return None
+        return f"{scheme}://{netloc}"
+
+    @staticmethod
+    def _load_hadoop_xml(config_dir: Optional[str]) -> Dict[str, str]:
+        """Parse core-site.xml + hdfs-site.xml from a Hadoop config dir into a
+        flat {name: value} dict. Returns empty dict if the dir is missing or
+        unreadable.
+
+        Used only to discover viewfs mount-table state so we can polyfill the
+        linkFallback mount that hdfs-native requires but libhdfs tolerates.
+        The final config dir is still handed to hdfs-native for its own
+        (more complete) xml parsing.
+        """
+        result: Dict[str, str] = {}
+        if not config_dir or not os.path.isdir(config_dir):
+            return result
+        for fname in ("core-site.xml", "hdfs-site.xml"):
+            path = os.path.join(config_dir, fname)
+            if not os.path.isfile(path):
+                continue
+            try:
+                tree = ET.parse(path)
+            except (ET.ParseError, OSError):
+                continue
+            for prop in tree.getroot().findall("property"):
+                name_el = prop.find("name")
+                value_el = prop.find("value")
+                if name_el is None or name_el.text is None:
+                    continue
+                value = (
+                    value_el.text.strip()
+                    if value_el is not None and value_el.text
+                    else ""
+                )
+                result[name_el.text.strip()] = value
+        return result
+
+    @staticmethod
+    def _maybe_inject_viewfs_fallback(
+        scheme: str,
+        netloc: Optional[str],
+        overrides: Dict[str, str],
+        hadoop_xml: Dict[str, str],
+    ) -> None:
+        """If we're opening a viewfs URI and no linkFallback is configured for
+        the cluster, pick a usable nameservice URI from existing link.*
+        targets or dfs.nameservices and inject one into `overrides`.
+
+        hdfs-native rejects viewfs init without a fallback mount; libhdfs
+        tolerates it. This bridges the gap without touching cluster xml.
+
+        The mount-table state is read from the merged view of hadoop xml and
+        catalog-option overrides, so a zero-file viewfs setup (link.* /
+        dfs.nameservices pushed purely through catalog options) gets a
+        fallback too; the injected key is still only written back to
+        `overrides`.
+        """
+        if scheme != "viewfs" or not netloc:
+            return
+        cluster = netloc
+        fallback_key = f"fs.viewfs.mounttable.{cluster}.linkFallback"
+        if fallback_key in overrides or fallback_key in hadoop_xml:
+            return
+
+        merged = {**hadoop_xml, **overrides}
+
+        link_prefix = f"fs.viewfs.mounttable.{cluster}.link."
+        for key, value in merged.items():
+            if key.startswith(link_prefix) and value:
+                parsed = urlparse(value)
+                if parsed.scheme == "hdfs" and parsed.netloc:
+                    overrides[fallback_key] = f"hdfs://{parsed.netloc}/"
+                    return
+
+        nameservices = [
+            ns.strip()
+            for ns in merged.get("dfs.nameservices", "").split(",")
+            if ns.strip()
+        ]
+        if nameservices:
+            overrides[fallback_key] = f"hdfs://{nameservices[0]}/"
+
+    def _setup_kerberos(self):
+        principal = (
+            self.properties.get(SecurityOptions.KERBEROS_PRINCIPAL)
+            or self.properties.to_map().get("security.principal")
+        )
+        keytab = (
+            self.properties.get(SecurityOptions.KERBEROS_KEYTAB)
+            or self.properties.to_map().get("security.keytab")
+        )
+        if bool(principal) != bool(keytab):
+            raise ValueError(
+                "security.kerberos.login.principal and "
+                "security.kerberos.login.keytab "
+                "must be both set or both unset"
+            )
+        if principal and keytab:
+            _kerberos.kerberos_login_from_keytab(principal, keytab)
+            cache_path = _kerberos.get_ticket_cache_path()
+            if not cache_path:
+                raise RuntimeError(
+                    "kinit succeeded but no ticket cache path could be "
+                    "determined. Set the KRB5CCNAME environment variable "
+                    "to specify the cache location."
+                )
+            # hdfs-native's GSSAPI layer reads KRB5CCNAME from the process
+            # env, which is global state. If a different cache was already
+            # configured (typically because another HdfsNativeFileIO with
+            # a different principal lives in the same process), warn — the
+            # last writer wins and earlier instances will start using the
+            # new ticket, which is almost certainly not what the caller
+            # wanted.
+            existing = os.environ.get("KRB5CCNAME")
+            existing_stripped = (
+                existing[5:] if existing and existing.startswith("FILE:")
+                else existing
+            )
+            if existing_stripped and existing_stripped != cache_path:
+                self.logger.warning(
+                    "Overwriting process-global KRB5CCNAME from %r to %r; "
+                    "concurrent HdfsNativeFileIO instances with different "
+                    "Kerberos principals share env state and will clobber "
+                    "each other's ticket caches.",
+                    existing, cache_path,
+                )
+            # Preserve the `FILE:` qualifier if the existing value carried
+            # it — some GSSAPI tooling distinguishes cache types by prefix.
+            os.environ["KRB5CCNAME"] = (
+                f"FILE:{cache_path}"
+                if existing and existing.startswith("FILE:")
+                else cache_path
+            )
+
+    def _build_config_dict(self) -> Dict[str, str]:
+        config: Dict[str, str] = {}
+        for key, value in self.properties.to_map().items():
+            if value is None:
+                continue
+            if any(key.startswith(p) for p in self.NATIVE_KEY_PREFIXES):
+                config[key] = str(value)
+            elif key.startswith(self.NS_PREFIX):
+                config[key[len(self.NS_PREFIX):]] = str(value)
+        return config
+
+    def to_filesystem_path(self, path: str) -> str:
+        # hdfs-native expects an absolute path within the cluster the Client is
+        # bound to; passing a full URI makes its Rust-side MountTable::resolve
+        # treat the string as a relative path (since it doesn't start with '/')
+        # and prepend the user's home dir, producing nonsense like
+        # `/user/foo/viewfs://cluster/...`. Strip the matching scheme+authority
+        # so a plain absolute path reaches the client.
+        parsed = urlparse(path)
+        if parsed.scheme in ("hdfs", "viewfs"):
+            if parsed.scheme == self._scheme and (
+                not parsed.netloc or parsed.netloc == self._netloc
+            ):
+                return parsed.path or "/"
+        return path
+
+    def _adapt_status(self, status, fallback_path: str = '') -> _HdfsFileInfo:
+        path = getattr(status, 'path', None) or fallback_path
+        is_dir = bool(getattr(status, 'isdir', False))
+        length = getattr(status, 'length', 0)
+        mtime_ms = getattr(status, 'modification_time', None)
+        mtime = (
+            datetime.fromtimestamp(mtime_ms / 1000.0, tz=timezone.utc)
+            if mtime_ms else None
+        )
+        size = None if is_dir else int(length or 0)
+        ftype = pafs.FileType.Directory if is_dir else pafs.FileType.File
+        return _HdfsFileInfo(path, size, ftype, mtime)
+
+    def new_input_stream(self, path: str):
+        path_str = self.to_filesystem_path(path)
+        reader = self._client.read(path_str)
+        return _HdfsReaderAdapter(reader)
+
+    def new_output_stream(self, path: str):
+        path_str = self.to_filesystem_path(path)
+        writer = self._client.create(
+            path_str,
+            self._WriteOptions(create_parent=True, overwrite=True),
+        )
+        return _HdfsWriterAdapter(writer)
+
+    def get_file_status(self, path: str):
+        path_str = self.to_filesystem_path(path)
+        try:
+            status = self._client.get_file_info(path_str)
+        except FileNotFoundError:
+            raise FileNotFoundError(f"File {path} does not exist")
+        return self._adapt_status(status, path_str)
+
+    def list_status(self, path: str):
+        path_str = self.to_filesystem_path(path)
+        return [self._adapt_status(s) for s in 
self._client.list_status(path_str)]
+
+    def exists(self, path: str) -> bool:
+        path_str = self.to_filesystem_path(path)
+        try:
+            self._client.get_file_info(path_str)
+            return True
+        except FileNotFoundError:
+            return False
+
+    def delete(self, path: str, recursive: bool = False) -> bool:
+        path_str = self.to_filesystem_path(path)
+        try:
+            status = self._client.get_file_info(path_str)
+        except FileNotFoundError:
+            return False
+        if bool(getattr(status, 'isdir', False)) and not recursive:
+            if next(iter(self._client.list_status(path_str)), None) is not 
None:
+                raise OSError(f"Directory {path} is not empty")
+        return bool(self._client.delete(path_str, recursive))
+
+    def mkdirs(self, path: str) -> bool:
+        path_str = self.to_filesystem_path(path)
+        try:
+            status = self._client.get_file_info(path_str)
+        except FileNotFoundError:
+            self._client.mkdirs(path_str, create_parent=True)
+            return True
+        if bool(getattr(status, 'isdir', False)):
+            return True
+        raise FileExistsError(f"Path exists but is not a directory: {path}")
+
+    def rename(self, src: str, dst: str) -> bool:
+        src_str = self.to_filesystem_path(src)
+        dst_str = self.to_filesystem_path(dst)
+        dst_parent = str(PurePosixPath(dst_str).parent)
+        if dst_parent and dst_parent != '.':
+            try:
+                self._client.get_file_info(dst_parent)
+            except FileNotFoundError:
+                self._client.mkdirs(dst_parent, create_parent=True)
+        try:
+            dst_status = self._client.get_file_info(dst_str)
+            if not getattr(dst_status, 'isdir', False):
+                return False
+            src_name = PurePosixPath(src_str).name
+            dst_str = str(PurePosixPath(dst_str) / src_name)
+            try:
+                self._client.get_file_info(dst_str)
+                return False
+            except FileNotFoundError:
+                pass
+        except FileNotFoundError:
+            pass
+        try:
+            self._client.rename(src_str, dst_str)
+            return True
+        except FileNotFoundError:
+            return False
+        except (PermissionError, OSError):
+            return False
+
+    def write_parquet(self, path: str, data: pyarrow.Table,
+                      compression: str = 'zstd', zstd_level: int = 1, 
**kwargs):
+        try:
+            import pyarrow.parquet as pq
+            if compression.lower() == 'zstd':
+                kwargs['compression_level'] = zstd_level
+            with self.new_output_stream(path) as raw_stream:
+                stream = pyarrow.PythonFile(raw_stream, mode='wb')
+                try:
+                    pq.write_table(
+                        data, stream, compression=compression, **kwargs)
+                finally:
+                    stream.close()
+        except Exception as e:
+            self.delete_quietly(path)
+            raise RuntimeError(f"Failed to write Parquet file {path}: {e}") 
from e
+
+    def write_orc(self, path: str, data: pyarrow.Table,
+                  compression: str = 'zstd', zstd_level: int = 1, **kwargs):
+        try:
+            import sys
+            import pyarrow.orc as orc
+            data = self._cast_time_columns_for_orc(data)
+            with self.new_output_stream(path) as raw_stream:
+                stream = pyarrow.PythonFile(raw_stream, mode='wb')
+                try:
+                    if sys.version_info[:2] == (3, 6):
+                        orc.write_table(data, stream, **kwargs)
+                    else:
+                        orc.write_table(
+                            data, stream, compression=compression, **kwargs)
+                finally:
+                    stream.close()
+        except Exception as e:
+            self.delete_quietly(path)
+            raise RuntimeError(f"Failed to write ORC file {path}: {e}") from e
+
+    def write_avro(self, path: str, data: pyarrow.Table,
+                   avro_schema=None, compression: str = 'zstd',
+                   zstd_level: int = 1, **kwargs):
+        import fastavro
+        if avro_schema is None:
+            avro_schema = PyarrowFieldParser.to_avro_schema(data.schema)
+
+        records_dict = data.to_pydict()
+
+        def record_generator():
+            num_rows = len(list(records_dict.values())[0])
+            for i in range(num_rows):
+                record = {}
+                for col in records_dict.keys():
+                    value = records_dict[col][i]
+                    if isinstance(value, datetime) and value.tzinfo is None:
+                        value = value.replace(tzinfo=timezone.utc)
+                    record[col] = value
+                yield record
+
+        codec_map = {
+            'null': 'null', 'deflate': 'deflate', 'snappy': 'snappy',
+            'bzip2': 'bzip2', 'xz': 'xz', 'zstandard': 'zstandard',
+            'zstd': 'zstandard',
+        }
+        codec = codec_map.get(compression.lower())
+        if codec is None:
+            raise ValueError(
+                f"Unsupported compression '{compression}' for Avro format. "
+                f"Supported compressions: {', 
'.join(sorted(codec_map.keys()))}."
+            )
+        if codec == 'zstandard':
+            kwargs['codec_compression_level'] = zstd_level
+        with self.new_output_stream(path) as output_stream:
+            fastavro.writer(output_stream, avro_schema,
+                            record_generator(), codec=codec, **kwargs)
+
+    def write_blob(self, path: str, data: pyarrow.Table, **kwargs):
+        try:
+            if data.num_columns != 1:
+                raise RuntimeError(
+                    f"Blob format only supports a single column, "
+                    f"got {data.num_columns} columns")
+            field = data.schema[0]
+            if pyarrow.types.is_large_binary(field.type):
+                fields = [DataField(0, field.name, AtomicType("BLOB"))]
+            else:
+                paimon_type = PyarrowFieldParser.to_paimon_type(
+                    field.type, field.nullable)
+                fields = [DataField(0, field.name, paimon_type)]
+            records_dict = data.to_pydict()
+            num_rows = data.num_rows
+            field_name = fields[0].name
+            with self.new_output_stream(path) as output_stream:
+                writer = BlobFormatWriter(output_stream)
+                for i in range(num_rows):
+                    writer.write_value(records_dict[field_name][i],
+                                       fields, self.uri_reader_factory)
+                writer.close()
+        except Exception as e:
+            self.delete_quietly(path)
+            raise RuntimeError(f"Failed to write blob file {path}: {e}") from e
+
+    def write_lance(self, path: str, data: pyarrow.Table, **kwargs):
+        # Mirror the remote-scheme writer: lance/vortex talk to the backend
+        # through their own object_store, so we hand them the URI plus any
+        # storage options the FileIO exposes rather than routing through the
+        # native client. Without these two methods, an HDFS table configured
+        # with file.format=lance/vortex would hit FileIO's NotImplementedError
+        # now that this class is the default hdfs:// backend.
+        try:
+            import lance
+
+            from pypaimon.read.reader.lance_utils import to_lance_specified
+            file_path_for_lance, storage_options = to_lance_specified(self, 
path)
+
+            writer = lance.file.LanceFileWriter(
+                file_path_for_lance, data.schema,
+                storage_options=storage_options, **kwargs)
+            try:
+                for batch in data.to_batches():
+                    writer.write_batch(batch)
+            finally:
+                writer.close()
+        except Exception as e:
+            self.delete_quietly(path)
+            raise RuntimeError(f"Failed to write Lance file {path}: {e}") from 
e
+
+    def write_vortex(self, path: str, data: pyarrow.Table, **kwargs):
+        try:
+            import vortex
+            from vortex import store
+
+            from pypaimon.read.reader.vortex_utils import to_vortex_specified
+            file_path_for_vortex, store_kwargs = to_vortex_specified(self, 
path)
+
+            if store_kwargs:
+                vortex_store = store.from_url(file_path_for_vortex, 
**store_kwargs)
+                vortex_store.write(vortex.array(data))
+            else:
+                from vortex._lib.io import write as vortex_write
+                vortex_write(vortex.array(data), file_path_for_vortex)
+        except Exception as e:
+            self.delete_quietly(path)
+            raise RuntimeError(f"Failed to write Vortex file {path}: {e}") 
from e
+
+    def close(self):
+        self._client = None
diff --git a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py 
b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
index 74d43579a2..79df9f79cb 100644
--- a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
+++ b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
@@ -329,26 +329,13 @@ class PyArrowFileIO(FileIO):
 
     @staticmethod
     def _kerberos_login_from_keytab(principal: str, keytab: str):
-        if not os.path.isfile(keytab):
-            raise FileNotFoundError(f"Kerberos keytab file not found: 
{keytab}")
-        if not os.access(keytab, os.R_OK):
-            raise PermissionError(f"Kerberos keytab file is not readable: 
{keytab}")
-        subprocess.run(
-            ['kinit', '-kt', keytab, principal],
-            check=True, capture_output=True, text=True
-        )
+        from pypaimon.filesystem import _kerberos
+        _kerberos.kerberos_login_from_keytab(principal, keytab)
 
     @staticmethod
     def _get_ticket_cache_path() -> Optional[str]:
-        cc = os.environ.get('KRB5CCNAME')
-        if cc:
-            if cc.startswith('FILE:'):
-                return cc[5:]
-            return cc
-        default_path = f'/tmp/krb5cc_{os.getuid()}'
-        if os.path.exists(default_path):
-            return default_path
-        return None
+        from pypaimon.filesystem import _kerberos
+        return _kerberos.get_ticket_cache_path()
 
     def new_input_stream(self, path: str):
         path_str = self.to_filesystem_path(path)
diff --git a/paimon-python/pypaimon/tests/e2e/hdfs/README.md 
b/paimon-python/pypaimon/tests/e2e/hdfs/README.md
new file mode 100644
index 0000000000..47f63701ed
--- /dev/null
+++ b/paimon-python/pypaimon/tests/e2e/hdfs/README.md
@@ -0,0 +1,75 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# HDFS End-to-End Tests
+
+Verifies the native HDFS FileIO backend (`HdfsNativeFileIO`) against a live 
HDFS
+cluster. No local Hadoop install or JVM required on the client side.
+
+## Quick start (Docker)
+
+```sh
+# 1. Bring up a single-NameNode + single-DataNode cluster.
+docker compose -f pypaimon/tests/e2e/hdfs/docker-compose.yml up -d
+
+# Wait ~30s for the cluster to become healthy; check with:
+docker compose -f pypaimon/tests/e2e/hdfs/docker-compose.yml ps
+
+# 2. Install the package with the hdfs extra.
+pip install -e '.[hdfs]'
+
+# 3. Run the tests.
+PYPAIMON_HDFS_E2E_URL=hdfs://localhost:8020 \
+  python -m pytest pypaimon/tests/e2e/hdfs/ -v
+
+# 4. Teardown.
+docker compose -f pypaimon/tests/e2e/hdfs/docker-compose.yml down -v
+```
+
+## REST-catalog config delivery mode (no local xml)
+
+The native backend accepts Hadoop key/values directly via catalog options.
+Skip the `core-site.xml` / `hdfs-site.xml` dance entirely by configuring the
+cluster wiring as options — exactly what a REST catalog would push to the
+client in its response. Example:
+
+```python
+catalog = CatalogFactory.create({
+    "warehouse": "viewfs://cluster/warehouse",
+    "hdfs.client.impl": "native",
+    # Forwarded as-is to the underlying client:
+    "dfs.nameservices": "ns1,ns2",
+    "dfs.ha.namenodes.ns1": "nn1,nn2",
+    "dfs.namenode.rpc-address.ns1.nn1": "host-1:8020",
+    "dfs.namenode.rpc-address.ns1.nn2": "host-2:8020",
+    "fs.viewfs.mounttable.cluster.link./prod": "hdfs://ns1/prod",
+})
+```
+
+Keys matching the prefixes `dfs.` / `fs.` / `hadoop.` / `ipc.` / `io.` are
+forwarded automatically. Use the `hdfs.config.<key>` namespace for any other
+key you want passed through.
+
+## Kerberos
+
+The cluster in `docker-compose.yml` runs without security to keep the
+smoke test simple. For a Kerberized e2e: provision a krb5 + HDFS compose
+separately, install `libgssapi-krb5-2` (or platform equivalent) on the
+client, set `KRB5_CONFIG` and `KRB5CCNAME`, then either run `kinit`
+yourself or pass `security.kerberos.login.principal` + `.keytab` as
+catalog options (pypaimon will run `kinit` for you).
diff --git a/paimon-python/pypaimon/tests/e2e/hdfs/__init__.py 
b/paimon-python/pypaimon/tests/e2e/hdfs/__init__.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/paimon-python/pypaimon/tests/e2e/hdfs/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/paimon-python/pypaimon/tests/e2e/hdfs/docker-compose.yml 
b/paimon-python/pypaimon/tests/e2e/hdfs/docker-compose.yml
new file mode 100644
index 0000000000..145708c3e6
--- /dev/null
+++ b/paimon-python/pypaimon/tests/e2e/hdfs/docker-compose.yml
@@ -0,0 +1,55 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Single-NameNode HDFS cluster for integration testing the native HDFS backend.
+# Brings up one NameNode (RPC 8020) and one DataNode (data 9866).
+#
+# Run:
+#   docker compose -f pypaimon/tests/e2e/hdfs/docker-compose.yml up -d
+#   PYPAIMON_HDFS_E2E_URL=hdfs://localhost:8020 \
+#     python -m pytest pypaimon/tests/e2e/hdfs/hdfs_native_e2e_test.py -v
+#   docker compose -f pypaimon/tests/e2e/hdfs/docker-compose.yml down -v
+
+services:
+  namenode:
+    image: apache/hadoop:3.3.6
+    hostname: namenode
+    user: root
+    command: ["/bin/bash", "-c", "hdfs namenode -format -force -nonInteractive 
|| true; hdfs namenode"]
+    environment:
+      HADOOP_HOME: /opt/hadoop
+      CORE-SITE.XML_fs.defaultFS: hdfs://namenode:8020
+      HDFS-SITE.XML_dfs.namenode.rpc-address: namenode:8020
+      HDFS-SITE.XML_dfs.replication: "1"
+      HDFS-SITE.XML_dfs.permissions.enabled: "false"
+    ports:
+      - "8020:8020"
+      - "9870:9870"
+
+  datanode:
+    image: apache/hadoop:3.3.6
+    user: root
+    command: ["hdfs", "datanode"]
+    environment:
+      HADOOP_HOME: /opt/hadoop
+      CORE-SITE.XML_fs.defaultFS: hdfs://namenode:8020
+      HDFS-SITE.XML_dfs.datanode.use.datanode.hostname: "false"
+      HDFS-SITE.XML_dfs.permissions.enabled: "false"
+    depends_on:
+      - namenode
+    ports:
+      - "9866:9866"
diff --git a/paimon-python/pypaimon/tests/e2e/hdfs/hdfs_native_e2e_test.py 
b/paimon-python/pypaimon/tests/e2e/hdfs/hdfs_native_e2e_test.py
new file mode 100644
index 0000000000..5989b815fa
--- /dev/null
+++ b/paimon-python/pypaimon/tests/e2e/hdfs/hdfs_native_e2e_test.py
@@ -0,0 +1,107 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""End-to-end tests for the native HDFS FileIO backend.
+
+Disabled by default. To run:
+  1. Start an HDFS cluster — see docker-compose.yml in this directory.
+  2. Install pypaimon with the hdfs extra:
+       pip install -e '.[hdfs]'
+  3. Point the tests at the cluster and run:
+       PYPAIMON_HDFS_E2E_URL=hdfs://localhost:8020 \\
+         python -m pytest pypaimon/tests/e2e/hdfs/hdfs_native_e2e_test.py -v
+
+To exercise the REST-catalog config-delivery path (no local xml), put the
+Hadoop config k/v in catalog options under the `dfs.*` / `fs.*` namespaces
+or via `hdfs.config.*` — both are forwarded to the underlying client.
+"""
+
+import os
+import unittest
+import uuid
+
+import pandas as pd
+import pyarrow as pa
+
+E2E_URL = os.environ.get("PYPAIMON_HDFS_E2E_URL")
+SKIP_REASON = ("PYPAIMON_HDFS_E2E_URL not set; skipping HDFS e2e. "
+               "See docker-compose.yml in this directory.")
+
+
[email protected](not E2E_URL, SKIP_REASON)
+class HdfsNativeE2ETest(unittest.TestCase):
+    """Smoke-test the native HDFS backend end-to-end against a live cluster."""
+
+    @classmethod
+    def setUpClass(cls):
+        try:
+            import hdfs_native  # noqa: F401
+        except ImportError as e:
+            raise unittest.SkipTest(
+                "hdfs-native not installed. pip install 'pypaimon[hdfs]'"
+            ) from e
+
+        from pypaimon.catalog.catalog_factory import CatalogFactory
+        cls.warehouse = (
+            f"{E2E_URL}/pypaimon-e2e/warehouse-{uuid.uuid4().hex[:8]}"
+        )
+        cls.catalog = CatalogFactory.create({
+            "warehouse": cls.warehouse,
+            "hdfs.client.impl": "native",
+        })
+        cls.catalog.create_database("default", True)
+
+    def _make_table(self, name, schema):
+        from pypaimon.schema.schema import Schema
+        fqn = f"default.{name}"
+        s = Schema.from_pyarrow_schema(
+            schema,
+            options={"file.format": "parquet"},
+        )
+        self.catalog.create_table(fqn, s, False)
+        return self.catalog.get_table(fqn)
+
+    def test_write_then_read_back(self):
+        pa_schema = pa.schema([
+            ("id", pa.int64()),
+            ("payload", pa.string()),
+        ])
+        table = self._make_table(f"t_{uuid.uuid4().hex[:8]}", pa_schema)
+
+        data = pd.DataFrame({
+            "id": list(range(100)),
+            "payload": [f"row-{i}" for i in range(100)],
+        })
+
+        writer = table.new_batch_write_builder().new_write()
+        writer.write_pandas(data)
+        commit_msgs = writer.prepare_commit()
+        committer = table.new_batch_write_builder().new_commit()
+        committer.commit(commit_msgs)
+        writer.close()
+        committer.close()
+
+        scan = table.new_read_builder().new_scan()
+        reader = table.new_read_builder().new_read()
+        splits = scan.plan().splits()
+        result = reader.to_arrow(splits)
+
+        self.assertEqual(result.num_rows, 100)
+
+
+if __name__ == "__main__":
+    unittest.main()
diff --git a/paimon-python/pypaimon/tests/hdfs_native_test.py 
b/paimon-python/pypaimon/tests/hdfs_native_test.py
new file mode 100644
index 0000000000..957ee8f123
--- /dev/null
+++ b/paimon-python/pypaimon/tests/hdfs_native_test.py
@@ -0,0 +1,917 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+import sys
+import tempfile
+import types
+import unittest
+from unittest.mock import MagicMock, patch
+
+import pyarrow.fs as pafs
+
+from pypaimon.common.file_io import FileIO
+from pypaimon.common.options import Options
+from pypaimon.common.options.config import HdfsOptions
+
+
+def _install_fake_hdfs_native():
+    """Install a fake hdfs_native module (with .fsspec submodule) into
+    sys.modules.
+
+    Returns (fake_module, mock_client_cls, mock_write_options_cls).
+    """
+    fake = types.ModuleType("hdfs_native")
+    fake.Client = MagicMock(name="Client")
+    fake.WriteOptions = MagicMock(name="WriteOptions")
+    fsspec_mod = types.ModuleType("hdfs_native.fsspec")
+    fsspec_mod.HdfsFileSystem = MagicMock(name="HdfsFileSystem")
+    fsspec_mod.ViewfsFileSystem = MagicMock(name="ViewfsFileSystem")
+    fake.fsspec = fsspec_mod
+    sys.modules["hdfs_native"] = fake
+    sys.modules["hdfs_native.fsspec"] = fsspec_mod
+    return fake, fake.Client, fake.WriteOptions
+
+
+def _uninstall_fake_hdfs_native():
+    sys.modules.pop("hdfs_native", None)
+    sys.modules.pop("hdfs_native.fsspec", None)
+    # Also drop the cached HdfsNativeFileIO so a re-import sees the new fake
+    sys.modules.pop(
+        "pypaimon.filesystem.hdfs_native_file_io", None)
+
+
+class HdfsOptionsTest(unittest.TestCase):
+
+    def test_defaults(self):
+        opts = Options({})
+        self.assertEqual(opts.get(HdfsOptions.HDFS_CLIENT_IMPL), "native")
+        self.assertTrue(opts.get(HdfsOptions.HDFS_CLIENT_FALLBACK_TO_PYARROW))
+        self.assertIsNone(opts.get(HdfsOptions.HDFS_CONF_DIR))
+
+    def test_explicit_pyarrow(self):
+        opts = Options({"hdfs.client.impl": "pyarrow"})
+        self.assertEqual(opts.get(HdfsOptions.HDFS_CLIENT_IMPL), "pyarrow")
+
+    def test_explicit_fallback_false(self):
+        opts = Options({"hdfs.client.fallback-to-pyarrow": "false"})
+        self.assertFalse(opts.get(HdfsOptions.HDFS_CLIENT_FALLBACK_TO_PYARROW))
+
+
+class HdfsNativeFileIORoutingTest(unittest.TestCase):
+
+    def setUp(self):
+        _uninstall_fake_hdfs_native()
+
+    def tearDown(self):
+        _uninstall_fake_hdfs_native()
+
+    def test_local_paths_unaffected(self):
+        fio = FileIO.get("file:///tmp/foo")
+        self.assertEqual(type(fio).__name__, "LocalFileIO")
+
+    def test_default_hdfs_routes_to_native(self):
+        _install_fake_hdfs_native()
+        fio = FileIO.get("hdfs://ns/foo", Options({}))
+        self.assertEqual(type(fio).__name__, "HdfsNativeFileIO")
+
+    def test_explicit_pyarrow_routes_to_pyarrow(self):
+        # No hdfs-native module needed; should go straight to pyarrow.
+        with patch(
+            "pypaimon.filesystem.pyarrow_file_io.PyArrowFileIO.__init__",
+            return_value=None,
+        ):
+            fio = FileIO.get(
+                "hdfs://ns/foo",
+                Options({"hdfs.client.impl": "pyarrow"}),
+            )
+        self.assertEqual(type(fio).__name__, "PyArrowFileIO")
+
+    def test_native_init_failure_falls_back_to_pyarrow(self):
+        # hdfs_native not installed; default fallback enabled.
+        _uninstall_fake_hdfs_native()
+        with patch(
+            "pypaimon.filesystem.pyarrow_file_io.PyArrowFileIO.__init__",
+            return_value=None,
+        ):
+            fio = FileIO.get("hdfs://ns/foo", Options({}))
+        self.assertEqual(type(fio).__name__, "PyArrowFileIO")
+
+    def test_native_init_failure_no_fallback_raises(self):
+        _uninstall_fake_hdfs_native()
+        with self.assertRaises(ImportError):
+            FileIO.get(
+                "hdfs://ns/foo",
+                Options({"hdfs.client.fallback-to-pyarrow": "false"}),
+            )
+
+    def test_unsupported_impl_raises(self):
+        with self.assertRaises(ValueError) as ctx:
+            FileIO.get(
+                "hdfs://ns/foo",
+                Options({"hdfs.client.impl": "bogus"}),
+            )
+        self.assertIn("Unsupported hdfs.client.impl", str(ctx.exception))
+
+    def test_env_var_override_when_option_absent(self):
+        _install_fake_hdfs_native()
+        with patch(
+            "pypaimon.filesystem.pyarrow_file_io.PyArrowFileIO.__init__",
+            return_value=None,
+        ):
+            with patch.dict(os.environ, {"PYPAIMON_HDFS_IMPL": "pyarrow"}):
+                fio = FileIO.get("hdfs://ns/foo", Options({}))
+        self.assertEqual(type(fio).__name__, "PyArrowFileIO")
+
+    def test_option_wins_over_env_var(self):
+        _install_fake_hdfs_native()
+        with patch.dict(os.environ, {"PYPAIMON_HDFS_IMPL": "pyarrow"}):
+            fio = FileIO.get(
+                "hdfs://ns/foo",
+                Options({"hdfs.client.impl": "native"}),
+            )
+        self.assertEqual(type(fio).__name__, "HdfsNativeFileIO")
+
+    def test_viewfs_scheme_routes_to_native(self):
+        _install_fake_hdfs_native()
+        fio = FileIO.get("viewfs://cluster/foo", Options({}))
+        self.assertEqual(type(fio).__name__, "HdfsNativeFileIO")
+
+    def test_empty_impl_option_treated_as_unset(self):
+        # Templated configs sometimes blank the option to opt out — that
+        # should fall through to the default ("native"), not raise
+        # "Unsupported hdfs.client.impl ''".
+        _install_fake_hdfs_native()
+        fio = FileIO.get("hdfs://ns/foo", Options({"hdfs.client.impl": ""}))
+        self.assertEqual(type(fio).__name__, "HdfsNativeFileIO")
+
+
+class HdfsNativeFileIOInitTest(unittest.TestCase):
+
+    def setUp(self):
+        self._fake, self._client_cls, self._wo_cls = 
_install_fake_hdfs_native()
+
+    def tearDown(self):
+        _uninstall_fake_hdfs_native()
+
+    def _make(self, path, props=None):
+        from pypaimon.filesystem.hdfs_native_file_io import HdfsNativeFileIO
+        return HdfsNativeFileIO(path, Options(props or {}))
+
+    def test_constructs_client_with_url(self):
+        self._make("hdfs://ns1/warehouse")
+        self._client_cls.assert_called_once()
+        _, kwargs = self._client_cls.call_args
+        self.assertEqual(kwargs.get("url"), "hdfs://ns1")
+        self.assertNotIn("config", kwargs)
+
+    def test_viewfs_scheme_passes_through(self):
+        self._make("viewfs://cluster1/")
+        _, kwargs = self._client_cls.call_args
+        self.assertEqual(kwargs.get("url"), "viewfs://cluster1")
+
+    def test_native_hadoop_keys_forwarded_as_config(self):
+        self._make("hdfs://ns1/foo", {
+            "dfs.nameservices": "ns1",
+            "dfs.ha.namenodes.ns1": "nn1,nn2",
+            "dfs.namenode.rpc-address.ns1.nn1": "host1:8020",
+            "fs.viewfs.mounttable.cluster.link./prod": "hdfs://ns1/prod",
+            "warehouse": "hdfs://ns1/warehouse",  # should NOT be forwarded
+        })
+        _, kwargs = self._client_cls.call_args
+        config = kwargs.get("config", {})
+        self.assertEqual(config.get("dfs.nameservices"), "ns1")
+        self.assertEqual(config.get("dfs.ha.namenodes.ns1"), "nn1,nn2")
+        self.assertEqual(
+            config.get("dfs.namenode.rpc-address.ns1.nn1"), "host1:8020")
+        self.assertEqual(
+            config.get("fs.viewfs.mounttable.cluster.link./prod"),
+            "hdfs://ns1/prod")
+        self.assertNotIn("warehouse", config)
+
+    def test_namespaced_overrides_forwarded(self):
+        self._make("hdfs://ns1/foo", {
+            "hdfs.config.dfs.client.read.shortcircuit": "true",
+        })
+        _, kwargs = self._client_cls.call_args
+        config = kwargs.get("config", {})
+        self.assertEqual(
+            config.get("dfs.client.read.shortcircuit"), "true")
+
+    def test_conf_dir_from_option(self):
+        self._make("hdfs://ns1/foo", {
+            "hdfs.conf-dir": "/tmp/conf",
+        })
+        _, kwargs = self._client_cls.call_args
+        self.assertEqual(kwargs.get("config_dir"), "/tmp/conf")
+
+    def test_conf_dir_from_env(self):
+        env = dict(os.environ)
+        env["HADOOP_CONF_DIR"] = "/env/conf"
+        with patch.dict(os.environ, env, clear=True):
+            self._make("hdfs://ns1/foo")
+        _, kwargs = self._client_cls.call_args
+        self.assertEqual(kwargs.get("config_dir"), "/env/conf")
+
+    def test_option_conf_dir_overrides_env(self):
+        env = dict(os.environ)
+        env["HADOOP_CONF_DIR"] = "/env/conf"
+        with patch.dict(os.environ, env, clear=True):
+            self._make("hdfs://ns1/foo", {"hdfs.conf-dir": "/opt/conf"})
+        _, kwargs = self._client_cls.call_args
+        self.assertEqual(kwargs.get("config_dir"), "/opt/conf")
+
+    @patch("pypaimon.filesystem._kerberos.subprocess.run")
+    def test_kerberos_principal_keytab_triggers_kinit(self, mock_kinit):
+        mock_kinit.return_value = MagicMock()
+        with tempfile.NamedTemporaryFile(suffix=".keytab") as keytab_file:
+            with patch.dict(os.environ, {"KRB5CCNAME": "/tmp/kc_test"}):
+                self._make("hdfs://ns1/foo", {
+                    "security.kerberos.login.principal": "user@REALM",
+                    "security.kerberos.login.keytab": keytab_file.name,
+                })
+            kinit_calls = [
+                c for c in mock_kinit.call_args_list
+                if c[0][0][0] == "kinit"
+            ]
+            self.assertEqual(len(kinit_calls), 1)
+            self.assertEqual(
+                kinit_calls[0][0][0],
+                ["kinit", "-kt", keytab_file.name, "user@REALM"],
+            )
+
+    def test_principal_without_keytab_raises(self):
+        with self.assertRaises(ValueError) as ctx:
+            self._make("hdfs://ns1/foo", {
+                "security.kerberos.login.principal": "user@REALM",
+            })
+        self.assertIn("must be both set or both unset", str(ctx.exception))
+
+    @patch("pypaimon.filesystem._kerberos.subprocess.run")
+    def test_kerberos_preserves_FILE_prefix_on_krb5ccname(self, mock_kinit):
+        # If KRB5CCNAME had a `FILE:` qualifier, the rewrite after kinit
+        # must keep it so GSSAPI cache-type detection isn't perturbed.
+        mock_kinit.return_value = MagicMock()
+        with tempfile.NamedTemporaryFile(suffix=".keytab") as keytab_file:
+            with patch.dict(os.environ,
+                            {"KRB5CCNAME": "FILE:/tmp/kc_test"},
+                            clear=True):
+                self._make("hdfs://ns1/foo", {
+                    "security.kerberos.login.principal": "user@REALM",
+                    "security.kerberos.login.keytab": keytab_file.name,
+                })
+                self.assertEqual(
+                    os.environ["KRB5CCNAME"], "FILE:/tmp/kc_test")
+
+    @patch("pypaimon.filesystem._kerberos.get_ticket_cache_path",
+           return_value="/tmp/freshly_kinit_cache")
+    @patch("pypaimon.filesystem._kerberos.subprocess.run")
+    def test_kerberos_warns_when_overwriting_different_cache(
+        self, mock_kinit, _mock_cache,
+    ):
+        # Multi-principal in the same process clobbers KRB5CCNAME; we warn
+        # so the operator sees the race instead of silently mis-routing
+        # earlier instances' RPCs.
+        mock_kinit.return_value = MagicMock()
+        with tempfile.NamedTemporaryFile(suffix=".keytab") as keytab_file:
+            with patch.dict(os.environ,
+                            {"KRB5CCNAME": "/tmp/some_other_cache"},
+                            clear=True):
+                with self.assertLogs(
+                    "pypaimon.filesystem.hdfs_native_file_io",
+                    level="WARNING",
+                ) as log_ctx:
+                    self._make("hdfs://ns1/foo", {
+                        "security.kerberos.login.principal": "user@REALM",
+                        "security.kerberos.login.keytab": keytab_file.name,
+                    })
+                self.assertTrue(
+                    any("Overwriting process-global KRB5CCNAME" in m
+                        for m in log_ctx.output),
+                    log_ctx.output,
+                )
+
+    @patch("pypaimon.filesystem._kerberos.get_ticket_cache_path",
+           return_value="/tmp/kc_test")
+    @patch("pypaimon.filesystem._kerberos.subprocess.run")
+    def test_kerberos_no_warning_when_cache_unchanged(
+        self, mock_kinit, _mock_cache,
+    ):
+        import logging as _logging
+        mock_kinit.return_value = MagicMock()
+        with tempfile.NamedTemporaryFile(suffix=".keytab") as keytab_file:
+            # Pre-existing value matches the kinit-resolved cache → no warn.
+            with patch.dict(os.environ,
+                            {"KRB5CCNAME": "/tmp/kc_test"},
+                            clear=True):
+                # assertNoLogs is 3.10+; patch warning explicitly so older
+                # interpreters keep working too.
+                logger = _logging.getLogger(
+                    "pypaimon.filesystem.hdfs_native_file_io")
+                with patch.object(logger, "warning") as warn:
+                    self._make("hdfs://ns1/foo", {
+                        "security.kerberos.login.principal": "user@REALM",
+                        "security.kerberos.login.keytab": keytab_file.name,
+                    })
+                    warn.assert_not_called()
+
+    def test_unsupported_scheme_raises(self):
+        with self.assertRaises(ValueError):
+            self._make("s3://bucket/key")
+
+
+class HdfsNativeFileIOOpsTest(unittest.TestCase):
+
+    def setUp(self):
+        self._fake, self._client_cls, self._wo_cls = 
_install_fake_hdfs_native()
+        self._mock_client = MagicMock(name="ClientInstance")
+        self._client_cls.return_value = self._mock_client
+        from pypaimon.filesystem.hdfs_native_file_io import HdfsNativeFileIO
+        self.fio = HdfsNativeFileIO("hdfs://ns/", Options({}))
+
+    def tearDown(self):
+        _uninstall_fake_hdfs_native()
+
+    def _file_status(self, path, isdir=False, length=0, mtime=0):
+        s = MagicMock()
+        s.path = path
+        s.isdir = isdir
+        s.length = length
+        s.modification_time = mtime
+        return s
+
+    def test_exists_true(self):
+        self._mock_client.get_file_info.return_value = self._file_status("/x")
+        self.assertTrue(self.fio.exists("/x"))
+
+    def test_exists_false(self):
+        self._mock_client.get_file_info.side_effect = FileNotFoundError("nope")
+        self.assertFalse(self.fio.exists("/missing"))
+
+    def test_get_file_status_adapts_to_pafs_filetype(self):
+        self._mock_client.get_file_info.return_value = self._file_status(
+            "/x", isdir=False, length=42, mtime=1700000000000,
+        )
+        info = self.fio.get_file_status("/x")
+        self.assertEqual(info.path, "/x")
+        self.assertEqual(info.size, 42)
+        self.assertEqual(info.type, pafs.FileType.File)
+        self.assertIsNotNone(info.mtime)
+
+    def test_list_status(self):
+        self._mock_client.list_status.return_value = iter([
+            self._file_status("/x/a", isdir=False, length=10),
+            self._file_status("/x/b", isdir=True),
+        ])
+        infos = self.fio.list_status("/x")
+        self.assertEqual(len(infos), 2)
+        self.assertEqual(infos[0].type, pafs.FileType.File)
+        self.assertEqual(infos[1].type, pafs.FileType.Directory)
+        self.assertIsNone(infos[1].size)
+
+    def test_delete_missing_returns_false(self):
+        self._mock_client.get_file_info.side_effect = FileNotFoundError("nope")
+        self.assertFalse(self.fio.delete("/missing"))
+        self._mock_client.delete.assert_not_called()
+
+    def test_delete_file(self):
+        self._mock_client.get_file_info.return_value = self._file_status("/x")
+        self._mock_client.delete.return_value = True
+        self.assertTrue(self.fio.delete("/x"))
+        self._mock_client.delete.assert_called_once_with("/x", False)
+
+    def test_delete_nonempty_dir_without_recursive_raises(self):
+        self._mock_client.get_file_info.return_value = self._file_status(
+            "/x", isdir=True)
+        self._mock_client.list_status.return_value = iter([
+            self._file_status("/x/a")])
+        with self.assertRaises(OSError):
+            self.fio.delete("/x", recursive=False)
+
+    def test_mkdirs_creates_when_missing(self):
+        self._mock_client.get_file_info.side_effect = FileNotFoundError("nope")
+        self.assertTrue(self.fio.mkdirs("/new"))
+        self._mock_client.mkdirs.assert_called_once_with(
+            "/new", create_parent=True)
+
+    def test_mkdirs_idempotent_on_existing_dir(self):
+        self._mock_client.get_file_info.return_value = self._file_status(
+            "/x", isdir=True)
+        self.assertTrue(self.fio.mkdirs("/x"))
+        self._mock_client.mkdirs.assert_not_called()
+
+    def test_mkdirs_existing_file_raises(self):
+        self._mock_client.get_file_info.return_value = self._file_status(
+            "/x", isdir=False)
+        with self.assertRaises(FileExistsError):
+            self.fio.mkdirs("/x")
+
+
+class HdfsNativeAdaptersTest(unittest.TestCase):
+
+    def setUp(self):
+        _install_fake_hdfs_native()
+
+    def tearDown(self):
+        _uninstall_fake_hdfs_native()
+
+    def test_writer_adapter_tracks_position_and_closes_once(self):
+        from pypaimon.filesystem.hdfs_native_file_io import _HdfsWriterAdapter
+        fw = MagicMock()
+        fw.write.side_effect = lambda buf: len(buf)
+        adapter = _HdfsWriterAdapter(fw)
+        adapter.write(b"abc")
+        adapter.write(b"defg")
+        self.assertEqual(adapter.tell(), 7)
+        adapter.close()
+        adapter.close()  # idempotent
+        fw.close.assert_called_once()
+
+    def test_reader_adapter_seek_and_read(self):
+        from pypaimon.filesystem.hdfs_native_file_io import _HdfsReaderAdapter
+        fr = MagicMock()
+        fr.tell.side_effect = [20, 30]
+        fr.read.return_value = b"x" * 10
+        adapter = _HdfsReaderAdapter(fr)
+        self.assertEqual(adapter.seek(20), 20)
+        fr.seek.assert_called_once_with(20, 0)
+        data = adapter.read(10)
+        self.assertEqual(data, b"x" * 10)
+        fr.read.assert_called_once_with(10)
+        self.assertEqual(adapter.tell(), 30)
+
+    def test_reader_adapter_read_negative_reads_all(self):
+        from pypaimon.filesystem.hdfs_native_file_io import _HdfsReaderAdapter
+        fr = MagicMock()
+        fr.read.return_value = b"all-content"
+        adapter = _HdfsReaderAdapter(fr)
+        self.assertEqual(adapter.read(), b"all-content")
+        fr.read.assert_called_once_with(-1)
+
+    def test_reader_adapter_close_releases_underlying(self):
+        from pypaimon.filesystem.hdfs_native_file_io import _HdfsReaderAdapter
+        fr = MagicMock()
+        adapter = _HdfsReaderAdapter(fr)
+        adapter.close()
+        adapter.close()  # idempotent
+        fr.close.assert_called_once()
+        self.assertTrue(adapter.closed)
+
+
+def _write_hadoop_xml(path, entries):
+    """Write a minimal Hadoop-style xml file at `path`."""
+    body = ['<?xml version="1.0"?>', "<configuration>"]
+    for name, value in entries.items():
+        body.append(
+            f"  <property><name>{name}</name><value>{value}</value></property>"
+        )
+    body.append("</configuration>")
+    with open(path, "w") as f:
+        f.write("\n".join(body))
+
+
+class ViewFsFallbackTest(unittest.TestCase):
+    """Cover _load_hadoop_xml + _maybe_inject_viewfs_fallback polyfill."""
+
+    def setUp(self):
+        _install_fake_hdfs_native()
+        from pypaimon.filesystem.hdfs_native_file_io import HdfsNativeFileIO
+        self.Fio = HdfsNativeFileIO
+
+    def tearDown(self):
+        _uninstall_fake_hdfs_native()
+
+    def test_load_hadoop_xml_merges_two_files(self):
+        with tempfile.TemporaryDirectory() as d:
+            _write_hadoop_xml(os.path.join(d, "core-site.xml"),
+                              {"fs.defaultFS": "viewfs://c1"})
+            _write_hadoop_xml(os.path.join(d, "hdfs-site.xml"),
+                              {"dfs.nameservices": "ns1"})
+            cfg = self.Fio._load_hadoop_xml(d)
+        self.assertEqual(cfg.get("fs.defaultFS"), "viewfs://c1")
+        self.assertEqual(cfg.get("dfs.nameservices"), "ns1")
+
+    def test_load_hadoop_xml_missing_dir_returns_empty(self):
+        self.assertEqual(self.Fio._load_hadoop_xml(None), {})
+        self.assertEqual(self.Fio._load_hadoop_xml("/no/such/dir/xyz"), {})
+
+    def test_load_hadoop_xml_malformed_file_skipped(self):
+        with tempfile.TemporaryDirectory() as d:
+            with open(os.path.join(d, "core-site.xml"), "w") as f:
+                f.write("<not really xml")
+            _write_hadoop_xml(os.path.join(d, "hdfs-site.xml"),
+                              {"dfs.nameservices": "ns1"})
+            cfg = self.Fio._load_hadoop_xml(d)
+        self.assertEqual(cfg, {"dfs.nameservices": "ns1"})
+
+    def test_fallback_from_existing_link_target(self):
+        overrides = {}
+        xml = {
+            "fs.viewfs.mounttable.c1.link./home": "hdfs://ns-prod/home",
+            "fs.viewfs.mounttable.c1.link./tmp": "hdfs://ns-tmp/tmp",
+        }
+        self.Fio._maybe_inject_viewfs_fallback("viewfs", "c1", overrides, xml)
+        self.assertEqual(
+            overrides.get("fs.viewfs.mounttable.c1.linkFallback"),
+            "hdfs://ns-prod/",
+        )
+
+    def test_fallback_from_nameservices_when_no_links(self):
+        overrides = {}
+        xml = {"dfs.nameservices": "nsA,nsB"}
+        self.Fio._maybe_inject_viewfs_fallback("viewfs", "c1", overrides, xml)
+        self.assertEqual(
+            overrides.get("fs.viewfs.mounttable.c1.linkFallback"),
+            "hdfs://nsA/",
+        )
+
+    def test_fallback_from_link_in_overrides_only(self):
+        # Zero-file viewfs setup: link.* arrives via catalog options
+        # (overrides), no hadoop xml present. The fallback must still be
+        # derived from the merged view.
+        overrides = {
+            "fs.viewfs.mounttable.c1.link./home": "hdfs://ns-prod/home",
+        }
+        xml = {}
+        self.Fio._maybe_inject_viewfs_fallback("viewfs", "c1", overrides, xml)
+        self.assertEqual(
+            overrides.get("fs.viewfs.mounttable.c1.linkFallback"),
+            "hdfs://ns-prod/",
+        )
+
+    def test_fallback_from_nameservices_in_overrides_only(self):
+        overrides = {"dfs.nameservices": "nsA,nsB"}
+        xml = {}
+        self.Fio._maybe_inject_viewfs_fallback("viewfs", "c1", overrides, xml)
+        self.assertEqual(
+            overrides.get("fs.viewfs.mounttable.c1.linkFallback"),
+            "hdfs://nsA/",
+        )
+
+    def test_fallback_already_in_xml_not_overridden(self):
+        overrides = {}
+        xml = {
+            "fs.viewfs.mounttable.c1.linkFallback": "hdfs://ns-existing/",
+            "dfs.nameservices": "nsA",
+        }
+        self.Fio._maybe_inject_viewfs_fallback("viewfs", "c1", overrides, xml)
+        self.assertNotIn("fs.viewfs.mounttable.c1.linkFallback", overrides)
+
+    def test_fallback_already_in_overrides_kept(self):
+        overrides = {"fs.viewfs.mounttable.c1.linkFallback": "hdfs://manual/"}
+        xml = {"dfs.nameservices": "nsA"}
+        self.Fio._maybe_inject_viewfs_fallback("viewfs", "c1", overrides, xml)
+        self.assertEqual(
+            overrides["fs.viewfs.mounttable.c1.linkFallback"],
+            "hdfs://manual/",
+        )
+
+    def test_hdfs_scheme_does_not_inject(self):
+        overrides = {}
+        xml = {"dfs.nameservices": "nsA"}
+        self.Fio._maybe_inject_viewfs_fallback("hdfs", "ns1", overrides, xml)
+        self.assertEqual(overrides, {})
+
+    def test_no_signal_no_inject(self):
+        overrides = {}
+        xml = {"unrelated.key": "v"}
+        self.Fio._maybe_inject_viewfs_fallback("viewfs", "c1", overrides, xml)
+        self.assertEqual(overrides, {})
+
+    def test_init_auto_injects_for_viewfs_uri(self):
+        with tempfile.TemporaryDirectory() as d:
+            _write_hadoop_xml(
+                os.path.join(d, "hdfs-site.xml"),
+                {
+                    "dfs.nameservices": "ns1",
+                    "fs.viewfs.mounttable.hadoop-lt-cluster.link./home":
+                        "hdfs://ns1/home",
+                },
+            )
+            opts = Options({"hdfs.conf-dir": d})
+            self.Fio("viewfs://hadoop-lt-cluster/home/x", opts)
+        client_cls = sys.modules["hdfs_native"].Client
+        _, kwargs = client_cls.call_args
+        config = kwargs.get("config", {})
+        self.assertEqual(
+            config.get("fs.viewfs.mounttable.hadoop-lt-cluster.linkFallback"),
+            "hdfs://ns1/",
+        )
+
+
+class ToFilesystemPathTest(unittest.TestCase):
+    """Cover URI -> absolute-path normalisation for hdfs-native."""
+
+    def setUp(self):
+        _install_fake_hdfs_native()
+
+    def tearDown(self):
+        _uninstall_fake_hdfs_native()
+
+    def _make(self, root):
+        from pypaimon.filesystem.hdfs_native_file_io import HdfsNativeFileIO
+        return HdfsNativeFileIO(root, Options({}))
+
+    def test_viewfs_uri_same_cluster_returns_path(self):
+        fio = self._make("viewfs://cluster1/")
+        self.assertEqual(
+            fio.to_filesystem_path("viewfs://cluster1/home/hudi/x"),
+            "/home/hudi/x",
+        )
+
+    def test_viewfs_uri_no_path_returns_root(self):
+        fio = self._make("viewfs://cluster1/")
+        self.assertEqual(fio.to_filesystem_path("viewfs://cluster1"), "/")
+
+    def test_viewfs_absolute_path_unchanged(self):
+        fio = self._make("viewfs://cluster1/")
+        self.assertEqual(fio.to_filesystem_path("/foo/bar"), "/foo/bar")
+
+    def test_hdfs_uri_same_ns_returns_path(self):
+        fio = self._make("hdfs://ns1/")
+        self.assertEqual(
+            fio.to_filesystem_path("hdfs://ns1/foo/bar"),
+            "/foo/bar",
+        )
+
+    def test_hdfs_uri_different_ns_unchanged(self):
+        fio = self._make("hdfs://ns1/")
+        self.assertEqual(
+            fio.to_filesystem_path("hdfs://nsX/foo"),
+            "hdfs://nsX/foo",
+        )
+
+    def test_hdfs_client_with_viewfs_uri_unchanged(self):
+        fio = self._make("hdfs://ns1/")
+        # Different scheme; let hdfs-native error rather than silently rewrite.
+        self.assertEqual(
+            fio.to_filesystem_path("viewfs://cluster1/foo"),
+            "viewfs://cluster1/foo",
+        )
+
+    def test_exists_passes_path_only_to_client(self):
+        from pypaimon.filesystem.hdfs_native_file_io import HdfsNativeFileIO
+        fio = HdfsNativeFileIO("viewfs://cluster1/", Options({}))
+        client = sys.modules["hdfs_native"].Client.return_value
+        client.get_file_info.return_value = MagicMock(
+            path="/home/hudi/x", isdir=False, length=0, modification_time=0)
+        fio.exists("viewfs://cluster1/home/hudi/x")
+        client.get_file_info.assert_called_once_with("/home/hudi/x")
+
+
+class FilesystemPropertyTest(unittest.TestCase):
+    """Cover the lazy pyarrow.fs facade backed by hdfs_native.fsspec."""
+
+    def setUp(self):
+        _install_fake_hdfs_native()
+        self._patcher = patch("pyarrow.fs.PyFileSystem")
+        self._handler_patcher = patch("pyarrow.fs.FSSpecHandler")
+        self.MockPyFs = self._patcher.start()
+        self.MockHandler = self._handler_patcher.start()
+
+    def tearDown(self):
+        self._patcher.stop()
+        self._handler_patcher.stop()
+        _uninstall_fake_hdfs_native()
+
+    def _make(self, root, props=None, xml_entries=None):
+        from pypaimon.filesystem.hdfs_native_file_io import HdfsNativeFileIO
+        if xml_entries:
+            d = tempfile.mkdtemp()
+            self.addCleanup(lambda: __import__("shutil").rmtree(d, 
ignore_errors=True))
+            _write_hadoop_xml(os.path.join(d, "hdfs-site.xml"), xml_entries)
+            base_props = {"hdfs.conf-dir": d}
+            base_props.update(props or {})
+            props = base_props
+        return HdfsNativeFileIO(root, Options(props or {}))
+
+    def test_viewfs_uses_viewfs_fsspec_class(self):
+        fio = self._make("viewfs://cluster1/")
+        fs_instance = fio.filesystem  # trigger lazy
+        VFs = sys.modules["hdfs_native.fsspec"].ViewfsFileSystem
+        HFs = sys.modules["hdfs_native.fsspec"].HdfsFileSystem
+        VFs.assert_called_once()
+        HFs.assert_not_called()
+        _, kwargs = VFs.call_args
+        self.assertEqual(kwargs.get("host"), "cluster1")
+        self.assertIs(fs_instance, self.MockPyFs.return_value)
+
+    def test_hdfs_uses_hdfs_fsspec_class(self):
+        self._make("hdfs://ns1/").filesystem
+        HFs = sys.modules["hdfs_native.fsspec"].HdfsFileSystem
+        VFs = sys.modules["hdfs_native.fsspec"].ViewfsFileSystem
+        HFs.assert_called_once()
+        VFs.assert_not_called()
+        _, kwargs = HFs.call_args
+        self.assertEqual(kwargs.get("host"), "ns1")
+
+    def test_lazy_caches_after_first_access(self):
+        fio = self._make("hdfs://ns1/")
+        first = fio.filesystem
+        second = fio.filesystem
+        self.assertIs(first, second)
+        HFs = sys.modules["hdfs_native.fsspec"].HdfsFileSystem
+        self.assertEqual(HFs.call_count, 1)
+
+    def test_xml_and_catalog_options_merged_into_fsspec_storage_options(self):
+        fio = self._make(
+            "hdfs://ns1/",
+            props={"dfs.client.read.shortcircuit": "true"},
+            xml_entries={"dfs.nameservices": "ns1"},
+        )
+        fio.filesystem  # trigger
+        HFs = sys.modules["hdfs_native.fsspec"].HdfsFileSystem
+        _, kwargs = HFs.call_args
+        # Both xml and option keys should land in the fsspec kwargs.
+        self.assertEqual(kwargs.get("dfs.nameservices"), "ns1")
+        self.assertEqual(kwargs.get("dfs.client.read.shortcircuit"), "true")
+
+    def test_catalog_option_overrides_xml(self):
+        fio = self._make(
+            "hdfs://ns1/",
+            props={"dfs.foo": "v_user"},
+            xml_entries={"dfs.foo": "v_xml"},
+        )
+        fio.filesystem
+        HFs = sys.modules["hdfs_native.fsspec"].HdfsFileSystem
+        _, kwargs = HFs.call_args
+        self.assertEqual(kwargs.get("dfs.foo"), "v_user")
+
+    def test_missing_fsspec_raises_clear_error(self):
+        fio = self._make("hdfs://ns1/")
+        # Remove the fsspec submodule but keep hdfs_native itself, to
+        # simulate an old/partial install.
+        sys.modules.pop("hdfs_native.fsspec", None)
+        sys.modules["hdfs_native"].fsspec = None
+        with self.assertRaises(RuntimeError) as ctx:
+            fio.filesystem
+        self.assertIn("hdfs-native fsspec adapter", str(ctx.exception))
+
+
+class PickleTest(unittest.TestCase):
+    """Cover __reduce__ so Ray / multiprocessing can ship FileIO."""
+
+    def setUp(self):
+        _install_fake_hdfs_native()
+        # Isolate from any HADOOP_CONF_DIR on the host so __reduce__'s
+        # env-derived config_dir pinning is deterministic across machines.
+        self._env_patcher = patch.dict(os.environ, {}, clear=True)
+        self._env_patcher.start()
+
+    def tearDown(self):
+        self._env_patcher.stop()
+        _uninstall_fake_hdfs_native()
+
+    def _make(self, path, props=None):
+        from pypaimon.filesystem.hdfs_native_file_io import HdfsNativeFileIO
+        return HdfsNativeFileIO(path, Options(props or {}))
+
+    def test_reduce_returns_class_and_args(self):
+        from pypaimon.filesystem.hdfs_native_file_io import HdfsNativeFileIO
+        fio = self._make("viewfs://cluster1/some/sub/path",
+                         {"dfs.nameservices": "ns1"})
+        cls, args = fio.__reduce__()
+        self.assertIs(cls, HdfsNativeFileIO)
+        path, options = args
+        # Path is rebuilt from scheme+netloc (path segment dropped) — that
+        # is intentional because __init__ ignores path beyond scheme+netloc.
+        self.assertEqual(path, "viewfs://cluster1")
+        self.assertEqual(options.to_map(), {"dfs.nameservices": "ns1"})
+
+    def test_reduce_for_empty_netloc(self):
+        fio = self._make("hdfs://")
+        _, (path, _) = fio.__reduce__()
+        self.assertEqual(path, "hdfs://")
+
+    def test_reduce_pins_env_resolved_config_dir_into_options(self):
+        # config_dir resolved from $HADOOP_CONF_DIR should be carried into
+        # the pickled options so a worker on a host with a different env
+        # value still uses the driver's resolved directory.
+        with tempfile.TemporaryDirectory() as d:
+            with patch.dict(os.environ, {"HADOOP_CONF_DIR": d}, clear=True):
+                fio = self._make("hdfs://ns1/foo")
+            _, (_, options) = fio.__reduce__()
+        self.assertEqual(options.to_map().get("hdfs.conf-dir"), d)
+
+    def test_reduce_does_not_override_explicit_conf_dir_option(self):
+        with tempfile.TemporaryDirectory() as opt_dir:
+            with patch.dict(os.environ,
+                            {"HADOOP_CONF_DIR": "/env/dir"}, clear=True):
+                fio = self._make("hdfs://ns1/foo",
+                                 {"hdfs.conf-dir": opt_dir})
+            _, (_, options) = fio.__reduce__()
+        self.assertEqual(options.to_map().get("hdfs.conf-dir"), opt_dir)
+
+    def test_pickle_roundtrip_preserves_type_and_options(self):
+        import pickle
+        fio = self._make("hdfs://ns1/foo",
+                         {"dfs.foo": "bar", "fs.viewfs.x": "y"})
+        client_cls = sys.modules["hdfs_native"].Client
+        client_cls.reset_mock()
+        # Roundtrip via the highest pickle protocol.
+        blob = pickle.dumps(fio, protocol=pickle.HIGHEST_PROTOCOL)
+        restored = pickle.loads(blob)
+        from pypaimon.filesystem.hdfs_native_file_io import HdfsNativeFileIO
+        self.assertIsInstance(restored, HdfsNativeFileIO)
+        self.assertEqual(restored.properties.to_map(),
+                         {"dfs.foo": "bar", "fs.viewfs.x": "y"})
+        # The original __init__ ran once; the unpickle ran __init__ again.
+        self.assertEqual(client_cls.call_count, 1)
+
+    def test_pickle_with_viewfs_scheme(self):
+        import pickle
+        fio = self._make("viewfs://cluster1/")
+        restored = pickle.loads(pickle.dumps(fio))
+        self.assertEqual(restored._scheme, "viewfs")
+        self.assertEqual(restored._netloc, "cluster1")
+
+    def test_pickle_does_not_serialise_live_client(self):
+        # If the live _client were pickled, the call would fail (MagicMocks
+        # are picklable but the real RawClient would not be). This test
+        # documents the contract: __reduce__ MUST sidestep _client.
+        import pickle
+        fio = self._make("hdfs://ns1/")
+        blob = pickle.dumps(fio)
+        # The pickled blob should reference the constructor inputs only;
+        # specifically it should not embed the literal mock _client.
+        self.assertNotIn(b"_client", blob)
+
+
+class HdfsNativeWriteFormatTest(unittest.TestCase):
+    """HdfsNativeFileIO is the default hdfs:// backend, so it must keep the
+    same write surface as the pyarrow backend it replaces — including
+    lance/vortex, which otherwise fall through to FileIO's NotImplementedError.
+    """
+
+    def setUp(self):
+        self._fake, self._client_cls, _ = _install_fake_hdfs_native()
+        self._client_cls.return_value = MagicMock(name="ClientInstance")
+        from pypaimon.filesystem.hdfs_native_file_io import HdfsNativeFileIO
+        self.fio = HdfsNativeFileIO("hdfs://ns/", Options({}))
+
+    def tearDown(self):
+        _uninstall_fake_hdfs_native()
+
+    def test_lance_and_vortex_are_overridden(self):
+        # The regression this guards: with these unimplemented, an HDFS table
+        # using file.format=lance/vortex would hit FileIO's 
NotImplementedError.
+        from pypaimon.common.file_io import FileIO
+        self.assertIsNot(
+            type(self.fio).write_lance, FileIO.write_lance)
+        self.assertIsNot(
+            type(self.fio).write_vortex, FileIO.write_vortex)
+
+    def test_write_lance_delegates_to_lance_specified(self):
+        import pyarrow
+        table = pyarrow.table({"a": [1, 2]})
+        writer = MagicMock(name="LanceFileWriter")
+        fake_lance = types.ModuleType("lance")
+        fake_lance.file = types.SimpleNamespace(
+            LanceFileWriter=MagicMock(return_value=writer))
+        with patch.dict(sys.modules, {"lance": fake_lance}), \
+                patch("pypaimon.read.reader.lance_utils.to_lance_specified",
+                      return_value=("hdfs://ns/x.lance", {"opt": "v"})) as 
spec:
+            self.fio.write_lance("hdfs://ns/x.lance", table)
+        spec.assert_called_once()
+        _, kwargs = fake_lance.file.LanceFileWriter.call_args
+        self.assertEqual(kwargs.get("storage_options"), {"opt": "v"})
+        writer.close.assert_called_once()
+
+    def test_write_vortex_delegates_to_vortex_specified(self):
+        import pyarrow
+        table = pyarrow.table({"a": [1, 2]})
+        fake_vortex = types.ModuleType("vortex")
+        fake_vortex.array = MagicMock(return_value="varr")
+        fake_vortex.store = types.SimpleNamespace(from_url=MagicMock())
+        fake_io = types.ModuleType("vortex._lib.io")
+        fake_io.write = MagicMock()
+        fake_lib = types.ModuleType("vortex._lib")
+        fake_lib.io = fake_io
+        fake_modules = {
+            "vortex": fake_vortex,
+            "vortex._lib": fake_lib,
+            "vortex._lib.io": fake_io,
+        }
+        with patch.dict(sys.modules, fake_modules), \
+                patch("pypaimon.read.reader.vortex_utils.to_vortex_specified",
+                      return_value=("hdfs://ns/x.vortex", None)):
+            self.fio.write_vortex("hdfs://ns/x.vortex", table)
+        fake_io.write.assert_called_once_with("varr", "hdfs://ns/x.vortex")
+
+
+if __name__ == "__main__":
+    unittest.main()
diff --git a/paimon-python/pypaimon/tests/kerberos_test.py 
b/paimon-python/pypaimon/tests/kerberos_test.py
index 06d98d90d6..722043d778 100644
--- a/paimon-python/pypaimon/tests/kerberos_test.py
+++ b/paimon-python/pypaimon/tests/kerberos_test.py
@@ -235,7 +235,7 @@ class KerberosHdfsTest(unittest.TestCase):
     @patch("pypaimon.filesystem.pyarrow_file_io.subprocess.run")
     @patch("pypaimon.filesystem.pyarrow_file_io.pafs.HadoopFileSystem")
     def test_hdfs_with_fallback_keys(self, mock_hdfs_fs, mock_subprocess_run):
-        """Verify that Java-compatible fallback keys security.principal / 
security.keytab work."""
+        """Verify that the secondary fallback keys security.principal / 
security.keytab work."""
         mock_subprocess_run.return_value = MagicMock(stdout="/some/classpath")
 
         with tempfile.NamedTemporaryFile(suffix=".keytab") as keytab_file:
diff --git a/paimon-python/setup.py b/paimon-python/setup.py
index aaba800fec..7355e25bf9 100644
--- a/paimon-python/setup.py
+++ b/paimon-python/setup.py
@@ -181,6 +181,9 @@ setup(
             'pypaimon-rust; python_version>="3.10"',
             'datafusion>=52; python_version>="3.10"',
         ],
+        'hdfs': [
+            'hdfs-native>=0.13,<1; python_version >= "3.10" and 
platform_system != "Windows"',
+        ],
     },
     description="Apache Paimon Python API",
     long_description=long_description,

Reply via email to