This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 97a35ff2b1 [python] Fix HDFS HA and ViewFS URI handling in
PyArrowFileIO (#7731)
97a35ff2b1 is described below
commit 97a35ff2b1ba11dd9b7c8b9c433cb4fb656d3f22
Author: chaoyang <[email protected]>
AuthorDate: Fri May 8 22:37:21 2026 +0800
[python] Fix HDFS HA and ViewFS URI handling in PyArrowFileIO (#7731)
`PyArrowFileIO._initialize_hdfs_fs` calls `splitport(netloc)` followed
by `int(port_str)`. ViewFS and HDFS HA URIs have no port, so `port_str`
is `None` and we hit `TypeError: int() argument must be a string ... not
'NoneType'` before reaching `pafs.HadoopFileSystem`.
This PR resolves `(host, port)` up-front so all three URI shapes work
without surprising the user:
- `viewfs://...` (with or without netloc) → `host='default', port=0` so
libhdfs reads `fs.defaultFS` and resolves the ViewFS mount table from
`core-site.xml`.
- `hdfs://nameservice/...` (HA, no port) or `hdfs:///...` (no netloc) →
also `host='default', port=0` to delegate to `fs.defaultFS`.
- `hdfs://host:port/...` → connect directly with the parsed host/port.
The `host`/`port` variables are reused by the existing Kerberos branch
unchanged.
---
.../pypaimon/filesystem/pyarrow_file_io.py | 16 ++++-
paimon-python/pypaimon/tests/file_io_test.py | 73 ++++++++++++++++++++++
2 files changed, 87 insertions(+), 2 deletions(-)
diff --git a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
index d6af3ca543..9488809628 100644
--- a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
+++ b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
@@ -267,8 +267,20 @@ class PyArrowFileIO(FileIO):
"security.kerberos.login.principal and
security.kerberos.login.keytab "
"must be both set or both unset")
- host, port_str = splitport(netloc)
- port = int(port_str) if port_str else 0
+ # Resolve (host, port) for pafs.HadoopFileSystem.
+ # - ViewFS URIs delegate to fs.defaultFS (host='default') so libhdfs
+ # resolves the mount table from core-site.xml.
+ # - HDFS HA URIs carry a nameservice without a port; also delegate to
+ # fs.defaultFS to avoid int(None) on the missing port.
+ # - Explicit "host:port" URIs connect directly.
+ if scheme == 'viewfs' or not netloc:
+ host, port = 'default', 0
+ else:
+ parsed_host, port_str = splitport(netloc)
+ if port_str is None:
+ host, port = 'default', 0
+ else:
+ host, port = parsed_host, int(port_str)
kerb_ticket = None
if principal and keytab:
diff --git a/paimon-python/pypaimon/tests/file_io_test.py
b/paimon-python/pypaimon/tests/file_io_test.py
index 86c4c28f82..57e0bd3d4d 100644
--- a/paimon-python/pypaimon/tests/file_io_test.py
+++ b/paimon-python/pypaimon/tests/file_io_test.py
@@ -489,5 +489,78 @@ class FileIOTest(unittest.TestCase):
self.assertNotIn("\\", call[0][0], f"backslash in path:
{call[0][0]}")
+class HdfsFileIOTest(unittest.TestCase):
+ """Cases for HDFS / ViewFS URI handling in
PyArrowFileIO._initialize_hdfs_fs."""
+
+ def _make_hdfs_env(self, env_patch):
+ env_patch.setdefault('HADOOP_HOME', '/opt/hadoop')
+ env_patch.setdefault('HADOOP_CONF_DIR', '/opt/hadoop/etc/hadoop')
+ env_patch.setdefault('CLASSPATH', '')
+ env_patch.setdefault('LD_LIBRARY_PATH', '')
+ return env_patch
+
+ def _make_file_io(self):
+ file_io = PyArrowFileIO.__new__(PyArrowFileIO)
+ file_io.properties = Options({})
+ return file_io
+
+ @patch('pypaimon.filesystem.pyarrow_file_io.subprocess.run')
+ @patch('pypaimon.filesystem.pyarrow_file_io.pafs.HadoopFileSystem')
+ def test_viewfs_uses_default_host(self, mock_hadoop_fs, mock_run):
+ mock_run.return_value =
MagicMock(stdout='/opt/hadoop/share/hadoop/common/*')
+ mock_hadoop_fs.return_value = MagicMock()
+ with patch.dict(os.environ, self._make_hdfs_env({}), clear=True):
+ self._make_file_io()._initialize_hdfs_fs('viewfs', 'clusterName')
+ mock_hadoop_fs.assert_called_once_with(host='default', port=0,
user='hadoop')
+
+ @patch('pypaimon.filesystem.pyarrow_file_io.subprocess.run')
+ @patch('pypaimon.filesystem.pyarrow_file_io.pafs.HadoopFileSystem')
+ def test_viewfs_without_netloc_uses_default_host(self, mock_hadoop_fs,
mock_run):
+ mock_run.return_value =
MagicMock(stdout='/opt/hadoop/share/hadoop/common/*')
+ mock_hadoop_fs.return_value = MagicMock()
+ with patch.dict(os.environ, self._make_hdfs_env({}), clear=True):
+ self._make_file_io()._initialize_hdfs_fs('viewfs', '')
+ mock_hadoop_fs.assert_called_once_with(host='default', port=0,
user='hadoop')
+
+ @patch('pypaimon.filesystem.pyarrow_file_io.subprocess.run')
+ @patch('pypaimon.filesystem.pyarrow_file_io.pafs.HadoopFileSystem')
+ def test_hdfs_with_port_uses_explicit_host(self, mock_hadoop_fs, mock_run):
+ mock_run.return_value =
MagicMock(stdout='/opt/hadoop/share/hadoop/common/*')
+ mock_hadoop_fs.return_value = MagicMock()
+ with patch.dict(os.environ, self._make_hdfs_env({}), clear=True):
+ self._make_file_io()._initialize_hdfs_fs('hdfs', 'namenode:8020')
+ mock_hadoop_fs.assert_called_once_with(host='namenode', port=8020,
user='hadoop')
+
+ @patch('pypaimon.filesystem.pyarrow_file_io.subprocess.run')
+ @patch('pypaimon.filesystem.pyarrow_file_io.pafs.HadoopFileSystem')
+ def test_hdfs_ha_nameservice_without_port_uses_default_host(self,
mock_hadoop_fs, mock_run):
+ mock_run.return_value =
MagicMock(stdout='/opt/hadoop/share/hadoop/common/*')
+ mock_hadoop_fs.return_value = MagicMock()
+ with patch.dict(os.environ, self._make_hdfs_env({}), clear=True):
+ self._make_file_io()._initialize_hdfs_fs('hdfs', 'nameservice1')
+ mock_hadoop_fs.assert_called_once_with(host='default', port=0,
user='hadoop')
+
+ @patch('pypaimon.filesystem.pyarrow_file_io.subprocess.run')
+ @patch('pypaimon.filesystem.pyarrow_file_io.pafs.HadoopFileSystem')
+ def test_hdfs_without_netloc_uses_default_host(self, mock_hadoop_fs,
mock_run):
+ mock_run.return_value =
MagicMock(stdout='/opt/hadoop/share/hadoop/common/*')
+ mock_hadoop_fs.return_value = MagicMock()
+ with patch.dict(os.environ, self._make_hdfs_env({}), clear=True):
+ self._make_file_io()._initialize_hdfs_fs('hdfs', '')
+ mock_hadoop_fs.assert_called_once_with(host='default', port=0,
user='hadoop')
+
+ def test_hdfs_missing_hadoop_home_raises(self):
+ with patch.dict(os.environ, {'HADOOP_CONF_DIR':
'/opt/hadoop/etc/hadoop'}, clear=True):
+ with self.assertRaises(RuntimeError) as ctx:
+ self._make_file_io()._initialize_hdfs_fs('hdfs',
'namenode:8020')
+ self.assertIn('HADOOP_HOME', str(ctx.exception))
+
+ def test_hdfs_missing_hadoop_conf_dir_raises(self):
+ with patch.dict(os.environ, {'HADOOP_HOME': '/opt/hadoop'},
clear=True):
+ with self.assertRaises(RuntimeError) as ctx:
+ self._make_file_io()._initialize_hdfs_fs('hdfs',
'namenode:8020')
+ self.assertIn('HADOOP_CONF_DIR', str(ctx.exception))
+
+
if __name__ == '__main__':
unittest.main()