This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new a8c12067fbe Enable TLS certificate verification for K8s token exchange
in `DatabricksHook` (#63704)
a8c12067fbe is described below
commit a8c12067fbe984cd2a99b35f0a6909a570e87a1c
Author: Marcin Wojtyczka <[email protected]>
AuthorDate: Tue Mar 17 09:29:19 2026 +0100
Enable TLS certificate verification for K8s token exchange in
`DatabricksHook` (#63704)
* Enable TLS Certificate Verification for K8s Token Exchange
* added unit tests
* fixed tests
* refactor tests
* added change log
* fixed spelling typo
* updated changelog
---------
Co-authored-by: Jens Scheffler <[email protected]>
Co-authored-by: Jarek Potiuk <[email protected]>
---
providers/databricks/docs/changelog.rst | 18 +++++
.../databricks/docs/connections/databricks.rst | 4 +-
.../providers/databricks/hooks/databricks_base.py | 7 +-
.../unit/databricks/hooks/test_databricks_base.py | 80 ++++++++++++++++++++--
4 files changed, 100 insertions(+), 9 deletions(-)
diff --git a/providers/databricks/docs/changelog.rst
b/providers/databricks/docs/changelog.rst
index 5734d4a8970..aac2f78b377 100644
--- a/providers/databricks/docs/changelog.rst
+++ b/providers/databricks/docs/changelog.rst
@@ -26,6 +26,24 @@
Changelog
---------
+.. note:: **Security fix — TLS verification for Kubernetes TokenRequest API
(affects Kubernetes OIDC token federation)**
+
+ The Kubernetes TokenRequest API call made during ``federated_k8s``
authentication now verifies the
+ Kubernetes API server's TLS certificate using the in-cluster CA bundle at
+ ``/var/run/secrets/kubernetes.io/serviceaccount/ca.crt``. Previously, TLS
verification was disabled
+ (``verify=False``), which exposed the token exchange to potential
man-in-the-middle attacks within the
+ cluster.
+
+ **This is a security fix that enforces what should always have been the
behaviour.**
+
+ For all standard Kubernetes deployments (EKS, AKS, GKE, vanilla
Kubernetes), the CA bundle is
+ automatically mounted at the standard path by the Kubernetes API server as
part of service account
+ token projection — no action is required.
+
+ **Potentially impacted:** Non-compliant or highly customized Kubernetes
distributions that do not
+ mount ``ca.crt`` at
``/var/run/secrets/kubernetes.io/serviceaccount/ca.crt``. If you are affected,
+ please open an issue so support for a configurable CA path can be added.
+
7.11.0
......
diff --git a/providers/databricks/docs/connections/databricks.rst
b/providers/databricks/docs/connections/databricks.rst
index 084743b4983..634f61f2d4d 100644
--- a/providers/databricks/docs/connections/databricks.rst
+++ b/providers/databricks/docs/connections/databricks.rst
@@ -221,7 +221,7 @@ Extra (optional)
**Important Notes:**
* This method requires no secrets to be stored in the Airflow connection
- * For TokenRequest API method: The TokenRequest API is called to request a
token for the ``default`` service account, regardless of which service account
the pod is running with. The pod's service account must have appropriate RBAC
permissions to create TokenRequest resources for the ``default`` service
account.
+ * For TokenRequest API method: The TokenRequest API is called to request a
token for the ``default`` service account, regardless of which service account
the pod is running with. The pod's service account must have appropriate RBAC
permissions to create TokenRequest resources for the ``default`` service
account. The call to the Kubernetes API server is TLS-verified using the
in-cluster CA bundle at
``/var/run/secrets/kubernetes.io/serviceaccount/ca.crt``.
* For Projected Volume method: No special Kubernetes permissions needed,
just standard service account token projection.
* The Databricks workspace must have federation policy configured in
Databricks Account for the Kubernetes identity provider. **Only service
principal-level policies are supported** for Kubernetes OIDC token federation.
* ``client_id`` is required in the connection extra parameters. Service
principal-level Databricks federation must be used because Kubernetes service
account tokens do not support custom claims, which are required for
account-wide federation.
@@ -365,6 +365,8 @@ Extra (optional)
* **"Kubernetes service account token not found" error:** This
authentication method only works when Airflow is running inside a Kubernetes
cluster. Ensure your pods have the service account token mounted (default
behavior in Kubernetes).
+ * **TLS certificate verification errors when calling the Kubernetes API:**
The TokenRequest API call is verified against the in-cluster CA bundle at
``/var/run/secrets/kubernetes.io/serviceaccount/ca.crt``. This file is
automatically mounted in standard Kubernetes deployments.
+
* **Permission denied errors:** For the TokenRequest API method, verify
your pod's service account has RBAC permissions to create TokenRequest
resources for the ``default`` service account. By default, service accounts can
only create TokenRequest resources for themselves, not for other service
accounts. You must explicitly grant these permissions via RBAC policies
(ClusterRole and ClusterRoleBinding or Role and RoleBinding).
* **Token exchange failures:** Ensure your Databricks federation policy
configuration matches the JWT token properties:
diff --git
a/providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py
b/providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py
index ce07d2e67f3..fe9d2245705 100644
---
a/providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py
+++
b/providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py
@@ -27,6 +27,7 @@ from __future__ import annotations
import copy
import platform
+import ssl
import time
from asyncio.exceptions import TimeoutError
from functools import cached_property
@@ -76,6 +77,7 @@ K8S_TOKEN_SERVICE_URL = "https://kubernetes.default.svc"
DEFAULT_K8S_AUDIENCE = "https://kubernetes.default.svc"
DEFAULT_K8S_SERVICE_ACCOUNT_TOKEN_PATH =
"/var/run/secrets/kubernetes.io/serviceaccount/token"
DEFAULT_K8S_NAMESPACE_PATH =
"/var/run/secrets/kubernetes.io/serviceaccount/namespace"
+K8S_CA_CERT_PATH = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
# RFC 8693 token exchange data template
TOKEN_EXCHANGE_DATA = {
@@ -696,7 +698,7 @@ class BaseDatabricksHook(BaseHook):
"Content-Type": "application/json",
},
json=self._build_k8s_token_request_payload(audience,
expiration_seconds),
- verify=False, # K8s in-cluster uses self-signed certs
+ verify=K8S_CA_CERT_PATH,
timeout=self.token_timeout_seconds,
)
resp.raise_for_status()
@@ -751,6 +753,7 @@ class BaseDatabricksHook(BaseHook):
token_request_url = (
f"{K8S_TOKEN_SERVICE_URL}/api/v1/namespaces/{namespace}/serviceaccounts/default/token"
)
+ ssl_ctx = ssl.create_default_context(cafile=K8S_CA_CERT_PATH)
async for attempt in self._a_get_retry_object():
with attempt:
@@ -761,7 +764,7 @@ class BaseDatabricksHook(BaseHook):
"Content-Type": "application/json",
},
json=self._build_k8s_token_request_payload(audience,
expiration_seconds),
- ssl=False, # K8s in-cluster uses self-signed certs
+ ssl=ssl_ctx,
timeout=self.token_timeout_seconds,
) as resp:
resp.raise_for_status()
diff --git
a/providers/databricks/tests/unit/databricks/hooks/test_databricks_base.py
b/providers/databricks/tests/unit/databricks/hooks/test_databricks_base.py
index 4c764cb2256..8c8794ebda9 100644
--- a/providers/databricks/tests/unit/databricks/hooks/test_databricks_base.py
+++ b/providers/databricks/tests/unit/databricks/hooks/test_databricks_base.py
@@ -33,6 +33,7 @@ from airflow.providers.common.compat.sdk import
AirflowException
from airflow.providers.databricks.hooks.databricks_base import (
DEFAULT_AZURE_CREDENTIAL_SETTING_KEY,
DEFAULT_DATABRICKS_SCOPE,
+ K8S_CA_CERT_PATH,
TOKEN_REFRESH_LEAD_TIME,
BaseDatabricksHook,
)
@@ -977,8 +978,8 @@ class TestBaseDatabricksHook:
assert k8s_json["spec"]["audiences"] ==
["https://kubernetes.default.svc"]
assert k8s_json["spec"]["expirationSeconds"] == 3600
- # Verify SSL verification is disabled for in-cluster self-signed certs
- assert k8s_call_args[1]["verify"] is False
+ # Verify TLS is verified using the in-cluster CA bundle
+ assert k8s_call_args[1]["verify"] == K8S_CA_CERT_PATH
# Verify Databricks token exchange call
db_call_args = mock_post.call_args_list[1]
@@ -1137,6 +1138,29 @@ class TestBaseDatabricksHook:
# Verify Authorization header uses custom token
assert call_args[1]["headers"]["Authorization"] == "Bearer
custom_in_cluster_token"
+ @mock.patch("builtins.open")
+ @mock.patch("requests.post")
+ def test_get_k8s_token_request_api_uses_ca_cert_for_tls(self, mock_post,
mock_open):
+ """Verify TokenRequest API call uses the in-cluster CA bundle for TLS
verification."""
+ mock_open.side_effect = [
+ mock.mock_open(read_data="in_cluster_token").return_value,
+ mock.mock_open(read_data="default").return_value,
+ ]
+ mock_response = mock.Mock()
+ mock_response.json.return_value = {"status": {"token": "jwt_token"}}
+ mock_response.raise_for_status.return_value = None
+ mock_post.return_value = mock_response
+
+ mock_conn = mock.Mock()
+ mock_conn.extra_dejson = {}
+ hook = BaseDatabricksHook()
+ hook.databricks_conn = mock_conn
+
+ hook._get_k8s_token_request_api()
+
+ call_kwargs = mock_post.call_args[1]
+ assert call_kwargs["verify"] == K8S_CA_CERT_PATH
+
@mock.patch("builtins.open",
mock.mock_open(read_data="projected_token_content"))
def test_get_k8s_projected_volume_token_success(self):
"""Test successfully reading token from Kubernetes projected volume."""
@@ -1358,7 +1382,8 @@ class TestBaseDatabricksHook:
@pytest.mark.asyncio
@mock.patch("aiohttp.ClientSession.post")
@time_machine.travel("2025-07-12 12:00:00")
- async def test_a_get_federated_token(self, mock_post):
+ @mock.patch("ssl.create_default_context")
+ async def test_a_get_federated_token(self, _mock_ssl_ctx, mock_post):
"""Test async version of federated token exchange."""
expiry_date = int((datetime(2025, 7, 12, 12, 0, 0) +
timedelta(minutes=60)).timestamp())
@@ -1478,7 +1503,8 @@ class TestBaseDatabricksHook:
@pytest.mark.asyncio
@mock.patch("aiohttp.ClientSession.post")
- async def test_a_get_federated_token_databricks_error(self, mock_post):
+ @mock.patch("ssl.create_default_context")
+ async def test_a_get_federated_token_databricks_error(self, _mock_ssl_ctx,
mock_post):
"""Test async error handling when Databricks token exchange fails."""
# Mock aiofiles.open for reading K8s token and namespace
mock_token_file = mock.AsyncMock()
@@ -1653,6 +1679,46 @@ class TestBaseDatabricksHook:
mock_projected.assert_not_called()
mock_token_request.assert_called_once()
+ @pytest.mark.asyncio
+ @mock.patch("ssl.create_default_context")
+ async def test_a_get_k8s_token_request_api_uses_ca_cert_for_tls(self,
mock_ssl_ctx):
+ """Verify async TokenRequest API call uses the in-cluster CA bundle
for TLS verification."""
+ import ssl
+
+ fake_ctx = mock.MagicMock(spec=ssl.SSLContext)
+ mock_ssl_ctx.return_value = fake_ctx
+
+ mock_conn = mock.Mock()
+ mock_conn.extra_dejson = {}
+ hook = BaseDatabricksHook()
+ hook.databricks_conn = mock_conn
+
+ mock_response = mock.AsyncMock()
+ mock_response.json = mock.AsyncMock(return_value={"status": {"token":
"jwt_token"}})
+ mock_response.raise_for_status = mock.Mock()
+ mock_session = mock.MagicMock()
+ mock_session.post.return_value.__aenter__ =
mock.AsyncMock(return_value=mock_response)
+ mock_session.post.return_value.__aexit__ =
mock.AsyncMock(return_value=None)
+
+ mock_token_file = mock.AsyncMock()
+ mock_token_file.read = mock.AsyncMock(return_value="in_cluster_token")
+ mock_namespace_file = mock.AsyncMock()
+ mock_namespace_file.read = mock.AsyncMock(return_value="default")
+
+ hook._session = mock_session
+ with mock.patch(
+ "aiofiles.open",
+ side_effect=[
+
mock.MagicMock(__aenter__=mock.AsyncMock(return_value=mock_token_file)),
+
mock.MagicMock(__aenter__=mock.AsyncMock(return_value=mock_namespace_file)),
+ ],
+ ):
+ await hook._a_get_k8s_token_request_api()
+
+ mock_ssl_ctx.assert_called_once_with(cafile=K8S_CA_CERT_PATH)
+ call_kwargs = mock_session.post.call_args[1]
+ assert call_kwargs["ssl"] is fake_ctx
+
@pytest.mark.asyncio
@time_machine.travel("2025-07-12 12:00:00")
async def test_a_get_federated_token_with_projected_volume(self):
@@ -1716,7 +1782,8 @@ class TestBaseDatabricksHook:
@pytest.mark.asyncio
@mock.patch("aiohttp.ClientSession.post")
- async def test_a_get_token_with_federated_k8s_login(self, mock_post):
+ @mock.patch("ssl.create_default_context")
+ async def test_a_get_token_with_federated_k8s_login(self, _mock_ssl_ctx,
mock_post):
"""Test async _a_get_token with login='federated_k8s'."""
# Mock aiofiles.open for reading K8s token and namespace
mock_token_file = mock.AsyncMock()
@@ -1769,7 +1836,8 @@ class TestBaseDatabricksHook:
@pytest.mark.asyncio
@mock.patch("aiohttp.ClientSession.post")
- async def test_a_get_token_with_federated_k8s_extra(self, mock_post):
+ @mock.patch("ssl.create_default_context")
+ async def test_a_get_token_with_federated_k8s_extra(self, _mock_ssl_ctx,
mock_post):
"""Test async _a_get_token with federated_k8s in extras."""
# Mock aiofiles.open for reading K8s token and namespace
mock_token_file = mock.AsyncMock()