This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b839753f46 [python] Add HDFS native FileIO backend (no Hadoop install
required) (#8031)
b839753f46 is described below
commit b839753f46d8d447ab4ba5f8770e1ca13c0a5288
Author: chaoyang <[email protected]>
AuthorDate: Wed Jun 3 10:28:14 2026 +0800
[python] Add HDFS native FileIO backend (no Hadoop install required) (#8031)
Introduces HdfsNativeFileIO backed by the hdfs-native protocol client
(Rust + PyO3)
Default backend for hdfs:// and viewfs:// switches to native; the
PyArrow / libhdfs path is kept, with auto-fallback when hdfs-native is
unavailable (e.g. on Windows or when the extra is not installed).
---
paimon-python/README.md | 79 ++
paimon-python/pypaimon/benchmark/hdfs_io_bench.py | 168 ++++
paimon-python/pypaimon/common/file_io.py | 41 +-
paimon-python/pypaimon/common/options/config.py | 35 +
paimon-python/pypaimon/filesystem/_kerberos.py | 45 +
.../pypaimon/filesystem/hdfs_native_file_io.py | 689 ++++++++++++++++
.../pypaimon/filesystem/pyarrow_file_io.py | 21 +-
paimon-python/pypaimon/tests/e2e/hdfs/README.md | 75 ++
paimon-python/pypaimon/tests/e2e/hdfs/__init__.py | 16 +
.../pypaimon/tests/e2e/hdfs/docker-compose.yml | 55 ++
.../tests/e2e/hdfs/hdfs_native_e2e_test.py | 107 +++
paimon-python/pypaimon/tests/hdfs_native_test.py | 917 +++++++++++++++++++++
paimon-python/pypaimon/tests/kerberos_test.py | 2 +-
paimon-python/setup.py | 3 +
14 files changed, 2233 insertions(+), 20 deletions(-)
diff --git a/paimon-python/README.md b/paimon-python/README.md
index e216dc0019..5f716c8828 100644
--- a/paimon-python/README.md
+++ b/paimon-python/README.md
@@ -31,3 +31,82 @@ pip3 install dist/*.tar.gz
The command will install the package and core dependencies to your local
Python environment.
+# HDFS without a local Hadoop install
+
+`pypaimon` supports HDFS through a pure-protocol client based on
+[`hdfs-native`](https://github.com/Kimahriman/hdfs-native) (Rust + PyO3).
+Use it when you want HDFS access **without** installing Hadoop, a JDK,
+`libhdfs`, or wrestling with `CLASSPATH` / `LD_LIBRARY_PATH`.
+
+Install with the optional extra:
+
+```commandline
+pip install 'pypaimon[hdfs]'
+```
+
+The native backend requires **Python 3.10+** (and is unavailable on Windows).
+On older interpreters the extra is skipped, so `pypaimon` still installs — keep
+using the legacy `pyarrow` (`libhdfs`/JVM) backend there via
+`hdfs.client.impl=pyarrow`.
+
+For `hdfs://` and `viewfs://` URIs this backend is now the default.
+Switch back to the legacy `libhdfs` (JNI) path with:
+
+```python
+catalog = CatalogFactory.create({
+ "warehouse": "hdfs://ns1/warehouse",
+ "hdfs.client.impl": "pyarrow", # default: "native"
+})
+```
+
+## Sourcing the cluster wiring
+
+The client still needs to know about NameNode addresses, HA failover
+groups, and `viewfs` mount tables. Three options:
+
+1. **Local xml** — set `HADOOP_CONF_DIR` (or the `hdfs.conf-dir` option)
+ to a directory containing `core-site.xml` / `hdfs-site.xml`. Only the
+ xml is required; no Hadoop binaries or JDK.
+
+2. **Catalog options (REST-friendly)** — pass the original Hadoop
+ key/values directly in catalog options. Keys with prefixes `dfs.`,
+ `fs.`, `hadoop.`, `ipc.`, `io.` are forwarded as-is. A REST catalog
+ can deliver these in its response, giving a fully zero-file client
+ experience:
+
+ ```python
+ CatalogFactory.create({
+ "warehouse": "viewfs://cluster/warehouse",
+ "dfs.nameservices": "ns1",
+ "dfs.ha.namenodes.ns1": "nn1,nn2",
+ "dfs.namenode.rpc-address.ns1.nn1": "host-1:8020",
+ "dfs.namenode.rpc-address.ns1.nn2": "host-2:8020",
+ "fs.viewfs.mounttable.cluster.link./prod": "hdfs://ns1/prod",
+ })
+ ```
+
+3. **Namespaced overrides** — use `hdfs.config.<key>` to forward any
+ other Hadoop key not covered by the prefix whitelist.
+
+The three sources can be combined; catalog options take precedence over
+xml.
+
+## Kerberos
+
+A secured cluster still needs the GSSAPI system library
+(`libgssapi-krb5-2` on Debian/Ubuntu, `krb5` via Homebrew on macOS,
+`krb5-libs` on RHEL) plus a `krb5.conf`. Provide credentials by either:
+
+- Running `kinit` yourself and pointing `KRB5CCNAME` at the cache, or
+- Setting `security.kerberos.login.principal` and
+ `security.kerberos.login.keytab` in catalog options — `pypaimon` will
+ run `kinit` for you.
+
+## Fallback behaviour
+
+If the native backend fails to initialise (e.g. wheel missing on an
+unsupported platform such as Windows), `pypaimon` automatically falls
+back to the `pyarrow` (`libhdfs`/JVM) path and logs a warning. Disable
+the fallback with `hdfs.client.fallback-to-pyarrow=false` if you want
+hard failures instead.
+
diff --git a/paimon-python/pypaimon/benchmark/hdfs_io_bench.py
b/paimon-python/pypaimon/benchmark/hdfs_io_bench.py
new file mode 100644
index 0000000000..5b6b12dd18
--- /dev/null
+++ b/paimon-python/pypaimon/benchmark/hdfs_io_bench.py
@@ -0,0 +1,168 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""HDFS FileIO benchmark: native (hdfs-native) vs pyarrow (libhdfs/JVM).
+
+Compares throughput of common FileIO operations between the two backends
+against the same HDFS cluster. Each backend is exercised via the FileIO
+factory by toggling the `hdfs.client.impl` option.
+
+Usage:
+ python -m pypaimon.benchmark.hdfs_io_bench \\
+ --warehouse hdfs://localhost:8020/bench \\
+ [--backend native|pyarrow|both] \\
+ [--write-size-mb 256] \\
+ [--list-files 1000] \\
+ [--read-iters 3]
+
+Notes:
+- `pyarrow` backend requires HADOOP_HOME + HADOOP_CONF_DIR + libhdfs.
+- `native` backend requires `pip install pypaimon[hdfs]`.
+- The benchmark writes/reads scratch files under <warehouse>/bench_<uuid>/
+ and removes them on exit.
+"""
+
+import argparse
+import os
+import sys
+import time
+import uuid
+from pathlib import Path
+
+sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
+
+from pypaimon.common.file_io import FileIO # noqa: E402
+from pypaimon.common.options import Options # noqa: E402
+
+
+def _build_file_io(warehouse: str, backend: str) -> FileIO:
+ opts = Options({"hdfs.client.impl": backend})
+ return FileIO.get(warehouse, opts)
+
+
+def _human(seconds: float) -> str:
+ if seconds < 1e-3:
+ return f"{seconds * 1e6:.0f}us"
+ if seconds < 1:
+ return f"{seconds * 1e3:.1f}ms"
+ return f"{seconds:.2f}s"
+
+
+def _bench_write(file_io: FileIO, root: str, size_mb: int) -> float:
+ payload = os.urandom(min(size_mb, 16) * 1024 * 1024)
+ path = f"{root}/write-{uuid.uuid4().hex[:8]}.bin"
+ t0 = time.perf_counter()
+ with file_io.new_output_stream(path) as stream:
+ written = 0
+ target = size_mb * 1024 * 1024
+ while written < target:
+ chunk = payload[: min(len(payload), target - written)]
+ n = stream.write(chunk)
+ written += n if isinstance(n, int) and n > 0 else len(chunk)
+ return time.perf_counter() - t0
+
+
+def _bench_read(file_io: FileIO, path: str) -> float:
+ t0 = time.perf_counter()
+ with file_io.new_input_stream(path) as stream:
+ while True:
+ chunk = stream.read(8 * 1024 * 1024)
+ if not chunk:
+ break
+ return time.perf_counter() - t0
+
+
+def _bench_list(file_io: FileIO, root: str, num_files: int) -> float:
+ scratch = f"{root}/list-{uuid.uuid4().hex[:8]}"
+ file_io.mkdirs(scratch)
+ try:
+ for i in range(num_files):
+ with file_io.new_output_stream(f"{scratch}/f-{i:06d}.txt") as s:
+ s.write(b"x")
+ t0 = time.perf_counter()
+ results = file_io.list_status(scratch)
+ _ = list(results)
+ return time.perf_counter() - t0
+ finally:
+ file_io.delete(scratch, recursive=True)
+
+
+def run_one(backend: str, args) -> None:
+ print(f"\n=== backend={backend} ===")
+ try:
+ file_io = _build_file_io(args.warehouse, backend)
+ except Exception as e:
+ print(f" init failed: {e}")
+ return
+
+ bench_root = f"{args.warehouse.rstrip('/')}/bench_{uuid.uuid4().hex[:8]}"
+ file_io.mkdirs(bench_root)
+ try:
+ # Write
+ sample_path = f"{bench_root}/write-sample.bin"
+ with file_io.new_output_stream(sample_path) as stream:
+ payload = os.urandom(min(args.write_size_mb, 16) * 1024 * 1024)
+ written = 0
+ target = args.write_size_mb * 1024 * 1024
+ t0 = time.perf_counter()
+ while written < target:
+ chunk = payload[: min(len(payload), target - written)]
+ n = stream.write(chunk)
+ written += n if isinstance(n, int) and n > 0 else len(chunk)
+ write_elapsed = time.perf_counter() - t0
+ mb_per_s = args.write_size_mb / write_elapsed if write_elapsed else 0
+ print(f" write {args.write_size_mb}MB: "
+ f"{_human(write_elapsed)} ({mb_per_s:.1f} MB/s)")
+
+ # Read (warm)
+ read_times = []
+ for _ in range(args.read_iters):
+ read_times.append(_bench_read(file_io, sample_path))
+ avg_read = sum(read_times) / len(read_times)
+ rmb_per_s = args.write_size_mb / avg_read if avg_read else 0
+ print(f" read {args.write_size_mb}MB (avg of {args.read_iters}): "
+ f"{_human(avg_read)} ({rmb_per_s:.1f} MB/s)")
+
+ # List
+ list_elapsed = _bench_list(file_io, bench_root, args.list_files)
+ print(f" list {args.list_files} files: {_human(list_elapsed)}")
+
+ finally:
+ try:
+ file_io.delete(bench_root, recursive=True)
+ except Exception as e:
+ print(f" cleanup failed: {e}")
+
+
+def main():
+ parser = argparse.ArgumentParser()
+ parser.add_argument("--warehouse", required=True,
+ help="HDFS URI under which scratch files are created")
+ parser.add_argument("--backend", default="both",
+ choices=["native", "pyarrow", "both"])
+ parser.add_argument("--write-size-mb", type=int, default=128)
+ parser.add_argument("--list-files", type=int, default=1000)
+ parser.add_argument("--read-iters", type=int, default=3)
+ args = parser.parse_args()
+
+ backends = ["native", "pyarrow"] if args.backend == "both" else
[args.backend]
+ for backend in backends:
+ run_one(backend, args)
+
+
+if __name__ == "__main__":
+ main()
diff --git a/paimon-python/pypaimon/common/file_io.py
b/paimon-python/pypaimon/common/file_io.py
index 217ae4fd72..fabda00495 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/common/file_io.py
@@ -289,8 +289,11 @@ class FileIO(ABC):
"""
Returns a FileIO instance for accessing the file system identified by
the given path.
- LocalFileIO for local file system (file:// or no scheme)
- - PyArrowFileIO for remote file systems (oss://, s3://, hdfs://, etc.)
+ - HdfsNativeFileIO for HDFS/ViewFS (default; pure protocol client, no
Hadoop install)
+ - PyArrowFileIO for other remote file systems (oss://, s3://, gs://,
...),
+ and for HDFS when explicitly requested via hdfs.client.impl=pyarrow
"""
+ import os as _os
from urllib.parse import urlparse
uri = urlparse(path)
@@ -300,5 +303,39 @@ class FileIO(ABC):
from pypaimon.filesystem.local_file_io import LocalFileIO
return LocalFileIO(path, catalog_options)
+ opts = catalog_options or Options({})
+
+ if scheme in ("hdfs", "viewfs"):
+ from pypaimon.common.options.config import HdfsOptions
+ impl_source = "hdfs.client.impl option"
+ # Treat an empty option value the same as "unset" so callers can
+ # blank it out (common in templated configs) without tripping
+ # the unsupported-impl branch.
+ impl_value = opts.to_map().get(HdfsOptions.HDFS_CLIENT_IMPL.key())
+ if not impl_value:
+ impl_value = _os.environ.get("PYPAIMON_HDFS_IMPL")
+ impl_source = "PYPAIMON_HDFS_IMPL env var"
+ if not impl_value:
+ impl_value = HdfsOptions.HDFS_CLIENT_IMPL.default_value()
+ impl_source = "default"
+ impl = impl_value.lower()
+ if impl == "native":
+ try:
+ from pypaimon.filesystem.hdfs_native_file_io import
HdfsNativeFileIO
+ return HdfsNativeFileIO(path, opts)
+ except (ImportError, RuntimeError) as e:
+ fallback =
opts.get(HdfsOptions.HDFS_CLIENT_FALLBACK_TO_PYARROW)
+ if not fallback:
+ raise
+ logging.getLogger(__name__).warning(
+ "Native HDFS backend init failed, falling back to "
+ "pyarrow: %s", e,
+ )
+ elif impl != "pyarrow":
+ raise ValueError(
+ f"Unsupported hdfs.client.impl '{impl_value}' "
+ f"(from {impl_source}). Supported: 'native', 'pyarrow'."
+ )
+
from pypaimon.filesystem.pyarrow_file_io import PyArrowFileIO
- return PyArrowFileIO(path, catalog_options or Options({}))
+ return PyArrowFileIO(path, opts)
diff --git a/paimon-python/pypaimon/common/options/config.py
b/paimon-python/pypaimon/common/options/config.py
index 465671725d..6189ebaf89 100644
--- a/paimon-python/pypaimon/common/options/config.py
+++ b/paimon-python/pypaimon/common/options/config.py
@@ -104,6 +104,41 @@ class CatalogOptions:
BLOB_FILE_IO_DEFAULT_CACHE_SIZE = 2 ** 31 - 1
+class HdfsOptions:
+ HDFS_CLIENT_IMPL = (
+ ConfigOptions.key("hdfs.client.impl")
+ .string_type()
+ .default_value("native")
+ .with_description(
+ "HDFS FileIO backend. Supported values: 'native' (default, uses "
+ "hdfs-native protocol client, no Hadoop install required), "
+ "'pyarrow' (legacy, requires HADOOP_HOME / libhdfs / JVM)."
+ )
+ )
+ HDFS_CLIENT_FALLBACK_TO_PYARROW = (
+ ConfigOptions.key("hdfs.client.fallback-to-pyarrow")
+ .boolean_type()
+ .default_value(True)
+ .with_description(
+ "When the native backend fails to initialise (e.g. missing wheel "
+ "or unsupported platform), fall back to the pyarrow backend "
+ "instead of raising."
+ )
+ )
+ HDFS_CONF_DIR = (
+ ConfigOptions.key("hdfs.conf-dir")
+ .string_type()
+ .no_default_value()
+ .with_description(
+ "Directory containing core-site.xml / hdfs-site.xml that the "
+ "native client should load. Defaults to $HADOOP_CONF_DIR."
+ )
+ )
+
+ HDFS_CONFIG_PREFIX = "hdfs.config."
+ HDFS_NATIVE_CONFIG_KEY_PREFIXES = ("dfs.", "fs.", "hadoop.", "ipc.", "io.")
+
+
class SecurityOptions:
KERBEROS_PRINCIPAL = (
ConfigOptions.key("security.kerberos.login.principal")
diff --git a/paimon-python/pypaimon/filesystem/_kerberos.py
b/paimon-python/pypaimon/filesystem/_kerberos.py
new file mode 100644
index 0000000000..00a31ba975
--- /dev/null
+++ b/paimon-python/pypaimon/filesystem/_kerberos.py
@@ -0,0 +1,45 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Shared Kerberos helpers used by HDFS FileIO backends."""
+
+import os
+import subprocess
+from typing import Optional
+
+
+def kerberos_login_from_keytab(principal: str, keytab: str) -> None:
+ if not os.path.isfile(keytab):
+ raise FileNotFoundError(f"Kerberos keytab file not found: {keytab}")
+ if not os.access(keytab, os.R_OK):
+ raise PermissionError(f"Kerberos keytab file is not readable:
{keytab}")
+ subprocess.run(
+ ['kinit', '-kt', keytab, principal],
+ check=True, capture_output=True, text=True,
+ )
+
+
+def get_ticket_cache_path() -> Optional[str]:
+ cc = os.environ.get('KRB5CCNAME')
+ if cc:
+ if cc.startswith('FILE:'):
+ return cc[5:]
+ return cc
+ default_path = f'/tmp/krb5cc_{os.getuid()}'
+ if os.path.exists(default_path):
+ return default_path
+ return None
diff --git a/paimon-python/pypaimon/filesystem/hdfs_native_file_io.py
b/paimon-python/pypaimon/filesystem/hdfs_native_file_io.py
new file mode 100644
index 0000000000..ba6fc0f4b6
--- /dev/null
+++ b/paimon-python/pypaimon/filesystem/hdfs_native_file_io.py
@@ -0,0 +1,689 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""HDFS FileIO backed by the hdfs-native protocol client (no JVM, no
libhdfs)."""
+
+import logging
+import os
+import xml.etree.ElementTree as ET
+from datetime import datetime, timezone
+from pathlib import PurePosixPath
+from typing import Dict, Optional
+from urllib.parse import urlparse
+
+import pyarrow
+import pyarrow.fs as pafs
+
+from pypaimon.common.file_io import FileIO
+from pypaimon.common.options import Options
+from pypaimon.common.options.config import HdfsOptions, SecurityOptions
+from pypaimon.common.uri_reader import UriReaderFactory
+from pypaimon.filesystem import _kerberos
+from pypaimon.schema.data_types import AtomicType, DataField,
PyarrowFieldParser
+from pypaimon.write.blob_format_writer import BlobFormatWriter
+
+
+class _HdfsFileInfo:
+ """pafs.FileInfo-shaped adapter built from hdfs_native.FileStatus."""
+ __slots__ = ('path', 'size', 'type', 'mtime', 'base_name')
+
+ def __init__(self, path: str, size: Optional[int], file_type, mtime):
+ self.path = path
+ self.size = size
+ self.type = file_type
+ self.mtime = mtime
+ self.base_name = path.rsplit('/', 1)[-1] if path else ''
+
+
+class _HdfsWriterAdapter:
+ """File-like wrapper over hdfs_native.FileWriter."""
+
+ def __init__(self, fw):
+ self._fw = fw
+ self._pos = 0
+ self._closed = False
+
+ def write(self, buf) -> int:
+ n = self._fw.write(buf)
+ if n is None:
+ n = len(buf) if hasattr(buf, '__len__') else 0
+ self._pos += n
+ return n
+
+ def tell(self) -> int:
+ return self._pos
+
+ def flush(self):
+ pass
+
+ def close(self):
+ if not self._closed:
+ try:
+ self._fw.close()
+ finally:
+ self._closed = True
+
+ @property
+ def closed(self) -> bool:
+ return self._closed
+
+ def writable(self) -> bool:
+ return True
+
+ def readable(self) -> bool:
+ return False
+
+ def seekable(self) -> bool:
+ return False
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *exc):
+ self.close()
+ return False
+
+
+class _HdfsReaderAdapter:
+ """File-like wrapper over hdfs_native.FileReader.
+
+ Delegates read/seek/tell straight to the underlying reader (which is an
+ io.RawIOBase subclass with full seek/tell support). The wrapper only
+ exists so that exiting a `with` block guarantees the underlying handle
+ is closed — hdfs-native's own FileReader.__exit__ is a no-op.
+ """
+
+ def __init__(self, fr):
+ self._fr = fr
+ self._closed = False
+
+ def read(self, size: int = -1) -> bytes:
+ return self._fr.read(-1 if size is None else size)
+
+ def read1(self, size: int = -1) -> bytes:
+ return self.read(size)
+
+ def seek(self, pos: int, whence: int = 0) -> int:
+ self._fr.seek(pos, whence)
+ return self._fr.tell()
+
+ def tell(self) -> int:
+ return self._fr.tell()
+
+ def close(self):
+ if self._closed:
+ return
+ try:
+ close = getattr(self._fr, 'close', None)
+ if close is not None:
+ close()
+ finally:
+ self._closed = True
+
+ @property
+ def closed(self) -> bool:
+ return self._closed
+
+ def readable(self) -> bool:
+ return True
+
+ def writable(self) -> bool:
+ return False
+
+ def seekable(self) -> bool:
+ return True
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *exc):
+ self.close()
+ return False
+
+
+class HdfsNativeFileIO(FileIO):
+ """HDFS FileIO that speaks the HDFS RPC protocol directly.
+
+ No JVM, no libhdfs, no Hadoop install required. Hadoop xml is still
+ consumed if present (HADOOP_CONF_DIR or `hdfs.conf-dir` option) for
+ viewfs mount tables and HA NameNode lists; alternatively the same
+ key/values can be delivered via the catalog options channel (a REST
+ catalog can therefore push the cluster wiring with the response).
+ """
+
+ NATIVE_KEY_PREFIXES = HdfsOptions.HDFS_NATIVE_CONFIG_KEY_PREFIXES
+ NS_PREFIX = HdfsOptions.HDFS_CONFIG_PREFIX
+
+ def __init__(self, path: str, catalog_options: Options):
+ self.properties = catalog_options or Options({})
+ self.logger = logging.getLogger(__name__)
+ self.uri_reader_factory = UriReaderFactory(self.properties)
+
+ scheme, netloc, _ = self.parse_location(path)
+ if scheme not in {"hdfs", "viewfs"}:
+ raise ValueError(
+ f"HdfsNativeFileIO does not support scheme '{scheme}'"
+ )
+ self._scheme = scheme
+ self._netloc = netloc
+
+ try:
+ from hdfs_native import Client, WriteOptions
+ except ImportError as e:
+ raise ImportError(
+ "hdfs-native is not installed. "
+ "Install with: pip install 'pypaimon[hdfs]'"
+ ) from e
+ self._WriteOptions = WriteOptions
+
+ self._setup_kerberos()
+
+ config_dir = (
+ self.properties.get(HdfsOptions.HDFS_CONF_DIR)
+ or os.environ.get("HADOOP_CONF_DIR")
+ )
+ hadoop_xml = self._load_hadoop_xml(config_dir)
+
+ config = self._build_config_dict()
+ self._maybe_inject_viewfs_fallback(scheme, netloc, config, hadoop_xml)
+
+ # Stash for the lazy `filesystem` property (the fsspec/pyarrow facade
+ # is only built if a caller asks for it).
+ self._config = config
+ self._hadoop_xml = hadoop_xml
+ self._config_dir = config_dir
+ self._filesystem = None
+
+ client_kwargs = {}
+ url = self._build_url(scheme, netloc)
+ if url:
+ client_kwargs["url"] = url
+ if config:
+ client_kwargs["config"] = config
+ if config_dir:
+ client_kwargs["config_dir"] = config_dir
+
+ self._client = Client(**client_kwargs)
+
+ def __reduce__(self):
+ """Pickle support for Ray / multiprocessing.
+
+ hdfs_native.Client is a Rust binding that can't be pickled; rather
+ than try to serialise live handles, we serialise the constructor
+ inputs and let workers re-init their own Client. Same pattern
+ pyarrow.fs.HadoopFileSystem uses.
+
+ Pin the resolved config_dir into the carried options. If the
+ driver resolved it from $HADOOP_CONF_DIR, a worker on a host with
+ a different env var would otherwise pick up the worker's value
+ and silently talk to a different cluster.
+ """
+ netloc = self._netloc or ""
+ path = f"{self._scheme}://{netloc}"
+ props_map = dict(self.properties.to_map())
+ if self._config_dir and not props_map.get(
+ HdfsOptions.HDFS_CONF_DIR.key()
+ ):
+ props_map[HdfsOptions.HDFS_CONF_DIR.key()] = self._config_dir
+ return (type(self), (path, Options(props_map)))
+
+ @property
+ def filesystem(self):
+ """pyarrow.fs.FileSystem facade backed by hdfs_native.fsspec.
+
+ Lazily constructed: FileIO-only call paths
+ (exists/list_status/new_input_stream/...) never pay the fsspec init
+ cost; only ds.dataset / open_input_file callers do.
+ """
+ if self._filesystem is None:
+ import pyarrow.fs as pafs
+ try:
+ from hdfs_native.fsspec import (
+ HdfsFileSystem,
+ ViewfsFileSystem,
+ )
+ except ImportError as e:
+ raise RuntimeError(
+ "hdfs-native fsspec adapter is required to bridge "
+ "HdfsNativeFileIO to a pyarrow.fs filesystem; upgrade "
+ "hdfs-native (>=0.13)."
+ ) from e
+ cls = (ViewfsFileSystem if self._scheme == "viewfs"
+ else HdfsFileSystem)
+ # Merge xml + overrides so the fsspec instance can connect
+ # without relying on HADOOP_CONF_DIR (BaseFileSystem.__init__
+ # only forwards storage_options to Client, not config_dir).
+ merged_config = {**self._hadoop_xml, **self._config}
+ fsspec_fs = cls(host=self._netloc, **merged_config)
+ self._filesystem = pafs.PyFileSystem(
+ pafs.FSSpecHandler(fsspec_fs))
+ return self._filesystem
+
+ @staticmethod
+ def parse_location(location: str):
+ uri = urlparse(location)
+ if not uri.scheme:
+ return "file", uri.netloc, os.path.abspath(location)
+ return uri.scheme, uri.netloc, uri.path
+
+ @staticmethod
+ def _build_url(scheme: str, netloc: Optional[str]) -> Optional[str]:
+ if not netloc:
+ return None
+ return f"{scheme}://{netloc}"
+
+ @staticmethod
+ def _load_hadoop_xml(config_dir: Optional[str]) -> Dict[str, str]:
+ """Parse core-site.xml + hdfs-site.xml from a Hadoop config dir into a
+ flat {name: value} dict. Returns empty dict if the dir is missing or
+ unreadable.
+
+ Used only to discover viewfs mount-table state so we can polyfill the
+ linkFallback mount that hdfs-native requires but libhdfs tolerates.
+ The final config dir is still handed to hdfs-native for its own
+ (more complete) xml parsing.
+ """
+ result: Dict[str, str] = {}
+ if not config_dir or not os.path.isdir(config_dir):
+ return result
+ for fname in ("core-site.xml", "hdfs-site.xml"):
+ path = os.path.join(config_dir, fname)
+ if not os.path.isfile(path):
+ continue
+ try:
+ tree = ET.parse(path)
+ except (ET.ParseError, OSError):
+ continue
+ for prop in tree.getroot().findall("property"):
+ name_el = prop.find("name")
+ value_el = prop.find("value")
+ if name_el is None or name_el.text is None:
+ continue
+ value = (
+ value_el.text.strip()
+ if value_el is not None and value_el.text
+ else ""
+ )
+ result[name_el.text.strip()] = value
+ return result
+
+ @staticmethod
+ def _maybe_inject_viewfs_fallback(
+ scheme: str,
+ netloc: Optional[str],
+ overrides: Dict[str, str],
+ hadoop_xml: Dict[str, str],
+ ) -> None:
+ """If we're opening a viewfs URI and no linkFallback is configured for
+ the cluster, pick a usable nameservice URI from existing link.*
+ targets or dfs.nameservices and inject one into `overrides`.
+
+ hdfs-native rejects viewfs init without a fallback mount; libhdfs
+ tolerates it. This bridges the gap without touching cluster xml.
+
+ The mount-table state is read from the merged view of hadoop xml and
+ catalog-option overrides, so a zero-file viewfs setup (link.* /
+ dfs.nameservices pushed purely through catalog options) gets a
+ fallback too; the injected key is still only written back to
+ `overrides`.
+ """
+ if scheme != "viewfs" or not netloc:
+ return
+ cluster = netloc
+ fallback_key = f"fs.viewfs.mounttable.{cluster}.linkFallback"
+ if fallback_key in overrides or fallback_key in hadoop_xml:
+ return
+
+ merged = {**hadoop_xml, **overrides}
+
+ link_prefix = f"fs.viewfs.mounttable.{cluster}.link."
+ for key, value in merged.items():
+ if key.startswith(link_prefix) and value:
+ parsed = urlparse(value)
+ if parsed.scheme == "hdfs" and parsed.netloc:
+ overrides[fallback_key] = f"hdfs://{parsed.netloc}/"
+ return
+
+ nameservices = [
+ ns.strip()
+ for ns in merged.get("dfs.nameservices", "").split(",")
+ if ns.strip()
+ ]
+ if nameservices:
+ overrides[fallback_key] = f"hdfs://{nameservices[0]}/"
+
+ def _setup_kerberos(self):
+ principal = (
+ self.properties.get(SecurityOptions.KERBEROS_PRINCIPAL)
+ or self.properties.to_map().get("security.principal")
+ )
+ keytab = (
+ self.properties.get(SecurityOptions.KERBEROS_KEYTAB)
+ or self.properties.to_map().get("security.keytab")
+ )
+ if bool(principal) != bool(keytab):
+ raise ValueError(
+ "security.kerberos.login.principal and "
+ "security.kerberos.login.keytab "
+ "must be both set or both unset"
+ )
+ if principal and keytab:
+ _kerberos.kerberos_login_from_keytab(principal, keytab)
+ cache_path = _kerberos.get_ticket_cache_path()
+ if not cache_path:
+ raise RuntimeError(
+ "kinit succeeded but no ticket cache path could be "
+ "determined. Set the KRB5CCNAME environment variable "
+ "to specify the cache location."
+ )
+ # hdfs-native's GSSAPI layer reads KRB5CCNAME from the process
+ # env, which is global state. If a different cache was already
+ # configured (typically because another HdfsNativeFileIO with
+ # a different principal lives in the same process), warn — the
+ # last writer wins and earlier instances will start using the
+ # new ticket, which is almost certainly not what the caller
+ # wanted.
+ existing = os.environ.get("KRB5CCNAME")
+ existing_stripped = (
+ existing[5:] if existing and existing.startswith("FILE:")
+ else existing
+ )
+ if existing_stripped and existing_stripped != cache_path:
+ self.logger.warning(
+ "Overwriting process-global KRB5CCNAME from %r to %r; "
+ "concurrent HdfsNativeFileIO instances with different "
+ "Kerberos principals share env state and will clobber "
+ "each other's ticket caches.",
+ existing, cache_path,
+ )
+ # Preserve the `FILE:` qualifier if the existing value carried
+ # it — some GSSAPI tooling distinguishes cache types by prefix.
+ os.environ["KRB5CCNAME"] = (
+ f"FILE:{cache_path}"
+ if existing and existing.startswith("FILE:")
+ else cache_path
+ )
+
+ def _build_config_dict(self) -> Dict[str, str]:
+ config: Dict[str, str] = {}
+ for key, value in self.properties.to_map().items():
+ if value is None:
+ continue
+ if any(key.startswith(p) for p in self.NATIVE_KEY_PREFIXES):
+ config[key] = str(value)
+ elif key.startswith(self.NS_PREFIX):
+ config[key[len(self.NS_PREFIX):]] = str(value)
+ return config
+
+ def to_filesystem_path(self, path: str) -> str:
+ # hdfs-native expects an absolute path within the cluster the Client is
+ # bound to; passing a full URI makes its Rust-side MountTable::resolve
+ # treat the string as a relative path (since it doesn't start with '/')
+ # and prepend the user's home dir, producing nonsense like
+ # `/user/foo/viewfs://cluster/...`. Strip the matching scheme+authority
+ # so a plain absolute path reaches the client.
+ parsed = urlparse(path)
+ if parsed.scheme in ("hdfs", "viewfs"):
+ if parsed.scheme == self._scheme and (
+ not parsed.netloc or parsed.netloc == self._netloc
+ ):
+ return parsed.path or "/"
+ return path
+
+ def _adapt_status(self, status, fallback_path: str = '') -> _HdfsFileInfo:
+ path = getattr(status, 'path', None) or fallback_path
+ is_dir = bool(getattr(status, 'isdir', False))
+ length = getattr(status, 'length', 0)
+ mtime_ms = getattr(status, 'modification_time', None)
+ mtime = (
+ datetime.fromtimestamp(mtime_ms / 1000.0, tz=timezone.utc)
+ if mtime_ms else None
+ )
+ size = None if is_dir else int(length or 0)
+ ftype = pafs.FileType.Directory if is_dir else pafs.FileType.File
+ return _HdfsFileInfo(path, size, ftype, mtime)
+
+ def new_input_stream(self, path: str):
+ path_str = self.to_filesystem_path(path)
+ reader = self._client.read(path_str)
+ return _HdfsReaderAdapter(reader)
+
+ def new_output_stream(self, path: str):
+ path_str = self.to_filesystem_path(path)
+ writer = self._client.create(
+ path_str,
+ self._WriteOptions(create_parent=True, overwrite=True),
+ )
+ return _HdfsWriterAdapter(writer)
+
+ def get_file_status(self, path: str):
+ path_str = self.to_filesystem_path(path)
+ try:
+ status = self._client.get_file_info(path_str)
+ except FileNotFoundError:
+ raise FileNotFoundError(f"File {path} does not exist")
+ return self._adapt_status(status, path_str)
+
+ def list_status(self, path: str):
+ path_str = self.to_filesystem_path(path)
+ return [self._adapt_status(s) for s in
self._client.list_status(path_str)]
+
+ def exists(self, path: str) -> bool:
+ path_str = self.to_filesystem_path(path)
+ try:
+ self._client.get_file_info(path_str)
+ return True
+ except FileNotFoundError:
+ return False
+
+ def delete(self, path: str, recursive: bool = False) -> bool:
+ path_str = self.to_filesystem_path(path)
+ try:
+ status = self._client.get_file_info(path_str)
+ except FileNotFoundError:
+ return False
+ if bool(getattr(status, 'isdir', False)) and not recursive:
+ if next(iter(self._client.list_status(path_str)), None) is not
None:
+ raise OSError(f"Directory {path} is not empty")
+ return bool(self._client.delete(path_str, recursive))
+
+ def mkdirs(self, path: str) -> bool:
+ path_str = self.to_filesystem_path(path)
+ try:
+ status = self._client.get_file_info(path_str)
+ except FileNotFoundError:
+ self._client.mkdirs(path_str, create_parent=True)
+ return True
+ if bool(getattr(status, 'isdir', False)):
+ return True
+ raise FileExistsError(f"Path exists but is not a directory: {path}")
+
+ def rename(self, src: str, dst: str) -> bool:
+ src_str = self.to_filesystem_path(src)
+ dst_str = self.to_filesystem_path(dst)
+ dst_parent = str(PurePosixPath(dst_str).parent)
+ if dst_parent and dst_parent != '.':
+ try:
+ self._client.get_file_info(dst_parent)
+ except FileNotFoundError:
+ self._client.mkdirs(dst_parent, create_parent=True)
+ try:
+ dst_status = self._client.get_file_info(dst_str)
+ if not getattr(dst_status, 'isdir', False):
+ return False
+ src_name = PurePosixPath(src_str).name
+ dst_str = str(PurePosixPath(dst_str) / src_name)
+ try:
+ self._client.get_file_info(dst_str)
+ return False
+ except FileNotFoundError:
+ pass
+ except FileNotFoundError:
+ pass
+ try:
+ self._client.rename(src_str, dst_str)
+ return True
+ except FileNotFoundError:
+ return False
+ except (PermissionError, OSError):
+ return False
+
+ def write_parquet(self, path: str, data: pyarrow.Table,
+ compression: str = 'zstd', zstd_level: int = 1,
**kwargs):
+ try:
+ import pyarrow.parquet as pq
+ if compression.lower() == 'zstd':
+ kwargs['compression_level'] = zstd_level
+ with self.new_output_stream(path) as raw_stream:
+ stream = pyarrow.PythonFile(raw_stream, mode='wb')
+ try:
+ pq.write_table(
+ data, stream, compression=compression, **kwargs)
+ finally:
+ stream.close()
+ except Exception as e:
+ self.delete_quietly(path)
+ raise RuntimeError(f"Failed to write Parquet file {path}: {e}")
from e
+
+ def write_orc(self, path: str, data: pyarrow.Table,
+ compression: str = 'zstd', zstd_level: int = 1, **kwargs):
+ try:
+ import sys
+ import pyarrow.orc as orc
+ data = self._cast_time_columns_for_orc(data)
+ with self.new_output_stream(path) as raw_stream:
+ stream = pyarrow.PythonFile(raw_stream, mode='wb')
+ try:
+ if sys.version_info[:2] == (3, 6):
+ orc.write_table(data, stream, **kwargs)
+ else:
+ orc.write_table(
+ data, stream, compression=compression, **kwargs)
+ finally:
+ stream.close()
+ except Exception as e:
+ self.delete_quietly(path)
+ raise RuntimeError(f"Failed to write ORC file {path}: {e}") from e
+
+ def write_avro(self, path: str, data: pyarrow.Table,
+ avro_schema=None, compression: str = 'zstd',
+ zstd_level: int = 1, **kwargs):
+ import fastavro
+ if avro_schema is None:
+ avro_schema = PyarrowFieldParser.to_avro_schema(data.schema)
+
+ records_dict = data.to_pydict()
+
+ def record_generator():
+ num_rows = len(list(records_dict.values())[0])
+ for i in range(num_rows):
+ record = {}
+ for col in records_dict.keys():
+ value = records_dict[col][i]
+ if isinstance(value, datetime) and value.tzinfo is None:
+ value = value.replace(tzinfo=timezone.utc)
+ record[col] = value
+ yield record
+
+ codec_map = {
+ 'null': 'null', 'deflate': 'deflate', 'snappy': 'snappy',
+ 'bzip2': 'bzip2', 'xz': 'xz', 'zstandard': 'zstandard',
+ 'zstd': 'zstandard',
+ }
+ codec = codec_map.get(compression.lower())
+ if codec is None:
+ raise ValueError(
+ f"Unsupported compression '{compression}' for Avro format. "
+ f"Supported compressions: {',
'.join(sorted(codec_map.keys()))}."
+ )
+ if codec == 'zstandard':
+ kwargs['codec_compression_level'] = zstd_level
+ with self.new_output_stream(path) as output_stream:
+ fastavro.writer(output_stream, avro_schema,
+ record_generator(), codec=codec, **kwargs)
+
+ def write_blob(self, path: str, data: pyarrow.Table, **kwargs):
+ try:
+ if data.num_columns != 1:
+ raise RuntimeError(
+ f"Blob format only supports a single column, "
+ f"got {data.num_columns} columns")
+ field = data.schema[0]
+ if pyarrow.types.is_large_binary(field.type):
+ fields = [DataField(0, field.name, AtomicType("BLOB"))]
+ else:
+ paimon_type = PyarrowFieldParser.to_paimon_type(
+ field.type, field.nullable)
+ fields = [DataField(0, field.name, paimon_type)]
+ records_dict = data.to_pydict()
+ num_rows = data.num_rows
+ field_name = fields[0].name
+ with self.new_output_stream(path) as output_stream:
+ writer = BlobFormatWriter(output_stream)
+ for i in range(num_rows):
+ writer.write_value(records_dict[field_name][i],
+ fields, self.uri_reader_factory)
+ writer.close()
+ except Exception as e:
+ self.delete_quietly(path)
+ raise RuntimeError(f"Failed to write blob file {path}: {e}") from e
+
+ def write_lance(self, path: str, data: pyarrow.Table, **kwargs):
+ # Mirror the remote-scheme writer: lance/vortex talk to the backend
+ # through their own object_store, so we hand them the URI plus any
+ # storage options the FileIO exposes rather than routing through the
+ # native client. Without these two methods, an HDFS table configured
+ # with file.format=lance/vortex would hit FileIO's NotImplementedError
+ # now that this class is the default hdfs:// backend.
+ try:
+ import lance
+
+ from pypaimon.read.reader.lance_utils import to_lance_specified
+ file_path_for_lance, storage_options = to_lance_specified(self,
path)
+
+ writer = lance.file.LanceFileWriter(
+ file_path_for_lance, data.schema,
+ storage_options=storage_options, **kwargs)
+ try:
+ for batch in data.to_batches():
+ writer.write_batch(batch)
+ finally:
+ writer.close()
+ except Exception as e:
+ self.delete_quietly(path)
+ raise RuntimeError(f"Failed to write Lance file {path}: {e}") from
e
+
+ def write_vortex(self, path: str, data: pyarrow.Table, **kwargs):
+ try:
+ import vortex
+ from vortex import store
+
+ from pypaimon.read.reader.vortex_utils import to_vortex_specified
+ file_path_for_vortex, store_kwargs = to_vortex_specified(self,
path)
+
+ if store_kwargs:
+ vortex_store = store.from_url(file_path_for_vortex,
**store_kwargs)
+ vortex_store.write(vortex.array(data))
+ else:
+ from vortex._lib.io import write as vortex_write
+ vortex_write(vortex.array(data), file_path_for_vortex)
+ except Exception as e:
+ self.delete_quietly(path)
+ raise RuntimeError(f"Failed to write Vortex file {path}: {e}")
from e
+
+ def close(self):
+ self._client = None
diff --git a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
index 74d43579a2..79df9f79cb 100644
--- a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
+++ b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
@@ -329,26 +329,13 @@ class PyArrowFileIO(FileIO):
@staticmethod
def _kerberos_login_from_keytab(principal: str, keytab: str):
- if not os.path.isfile(keytab):
- raise FileNotFoundError(f"Kerberos keytab file not found:
{keytab}")
- if not os.access(keytab, os.R_OK):
- raise PermissionError(f"Kerberos keytab file is not readable:
{keytab}")
- subprocess.run(
- ['kinit', '-kt', keytab, principal],
- check=True, capture_output=True, text=True
- )
+ from pypaimon.filesystem import _kerberos
+ _kerberos.kerberos_login_from_keytab(principal, keytab)
@staticmethod
def _get_ticket_cache_path() -> Optional[str]:
- cc = os.environ.get('KRB5CCNAME')
- if cc:
- if cc.startswith('FILE:'):
- return cc[5:]
- return cc
- default_path = f'/tmp/krb5cc_{os.getuid()}'
- if os.path.exists(default_path):
- return default_path
- return None
+ from pypaimon.filesystem import _kerberos
+ return _kerberos.get_ticket_cache_path()
def new_input_stream(self, path: str):
path_str = self.to_filesystem_path(path)
diff --git a/paimon-python/pypaimon/tests/e2e/hdfs/README.md
b/paimon-python/pypaimon/tests/e2e/hdfs/README.md
new file mode 100644
index 0000000000..47f63701ed
--- /dev/null
+++ b/paimon-python/pypaimon/tests/e2e/hdfs/README.md
@@ -0,0 +1,75 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# HDFS End-to-End Tests
+
+Verifies the native HDFS FileIO backend (`HdfsNativeFileIO`) against a live
HDFS
+cluster. No local Hadoop install or JVM required on the client side.
+
+## Quick start (Docker)
+
+```sh
+# 1. Bring up a single-NameNode + single-DataNode cluster.
+docker compose -f pypaimon/tests/e2e/hdfs/docker-compose.yml up -d
+
+# Wait ~30s for the cluster to become healthy; check with:
+docker compose -f pypaimon/tests/e2e/hdfs/docker-compose.yml ps
+
+# 2. Install the package with the hdfs extra.
+pip install -e '.[hdfs]'
+
+# 3. Run the tests.
+PYPAIMON_HDFS_E2E_URL=hdfs://localhost:8020 \
+ python -m pytest pypaimon/tests/e2e/hdfs/ -v
+
+# 4. Teardown.
+docker compose -f pypaimon/tests/e2e/hdfs/docker-compose.yml down -v
+```
+
+## REST-catalog config delivery mode (no local xml)
+
+The native backend accepts Hadoop key/values directly via catalog options.
+Skip the `core-site.xml` / `hdfs-site.xml` dance entirely by configuring the
+cluster wiring as options — exactly what a REST catalog would push to the
+client in its response. Example:
+
+```python
+catalog = CatalogFactory.create({
+ "warehouse": "viewfs://cluster/warehouse",
+ "hdfs.client.impl": "native",
+ # Forwarded as-is to the underlying client:
+ "dfs.nameservices": "ns1,ns2",
+ "dfs.ha.namenodes.ns1": "nn1,nn2",
+ "dfs.namenode.rpc-address.ns1.nn1": "host-1:8020",
+ "dfs.namenode.rpc-address.ns1.nn2": "host-2:8020",
+ "fs.viewfs.mounttable.cluster.link./prod": "hdfs://ns1/prod",
+})
+```
+
+Keys matching the prefixes `dfs.` / `fs.` / `hadoop.` / `ipc.` / `io.` are
+forwarded automatically. Use the `hdfs.config.<key>` namespace for any other
+key you want passed through.
+
+## Kerberos
+
+The cluster in `docker-compose.yml` runs without security to keep the
+smoke test simple. For a Kerberized e2e: provision a krb5 + HDFS compose
+separately, install `libgssapi-krb5-2` (or platform equivalent) on the
+client, set `KRB5_CONFIG` and `KRB5CCNAME`, then either run `kinit`
+yourself or pass `security.kerberos.login.principal` + `.keytab` as
+catalog options (pypaimon will run `kinit` for you).
diff --git a/paimon-python/pypaimon/tests/e2e/hdfs/__init__.py
b/paimon-python/pypaimon/tests/e2e/hdfs/__init__.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/paimon-python/pypaimon/tests/e2e/hdfs/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/paimon-python/pypaimon/tests/e2e/hdfs/docker-compose.yml
b/paimon-python/pypaimon/tests/e2e/hdfs/docker-compose.yml
new file mode 100644
index 0000000000..145708c3e6
--- /dev/null
+++ b/paimon-python/pypaimon/tests/e2e/hdfs/docker-compose.yml
@@ -0,0 +1,55 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Single-NameNode HDFS cluster for integration testing the native HDFS backend.
+# Brings up one NameNode (RPC 8020) and one DataNode (data 9866).
+#
+# Run:
+# docker compose -f pypaimon/tests/e2e/hdfs/docker-compose.yml up -d
+# PYPAIMON_HDFS_E2E_URL=hdfs://localhost:8020 \
+# python -m pytest pypaimon/tests/e2e/hdfs/hdfs_native_e2e_test.py -v
+# docker compose -f pypaimon/tests/e2e/hdfs/docker-compose.yml down -v
+
+services:
+ namenode:
+ image: apache/hadoop:3.3.6
+ hostname: namenode
+ user: root
+ command: ["/bin/bash", "-c", "hdfs namenode -format -force -nonInteractive
|| true; hdfs namenode"]
+ environment:
+ HADOOP_HOME: /opt/hadoop
+ CORE-SITE.XML_fs.defaultFS: hdfs://namenode:8020
+ HDFS-SITE.XML_dfs.namenode.rpc-address: namenode:8020
+ HDFS-SITE.XML_dfs.replication: "1"
+ HDFS-SITE.XML_dfs.permissions.enabled: "false"
+ ports:
+ - "8020:8020"
+ - "9870:9870"
+
+ datanode:
+ image: apache/hadoop:3.3.6
+ user: root
+ command: ["hdfs", "datanode"]
+ environment:
+ HADOOP_HOME: /opt/hadoop
+ CORE-SITE.XML_fs.defaultFS: hdfs://namenode:8020
+ HDFS-SITE.XML_dfs.datanode.use.datanode.hostname: "false"
+ HDFS-SITE.XML_dfs.permissions.enabled: "false"
+ depends_on:
+ - namenode
+ ports:
+ - "9866:9866"
diff --git a/paimon-python/pypaimon/tests/e2e/hdfs/hdfs_native_e2e_test.py
b/paimon-python/pypaimon/tests/e2e/hdfs/hdfs_native_e2e_test.py
new file mode 100644
index 0000000000..5989b815fa
--- /dev/null
+++ b/paimon-python/pypaimon/tests/e2e/hdfs/hdfs_native_e2e_test.py
@@ -0,0 +1,107 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""End-to-end tests for the native HDFS FileIO backend.
+
+Disabled by default. To run:
+ 1. Start an HDFS cluster — see docker-compose.yml in this directory.
+ 2. Install pypaimon with the hdfs extra:
+ pip install -e '.[hdfs]'
+ 3. Point the tests at the cluster and run:
+ PYPAIMON_HDFS_E2E_URL=hdfs://localhost:8020 \\
+ python -m pytest pypaimon/tests/e2e/hdfs/hdfs_native_e2e_test.py -v
+
+To exercise the REST-catalog config-delivery path (no local xml), put the
+Hadoop config k/v in catalog options under the `dfs.*` / `fs.*` namespaces
+or via `hdfs.config.*` — both are forwarded to the underlying client.
+"""
+
+import os
+import unittest
+import uuid
+
+import pandas as pd
+import pyarrow as pa
+
+E2E_URL = os.environ.get("PYPAIMON_HDFS_E2E_URL")
+SKIP_REASON = ("PYPAIMON_HDFS_E2E_URL not set; skipping HDFS e2e. "
+ "See docker-compose.yml in this directory.")
+
+
[email protected](not E2E_URL, SKIP_REASON)
+class HdfsNativeE2ETest(unittest.TestCase):
+ """Smoke-test the native HDFS backend end-to-end against a live cluster."""
+
+ @classmethod
+ def setUpClass(cls):
+ try:
+ import hdfs_native # noqa: F401
+ except ImportError as e:
+ raise unittest.SkipTest(
+ "hdfs-native not installed. pip install 'pypaimon[hdfs]'"
+ ) from e
+
+ from pypaimon.catalog.catalog_factory import CatalogFactory
+ cls.warehouse = (
+ f"{E2E_URL}/pypaimon-e2e/warehouse-{uuid.uuid4().hex[:8]}"
+ )
+ cls.catalog = CatalogFactory.create({
+ "warehouse": cls.warehouse,
+ "hdfs.client.impl": "native",
+ })
+ cls.catalog.create_database("default", True)
+
+ def _make_table(self, name, schema):
+ from pypaimon.schema.schema import Schema
+ fqn = f"default.{name}"
+ s = Schema.from_pyarrow_schema(
+ schema,
+ options={"file.format": "parquet"},
+ )
+ self.catalog.create_table(fqn, s, False)
+ return self.catalog.get_table(fqn)
+
+ def test_write_then_read_back(self):
+ pa_schema = pa.schema([
+ ("id", pa.int64()),
+ ("payload", pa.string()),
+ ])
+ table = self._make_table(f"t_{uuid.uuid4().hex[:8]}", pa_schema)
+
+ data = pd.DataFrame({
+ "id": list(range(100)),
+ "payload": [f"row-{i}" for i in range(100)],
+ })
+
+ writer = table.new_batch_write_builder().new_write()
+ writer.write_pandas(data)
+ commit_msgs = writer.prepare_commit()
+ committer = table.new_batch_write_builder().new_commit()
+ committer.commit(commit_msgs)
+ writer.close()
+ committer.close()
+
+ scan = table.new_read_builder().new_scan()
+ reader = table.new_read_builder().new_read()
+ splits = scan.plan().splits()
+ result = reader.to_arrow(splits)
+
+ self.assertEqual(result.num_rows, 100)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/paimon-python/pypaimon/tests/hdfs_native_test.py
b/paimon-python/pypaimon/tests/hdfs_native_test.py
new file mode 100644
index 0000000000..957ee8f123
--- /dev/null
+++ b/paimon-python/pypaimon/tests/hdfs_native_test.py
@@ -0,0 +1,917 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+import sys
+import tempfile
+import types
+import unittest
+from unittest.mock import MagicMock, patch
+
+import pyarrow.fs as pafs
+
+from pypaimon.common.file_io import FileIO
+from pypaimon.common.options import Options
+from pypaimon.common.options.config import HdfsOptions
+
+
+def _install_fake_hdfs_native():
+ """Install a fake hdfs_native module (with .fsspec submodule) into
+ sys.modules.
+
+ Returns (fake_module, mock_client_cls, mock_write_options_cls).
+ """
+ fake = types.ModuleType("hdfs_native")
+ fake.Client = MagicMock(name="Client")
+ fake.WriteOptions = MagicMock(name="WriteOptions")
+ fsspec_mod = types.ModuleType("hdfs_native.fsspec")
+ fsspec_mod.HdfsFileSystem = MagicMock(name="HdfsFileSystem")
+ fsspec_mod.ViewfsFileSystem = MagicMock(name="ViewfsFileSystem")
+ fake.fsspec = fsspec_mod
+ sys.modules["hdfs_native"] = fake
+ sys.modules["hdfs_native.fsspec"] = fsspec_mod
+ return fake, fake.Client, fake.WriteOptions
+
+
+def _uninstall_fake_hdfs_native():
+ sys.modules.pop("hdfs_native", None)
+ sys.modules.pop("hdfs_native.fsspec", None)
+ # Also drop the cached HdfsNativeFileIO so a re-import sees the new fake
+ sys.modules.pop(
+ "pypaimon.filesystem.hdfs_native_file_io", None)
+
+
+class HdfsOptionsTest(unittest.TestCase):
+
+ def test_defaults(self):
+ opts = Options({})
+ self.assertEqual(opts.get(HdfsOptions.HDFS_CLIENT_IMPL), "native")
+ self.assertTrue(opts.get(HdfsOptions.HDFS_CLIENT_FALLBACK_TO_PYARROW))
+ self.assertIsNone(opts.get(HdfsOptions.HDFS_CONF_DIR))
+
+ def test_explicit_pyarrow(self):
+ opts = Options({"hdfs.client.impl": "pyarrow"})
+ self.assertEqual(opts.get(HdfsOptions.HDFS_CLIENT_IMPL), "pyarrow")
+
+ def test_explicit_fallback_false(self):
+ opts = Options({"hdfs.client.fallback-to-pyarrow": "false"})
+ self.assertFalse(opts.get(HdfsOptions.HDFS_CLIENT_FALLBACK_TO_PYARROW))
+
+
+class HdfsNativeFileIORoutingTest(unittest.TestCase):
+
+ def setUp(self):
+ _uninstall_fake_hdfs_native()
+
+ def tearDown(self):
+ _uninstall_fake_hdfs_native()
+
+ def test_local_paths_unaffected(self):
+ fio = FileIO.get("file:///tmp/foo")
+ self.assertEqual(type(fio).__name__, "LocalFileIO")
+
+ def test_default_hdfs_routes_to_native(self):
+ _install_fake_hdfs_native()
+ fio = FileIO.get("hdfs://ns/foo", Options({}))
+ self.assertEqual(type(fio).__name__, "HdfsNativeFileIO")
+
+ def test_explicit_pyarrow_routes_to_pyarrow(self):
+ # No hdfs-native module needed; should go straight to pyarrow.
+ with patch(
+ "pypaimon.filesystem.pyarrow_file_io.PyArrowFileIO.__init__",
+ return_value=None,
+ ):
+ fio = FileIO.get(
+ "hdfs://ns/foo",
+ Options({"hdfs.client.impl": "pyarrow"}),
+ )
+ self.assertEqual(type(fio).__name__, "PyArrowFileIO")
+
+ def test_native_init_failure_falls_back_to_pyarrow(self):
+ # hdfs_native not installed; default fallback enabled.
+ _uninstall_fake_hdfs_native()
+ with patch(
+ "pypaimon.filesystem.pyarrow_file_io.PyArrowFileIO.__init__",
+ return_value=None,
+ ):
+ fio = FileIO.get("hdfs://ns/foo", Options({}))
+ self.assertEqual(type(fio).__name__, "PyArrowFileIO")
+
+ def test_native_init_failure_no_fallback_raises(self):
+ _uninstall_fake_hdfs_native()
+ with self.assertRaises(ImportError):
+ FileIO.get(
+ "hdfs://ns/foo",
+ Options({"hdfs.client.fallback-to-pyarrow": "false"}),
+ )
+
+ def test_unsupported_impl_raises(self):
+ with self.assertRaises(ValueError) as ctx:
+ FileIO.get(
+ "hdfs://ns/foo",
+ Options({"hdfs.client.impl": "bogus"}),
+ )
+ self.assertIn("Unsupported hdfs.client.impl", str(ctx.exception))
+
+ def test_env_var_override_when_option_absent(self):
+ _install_fake_hdfs_native()
+ with patch(
+ "pypaimon.filesystem.pyarrow_file_io.PyArrowFileIO.__init__",
+ return_value=None,
+ ):
+ with patch.dict(os.environ, {"PYPAIMON_HDFS_IMPL": "pyarrow"}):
+ fio = FileIO.get("hdfs://ns/foo", Options({}))
+ self.assertEqual(type(fio).__name__, "PyArrowFileIO")
+
+ def test_option_wins_over_env_var(self):
+ _install_fake_hdfs_native()
+ with patch.dict(os.environ, {"PYPAIMON_HDFS_IMPL": "pyarrow"}):
+ fio = FileIO.get(
+ "hdfs://ns/foo",
+ Options({"hdfs.client.impl": "native"}),
+ )
+ self.assertEqual(type(fio).__name__, "HdfsNativeFileIO")
+
+ def test_viewfs_scheme_routes_to_native(self):
+ _install_fake_hdfs_native()
+ fio = FileIO.get("viewfs://cluster/foo", Options({}))
+ self.assertEqual(type(fio).__name__, "HdfsNativeFileIO")
+
+ def test_empty_impl_option_treated_as_unset(self):
+ # Templated configs sometimes blank the option to opt out — that
+ # should fall through to the default ("native"), not raise
+ # "Unsupported hdfs.client.impl ''".
+ _install_fake_hdfs_native()
+ fio = FileIO.get("hdfs://ns/foo", Options({"hdfs.client.impl": ""}))
+ self.assertEqual(type(fio).__name__, "HdfsNativeFileIO")
+
+
+class HdfsNativeFileIOInitTest(unittest.TestCase):
+
+ def setUp(self):
+ self._fake, self._client_cls, self._wo_cls =
_install_fake_hdfs_native()
+
+ def tearDown(self):
+ _uninstall_fake_hdfs_native()
+
+ def _make(self, path, props=None):
+ from pypaimon.filesystem.hdfs_native_file_io import HdfsNativeFileIO
+ return HdfsNativeFileIO(path, Options(props or {}))
+
+ def test_constructs_client_with_url(self):
+ self._make("hdfs://ns1/warehouse")
+ self._client_cls.assert_called_once()
+ _, kwargs = self._client_cls.call_args
+ self.assertEqual(kwargs.get("url"), "hdfs://ns1")
+ self.assertNotIn("config", kwargs)
+
+ def test_viewfs_scheme_passes_through(self):
+ self._make("viewfs://cluster1/")
+ _, kwargs = self._client_cls.call_args
+ self.assertEqual(kwargs.get("url"), "viewfs://cluster1")
+
+ def test_native_hadoop_keys_forwarded_as_config(self):
+ self._make("hdfs://ns1/foo", {
+ "dfs.nameservices": "ns1",
+ "dfs.ha.namenodes.ns1": "nn1,nn2",
+ "dfs.namenode.rpc-address.ns1.nn1": "host1:8020",
+ "fs.viewfs.mounttable.cluster.link./prod": "hdfs://ns1/prod",
+ "warehouse": "hdfs://ns1/warehouse", # should NOT be forwarded
+ })
+ _, kwargs = self._client_cls.call_args
+ config = kwargs.get("config", {})
+ self.assertEqual(config.get("dfs.nameservices"), "ns1")
+ self.assertEqual(config.get("dfs.ha.namenodes.ns1"), "nn1,nn2")
+ self.assertEqual(
+ config.get("dfs.namenode.rpc-address.ns1.nn1"), "host1:8020")
+ self.assertEqual(
+ config.get("fs.viewfs.mounttable.cluster.link./prod"),
+ "hdfs://ns1/prod")
+ self.assertNotIn("warehouse", config)
+
+ def test_namespaced_overrides_forwarded(self):
+ self._make("hdfs://ns1/foo", {
+ "hdfs.config.dfs.client.read.shortcircuit": "true",
+ })
+ _, kwargs = self._client_cls.call_args
+ config = kwargs.get("config", {})
+ self.assertEqual(
+ config.get("dfs.client.read.shortcircuit"), "true")
+
+ def test_conf_dir_from_option(self):
+ self._make("hdfs://ns1/foo", {
+ "hdfs.conf-dir": "/tmp/conf",
+ })
+ _, kwargs = self._client_cls.call_args
+ self.assertEqual(kwargs.get("config_dir"), "/tmp/conf")
+
+ def test_conf_dir_from_env(self):
+ env = dict(os.environ)
+ env["HADOOP_CONF_DIR"] = "/env/conf"
+ with patch.dict(os.environ, env, clear=True):
+ self._make("hdfs://ns1/foo")
+ _, kwargs = self._client_cls.call_args
+ self.assertEqual(kwargs.get("config_dir"), "/env/conf")
+
+ def test_option_conf_dir_overrides_env(self):
+ env = dict(os.environ)
+ env["HADOOP_CONF_DIR"] = "/env/conf"
+ with patch.dict(os.environ, env, clear=True):
+ self._make("hdfs://ns1/foo", {"hdfs.conf-dir": "/opt/conf"})
+ _, kwargs = self._client_cls.call_args
+ self.assertEqual(kwargs.get("config_dir"), "/opt/conf")
+
+ @patch("pypaimon.filesystem._kerberos.subprocess.run")
+ def test_kerberos_principal_keytab_triggers_kinit(self, mock_kinit):
+ mock_kinit.return_value = MagicMock()
+ with tempfile.NamedTemporaryFile(suffix=".keytab") as keytab_file:
+ with patch.dict(os.environ, {"KRB5CCNAME": "/tmp/kc_test"}):
+ self._make("hdfs://ns1/foo", {
+ "security.kerberos.login.principal": "user@REALM",
+ "security.kerberos.login.keytab": keytab_file.name,
+ })
+ kinit_calls = [
+ c for c in mock_kinit.call_args_list
+ if c[0][0][0] == "kinit"
+ ]
+ self.assertEqual(len(kinit_calls), 1)
+ self.assertEqual(
+ kinit_calls[0][0][0],
+ ["kinit", "-kt", keytab_file.name, "user@REALM"],
+ )
+
+ def test_principal_without_keytab_raises(self):
+ with self.assertRaises(ValueError) as ctx:
+ self._make("hdfs://ns1/foo", {
+ "security.kerberos.login.principal": "user@REALM",
+ })
+ self.assertIn("must be both set or both unset", str(ctx.exception))
+
+ @patch("pypaimon.filesystem._kerberos.subprocess.run")
+ def test_kerberos_preserves_FILE_prefix_on_krb5ccname(self, mock_kinit):
+ # If KRB5CCNAME had a `FILE:` qualifier, the rewrite after kinit
+ # must keep it so GSSAPI cache-type detection isn't perturbed.
+ mock_kinit.return_value = MagicMock()
+ with tempfile.NamedTemporaryFile(suffix=".keytab") as keytab_file:
+ with patch.dict(os.environ,
+ {"KRB5CCNAME": "FILE:/tmp/kc_test"},
+ clear=True):
+ self._make("hdfs://ns1/foo", {
+ "security.kerberos.login.principal": "user@REALM",
+ "security.kerberos.login.keytab": keytab_file.name,
+ })
+ self.assertEqual(
+ os.environ["KRB5CCNAME"], "FILE:/tmp/kc_test")
+
+ @patch("pypaimon.filesystem._kerberos.get_ticket_cache_path",
+ return_value="/tmp/freshly_kinit_cache")
+ @patch("pypaimon.filesystem._kerberos.subprocess.run")
+ def test_kerberos_warns_when_overwriting_different_cache(
+ self, mock_kinit, _mock_cache,
+ ):
+ # Multi-principal in the same process clobbers KRB5CCNAME; we warn
+ # so the operator sees the race instead of silently mis-routing
+ # earlier instances' RPCs.
+ mock_kinit.return_value = MagicMock()
+ with tempfile.NamedTemporaryFile(suffix=".keytab") as keytab_file:
+ with patch.dict(os.environ,
+ {"KRB5CCNAME": "/tmp/some_other_cache"},
+ clear=True):
+ with self.assertLogs(
+ "pypaimon.filesystem.hdfs_native_file_io",
+ level="WARNING",
+ ) as log_ctx:
+ self._make("hdfs://ns1/foo", {
+ "security.kerberos.login.principal": "user@REALM",
+ "security.kerberos.login.keytab": keytab_file.name,
+ })
+ self.assertTrue(
+ any("Overwriting process-global KRB5CCNAME" in m
+ for m in log_ctx.output),
+ log_ctx.output,
+ )
+
+ @patch("pypaimon.filesystem._kerberos.get_ticket_cache_path",
+ return_value="/tmp/kc_test")
+ @patch("pypaimon.filesystem._kerberos.subprocess.run")
+ def test_kerberos_no_warning_when_cache_unchanged(
+ self, mock_kinit, _mock_cache,
+ ):
+ import logging as _logging
+ mock_kinit.return_value = MagicMock()
+ with tempfile.NamedTemporaryFile(suffix=".keytab") as keytab_file:
+ # Pre-existing value matches the kinit-resolved cache → no warn.
+ with patch.dict(os.environ,
+ {"KRB5CCNAME": "/tmp/kc_test"},
+ clear=True):
+ # assertNoLogs is 3.10+; patch warning explicitly so older
+ # interpreters keep working too.
+ logger = _logging.getLogger(
+ "pypaimon.filesystem.hdfs_native_file_io")
+ with patch.object(logger, "warning") as warn:
+ self._make("hdfs://ns1/foo", {
+ "security.kerberos.login.principal": "user@REALM",
+ "security.kerberos.login.keytab": keytab_file.name,
+ })
+ warn.assert_not_called()
+
+ def test_unsupported_scheme_raises(self):
+ with self.assertRaises(ValueError):
+ self._make("s3://bucket/key")
+
+
+class HdfsNativeFileIOOpsTest(unittest.TestCase):
+
+ def setUp(self):
+ self._fake, self._client_cls, self._wo_cls =
_install_fake_hdfs_native()
+ self._mock_client = MagicMock(name="ClientInstance")
+ self._client_cls.return_value = self._mock_client
+ from pypaimon.filesystem.hdfs_native_file_io import HdfsNativeFileIO
+ self.fio = HdfsNativeFileIO("hdfs://ns/", Options({}))
+
+ def tearDown(self):
+ _uninstall_fake_hdfs_native()
+
+ def _file_status(self, path, isdir=False, length=0, mtime=0):
+ s = MagicMock()
+ s.path = path
+ s.isdir = isdir
+ s.length = length
+ s.modification_time = mtime
+ return s
+
+ def test_exists_true(self):
+ self._mock_client.get_file_info.return_value = self._file_status("/x")
+ self.assertTrue(self.fio.exists("/x"))
+
+ def test_exists_false(self):
+ self._mock_client.get_file_info.side_effect = FileNotFoundError("nope")
+ self.assertFalse(self.fio.exists("/missing"))
+
+ def test_get_file_status_adapts_to_pafs_filetype(self):
+ self._mock_client.get_file_info.return_value = self._file_status(
+ "/x", isdir=False, length=42, mtime=1700000000000,
+ )
+ info = self.fio.get_file_status("/x")
+ self.assertEqual(info.path, "/x")
+ self.assertEqual(info.size, 42)
+ self.assertEqual(info.type, pafs.FileType.File)
+ self.assertIsNotNone(info.mtime)
+
+ def test_list_status(self):
+ self._mock_client.list_status.return_value = iter([
+ self._file_status("/x/a", isdir=False, length=10),
+ self._file_status("/x/b", isdir=True),
+ ])
+ infos = self.fio.list_status("/x")
+ self.assertEqual(len(infos), 2)
+ self.assertEqual(infos[0].type, pafs.FileType.File)
+ self.assertEqual(infos[1].type, pafs.FileType.Directory)
+ self.assertIsNone(infos[1].size)
+
+ def test_delete_missing_returns_false(self):
+ self._mock_client.get_file_info.side_effect = FileNotFoundError("nope")
+ self.assertFalse(self.fio.delete("/missing"))
+ self._mock_client.delete.assert_not_called()
+
+ def test_delete_file(self):
+ self._mock_client.get_file_info.return_value = self._file_status("/x")
+ self._mock_client.delete.return_value = True
+ self.assertTrue(self.fio.delete("/x"))
+ self._mock_client.delete.assert_called_once_with("/x", False)
+
+ def test_delete_nonempty_dir_without_recursive_raises(self):
+ self._mock_client.get_file_info.return_value = self._file_status(
+ "/x", isdir=True)
+ self._mock_client.list_status.return_value = iter([
+ self._file_status("/x/a")])
+ with self.assertRaises(OSError):
+ self.fio.delete("/x", recursive=False)
+
+ def test_mkdirs_creates_when_missing(self):
+ self._mock_client.get_file_info.side_effect = FileNotFoundError("nope")
+ self.assertTrue(self.fio.mkdirs("/new"))
+ self._mock_client.mkdirs.assert_called_once_with(
+ "/new", create_parent=True)
+
+ def test_mkdirs_idempotent_on_existing_dir(self):
+ self._mock_client.get_file_info.return_value = self._file_status(
+ "/x", isdir=True)
+ self.assertTrue(self.fio.mkdirs("/x"))
+ self._mock_client.mkdirs.assert_not_called()
+
+ def test_mkdirs_existing_file_raises(self):
+ self._mock_client.get_file_info.return_value = self._file_status(
+ "/x", isdir=False)
+ with self.assertRaises(FileExistsError):
+ self.fio.mkdirs("/x")
+
+
+class HdfsNativeAdaptersTest(unittest.TestCase):
+
+ def setUp(self):
+ _install_fake_hdfs_native()
+
+ def tearDown(self):
+ _uninstall_fake_hdfs_native()
+
+ def test_writer_adapter_tracks_position_and_closes_once(self):
+ from pypaimon.filesystem.hdfs_native_file_io import _HdfsWriterAdapter
+ fw = MagicMock()
+ fw.write.side_effect = lambda buf: len(buf)
+ adapter = _HdfsWriterAdapter(fw)
+ adapter.write(b"abc")
+ adapter.write(b"defg")
+ self.assertEqual(adapter.tell(), 7)
+ adapter.close()
+ adapter.close() # idempotent
+ fw.close.assert_called_once()
+
+ def test_reader_adapter_seek_and_read(self):
+ from pypaimon.filesystem.hdfs_native_file_io import _HdfsReaderAdapter
+ fr = MagicMock()
+ fr.tell.side_effect = [20, 30]
+ fr.read.return_value = b"x" * 10
+ adapter = _HdfsReaderAdapter(fr)
+ self.assertEqual(adapter.seek(20), 20)
+ fr.seek.assert_called_once_with(20, 0)
+ data = adapter.read(10)
+ self.assertEqual(data, b"x" * 10)
+ fr.read.assert_called_once_with(10)
+ self.assertEqual(adapter.tell(), 30)
+
+ def test_reader_adapter_read_negative_reads_all(self):
+ from pypaimon.filesystem.hdfs_native_file_io import _HdfsReaderAdapter
+ fr = MagicMock()
+ fr.read.return_value = b"all-content"
+ adapter = _HdfsReaderAdapter(fr)
+ self.assertEqual(adapter.read(), b"all-content")
+ fr.read.assert_called_once_with(-1)
+
+ def test_reader_adapter_close_releases_underlying(self):
+ from pypaimon.filesystem.hdfs_native_file_io import _HdfsReaderAdapter
+ fr = MagicMock()
+ adapter = _HdfsReaderAdapter(fr)
+ adapter.close()
+ adapter.close() # idempotent
+ fr.close.assert_called_once()
+ self.assertTrue(adapter.closed)
+
+
+def _write_hadoop_xml(path, entries):
+ """Write a minimal Hadoop-style xml file at `path`."""
+ body = ['<?xml version="1.0"?>', "<configuration>"]
+ for name, value in entries.items():
+ body.append(
+ f" <property><name>{name}</name><value>{value}</value></property>"
+ )
+ body.append("</configuration>")
+ with open(path, "w") as f:
+ f.write("\n".join(body))
+
+
+class ViewFsFallbackTest(unittest.TestCase):
+ """Cover _load_hadoop_xml + _maybe_inject_viewfs_fallback polyfill."""
+
+ def setUp(self):
+ _install_fake_hdfs_native()
+ from pypaimon.filesystem.hdfs_native_file_io import HdfsNativeFileIO
+ self.Fio = HdfsNativeFileIO
+
+ def tearDown(self):
+ _uninstall_fake_hdfs_native()
+
+ def test_load_hadoop_xml_merges_two_files(self):
+ with tempfile.TemporaryDirectory() as d:
+ _write_hadoop_xml(os.path.join(d, "core-site.xml"),
+ {"fs.defaultFS": "viewfs://c1"})
+ _write_hadoop_xml(os.path.join(d, "hdfs-site.xml"),
+ {"dfs.nameservices": "ns1"})
+ cfg = self.Fio._load_hadoop_xml(d)
+ self.assertEqual(cfg.get("fs.defaultFS"), "viewfs://c1")
+ self.assertEqual(cfg.get("dfs.nameservices"), "ns1")
+
+ def test_load_hadoop_xml_missing_dir_returns_empty(self):
+ self.assertEqual(self.Fio._load_hadoop_xml(None), {})
+ self.assertEqual(self.Fio._load_hadoop_xml("/no/such/dir/xyz"), {})
+
+ def test_load_hadoop_xml_malformed_file_skipped(self):
+ with tempfile.TemporaryDirectory() as d:
+ with open(os.path.join(d, "core-site.xml"), "w") as f:
+ f.write("<not really xml")
+ _write_hadoop_xml(os.path.join(d, "hdfs-site.xml"),
+ {"dfs.nameservices": "ns1"})
+ cfg = self.Fio._load_hadoop_xml(d)
+ self.assertEqual(cfg, {"dfs.nameservices": "ns1"})
+
+ def test_fallback_from_existing_link_target(self):
+ overrides = {}
+ xml = {
+ "fs.viewfs.mounttable.c1.link./home": "hdfs://ns-prod/home",
+ "fs.viewfs.mounttable.c1.link./tmp": "hdfs://ns-tmp/tmp",
+ }
+ self.Fio._maybe_inject_viewfs_fallback("viewfs", "c1", overrides, xml)
+ self.assertEqual(
+ overrides.get("fs.viewfs.mounttable.c1.linkFallback"),
+ "hdfs://ns-prod/",
+ )
+
+ def test_fallback_from_nameservices_when_no_links(self):
+ overrides = {}
+ xml = {"dfs.nameservices": "nsA,nsB"}
+ self.Fio._maybe_inject_viewfs_fallback("viewfs", "c1", overrides, xml)
+ self.assertEqual(
+ overrides.get("fs.viewfs.mounttable.c1.linkFallback"),
+ "hdfs://nsA/",
+ )
+
+ def test_fallback_from_link_in_overrides_only(self):
+ # Zero-file viewfs setup: link.* arrives via catalog options
+ # (overrides), no hadoop xml present. The fallback must still be
+ # derived from the merged view.
+ overrides = {
+ "fs.viewfs.mounttable.c1.link./home": "hdfs://ns-prod/home",
+ }
+ xml = {}
+ self.Fio._maybe_inject_viewfs_fallback("viewfs", "c1", overrides, xml)
+ self.assertEqual(
+ overrides.get("fs.viewfs.mounttable.c1.linkFallback"),
+ "hdfs://ns-prod/",
+ )
+
+ def test_fallback_from_nameservices_in_overrides_only(self):
+ overrides = {"dfs.nameservices": "nsA,nsB"}
+ xml = {}
+ self.Fio._maybe_inject_viewfs_fallback("viewfs", "c1", overrides, xml)
+ self.assertEqual(
+ overrides.get("fs.viewfs.mounttable.c1.linkFallback"),
+ "hdfs://nsA/",
+ )
+
+ def test_fallback_already_in_xml_not_overridden(self):
+ overrides = {}
+ xml = {
+ "fs.viewfs.mounttable.c1.linkFallback": "hdfs://ns-existing/",
+ "dfs.nameservices": "nsA",
+ }
+ self.Fio._maybe_inject_viewfs_fallback("viewfs", "c1", overrides, xml)
+ self.assertNotIn("fs.viewfs.mounttable.c1.linkFallback", overrides)
+
+ def test_fallback_already_in_overrides_kept(self):
+ overrides = {"fs.viewfs.mounttable.c1.linkFallback": "hdfs://manual/"}
+ xml = {"dfs.nameservices": "nsA"}
+ self.Fio._maybe_inject_viewfs_fallback("viewfs", "c1", overrides, xml)
+ self.assertEqual(
+ overrides["fs.viewfs.mounttable.c1.linkFallback"],
+ "hdfs://manual/",
+ )
+
+ def test_hdfs_scheme_does_not_inject(self):
+ overrides = {}
+ xml = {"dfs.nameservices": "nsA"}
+ self.Fio._maybe_inject_viewfs_fallback("hdfs", "ns1", overrides, xml)
+ self.assertEqual(overrides, {})
+
+ def test_no_signal_no_inject(self):
+ overrides = {}
+ xml = {"unrelated.key": "v"}
+ self.Fio._maybe_inject_viewfs_fallback("viewfs", "c1", overrides, xml)
+ self.assertEqual(overrides, {})
+
+ def test_init_auto_injects_for_viewfs_uri(self):
+ with tempfile.TemporaryDirectory() as d:
+ _write_hadoop_xml(
+ os.path.join(d, "hdfs-site.xml"),
+ {
+ "dfs.nameservices": "ns1",
+ "fs.viewfs.mounttable.hadoop-lt-cluster.link./home":
+ "hdfs://ns1/home",
+ },
+ )
+ opts = Options({"hdfs.conf-dir": d})
+ self.Fio("viewfs://hadoop-lt-cluster/home/x", opts)
+ client_cls = sys.modules["hdfs_native"].Client
+ _, kwargs = client_cls.call_args
+ config = kwargs.get("config", {})
+ self.assertEqual(
+ config.get("fs.viewfs.mounttable.hadoop-lt-cluster.linkFallback"),
+ "hdfs://ns1/",
+ )
+
+
+class ToFilesystemPathTest(unittest.TestCase):
+ """Cover URI -> absolute-path normalisation for hdfs-native."""
+
+ def setUp(self):
+ _install_fake_hdfs_native()
+
+ def tearDown(self):
+ _uninstall_fake_hdfs_native()
+
+ def _make(self, root):
+ from pypaimon.filesystem.hdfs_native_file_io import HdfsNativeFileIO
+ return HdfsNativeFileIO(root, Options({}))
+
+ def test_viewfs_uri_same_cluster_returns_path(self):
+ fio = self._make("viewfs://cluster1/")
+ self.assertEqual(
+ fio.to_filesystem_path("viewfs://cluster1/home/hudi/x"),
+ "/home/hudi/x",
+ )
+
+ def test_viewfs_uri_no_path_returns_root(self):
+ fio = self._make("viewfs://cluster1/")
+ self.assertEqual(fio.to_filesystem_path("viewfs://cluster1"), "/")
+
+ def test_viewfs_absolute_path_unchanged(self):
+ fio = self._make("viewfs://cluster1/")
+ self.assertEqual(fio.to_filesystem_path("/foo/bar"), "/foo/bar")
+
+ def test_hdfs_uri_same_ns_returns_path(self):
+ fio = self._make("hdfs://ns1/")
+ self.assertEqual(
+ fio.to_filesystem_path("hdfs://ns1/foo/bar"),
+ "/foo/bar",
+ )
+
+ def test_hdfs_uri_different_ns_unchanged(self):
+ fio = self._make("hdfs://ns1/")
+ self.assertEqual(
+ fio.to_filesystem_path("hdfs://nsX/foo"),
+ "hdfs://nsX/foo",
+ )
+
+ def test_hdfs_client_with_viewfs_uri_unchanged(self):
+ fio = self._make("hdfs://ns1/")
+ # Different scheme; let hdfs-native error rather than silently rewrite.
+ self.assertEqual(
+ fio.to_filesystem_path("viewfs://cluster1/foo"),
+ "viewfs://cluster1/foo",
+ )
+
+ def test_exists_passes_path_only_to_client(self):
+ from pypaimon.filesystem.hdfs_native_file_io import HdfsNativeFileIO
+ fio = HdfsNativeFileIO("viewfs://cluster1/", Options({}))
+ client = sys.modules["hdfs_native"].Client.return_value
+ client.get_file_info.return_value = MagicMock(
+ path="/home/hudi/x", isdir=False, length=0, modification_time=0)
+ fio.exists("viewfs://cluster1/home/hudi/x")
+ client.get_file_info.assert_called_once_with("/home/hudi/x")
+
+
+class FilesystemPropertyTest(unittest.TestCase):
+ """Cover the lazy pyarrow.fs facade backed by hdfs_native.fsspec."""
+
+ def setUp(self):
+ _install_fake_hdfs_native()
+ self._patcher = patch("pyarrow.fs.PyFileSystem")
+ self._handler_patcher = patch("pyarrow.fs.FSSpecHandler")
+ self.MockPyFs = self._patcher.start()
+ self.MockHandler = self._handler_patcher.start()
+
+ def tearDown(self):
+ self._patcher.stop()
+ self._handler_patcher.stop()
+ _uninstall_fake_hdfs_native()
+
+ def _make(self, root, props=None, xml_entries=None):
+ from pypaimon.filesystem.hdfs_native_file_io import HdfsNativeFileIO
+ if xml_entries:
+ d = tempfile.mkdtemp()
+ self.addCleanup(lambda: __import__("shutil").rmtree(d,
ignore_errors=True))
+ _write_hadoop_xml(os.path.join(d, "hdfs-site.xml"), xml_entries)
+ base_props = {"hdfs.conf-dir": d}
+ base_props.update(props or {})
+ props = base_props
+ return HdfsNativeFileIO(root, Options(props or {}))
+
+ def test_viewfs_uses_viewfs_fsspec_class(self):
+ fio = self._make("viewfs://cluster1/")
+ fs_instance = fio.filesystem # trigger lazy
+ VFs = sys.modules["hdfs_native.fsspec"].ViewfsFileSystem
+ HFs = sys.modules["hdfs_native.fsspec"].HdfsFileSystem
+ VFs.assert_called_once()
+ HFs.assert_not_called()
+ _, kwargs = VFs.call_args
+ self.assertEqual(kwargs.get("host"), "cluster1")
+ self.assertIs(fs_instance, self.MockPyFs.return_value)
+
+ def test_hdfs_uses_hdfs_fsspec_class(self):
+ self._make("hdfs://ns1/").filesystem
+ HFs = sys.modules["hdfs_native.fsspec"].HdfsFileSystem
+ VFs = sys.modules["hdfs_native.fsspec"].ViewfsFileSystem
+ HFs.assert_called_once()
+ VFs.assert_not_called()
+ _, kwargs = HFs.call_args
+ self.assertEqual(kwargs.get("host"), "ns1")
+
+ def test_lazy_caches_after_first_access(self):
+ fio = self._make("hdfs://ns1/")
+ first = fio.filesystem
+ second = fio.filesystem
+ self.assertIs(first, second)
+ HFs = sys.modules["hdfs_native.fsspec"].HdfsFileSystem
+ self.assertEqual(HFs.call_count, 1)
+
+ def test_xml_and_catalog_options_merged_into_fsspec_storage_options(self):
+ fio = self._make(
+ "hdfs://ns1/",
+ props={"dfs.client.read.shortcircuit": "true"},
+ xml_entries={"dfs.nameservices": "ns1"},
+ )
+ fio.filesystem # trigger
+ HFs = sys.modules["hdfs_native.fsspec"].HdfsFileSystem
+ _, kwargs = HFs.call_args
+ # Both xml and option keys should land in the fsspec kwargs.
+ self.assertEqual(kwargs.get("dfs.nameservices"), "ns1")
+ self.assertEqual(kwargs.get("dfs.client.read.shortcircuit"), "true")
+
+ def test_catalog_option_overrides_xml(self):
+ fio = self._make(
+ "hdfs://ns1/",
+ props={"dfs.foo": "v_user"},
+ xml_entries={"dfs.foo": "v_xml"},
+ )
+ fio.filesystem
+ HFs = sys.modules["hdfs_native.fsspec"].HdfsFileSystem
+ _, kwargs = HFs.call_args
+ self.assertEqual(kwargs.get("dfs.foo"), "v_user")
+
+ def test_missing_fsspec_raises_clear_error(self):
+ fio = self._make("hdfs://ns1/")
+ # Remove the fsspec submodule but keep hdfs_native itself, to
+ # simulate an old/partial install.
+ sys.modules.pop("hdfs_native.fsspec", None)
+ sys.modules["hdfs_native"].fsspec = None
+ with self.assertRaises(RuntimeError) as ctx:
+ fio.filesystem
+ self.assertIn("hdfs-native fsspec adapter", str(ctx.exception))
+
+
+class PickleTest(unittest.TestCase):
+ """Cover __reduce__ so Ray / multiprocessing can ship FileIO."""
+
+ def setUp(self):
+ _install_fake_hdfs_native()
+ # Isolate from any HADOOP_CONF_DIR on the host so __reduce__'s
+ # env-derived config_dir pinning is deterministic across machines.
+ self._env_patcher = patch.dict(os.environ, {}, clear=True)
+ self._env_patcher.start()
+
+ def tearDown(self):
+ self._env_patcher.stop()
+ _uninstall_fake_hdfs_native()
+
+ def _make(self, path, props=None):
+ from pypaimon.filesystem.hdfs_native_file_io import HdfsNativeFileIO
+ return HdfsNativeFileIO(path, Options(props or {}))
+
+ def test_reduce_returns_class_and_args(self):
+ from pypaimon.filesystem.hdfs_native_file_io import HdfsNativeFileIO
+ fio = self._make("viewfs://cluster1/some/sub/path",
+ {"dfs.nameservices": "ns1"})
+ cls, args = fio.__reduce__()
+ self.assertIs(cls, HdfsNativeFileIO)
+ path, options = args
+ # Path is rebuilt from scheme+netloc (path segment dropped) — that
+ # is intentional because __init__ ignores path beyond scheme+netloc.
+ self.assertEqual(path, "viewfs://cluster1")
+ self.assertEqual(options.to_map(), {"dfs.nameservices": "ns1"})
+
+ def test_reduce_for_empty_netloc(self):
+ fio = self._make("hdfs://")
+ _, (path, _) = fio.__reduce__()
+ self.assertEqual(path, "hdfs://")
+
+ def test_reduce_pins_env_resolved_config_dir_into_options(self):
+ # config_dir resolved from $HADOOP_CONF_DIR should be carried into
+ # the pickled options so a worker on a host with a different env
+ # value still uses the driver's resolved directory.
+ with tempfile.TemporaryDirectory() as d:
+ with patch.dict(os.environ, {"HADOOP_CONF_DIR": d}, clear=True):
+ fio = self._make("hdfs://ns1/foo")
+ _, (_, options) = fio.__reduce__()
+ self.assertEqual(options.to_map().get("hdfs.conf-dir"), d)
+
+ def test_reduce_does_not_override_explicit_conf_dir_option(self):
+ with tempfile.TemporaryDirectory() as opt_dir:
+ with patch.dict(os.environ,
+ {"HADOOP_CONF_DIR": "/env/dir"}, clear=True):
+ fio = self._make("hdfs://ns1/foo",
+ {"hdfs.conf-dir": opt_dir})
+ _, (_, options) = fio.__reduce__()
+ self.assertEqual(options.to_map().get("hdfs.conf-dir"), opt_dir)
+
+ def test_pickle_roundtrip_preserves_type_and_options(self):
+ import pickle
+ fio = self._make("hdfs://ns1/foo",
+ {"dfs.foo": "bar", "fs.viewfs.x": "y"})
+ client_cls = sys.modules["hdfs_native"].Client
+ client_cls.reset_mock()
+ # Roundtrip via the highest pickle protocol.
+ blob = pickle.dumps(fio, protocol=pickle.HIGHEST_PROTOCOL)
+ restored = pickle.loads(blob)
+ from pypaimon.filesystem.hdfs_native_file_io import HdfsNativeFileIO
+ self.assertIsInstance(restored, HdfsNativeFileIO)
+ self.assertEqual(restored.properties.to_map(),
+ {"dfs.foo": "bar", "fs.viewfs.x": "y"})
+ # The original __init__ ran once; the unpickle ran __init__ again.
+ self.assertEqual(client_cls.call_count, 1)
+
+ def test_pickle_with_viewfs_scheme(self):
+ import pickle
+ fio = self._make("viewfs://cluster1/")
+ restored = pickle.loads(pickle.dumps(fio))
+ self.assertEqual(restored._scheme, "viewfs")
+ self.assertEqual(restored._netloc, "cluster1")
+
+ def test_pickle_does_not_serialise_live_client(self):
+ # If the live _client were pickled, the call would fail (MagicMocks
+ # are picklable but the real RawClient would not be). This test
+ # documents the contract: __reduce__ MUST sidestep _client.
+ import pickle
+ fio = self._make("hdfs://ns1/")
+ blob = pickle.dumps(fio)
+ # The pickled blob should reference the constructor inputs only;
+ # specifically it should not embed the literal mock _client.
+ self.assertNotIn(b"_client", blob)
+
+
+class HdfsNativeWriteFormatTest(unittest.TestCase):
+ """HdfsNativeFileIO is the default hdfs:// backend, so it must keep the
+ same write surface as the pyarrow backend it replaces — including
+ lance/vortex, which otherwise fall through to FileIO's NotImplementedError.
+ """
+
+ def setUp(self):
+ self._fake, self._client_cls, _ = _install_fake_hdfs_native()
+ self._client_cls.return_value = MagicMock(name="ClientInstance")
+ from pypaimon.filesystem.hdfs_native_file_io import HdfsNativeFileIO
+ self.fio = HdfsNativeFileIO("hdfs://ns/", Options({}))
+
+ def tearDown(self):
+ _uninstall_fake_hdfs_native()
+
+ def test_lance_and_vortex_are_overridden(self):
+ # The regression this guards: with these unimplemented, an HDFS table
+ # using file.format=lance/vortex would hit FileIO's
NotImplementedError.
+ from pypaimon.common.file_io import FileIO
+ self.assertIsNot(
+ type(self.fio).write_lance, FileIO.write_lance)
+ self.assertIsNot(
+ type(self.fio).write_vortex, FileIO.write_vortex)
+
+ def test_write_lance_delegates_to_lance_specified(self):
+ import pyarrow
+ table = pyarrow.table({"a": [1, 2]})
+ writer = MagicMock(name="LanceFileWriter")
+ fake_lance = types.ModuleType("lance")
+ fake_lance.file = types.SimpleNamespace(
+ LanceFileWriter=MagicMock(return_value=writer))
+ with patch.dict(sys.modules, {"lance": fake_lance}), \
+ patch("pypaimon.read.reader.lance_utils.to_lance_specified",
+ return_value=("hdfs://ns/x.lance", {"opt": "v"})) as
spec:
+ self.fio.write_lance("hdfs://ns/x.lance", table)
+ spec.assert_called_once()
+ _, kwargs = fake_lance.file.LanceFileWriter.call_args
+ self.assertEqual(kwargs.get("storage_options"), {"opt": "v"})
+ writer.close.assert_called_once()
+
+ def test_write_vortex_delegates_to_vortex_specified(self):
+ import pyarrow
+ table = pyarrow.table({"a": [1, 2]})
+ fake_vortex = types.ModuleType("vortex")
+ fake_vortex.array = MagicMock(return_value="varr")
+ fake_vortex.store = types.SimpleNamespace(from_url=MagicMock())
+ fake_io = types.ModuleType("vortex._lib.io")
+ fake_io.write = MagicMock()
+ fake_lib = types.ModuleType("vortex._lib")
+ fake_lib.io = fake_io
+ fake_modules = {
+ "vortex": fake_vortex,
+ "vortex._lib": fake_lib,
+ "vortex._lib.io": fake_io,
+ }
+ with patch.dict(sys.modules, fake_modules), \
+ patch("pypaimon.read.reader.vortex_utils.to_vortex_specified",
+ return_value=("hdfs://ns/x.vortex", None)):
+ self.fio.write_vortex("hdfs://ns/x.vortex", table)
+ fake_io.write.assert_called_once_with("varr", "hdfs://ns/x.vortex")
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/paimon-python/pypaimon/tests/kerberos_test.py
b/paimon-python/pypaimon/tests/kerberos_test.py
index 06d98d90d6..722043d778 100644
--- a/paimon-python/pypaimon/tests/kerberos_test.py
+++ b/paimon-python/pypaimon/tests/kerberos_test.py
@@ -235,7 +235,7 @@ class KerberosHdfsTest(unittest.TestCase):
@patch("pypaimon.filesystem.pyarrow_file_io.subprocess.run")
@patch("pypaimon.filesystem.pyarrow_file_io.pafs.HadoopFileSystem")
def test_hdfs_with_fallback_keys(self, mock_hdfs_fs, mock_subprocess_run):
- """Verify that Java-compatible fallback keys security.principal /
security.keytab work."""
+ """Verify that the secondary fallback keys security.principal /
security.keytab work."""
mock_subprocess_run.return_value = MagicMock(stdout="/some/classpath")
with tempfile.NamedTemporaryFile(suffix=".keytab") as keytab_file:
diff --git a/paimon-python/setup.py b/paimon-python/setup.py
index aaba800fec..7355e25bf9 100644
--- a/paimon-python/setup.py
+++ b/paimon-python/setup.py
@@ -181,6 +181,9 @@ setup(
'pypaimon-rust; python_version>="3.10"',
'datafusion>=52; python_version>="3.10"',
],
+ 'hdfs': [
+ 'hdfs-native>=0.13,<1; python_version >= "3.10" and
platform_system != "Windows"',
+ ],
},
description="Apache Paimon Python API",
long_description=long_description,