This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 5b898ad63cb Add mTLS support to WebHDFSHook (#44561)
5b898ad63cb is described below

commit 5b898ad63cb0e59308082bcc998b2947ac2422b6
Author: Mark H <[email protected]>
AuthorDate: Tue Dec 3 07:55:07 2024 +0800

    Add mTLS support to WebHDFSHook (#44561)
    
    * WebHDFS Hook - mTLS Support
    
    * Static check fixes
---
 .../connections.rst                                |  2 +
 docs/spelling_wordlist.txt                         |  2 +
 .../airflow/providers/apache/hdfs/hooks/webhdfs.py | 20 +++++++++
 providers/tests/apache/hdfs/hooks/test_webhdfs.py  | 48 ++++++++++++++++++++++
 4 files changed, 72 insertions(+)

diff --git a/docs/apache-airflow-providers-apache-hdfs/connections.rst 
b/docs/apache-airflow-providers-apache-hdfs/connections.rst
index c67331aaedf..28b557c4637 100644
--- a/docs/apache-airflow-providers-apache-hdfs/connections.rst
+++ b/docs/apache-airflow-providers-apache-hdfs/connections.rst
@@ -43,3 +43,5 @@ Extra (optional, connection parameters)
 
     * ``use_ssl`` - If SSL should be used. By default is set to `false`.
     * ``verify`` - How to verify SSL. For more information refer to 
https://docs.python-requests.org/en/master/user/advanced/#ssl-cert-verification.
+    * ``cert`` - Client certificate path for mTLS, can be combined cert or 
used with ``key``
+    * ``key`` - Client key path for mTLS with ``cert``
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index debbd43c896..e02e6cee380 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -1058,6 +1058,8 @@ moto
 mouseover
 msg
 mssql
+mTLS
+mtls
 muldelete
 Multinamespace
 mutex
diff --git a/providers/src/airflow/providers/apache/hdfs/hooks/webhdfs.py 
b/providers/src/airflow/providers/apache/hdfs/hooks/webhdfs.py
index 3a996dddc77..47ab97db4e8 100644
--- a/providers/src/airflow/providers/apache/hdfs/hooks/webhdfs.py
+++ b/providers/src/airflow/providers/apache/hdfs/hooks/webhdfs.py
@@ -105,8 +105,18 @@ class WebHDFSHook(BaseHook):
     def _get_client(
         self, namenode: str, port: int, login: str, password: str | None, 
schema: str, extra_dejson: dict
     ) -> Any:
+        """
+        Get WebHDFS client.
+
+        Additional options via ``extra``:
+        - use_ssl: enable SSL connection (default: False)
+        - verify: CA certificate path or boolean for SSL verification 
(default: False)
+        - cert: client certificate path for mTLS, can be combined cert or used 
with ``key``
+        - key: client key path for mTLS with ``cert``
+        """
         connection_str = f"http://{namenode}";
         session = requests.Session()
+
         if password is not None:
             session.auth = (login, password)
 
@@ -114,6 +124,16 @@ class WebHDFSHook(BaseHook):
             connection_str = f"https://{namenode}";
             session.verify = extra_dejson.get("verify", False)
 
+            # Handle mTLS certificates
+            cert = extra_dejson.get("cert")
+            key = extra_dejson.get("key")
+
+            if cert:
+                if key:
+                    session.cert = (cert, key)
+                else:
+                    session.cert = cert
+
         if port is not None:
             connection_str += f":{port}"
 
diff --git a/providers/tests/apache/hdfs/hooks/test_webhdfs.py 
b/providers/tests/apache/hdfs/hooks/test_webhdfs.py
index 6eb32328cdc..80b4cef76ca 100644
--- a/providers/tests/apache/hdfs/hooks/test_webhdfs.py
+++ b/providers/tests/apache/hdfs/hooks/test_webhdfs.py
@@ -261,3 +261,51 @@ class TestWebHDFSHook:
 
             assert f"https://{connection.host}:{connection.port}"; == 
mock_insecure_client.call_args.args[0]
             assert not mock_insecure_client.call_args.kwargs["session"].verify
+
+    @patch("airflow.providers.apache.hdfs.hooks.webhdfs.InsecureClient")
+    @patch("airflow.providers.apache.hdfs.hooks.webhdfs.socket")
+    def test_conn_mtls_cert_and_key(self, socket_mock, mock_insecure_client):
+        """Test mTLS configuration with client cert and key"""
+        with patch(
+            
"airflow.providers.apache.hdfs.hooks.webhdfs.WebHDFSHook.get_connection",
+            return_value=Connection(
+                host="host_1",
+                port=123,
+                extra={
+                    "use_ssl": "True",
+                    "cert": "/path/to/cert.pem",
+                    "key": "/path/to/key.pem",
+                },
+            ),
+        ) as mock_get_connection:
+            socket_mock.socket.return_value.connect_ex.return_value = 0
+            self.webhdfs_hook.get_conn()
+            connection = mock_get_connection.return_value
+
+            assert f"https://{connection.host}:{connection.port}"; == 
mock_insecure_client.call_args.args[0]
+            assert mock_insecure_client.call_args.kwargs["session"].cert == (
+                "/path/to/cert.pem",
+                "/path/to/key.pem",
+            )
+
+    @patch("airflow.providers.apache.hdfs.hooks.webhdfs.InsecureClient")
+    @patch("airflow.providers.apache.hdfs.hooks.webhdfs.socket")
+    def test_conn_mtls_combined_cert(self, socket_mock, mock_insecure_client):
+        """Test mTLS configuration with combined client cert and key"""
+        with patch(
+            
"airflow.providers.apache.hdfs.hooks.webhdfs.WebHDFSHook.get_connection",
+            return_value=Connection(
+                host="host_1",
+                port=123,
+                extra={
+                    "use_ssl": "True",
+                    "cert": "/path/to/combined.pem",
+                },
+            ),
+        ) as mock_get_connection:
+            socket_mock.socket.return_value.connect_ex.return_value = 0
+            self.webhdfs_hook.get_conn()
+            connection = mock_get_connection.return_value
+
+            assert f"https://{connection.host}:{connection.port}"; == 
mock_insecure_client.call_args.args[0]
+            assert mock_insecure_client.call_args.kwargs["session"].cert == 
("/path/to/combined.pem")

Reply via email to