This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 97a35ff2b1 [python] Fix HDFS HA and ViewFS URI handling in 
PyArrowFileIO (#7731)
97a35ff2b1 is described below

commit 97a35ff2b1ba11dd9b7c8b9c433cb4fb656d3f22
Author: chaoyang <[email protected]>
AuthorDate: Fri May 8 22:37:21 2026 +0800

    [python] Fix HDFS HA and ViewFS URI handling in PyArrowFileIO (#7731)
    
    `PyArrowFileIO._initialize_hdfs_fs` calls `splitport(netloc)` followed
    by `int(port_str)`. ViewFS and HDFS HA URIs have no port, so `port_str`
    is `None` and we hit `TypeError: int() argument must be a string ... not
    'NoneType'` before reaching `pafs.HadoopFileSystem`.
    
    This PR resolves `(host, port)` up-front so all three URI shapes work
    without surprising the user:
    
    - `viewfs://...` (with or without netloc) → `host='default', port=0` so
    libhdfs reads `fs.defaultFS` and resolves the ViewFS mount table from
    `core-site.xml`.
    - `hdfs://nameservice/...` (HA, no port) or `hdfs:///...` (no netloc) →
    also `host='default', port=0` to delegate to `fs.defaultFS`.
    - `hdfs://host:port/...` → connect directly with the parsed host/port.
    
    The `host`/`port` variables are reused by the existing Kerberos branch
    unchanged.
---
 .../pypaimon/filesystem/pyarrow_file_io.py         | 16 ++++-
 paimon-python/pypaimon/tests/file_io_test.py       | 73 ++++++++++++++++++++++
 2 files changed, 87 insertions(+), 2 deletions(-)

diff --git a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py 
b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
index d6af3ca543..9488809628 100644
--- a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
+++ b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
@@ -267,8 +267,20 @@ class PyArrowFileIO(FileIO):
                 "security.kerberos.login.principal and 
security.kerberos.login.keytab "
                 "must be both set or both unset")
 
-        host, port_str = splitport(netloc)
-        port = int(port_str) if port_str else 0
+        # Resolve (host, port) for pafs.HadoopFileSystem.
+        # - ViewFS URIs delegate to fs.defaultFS (host='default') so libhdfs
+        #   resolves the mount table from core-site.xml.
+        # - HDFS HA URIs carry a nameservice without a port; also delegate to
+        #   fs.defaultFS to avoid int(None) on the missing port.
+        # - Explicit "host:port" URIs connect directly.
+        if scheme == 'viewfs' or not netloc:
+            host, port = 'default', 0
+        else:
+            parsed_host, port_str = splitport(netloc)
+            if port_str is None:
+                host, port = 'default', 0
+            else:
+                host, port = parsed_host, int(port_str)
 
         kerb_ticket = None
         if principal and keytab:
diff --git a/paimon-python/pypaimon/tests/file_io_test.py 
b/paimon-python/pypaimon/tests/file_io_test.py
index 86c4c28f82..57e0bd3d4d 100644
--- a/paimon-python/pypaimon/tests/file_io_test.py
+++ b/paimon-python/pypaimon/tests/file_io_test.py
@@ -489,5 +489,78 @@ class FileIOTest(unittest.TestCase):
             self.assertNotIn("\\", call[0][0], f"backslash in path: 
{call[0][0]}")
 
 
+class HdfsFileIOTest(unittest.TestCase):
+    """Cases for HDFS / ViewFS URI handling in 
PyArrowFileIO._initialize_hdfs_fs."""
+
+    def _make_hdfs_env(self, env_patch):
+        env_patch.setdefault('HADOOP_HOME', '/opt/hadoop')
+        env_patch.setdefault('HADOOP_CONF_DIR', '/opt/hadoop/etc/hadoop')
+        env_patch.setdefault('CLASSPATH', '')
+        env_patch.setdefault('LD_LIBRARY_PATH', '')
+        return env_patch
+
+    def _make_file_io(self):
+        file_io = PyArrowFileIO.__new__(PyArrowFileIO)
+        file_io.properties = Options({})
+        return file_io
+
+    @patch('pypaimon.filesystem.pyarrow_file_io.subprocess.run')
+    @patch('pypaimon.filesystem.pyarrow_file_io.pafs.HadoopFileSystem')
+    def test_viewfs_uses_default_host(self, mock_hadoop_fs, mock_run):
+        mock_run.return_value = 
MagicMock(stdout='/opt/hadoop/share/hadoop/common/*')
+        mock_hadoop_fs.return_value = MagicMock()
+        with patch.dict(os.environ, self._make_hdfs_env({}), clear=True):
+            self._make_file_io()._initialize_hdfs_fs('viewfs', 'clusterName')
+        mock_hadoop_fs.assert_called_once_with(host='default', port=0, 
user='hadoop')
+
+    @patch('pypaimon.filesystem.pyarrow_file_io.subprocess.run')
+    @patch('pypaimon.filesystem.pyarrow_file_io.pafs.HadoopFileSystem')
+    def test_viewfs_without_netloc_uses_default_host(self, mock_hadoop_fs, 
mock_run):
+        mock_run.return_value = 
MagicMock(stdout='/opt/hadoop/share/hadoop/common/*')
+        mock_hadoop_fs.return_value = MagicMock()
+        with patch.dict(os.environ, self._make_hdfs_env({}), clear=True):
+            self._make_file_io()._initialize_hdfs_fs('viewfs', '')
+        mock_hadoop_fs.assert_called_once_with(host='default', port=0, 
user='hadoop')
+
+    @patch('pypaimon.filesystem.pyarrow_file_io.subprocess.run')
+    @patch('pypaimon.filesystem.pyarrow_file_io.pafs.HadoopFileSystem')
+    def test_hdfs_with_port_uses_explicit_host(self, mock_hadoop_fs, mock_run):
+        mock_run.return_value = 
MagicMock(stdout='/opt/hadoop/share/hadoop/common/*')
+        mock_hadoop_fs.return_value = MagicMock()
+        with patch.dict(os.environ, self._make_hdfs_env({}), clear=True):
+            self._make_file_io()._initialize_hdfs_fs('hdfs', 'namenode:8020')
+        mock_hadoop_fs.assert_called_once_with(host='namenode', port=8020, 
user='hadoop')
+
+    @patch('pypaimon.filesystem.pyarrow_file_io.subprocess.run')
+    @patch('pypaimon.filesystem.pyarrow_file_io.pafs.HadoopFileSystem')
+    def test_hdfs_ha_nameservice_without_port_uses_default_host(self, 
mock_hadoop_fs, mock_run):
+        mock_run.return_value = 
MagicMock(stdout='/opt/hadoop/share/hadoop/common/*')
+        mock_hadoop_fs.return_value = MagicMock()
+        with patch.dict(os.environ, self._make_hdfs_env({}), clear=True):
+            self._make_file_io()._initialize_hdfs_fs('hdfs', 'nameservice1')
+        mock_hadoop_fs.assert_called_once_with(host='default', port=0, 
user='hadoop')
+
+    @patch('pypaimon.filesystem.pyarrow_file_io.subprocess.run')
+    @patch('pypaimon.filesystem.pyarrow_file_io.pafs.HadoopFileSystem')
+    def test_hdfs_without_netloc_uses_default_host(self, mock_hadoop_fs, 
mock_run):
+        mock_run.return_value = 
MagicMock(stdout='/opt/hadoop/share/hadoop/common/*')
+        mock_hadoop_fs.return_value = MagicMock()
+        with patch.dict(os.environ, self._make_hdfs_env({}), clear=True):
+            self._make_file_io()._initialize_hdfs_fs('hdfs', '')
+        mock_hadoop_fs.assert_called_once_with(host='default', port=0, 
user='hadoop')
+
+    def test_hdfs_missing_hadoop_home_raises(self):
+        with patch.dict(os.environ, {'HADOOP_CONF_DIR': 
'/opt/hadoop/etc/hadoop'}, clear=True):
+            with self.assertRaises(RuntimeError) as ctx:
+                self._make_file_io()._initialize_hdfs_fs('hdfs', 
'namenode:8020')
+            self.assertIn('HADOOP_HOME', str(ctx.exception))
+
+    def test_hdfs_missing_hadoop_conf_dir_raises(self):
+        with patch.dict(os.environ, {'HADOOP_HOME': '/opt/hadoop'}, 
clear=True):
+            with self.assertRaises(RuntimeError) as ctx:
+                self._make_file_io()._initialize_hdfs_fs('hdfs', 
'namenode:8020')
+            self.assertIn('HADOOP_CONF_DIR', str(ctx.exception))
+
+
 if __name__ == '__main__':
     unittest.main()

Reply via email to