This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 5b898ad63cb Add mTLS support to WebHDFSHook (#44561)
5b898ad63cb is described below
commit 5b898ad63cb0e59308082bcc998b2947ac2422b6
Author: Mark H <[email protected]>
AuthorDate: Tue Dec 3 07:55:07 2024 +0800
Add mTLS support to WebHDFSHook (#44561)
* WebHDFS Hook - mTLS Support
* Static check fixes
---
.../connections.rst | 2 +
docs/spelling_wordlist.txt | 2 +
.../airflow/providers/apache/hdfs/hooks/webhdfs.py | 20 +++++++++
providers/tests/apache/hdfs/hooks/test_webhdfs.py | 48 ++++++++++++++++++++++
4 files changed, 72 insertions(+)
diff --git a/docs/apache-airflow-providers-apache-hdfs/connections.rst
b/docs/apache-airflow-providers-apache-hdfs/connections.rst
index c67331aaedf..28b557c4637 100644
--- a/docs/apache-airflow-providers-apache-hdfs/connections.rst
+++ b/docs/apache-airflow-providers-apache-hdfs/connections.rst
@@ -43,3 +43,5 @@ Extra (optional, connection parameters)
* ``use_ssl`` - If SSL should be used. By default is set to `false`.
* ``verify`` - How to verify SSL. For more information refer to
https://docs.python-requests.org/en/master/user/advanced/#ssl-cert-verification.
+ * ``cert`` - Client certificate path for mTLS, can be combined cert or
used with ``key``
+ * ``key`` - Client key path for mTLS with ``cert``
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index debbd43c896..e02e6cee380 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -1058,6 +1058,8 @@ moto
mouseover
msg
mssql
+mTLS
+mtls
muldelete
Multinamespace
mutex
diff --git a/providers/src/airflow/providers/apache/hdfs/hooks/webhdfs.py
b/providers/src/airflow/providers/apache/hdfs/hooks/webhdfs.py
index 3a996dddc77..47ab97db4e8 100644
--- a/providers/src/airflow/providers/apache/hdfs/hooks/webhdfs.py
+++ b/providers/src/airflow/providers/apache/hdfs/hooks/webhdfs.py
@@ -105,8 +105,18 @@ class WebHDFSHook(BaseHook):
def _get_client(
self, namenode: str, port: int, login: str, password: str | None,
schema: str, extra_dejson: dict
) -> Any:
+ """
+ Get WebHDFS client.
+
+ Additional options via ``extra``:
+ - use_ssl: enable SSL connection (default: False)
+ - verify: CA certificate path or boolean for SSL verification
(default: False)
+ - cert: client certificate path for mTLS, can be combined cert or used
with ``key``
+ - key: client key path for mTLS with ``cert``
+ """
connection_str = f"http://{namenode}"
session = requests.Session()
+
if password is not None:
session.auth = (login, password)
@@ -114,6 +124,16 @@ class WebHDFSHook(BaseHook):
connection_str = f"https://{namenode}"
session.verify = extra_dejson.get("verify", False)
+ # Handle mTLS certificates
+ cert = extra_dejson.get("cert")
+ key = extra_dejson.get("key")
+
+ if cert:
+ if key:
+ session.cert = (cert, key)
+ else:
+ session.cert = cert
+
if port is not None:
connection_str += f":{port}"
diff --git a/providers/tests/apache/hdfs/hooks/test_webhdfs.py
b/providers/tests/apache/hdfs/hooks/test_webhdfs.py
index 6eb32328cdc..80b4cef76ca 100644
--- a/providers/tests/apache/hdfs/hooks/test_webhdfs.py
+++ b/providers/tests/apache/hdfs/hooks/test_webhdfs.py
@@ -261,3 +261,51 @@ class TestWebHDFSHook:
assert f"https://{connection.host}:{connection.port}" ==
mock_insecure_client.call_args.args[0]
assert not mock_insecure_client.call_args.kwargs["session"].verify
+
+ @patch("airflow.providers.apache.hdfs.hooks.webhdfs.InsecureClient")
+ @patch("airflow.providers.apache.hdfs.hooks.webhdfs.socket")
+ def test_conn_mtls_cert_and_key(self, socket_mock, mock_insecure_client):
+ """Test mTLS configuration with client cert and key"""
+ with patch(
+
"airflow.providers.apache.hdfs.hooks.webhdfs.WebHDFSHook.get_connection",
+ return_value=Connection(
+ host="host_1",
+ port=123,
+ extra={
+ "use_ssl": "True",
+ "cert": "/path/to/cert.pem",
+ "key": "/path/to/key.pem",
+ },
+ ),
+ ) as mock_get_connection:
+ socket_mock.socket.return_value.connect_ex.return_value = 0
+ self.webhdfs_hook.get_conn()
+ connection = mock_get_connection.return_value
+
+ assert f"https://{connection.host}:{connection.port}" ==
mock_insecure_client.call_args.args[0]
+ assert mock_insecure_client.call_args.kwargs["session"].cert == (
+ "/path/to/cert.pem",
+ "/path/to/key.pem",
+ )
+
+ @patch("airflow.providers.apache.hdfs.hooks.webhdfs.InsecureClient")
+ @patch("airflow.providers.apache.hdfs.hooks.webhdfs.socket")
+ def test_conn_mtls_combined_cert(self, socket_mock, mock_insecure_client):
+ """Test mTLS configuration with combined client cert and key"""
+ with patch(
+
"airflow.providers.apache.hdfs.hooks.webhdfs.WebHDFSHook.get_connection",
+ return_value=Connection(
+ host="host_1",
+ port=123,
+ extra={
+ "use_ssl": "True",
+ "cert": "/path/to/combined.pem",
+ },
+ ),
+ ) as mock_get_connection:
+ socket_mock.socket.return_value.connect_ex.return_value = 0
+ self.webhdfs_hook.get_conn()
+ connection = mock_get_connection.return_value
+
+ assert f"https://{connection.host}:{connection.port}" ==
mock_insecure_client.call_args.args[0]
+ assert mock_insecure_client.call_args.kwargs["session"].cert ==
("/path/to/combined.pem")