This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new a8c12067fbe Enable TLS certificate verification for K8s token exchange 
in `DatabricksHook` (#63704)
a8c12067fbe is described below

commit a8c12067fbe984cd2a99b35f0a6909a570e87a1c
Author: Marcin Wojtyczka <[email protected]>
AuthorDate: Tue Mar 17 09:29:19 2026 +0100

    Enable TLS certificate verification for K8s token exchange in 
`DatabricksHook` (#63704)
    
    * Enable  TLS Certificate Verification for K8s Token Exchange
    
    * added unit tests
    
    * fixed tests
    
    * refactor tests
    
    * added change log
    
    * fixed spelling typo
    
    * updated changelog
    
    ---------
    
    Co-authored-by: Jens Scheffler <[email protected]>
    Co-authored-by: Jarek Potiuk <[email protected]>
---
 providers/databricks/docs/changelog.rst            | 18 +++++
 .../databricks/docs/connections/databricks.rst     |  4 +-
 .../providers/databricks/hooks/databricks_base.py  |  7 +-
 .../unit/databricks/hooks/test_databricks_base.py  | 80 ++++++++++++++++++++--
 4 files changed, 100 insertions(+), 9 deletions(-)

diff --git a/providers/databricks/docs/changelog.rst 
b/providers/databricks/docs/changelog.rst
index 5734d4a8970..aac2f78b377 100644
--- a/providers/databricks/docs/changelog.rst
+++ b/providers/databricks/docs/changelog.rst
@@ -26,6 +26,24 @@
 Changelog
 ---------
 
+.. note:: **Security fix — TLS verification for Kubernetes TokenRequest API 
(affects Kubernetes OIDC token federation)**
+
+   The Kubernetes TokenRequest API call made during ``federated_k8s`` 
authentication now verifies the
+   Kubernetes API server's TLS certificate using the in-cluster CA bundle at
+   ``/var/run/secrets/kubernetes.io/serviceaccount/ca.crt``. Previously, TLS 
verification was disabled
+   (``verify=False``), which exposed the token exchange to potential 
man-in-the-middle attacks within the
+   cluster.
+
+   **This is a security fix that enforces what should always have been the 
behaviour.**
+
+   For all standard Kubernetes deployments (EKS, AKS, GKE, vanilla 
Kubernetes), the CA bundle is
+   automatically mounted at the standard path by the Kubernetes API server as 
part of service account
+   token projection — no action is required.
+
+   **Potentially impacted:** Non-compliant or highly customized Kubernetes 
distributions that do not
+   mount ``ca.crt`` at 
``/var/run/secrets/kubernetes.io/serviceaccount/ca.crt``. If you are affected,
+   please open an issue so support for a configurable CA path can be added.
+
 7.11.0
 ......
 
diff --git a/providers/databricks/docs/connections/databricks.rst 
b/providers/databricks/docs/connections/databricks.rst
index 084743b4983..634f61f2d4d 100644
--- a/providers/databricks/docs/connections/databricks.rst
+++ b/providers/databricks/docs/connections/databricks.rst
@@ -221,7 +221,7 @@ Extra (optional)
     **Important Notes:**
 
     * This method requires no secrets to be stored in the Airflow connection
-    * For TokenRequest API method: The TokenRequest API is called to request a 
token for the ``default`` service account, regardless of which service account 
the pod is running with. The pod's service account must have appropriate RBAC 
permissions to create TokenRequest resources for the ``default`` service 
account.
+    * For TokenRequest API method: The TokenRequest API is called to request a 
token for the ``default`` service account, regardless of which service account 
the pod is running with. The pod's service account must have appropriate RBAC 
permissions to create TokenRequest resources for the ``default`` service 
account. The call to the Kubernetes API server is TLS-verified using the 
in-cluster CA bundle at 
``/var/run/secrets/kubernetes.io/serviceaccount/ca.crt``.
     * For Projected Volume method: No special Kubernetes permissions needed, 
just standard service account token projection.
     * The Databricks workspace must have federation policy configured in 
Databricks Account for the Kubernetes identity provider. **Only service 
principal-level policies are supported** for Kubernetes OIDC token federation.
     * ``client_id`` is required in the connection extra parameters. Service 
principal-level Databricks federation must be used because Kubernetes service 
account tokens do not support custom claims, which are required for 
account-wide federation.
@@ -365,6 +365,8 @@ Extra (optional)
 
     * **"Kubernetes service account token not found" error:** This 
authentication method only works when Airflow is running inside a Kubernetes 
cluster. Ensure your pods have the service account token mounted (default 
behavior in Kubernetes).
 
+    * **TLS certificate verification errors when calling the Kubernetes API:** 
The TokenRequest API call is verified against the in-cluster CA bundle at 
``/var/run/secrets/kubernetes.io/serviceaccount/ca.crt``. This file is 
automatically mounted in standard Kubernetes deployments.
+
     * **Permission denied errors:** For the TokenRequest API method, verify 
your pod's service account has RBAC permissions to create TokenRequest 
resources for the ``default`` service account. By default, service accounts can 
only create TokenRequest resources for themselves, not for other service 
accounts. You must explicitly grant these permissions via RBAC policies 
(ClusterRole and ClusterRoleBinding or Role and RoleBinding).
 
     * **Token exchange failures:** Ensure your Databricks federation policy 
configuration matches the JWT token properties:
diff --git 
a/providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py
 
b/providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py
index ce07d2e67f3..fe9d2245705 100644
--- 
a/providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py
+++ 
b/providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py
@@ -27,6 +27,7 @@ from __future__ import annotations
 
 import copy
 import platform
+import ssl
 import time
 from asyncio.exceptions import TimeoutError
 from functools import cached_property
@@ -76,6 +77,7 @@ K8S_TOKEN_SERVICE_URL = "https://kubernetes.default.svc";
 DEFAULT_K8S_AUDIENCE = "https://kubernetes.default.svc";
 DEFAULT_K8S_SERVICE_ACCOUNT_TOKEN_PATH = 
"/var/run/secrets/kubernetes.io/serviceaccount/token"
 DEFAULT_K8S_NAMESPACE_PATH = 
"/var/run/secrets/kubernetes.io/serviceaccount/namespace"
+K8S_CA_CERT_PATH = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
 
 # RFC 8693 token exchange data template
 TOKEN_EXCHANGE_DATA = {
@@ -696,7 +698,7 @@ class BaseDatabricksHook(BaseHook):
                             "Content-Type": "application/json",
                         },
                         json=self._build_k8s_token_request_payload(audience, 
expiration_seconds),
-                        verify=False,  # K8s in-cluster uses self-signed certs
+                        verify=K8S_CA_CERT_PATH,
                         timeout=self.token_timeout_seconds,
                     )
                     resp.raise_for_status()
@@ -751,6 +753,7 @@ class BaseDatabricksHook(BaseHook):
             token_request_url = (
                 
f"{K8S_TOKEN_SERVICE_URL}/api/v1/namespaces/{namespace}/serviceaccounts/default/token"
             )
+            ssl_ctx = ssl.create_default_context(cafile=K8S_CA_CERT_PATH)
 
             async for attempt in self._a_get_retry_object():
                 with attempt:
@@ -761,7 +764,7 @@ class BaseDatabricksHook(BaseHook):
                             "Content-Type": "application/json",
                         },
                         json=self._build_k8s_token_request_payload(audience, 
expiration_seconds),
-                        ssl=False,  # K8s in-cluster uses self-signed certs
+                        ssl=ssl_ctx,
                         timeout=self.token_timeout_seconds,
                     ) as resp:
                         resp.raise_for_status()
