This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new a35ab8b8daa Support for client-side certificates using task-sdk
(#62105)
a35ab8b8daa is described below
commit a35ab8b8daa3c6f671900d9cb2637baf52036624
Author: jj.lee <[email protected]>
AuthorDate: Fri Feb 20 07:34:33 2026 +0900
Support for client-side certificates using task-sdk (#62105)
* support client cert
* testcode
---
.../src/airflow/config_templates/config.yml | 17 ++++++++++++++
task-sdk/src/airflow/sdk/api/client.py | 8 +++++++
task-sdk/tests/task_sdk/api/test_client.py | 27 ++++++++++++++++++++++
3 files changed, 52 insertions(+)
diff --git a/airflow-core/src/airflow/config_templates/config.yml
b/airflow-core/src/airflow/config_templates/config.yml
index 743ec9de893..122e0ece78d 100644
--- a/airflow-core/src/airflow/config_templates/config.yml
+++ b/airflow-core/src/airflow/config_templates/config.yml
@@ -1758,6 +1758,23 @@ api:
type: boolean
example: ~
default: "False"
+ client_ssl_cert:
+ description: |
+ Path to a PEM-encoded client SSL certificate to use
+ when the Task SDK connects to the Airflow Execution API.
+ Must be set together with ``client_ssl_key``.
+ version_added: ~
+ type: string
+ example: "/etc/airflow/certs/client.crt"
+ default: ~
+ client_ssl_key:
+ description: |
+ Path to the PEM-encoded private key corresponding to
``client_ssl_cert``.
+ Must be set together with ``client_ssl_cert``.
+ version_added: ~
+ type: string
+ example: "/etc/airflow/certs/client.key"
+ default: ~
workers:
description: Configuration related to workers that run Airflow tasks.
options:
diff --git a/task-sdk/src/airflow/sdk/api/client.py
b/task-sdk/src/airflow/sdk/api/client.py
index 09cd6520ebb..3fc023e6c78 100644
--- a/task-sdk/src/airflow/sdk/api/client.py
+++ b/task-sdk/src/airflow/sdk/api/client.py
@@ -878,6 +878,8 @@ API_RETRY_WAIT_MIN = conf.getfloat("workers",
"execution_api_retry_wait_min")
API_RETRY_WAIT_MAX = conf.getfloat("workers", "execution_api_retry_wait_max")
API_SSL_CERT_PATH = conf.get("api", "ssl_cert")
API_TIMEOUT = conf.getfloat("workers", "execution_api_timeout")
+API_CLIENT_SSL_CERT = conf.get("api", "client_ssl_cert", fallback=None)
+API_CLIENT_SSL_KEY = conf.get("api", "client_ssl_key", fallback=None)
def _should_retry_api_request(exception: BaseException) -> bool:
@@ -913,6 +915,12 @@ class Client(httpx.Client):
# Call via the class to avoid binding lru_cache wires to this
instance.
kwargs["verify"] =
type(self)._get_ssl_context_cached(certifi.where(), API_SSL_CERT_PATH)
+ if API_CLIENT_SSL_CERT or API_CLIENT_SSL_KEY:
+ if not (API_CLIENT_SSL_CERT and API_CLIENT_SSL_KEY):
+ raise ValueError("Both client_ssl_cert and client_ssl_key
must be set.")
+
+ kwargs["cert"] = (API_CLIENT_SSL_CERT, API_CLIENT_SSL_KEY)
+
# Set timeout if not explicitly provided
kwargs.setdefault("timeout", API_TIMEOUT)
diff --git a/task-sdk/tests/task_sdk/api/test_client.py
b/task-sdk/tests/task_sdk/api/test_client.py
index 1889fabcd5e..6347d611a6a 100644
--- a/task-sdk/tests/task_sdk/api/test_client.py
+++ b/task-sdk/tests/task_sdk/api/test_client.py
@@ -262,6 +262,33 @@ class TestClient:
response = httpx.Response(status_code, json={"detail": f"Test
{description}"})
assert ServerResponseError.from_response(response) is None
+ @mock.patch("airflow.sdk.api.client.API_CLIENT_SSL_CERT",
"/etc/airflow/certs/client.crt")
+ @mock.patch("airflow.sdk.api.client.API_CLIENT_SSL_KEY",
"/etc/airflow/certs/client.key")
+ def test_sets_cert_tuple(self):
+ def handle_request(request: httpx.Request) -> httpx.Response:
+ return httpx.Response(status_code=200)
+
+ captured: dict[str, object] = {}
+ real_init = httpx.Client.__init__
+
+ def spy_init(self, *args, **kwargs):
+ captured["cert"] = kwargs.get("cert")
+ return real_init(self, *args, **kwargs)
+
+ with mock.patch.object(httpx.Client, "__init__", spy_init):
+ make_client(httpx.MockTransport(handle_request))
+
+ assert captured["cert"] == ("/etc/airflow/certs/client.crt",
"/etc/airflow/certs/client.key")
+
+ @mock.patch("airflow.sdk.api.client.API_CLIENT_SSL_CERT",
"/etc/airflow/certs/client.crt")
+ @mock.patch("airflow.sdk.api.client.API_CLIENT_SSL_KEY", None)
+ def test_requires_both_cert_and_key(self):
+ def handle_request(request: httpx.Request) -> httpx.Response:
+ return httpx.Response(status_code=200)
+
+ with pytest.raises(ValueError, match="Both client_ssl_cert and
client_ssl_key must be set"):
+ make_client(httpx.MockTransport(handle_request))
+
class TestTaskInstanceOperations:
"""