TheR1sing3un commented on code in PR #8031:
URL: https://github.com/apache/paimon/pull/8031#discussion_r3340092839


##########
paimon-python/pypaimon/filesystem/hdfs_native_file_io.py:
##########
@@ -0,0 +1,638 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""HDFS FileIO backed by the hdfs-native protocol client (no JVM, no 
libhdfs)."""
+
+import logging
+import os
+import xml.etree.ElementTree as ET
+from datetime import datetime, timezone
+from pathlib import PurePosixPath
+from typing import Dict, Optional
+from urllib.parse import urlparse
+
+import pyarrow
+import pyarrow.fs as pafs
+
+from pypaimon.common.file_io import FileIO
+from pypaimon.common.options import Options
+from pypaimon.common.options.config import HdfsOptions, SecurityOptions
+from pypaimon.common.uri_reader import UriReaderFactory
+from pypaimon.filesystem import _kerberos
+from pypaimon.schema.data_types import AtomicType, DataField, 
PyarrowFieldParser
+from pypaimon.write.blob_format_writer import BlobFormatWriter
+
+
+class _HdfsFileInfo:
+    """pafs.FileInfo-shaped adapter built from hdfs_native.FileStatus."""
+    __slots__ = ('path', 'size', 'type', 'mtime', 'base_name')
+
+    def __init__(self, path: str, size: Optional[int], file_type, mtime):
+        self.path = path
+        self.size = size
+        self.type = file_type
+        self.mtime = mtime
+        self.base_name = path.rsplit('/', 1)[-1] if path else ''
+
+
+class _HdfsWriterAdapter:
+    """File-like wrapper over hdfs_native.FileWriter."""
+
+    def __init__(self, fw):
+        self._fw = fw
+        self._pos = 0
+        self._closed = False
+
+    def write(self, buf) -> int:
+        n = self._fw.write(buf)
+        if n is None:
+            n = len(buf) if hasattr(buf, '__len__') else 0
+        self._pos += n
+        return n
+
+    def tell(self) -> int:
+        return self._pos
+
+    def flush(self):
+        pass
+
+    def close(self):
+        if not self._closed:
+            try:
+                self._fw.close()
+            finally:
+                self._closed = True
+
+    @property
+    def closed(self) -> bool:
+        return self._closed
+
+    def writable(self) -> bool:
+        return True
+
+    def readable(self) -> bool:
+        return False
+
+    def seekable(self) -> bool:
+        return False
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, *exc):
+        self.close()
+        return False
+
+
+class _HdfsReaderAdapter:
+    """File-like wrapper over hdfs_native.FileReader.
+
+    Delegates read/seek/tell straight to the underlying reader (which is an
+    io.RawIOBase subclass with full seek/tell support). The wrapper only
+    exists so that exiting a `with` block guarantees the underlying handle
+    is closed — hdfs-native's own FileReader.__exit__ is a no-op.
+    """
+
+    def __init__(self, fr):
+        self._fr = fr
+        self._closed = False
+
+    def read(self, size: int = -1) -> bytes:
+        return self._fr.read(-1 if size is None else size)
+
+    def read1(self, size: int = -1) -> bytes:
+        return self.read(size)
+
+    def seek(self, pos: int, whence: int = 0) -> int:
+        self._fr.seek(pos, whence)
+        return self._fr.tell()
+
+    def tell(self) -> int:
+        return self._fr.tell()
+
+    def close(self):
+        if self._closed:
+            return
+        try:
+            close = getattr(self._fr, 'close', None)
+            if close is not None:
+                close()
+        finally:
+            self._closed = True
+
+    @property
+    def closed(self) -> bool:
+        return self._closed
+
+    def readable(self) -> bool:
+        return True
+
+    def writable(self) -> bool:
+        return False
+
+    def seekable(self) -> bool:
+        return True
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, *exc):
+        self.close()
+        return False
+
+
+class HdfsNativeFileIO(FileIO):
+    """HDFS FileIO that speaks the HDFS RPC protocol directly.
+
+    No JVM, no libhdfs, no Hadoop install required. Hadoop xml is still
+    consumed if present (HADOOP_CONF_DIR or `hdfs.conf-dir` option) for
+    viewfs mount tables and HA NameNode lists; alternatively the same
+    key/values can be delivered via the catalog options channel (a REST
+    catalog can therefore push the cluster wiring with the response).
+    """
+
+    NATIVE_KEY_PREFIXES = HdfsOptions.HDFS_NATIVE_CONFIG_KEY_PREFIXES
+    NS_PREFIX = HdfsOptions.HDFS_CONFIG_PREFIX
+
+    def __init__(self, path: str, catalog_options: Options):
+        self.properties = catalog_options or Options({})
+        self.logger = logging.getLogger(__name__)
+        self.uri_reader_factory = UriReaderFactory(self.properties)
+
+        scheme, netloc, _ = self.parse_location(path)
+        if scheme not in {"hdfs", "viewfs"}:
+            raise ValueError(
+                f"HdfsNativeFileIO does not support scheme '{scheme}'"
+            )
+        self._scheme = scheme
+        self._netloc = netloc
+
+        try:
+            from hdfs_native import Client, WriteOptions
+        except ImportError as e:
+            raise ImportError(
+                "hdfs-native is not installed. "
+                "Install with: pip install 'pypaimon[hdfs]'"
+            ) from e
+        self._WriteOptions = WriteOptions
+
+        self._setup_kerberos()
+
+        config_dir = (
+            self.properties.get(HdfsOptions.HDFS_CONF_DIR)
+            or os.environ.get("HADOOP_CONF_DIR")
+        )
+        hadoop_xml = self._load_hadoop_xml(config_dir)
+
+        config = self._build_config_dict()
+        self._maybe_inject_viewfs_fallback(scheme, netloc, config, hadoop_xml)
+
+        # Stash for the lazy `filesystem` property (the fsspec/pyarrow facade
+        # is only built if a caller asks for it).
+        self._config = config
+        self._hadoop_xml = hadoop_xml
+        self._config_dir = config_dir
+        self._filesystem = None
+
+        client_kwargs = {}
+        url = self._build_url(scheme, netloc)
+        if url:
+            client_kwargs["url"] = url
+        if config:
+            client_kwargs["config"] = config
+        if config_dir:
+            client_kwargs["config_dir"] = config_dir
+
+        self._client = Client(**client_kwargs)
+
+    def __reduce__(self):
+        """Pickle support for Ray / multiprocessing.
+
+        hdfs_native.Client is a Rust binding that can't be pickled; rather
+        than try to serialise live handles, we serialise the constructor
+        inputs and let workers re-init their own Client. Same pattern
+        pyarrow.fs.HadoopFileSystem uses.
+
+        Pin the resolved config_dir into the carried options. If the
+        driver resolved it from $HADOOP_CONF_DIR, a worker on a host with
+        a different env var would otherwise pick up the worker's value
+        and silently talk to a different cluster.
+        """
+        netloc = self._netloc or ""
+        path = f"{self._scheme}://{netloc}"
+        props_map = dict(self.properties.to_map())
+        if self._config_dir and not props_map.get(
+            HdfsOptions.HDFS_CONF_DIR.key()
+        ):
+            props_map[HdfsOptions.HDFS_CONF_DIR.key()] = self._config_dir
+        return (type(self), (path, Options(props_map)))
+
+    @property
+    def filesystem(self):
+        """pyarrow.fs.FileSystem facade backed by hdfs_native.fsspec.
+
+        Lazily constructed: FileIO-only call paths
+        (exists/list_status/new_input_stream/...) never pay the fsspec init
+        cost; only ds.dataset / open_input_file callers do.
+        """
+        if self._filesystem is None:
+            import pyarrow.fs as pafs
+            try:
+                from hdfs_native.fsspec import (
+                    HdfsFileSystem,
+                    ViewfsFileSystem,
+                )
+            except ImportError as e:
+                raise RuntimeError(
+                    "hdfs-native fsspec adapter is required to bridge "
+                    "HdfsNativeFileIO to a pyarrow.fs filesystem; upgrade "
+                    "hdfs-native (>=0.13)."
+                ) from e
+            cls = (ViewfsFileSystem if self._scheme == "viewfs"
+                   else HdfsFileSystem)
+            # Merge xml + overrides so the fsspec instance can connect
+            # without relying on HADOOP_CONF_DIR (BaseFileSystem.__init__
+            # only forwards storage_options to Client, not config_dir).
+            merged_config = {**self._hadoop_xml, **self._config}
+            fsspec_fs = cls(host=self._netloc, **merged_config)
+            self._filesystem = pafs.PyFileSystem(
+                pafs.FSSpecHandler(fsspec_fs))
+        return self._filesystem
+
+    @staticmethod
+    def parse_location(location: str):
+        uri = urlparse(location)
+        if not uri.scheme:
+            return "file", uri.netloc, os.path.abspath(location)
+        return uri.scheme, uri.netloc, uri.path
+
+    @staticmethod
+    def _build_url(scheme: str, netloc: Optional[str]) -> Optional[str]:
+        if not netloc:
+            return None
+        return f"{scheme}://{netloc}"
+
+    @staticmethod
+    def _load_hadoop_xml(config_dir: Optional[str]) -> Dict[str, str]:
+        """Parse core-site.xml + hdfs-site.xml from a Hadoop config dir into a
+        flat {name: value} dict. Returns empty dict if the dir is missing or
+        unreadable.
+
+        Used only to discover viewfs mount-table state so we can polyfill the
+        linkFallback mount that hdfs-native requires but libhdfs tolerates.
+        The final config dir is still handed to hdfs-native for its own
+        (more complete) xml parsing.
+        """
+        result: Dict[str, str] = {}
+        if not config_dir or not os.path.isdir(config_dir):
+            return result
+        for fname in ("core-site.xml", "hdfs-site.xml"):
+            path = os.path.join(config_dir, fname)
+            if not os.path.isfile(path):
+                continue
+            try:
+                tree = ET.parse(path)
+            except (ET.ParseError, OSError):
+                continue
+            for prop in tree.getroot().findall("property"):
+                name_el = prop.find("name")
+                value_el = prop.find("value")
+                if name_el is None or name_el.text is None:
+                    continue
+                value = (
+                    value_el.text.strip()
+                    if value_el is not None and value_el.text
+                    else ""
+                )
+                result[name_el.text.strip()] = value
+        return result
+
+    @staticmethod
+    def _maybe_inject_viewfs_fallback(
+        scheme: str,
+        netloc: Optional[str],
+        overrides: Dict[str, str],
+        hadoop_xml: Dict[str, str],
+    ) -> None:
+        """If we're opening a viewfs URI and no linkFallback is configured for
+        the cluster, pick a usable nameservice URI from existing link.*
+        targets or dfs.nameservices and inject one into `overrides`.
+
+        hdfs-native rejects viewfs init without a fallback mount; libhdfs
+        tolerates it. This bridges the gap without touching cluster xml.
+        """
+        if scheme != "viewfs" or not netloc:
+            return
+        cluster = netloc
+        fallback_key = f"fs.viewfs.mounttable.{cluster}.linkFallback"
+        if fallback_key in overrides or fallback_key in hadoop_xml:
+            return
+
+        link_prefix = f"fs.viewfs.mounttable.{cluster}.link."
+        for key, value in hadoop_xml.items():
+            if key.startswith(link_prefix) and value:
+                parsed = urlparse(value)
+                if parsed.scheme == "hdfs" and parsed.netloc:
+                    overrides[fallback_key] = f"hdfs://{parsed.netloc}/"
+                    return
+
+        nameservices = [
+            ns.strip()
+            for ns in hadoop_xml.get("dfs.nameservices", "").split(",")
+            if ns.strip()
+        ]
+        if nameservices:
+            overrides[fallback_key] = f"hdfs://{nameservices[0]}/"
+
+    def _setup_kerberos(self):
+        principal = (
+            self.properties.get(SecurityOptions.KERBEROS_PRINCIPAL)
+            or self.properties.to_map().get("security.principal")
+        )
+        keytab = (
+            self.properties.get(SecurityOptions.KERBEROS_KEYTAB)
+            or self.properties.to_map().get("security.keytab")
+        )
+        if bool(principal) != bool(keytab):
+            raise ValueError(
+                "security.kerberos.login.principal and "
+                "security.kerberos.login.keytab "
+                "must be both set or both unset"
+            )
+        if principal and keytab:
+            _kerberos.kerberos_login_from_keytab(principal, keytab)
+            cache_path = _kerberos.get_ticket_cache_path()
+            if not cache_path:
+                raise RuntimeError(
+                    "kinit succeeded but no ticket cache path could be "
+                    "determined. Set the KRB5CCNAME environment variable "
+                    "to specify the cache location."
+                )
+            # hdfs-native's GSSAPI layer reads KRB5CCNAME from the process
+            # env, which is global state. If a different cache was already
+            # configured (typically because another HdfsNativeFileIO with
+            # a different principal lives in the same process), warn — the
+            # last writer wins and earlier instances will start using the
+            # new ticket, which is almost certainly not what the caller
+            # wanted.
+            existing = os.environ.get("KRB5CCNAME")
+            existing_stripped = (
+                existing[5:] if existing and existing.startswith("FILE:")
+                else existing
+            )
+            if existing_stripped and existing_stripped != cache_path:
+                self.logger.warning(
+                    "Overwriting process-global KRB5CCNAME from %r to %r; "
+                    "concurrent HdfsNativeFileIO instances with different "
+                    "Kerberos principals share env state and will clobber "
+                    "each other's ticket caches.",
+                    existing, cache_path,
+                )
+            # Preserve the `FILE:` qualifier if the existing value carried
+            # it — some GSSAPI tooling distinguishes cache types by prefix.
+            os.environ["KRB5CCNAME"] = (
+                f"FILE:{cache_path}"
+                if existing and existing.startswith("FILE:")
+                else cache_path
+            )
+
+    def _build_config_dict(self) -> Dict[str, str]:
+        config: Dict[str, str] = {}
+        for key, value in self.properties.to_map().items():
+            if value is None:
+                continue
+            if any(key.startswith(p) for p in self.NATIVE_KEY_PREFIXES):
+                config[key] = str(value)
+            elif key.startswith(self.NS_PREFIX):
+                config[key[len(self.NS_PREFIX):]] = str(value)
+        return config
+
+    def to_filesystem_path(self, path: str) -> str:
+        # hdfs-native expects an absolute path within the cluster the Client is
+        # bound to; passing a full URI makes its Rust-side MountTable::resolve
+        # treat the string as a relative path (since it doesn't start with '/')
+        # and prepend the user's home dir, producing nonsense like
+        # `/user/foo/viewfs://cluster/...`. Strip the matching scheme+authority
+        # so a plain absolute path reaches the client.
+        parsed = urlparse(path)
+        if parsed.scheme in ("hdfs", "viewfs"):
+            if parsed.scheme == self._scheme and (
+                not parsed.netloc or parsed.netloc == self._netloc
+            ):
+                return parsed.path or "/"
+        return path
+
+    def _adapt_status(self, status, fallback_path: str = '') -> _HdfsFileInfo:
+        path = getattr(status, 'path', None) or fallback_path
+        is_dir = bool(getattr(status, 'isdir', False))
+        length = getattr(status, 'length', 0)
+        mtime_ms = getattr(status, 'modification_time', None)
+        mtime = (
+            datetime.fromtimestamp(mtime_ms / 1000.0, tz=timezone.utc)
+            if mtime_ms else None
+        )
+        size = None if is_dir else int(length or 0)
+        ftype = pafs.FileType.Directory if is_dir else pafs.FileType.File
+        return _HdfsFileInfo(path, size, ftype, mtime)
+
+    def new_input_stream(self, path: str):
+        path_str = self.to_filesystem_path(path)
+        reader = self._client.read(path_str)
+        return _HdfsReaderAdapter(reader)
+
+    def new_output_stream(self, path: str):
+        path_str = self.to_filesystem_path(path)
+        writer = self._client.create(
+            path_str,
+            self._WriteOptions(create_parent=True, overwrite=True),
+        )
+        return _HdfsWriterAdapter(writer)
+
+    def get_file_status(self, path: str):
+        path_str = self.to_filesystem_path(path)
+        try:
+            status = self._client.get_file_info(path_str)
+        except FileNotFoundError:
+            raise FileNotFoundError(f"File {path} does not exist")
+        return self._adapt_status(status, path_str)
+
+    def list_status(self, path: str):
+        path_str = self.to_filesystem_path(path)
+        return [self._adapt_status(s) for s in 
self._client.list_status(path_str)]
+
+    def exists(self, path: str) -> bool:
+        path_str = self.to_filesystem_path(path)
+        try:
+            self._client.get_file_info(path_str)
+            return True
+        except FileNotFoundError:
+            return False
+
+    def delete(self, path: str, recursive: bool = False) -> bool:
+        path_str = self.to_filesystem_path(path)
+        try:
+            status = self._client.get_file_info(path_str)
+        except FileNotFoundError:
+            return False
+        if bool(getattr(status, 'isdir', False)) and not recursive:
+            if next(iter(self._client.list_status(path_str)), None) is not 
None:
+                raise OSError(f"Directory {path} is not empty")
+        return bool(self._client.delete(path_str, recursive))
+
+    def mkdirs(self, path: str) -> bool:
+        path_str = self.to_filesystem_path(path)
+        try:
+            status = self._client.get_file_info(path_str)
+        except FileNotFoundError:
+            self._client.mkdirs(path_str, create_parent=True)
+            return True
+        if bool(getattr(status, 'isdir', False)):
+            return True
+        raise FileExistsError(f"Path exists but is not a directory: {path}")
+
+    def rename(self, src: str, dst: str) -> bool:
+        src_str = self.to_filesystem_path(src)
+        dst_str = self.to_filesystem_path(dst)
+        dst_parent = str(PurePosixPath(dst_str).parent)
+        if dst_parent and dst_parent != '.':
+            try:
+                self._client.get_file_info(dst_parent)
+            except FileNotFoundError:
+                self._client.mkdirs(dst_parent, create_parent=True)
+        try:
+            dst_status = self._client.get_file_info(dst_str)
+            if not getattr(dst_status, 'isdir', False):
+                return False
+            src_name = PurePosixPath(src_str).name
+            dst_str = str(PurePosixPath(dst_str) / src_name)
+            try:
+                self._client.get_file_info(dst_str)
+                return False
+            except FileNotFoundError:
+                pass
+        except FileNotFoundError:
+            pass
+        try:
+            self._client.rename(src_str, dst_str)
+            return True
+        except FileNotFoundError:
+            return False
+        except (PermissionError, OSError):
+            return False
+
+    def write_parquet(self, path: str, data: pyarrow.Table,
+                      compression: str = 'zstd', zstd_level: int = 1, 
**kwargs):
+        try:
+            import pyarrow.parquet as pq
+            if compression.lower() == 'zstd':
+                kwargs['compression_level'] = zstd_level
+            with self.new_output_stream(path) as raw_stream:
+                stream = pyarrow.PythonFile(raw_stream, mode='wb')
+                try:
+                    pq.write_table(
+                        data, stream, compression=compression, **kwargs)
+                finally:
+                    stream.close()
+        except Exception as e:
+            self.delete_quietly(path)
+            raise RuntimeError(f"Failed to write Parquet file {path}: {e}") 
from e
+
+    def write_orc(self, path: str, data: pyarrow.Table,
+                  compression: str = 'zstd', zstd_level: int = 1, **kwargs):
+        try:
+            import sys
+            import pyarrow.orc as orc
+            data = self._cast_time_columns_for_orc(data)
+            with self.new_output_stream(path) as raw_stream:
+                stream = pyarrow.PythonFile(raw_stream, mode='wb')
+                try:
+                    if sys.version_info[:2] == (3, 6):
+                        orc.write_table(data, stream, **kwargs)
+                    else:
+                        orc.write_table(
+                            data, stream, compression=compression, **kwargs)
+                finally:
+                    stream.close()
+        except Exception as e:
+            self.delete_quietly(path)
+            raise RuntimeError(f"Failed to write ORC file {path}: {e}") from e
+
+    def write_avro(self, path: str, data: pyarrow.Table,
+                   avro_schema=None, compression: str = 'zstd',
+                   zstd_level: int = 1, **kwargs):
+        import fastavro
+        if avro_schema is None:
+            avro_schema = PyarrowFieldParser.to_avro_schema(data.schema)
+
+        records_dict = data.to_pydict()
+
+        def record_generator():
+            num_rows = len(list(records_dict.values())[0])
+            for i in range(num_rows):
+                record = {}
+                for col in records_dict.keys():
+                    value = records_dict[col][i]
+                    if isinstance(value, datetime) and value.tzinfo is None:
+                        value = value.replace(tzinfo=timezone.utc)
+                    record[col] = value
+                yield record
+
+        codec_map = {
+            'null': 'null', 'deflate': 'deflate', 'snappy': 'snappy',
+            'bzip2': 'bzip2', 'xz': 'xz', 'zstandard': 'zstandard',
+            'zstd': 'zstandard',
+        }
+        codec = codec_map.get(compression.lower())
+        if codec is None:
+            raise ValueError(
+                f"Unsupported compression '{compression}' for Avro format. "
+                f"Supported compressions: {', 
'.join(sorted(codec_map.keys()))}."
+            )
+        if codec == 'zstandard':
+            kwargs['codec_compression_level'] = zstd_level
+        with self.new_output_stream(path) as output_stream:
+            fastavro.writer(output_stream, avro_schema,
+                            record_generator(), codec=codec, **kwargs)
+
+    def write_blob(self, path: str, data: pyarrow.Table, **kwargs):
+        try:
+            if data.num_columns != 1:
+                raise RuntimeError(
+                    f"Blob format only supports a single column, "
+                    f"got {data.num_columns} columns")
+            field = data.schema[0]
+            if pyarrow.types.is_large_binary(field.type):
+                fields = [DataField(0, field.name, AtomicType("BLOB"))]
+            else:
+                paimon_type = PyarrowFieldParser.to_paimon_type(
+                    field.type, field.nullable)
+                fields = [DataField(0, field.name, paimon_type)]
+            records_dict = data.to_pydict()
+            num_rows = data.num_rows
+            field_name = fields[0].name
+            with self.new_output_stream(path) as output_stream:
+                writer = BlobFormatWriter(output_stream)
+                for i in range(num_rows):
+                    writer.write_value(records_dict[field_name][i],
+                                       fields, self.uri_reader_factory)
+                writer.close()
+        except Exception as e:
+            self.delete_quietly(path)
+            raise RuntimeError(f"Failed to write blob file {path}: {e}") from e
+
+    def close(self):

Review Comment:
   nice suggestion! done!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to