diff --git 
a/providers/databricks/tests/unit/databricks/hooks/test_databricks_base.py 
b/providers/databricks/tests/unit/databricks/hooks/test_databricks_base.py
index 4c764cb2256..8c8794ebda9 100644
--- a/providers/databricks/tests/unit/databricks/hooks/test_databricks_base.py
+++ b/providers/databricks/tests/unit/databricks/hooks/test_databricks_base.py
@@ -33,6 +33,7 @@ from airflow.providers.common.compat.sdk import 
AirflowException
 from airflow.providers.databricks.hooks.databricks_base import (
     DEFAULT_AZURE_CREDENTIAL_SETTING_KEY,
     DEFAULT_DATABRICKS_SCOPE,
+    K8S_CA_CERT_PATH,
     TOKEN_REFRESH_LEAD_TIME,
     BaseDatabricksHook,
 )
@@ -977,8 +978,8 @@ class TestBaseDatabricksHook:
         assert k8s_json["spec"]["audiences"] == 
["https://kubernetes.default.svc";]
         assert k8s_json["spec"]["expirationSeconds"] == 3600
 
-        # Verify SSL verification is disabled for in-cluster self-signed certs
-        assert k8s_call_args[1]["verify"] is False
+        # Verify TLS is verified using the in-cluster CA bundle
+        assert k8s_call_args[1]["verify"] == K8S_CA_CERT_PATH
 
         # Verify Databricks token exchange call
         db_call_args = mock_post.call_args_list[1]
@@ -1137,6 +1138,29 @@ class TestBaseDatabricksHook:
         # Verify Authorization header uses custom token
         assert call_args[1]["headers"]["Authorization"] == "Bearer 
custom_in_cluster_token"
 
+    @mock.patch("builtins.open")
+    @mock.patch("requests.post")
+    def test_get_k8s_token_request_api_uses_ca_cert_for_tls(self, mock_post, 
mock_open):
+        """Verify TokenRequest API call uses the in-cluster CA bundle for TLS 
verification."""
+        mock_open.side_effect = [
+            mock.mock_open(read_data="in_cluster_token").return_value,
+            mock.mock_open(read_data="default").return_value,
+        ]
+        mock_response = mock.Mock()
+        mock_response.json.return_value = {"status": {"token": "jwt_token"}}
+        mock_response.raise_for_status.return_value = None
+        mock_post.return_value = mock_response
+
+        mock_conn = mock.Mock()
+        mock_conn.extra_dejson = {}
+        hook = BaseDatabricksHook()
+        hook.databricks_conn = mock_conn
+
+        hook._get_k8s_token_request_api()
+
+        call_kwargs = mock_post.call_args[1]
+        assert call_kwargs["verify"] == K8S_CA_CERT_PATH
+
     @mock.patch("builtins.open", 
mock.mock_open(read_data="projected_token_content"))
     def test_get_k8s_projected_volume_token_success(self):
         """Test successfully reading token from Kubernetes projected volume."""
@@ -1358,7 +1382,8 @@ class TestBaseDatabricksHook:
     @pytest.mark.asyncio
     @mock.patch("aiohttp.ClientSession.post")
     @time_machine.travel("2025-07-12 12:00:00")
-    async def test_a_get_federated_token(self, mock_post):
+    @mock.patch("ssl.create_default_context")
+    async def test_a_get_federated_token(self, _mock_ssl_ctx, mock_post):
         """Test async version of federated token exchange."""
         expiry_date = int((datetime(2025, 7, 12, 12, 0, 0) + 
timedelta(minutes=60)).timestamp())
 
@@ -1478,7 +1503,8 @@ class TestBaseDatabricksHook:
 
     @pytest.mark.asyncio
     @mock.patch("aiohttp.ClientSession.post")
-    async def test_a_get_federated_token_databricks_error(self, mock_post):
+    @mock.patch("ssl.create_default_context")
+    async def test_a_get_federated_token_databricks_error(self, _mock_ssl_ctx, 
mock_post):
         """Test async error handling when Databricks token exchange fails."""
         # Mock aiofiles.open for reading K8s token and namespace
         mock_token_file = mock.AsyncMock()
@@ -1653,6 +1679,46 @@ class TestBaseDatabricksHook:
                 mock_projected.assert_not_called()
                 mock_token_request.assert_called_once()
 
+    @pytest.mark.asyncio
+    @mock.patch("ssl.create_default_context")
+    async def test_a_get_k8s_token_request_api_uses_ca_cert_for_tls(self, 
mock_ssl_ctx):
+        """Verify async TokenRequest API call uses the in-cluster CA bundle 
for TLS verification."""
+        import ssl
+
+        fake_ctx = mock.MagicMock(spec=ssl.SSLContext)
+        mock_ssl_ctx.return_value = fake_ctx
+
+        mock_conn = mock.Mock()
+        mock_conn.extra_dejson = {}
+        hook = BaseDatabricksHook()
+        hook.databricks_conn = mock_conn
+
+        mock_response = mock.AsyncMock()
+        mock_response.json = mock.AsyncMock(return_value={"status": {"token": 
"jwt_token"}})
+        mock_response.raise_for_status = mock.Mock()
+        mock_session = mock.MagicMock()
+        mock_session.post.return_value.__aenter__ = 
mock.AsyncMock(return_value=mock_response)
+        mock_session.post.return_value.__aexit__ = 
mock.AsyncMock(return_value=None)
+
+        mock_token_file = mock.AsyncMock()
+        mock_token_file.read = mock.AsyncMock(return_value="in_cluster_token")
+        mock_namespace_file = mock.AsyncMock()
+        mock_namespace_file.read = mock.AsyncMock(return_value="default")
+
+        hook._session = mock_session
+        with mock.patch(
+            "aiofiles.open",
+            side_effect=[
+                
mock.MagicMock(__aenter__=mock.AsyncMock(return_value=mock_token_file)),
+                
mock.MagicMock(__aenter__=mock.AsyncMock(return_value=mock_namespace_file)),
+            ],
+        ):
+            await hook._a_get_k8s_token_request_api()
+
+        mock_ssl_ctx.assert_called_once_with(cafile=K8S_CA_CERT_PATH)
+        call_kwargs = mock_session.post.call_args[1]
+        assert call_kwargs["ssl"] is fake_ctx
+
     @pytest.mark.asyncio
     @time_machine.travel("2025-07-12 12:00:00")
     async def test_a_get_federated_token_with_projected_volume(self):
@@ -1716,7 +1782,8 @@ class TestBaseDatabricksHook:
 
     @pytest.mark.asyncio
     @mock.patch("aiohttp.ClientSession.post")
-    async def test_a_get_token_with_federated_k8s_login(self, mock_post):
+    @mock.patch("ssl.create_default_context")
+    async def test_a_get_token_with_federated_k8s_login(self, _mock_ssl_ctx, 
mock_post):
         """Test async _a_get_token with login='federated_k8s'."""
         # Mock aiofiles.open for reading K8s token and namespace
         mock_token_file = mock.AsyncMock()
@@ -1769,7 +1836,8 @@ class TestBaseDatabricksHook:
 
     @pytest.mark.asyncio
     @mock.patch("aiohttp.ClientSession.post")
-    async def test_a_get_token_with_federated_k8s_extra(self, mock_post):
+    @mock.patch("ssl.create_default_context")
+    async def test_a_get_token_with_federated_k8s_extra(self, _mock_ssl_ctx, 
mock_post):
         """Test async _a_get_token with federated_k8s in extras."""
         # Mock aiofiles.open for reading K8s token and namespace
         mock_token_file = mock.AsyncMock()

Reply via email to