This is an automated email from the ASF dual-hosted git repository.
ashb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 88ec07321fa Propogate OTel trace headers from the client to Execution
API server-side spans (#67904)
88ec07321fa is described below
commit 88ec07321fa307b7ded6e4e196b99995e792a75a
Author: Ash Berlin-Taylor <[email protected]>
AuthorDate: Tue Jun 2 21:47:15 2026 +0100
Propogate OTel trace headers from the client to Execution API server-side
spans (#67904)
Trace propogation is a useful tool to let us trace execution across
distrubted
systems -- exactly what we have with the API server and the workers. We
already propogated the trace context all the way from the task code in
#66151,
this continues it to any spans emitted on the API server side.
The mode of trace propagation is set to "only-authenticated" by default to
defend against data polution (i.e. it's not a security risk):
- Sampling-flag manipulation: they set the sampled flag (`-01` trailer) on
every request to force your tracing backend to record all their
reconnaissance/probe traffic. If you pay per ingested span or have
constrained trace storage, this has a real cost.
- Trace ID pollution: attacker-controlled trace IDs appear in your backend.
With 128-bit random IDs, collision with a legitimate trace is negligible,
but it clutters dashboards.
---
.../src/airflow/api_fastapi/execution_api/app.py | 51 ++++++++-
.../src/airflow/config_templates/config.yml | 19 ++++
.../unit/api_fastapi/execution_api/test_app.py | 122 ++++++++++++++++++++-
3 files changed, 187 insertions(+), 5 deletions(-)
diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/app.py
b/airflow-core/src/airflow/api_fastapi/execution_api/app.py
index 8dbdf25f7e0..f10c772e03f 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/app.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/app.py
@@ -17,6 +17,7 @@
from __future__ import annotations
+import contextlib
import json
import time
from contextlib import AsyncExitStack
@@ -27,9 +28,12 @@ import attrs
import svcs
from cadwyn import (
Cadwyn,
+ current_dependency_solver,
)
-from fastapi import FastAPI, Request, Response
+from fastapi import Depends, FastAPI, Request, Response
from fastapi.responses import JSONResponse
+from fastapi.routing import APIRoute
+from opentelemetry import context as otel_context, propagate as otel_propagate
from starlette.middleware.base import BaseHTTPMiddleware
from airflow.api_fastapi.auth.tokens import (
@@ -41,7 +45,6 @@ from airflow.api_fastapi.auth.tokens import (
if TYPE_CHECKING:
import httpx
- from fastapi.routing import APIRoute
import structlog
from structlog.contextvars import bind_contextvars
@@ -236,10 +239,51 @@ class CadwynWithOpenAPICustomization(Cadwyn):
return openapi_schema
+async def _extract_w3c_trace_context(
+ request: Request,
+ dependency_solver=Depends(current_dependency_solver),
+):
+ # Cadwyn solves dependencies twice (the real request, then again to
migrate the
+ # request body). Only act in the real "fastapi" pass so we attach/detach
exactly
+ # once, in the context the endpoint runs in.
+ if dependency_solver != "fastapi":
+ yield
+ return
+ ctx = otel_propagate.extract(request.headers)
+ token = otel_context.attach(ctx)
+ try:
+ yield
+ finally:
+ with contextlib.suppress(Exception):
+ otel_context.detach(token)
+
+
+def _inject_trace_context_dep(routes, mode: str) -> None:
+ dep = Depends(_extract_w3c_trace_context)
+ for route in routes:
+ if not isinstance(route, APIRoute):
+ continue
+ # Idempotent: create_task_execution_api_app() runs more than once per
process
+ # (cached_app + InProcessExecutionAPI), and execution_api_router is
shared
+ # module state, so strip any prior injection first.
+ route.dependencies[:] = [
+ d for d in route.dependencies if getattr(d, "dependency", None) is
not _extract_w3c_trace_context
+ ]
+ match mode:
+ case "unsafe-always":
+ route.dependencies.insert(0, dep)
+ case "only-authenticated":
+ from airflow.api_fastapi.execution_api.security import
require_auth
+
+ if any(getattr(d, "dependency", None) is require_auth for d in
route.dependencies):
+ route.dependencies.append(dep)
+
+
def create_task_execution_api_app() -> FastAPI:
"""Create FastAPI app for task execution API."""
from airflow.api_fastapi.execution_api.routes import execution_api_router
from airflow.api_fastapi.execution_api.versions import bundle
+ from airflow.configuration import conf
def custom_generate_unique_id(route: APIRoute):
# This is called only if the route doesn't provide an explicit
operation ID
@@ -260,6 +304,9 @@ def create_task_execution_api_app() -> FastAPI:
app.add_middleware(CorrelationIdMiddleware)
app.add_middleware(JWTReissueMiddleware)
+ mode = conf.get("execution_api", "otel_trace_propagation",
fallback="only-authenticated")
+ _inject_trace_context_dep(execution_api_router.routes, mode)
+
app.generate_and_include_versioned_routers(execution_api_router)
# As we are mounted as a sub app, we don't get any logs for unhandled
exceptions without this!
diff --git a/airflow-core/src/airflow/config_templates/config.yml
b/airflow-core/src/airflow/config_templates/config.yml
index 1ab0caec6da..6f97eebbeb0 100644
--- a/airflow-core/src/airflow/config_templates/config.yml
+++ b/airflow-core/src/airflow/config_templates/config.yml
@@ -2208,6 +2208,25 @@ execution_api:
type: integer
example: ~
default: "600"
+ otel_trace_propagation:
+ version_added: 3.3.0
+ description: |
+ Controls when W3C trace context (``traceparent``/``tracestate``) is
extracted from
+ incoming Execution API requests and attached to the OpenTelemetry
context.
+
+ ``only-authenticated`` (default): trace context is extracted as a
FastAPI dependency
+ on authenticated routes, after JWT verification succeeds.
Unauthenticated requests
+ (including probes and attack traffic) cannot inject trace context.
+
+ ``unsafe-always``: trace context is extracted on all routes (including
unauthenticated ones
+ such as health checks), before JWT verification. Use this when you
want trace context
+ propagated even on auth failures. If untrusted traffic can reach your
API server this can
+ result in data pollution or load/cost in your OTel trace store.
+
+ ``never``: trace context is never extracted. Use this to disable the
feature entirely.
+ default: "only-authenticated"
+ example: ~
+ type: string
jwt_audience:
version_added: 3.0.0
description: |
diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/test_app.py
b/airflow-core/tests/unit/api_fastapi/execution_api/test_app.py
index b0cb1d85c2e..b34a7e775c7 100644
--- a/airflow-core/tests/unit/api_fastapi/execution_api/test_app.py
+++ b/airflow-core/tests/unit/api_fastapi/execution_api/test_app.py
@@ -16,11 +16,24 @@
# under the License.
from __future__ import annotations
+from unittest import mock
+from uuid import UUID
+
import pytest
+from fastapi import Request
+from fastapi.params import Security as SecurityParam
+from fastapi.routing import APIRoute
+from fastapi.testclient import TestClient
+from opentelemetry import context as otel_context, propagate as otel_propagate
+from airflow.api_fastapi.execution_api.app import create_task_execution_api_app
from airflow.api_fastapi.execution_api.datamodels.taskinstance import
TaskInstance
+from airflow.api_fastapi.execution_api.datamodels.token import TIClaims,
TIToken
+from airflow.api_fastapi.execution_api.security import require_auth
from airflow.api_fastapi.execution_api.versions import bundle
+from tests_common.test_utils.config import conf_vars
+
pytestmark = pytest.mark.db_test
@@ -45,9 +58,6 @@ def test_access_api_contract(client):
def test_ti_self_routes_have_task_instance_id_param(client):
"""Every route with ti:self scope must have a {task_instance_id} path
parameter."""
- from fastapi.params import Security as SecurityParam
- from fastapi.routing import APIRoute
-
app = client.app
for route in app.routes:
@@ -111,3 +121,109 @@ class TestCorrelationIdMiddleware:
# Verify they didn't interfere with each other
assert correlation_id_1 != correlation_id_2
+
+
+class TestTraceContextPropagation:
+ """Exercise ``execution_api.otel_trace_propagation`` on the real Execution
API app."""
+
+ @pytest.fixture(autouse=True)
+ def _restore_router_dependencies(self):
+ from airflow.api_fastapi.execution_api.routes import
execution_api_router
+
+ snapshot = {
+ id(route): list(route.dependencies)
+ for route in execution_api_router.routes
+ if isinstance(route, APIRoute)
+ }
+ yield
+ for route in execution_api_router.routes:
+ if isinstance(route, APIRoute):
+ route.dependencies[:] = snapshot[id(route)]
+
+ @staticmethod
+ def _build_app(mode: str):
+ with conf_vars({("execution_api", "otel_trace_propagation"): mode}):
+ return create_task_execution_api_app()
+
+ @pytest.mark.parametrize(
+ ("mode", "path", "valid_auth", "expect_extract", "expect_status"),
+ [
+ pytest.param("unsafe-always", "/health", False, True, 200,
id="always-unauthenticated"),
+ pytest.param("unsafe-always", "/variables/k", False, True, 401,
id="always-auth-failure"),
+ pytest.param("unsafe-always", "/variables/k", True, True, None,
id="always-authenticated"),
+ pytest.param("only-authenticated", "/health", False, False, 200,
id="onlyauth-unauthenticated"),
+ pytest.param("only-authenticated", "/variables/k", False, False,
401, id="onlyauth-auth-failure"),
+ pytest.param("only-authenticated", "/variables/k", True, True,
None, id="onlyauth-authenticated"),
+ pytest.param("never", "/health", False, False, 200,
id="never-unauthenticated"),
+ pytest.param("never", "/variables/k", False, False, 401,
id="never-auth-failure"),
+ pytest.param("never", "/variables/k", True, False, None,
id="never-authenticated"),
+ ],
+ )
+ def test_trace_context_extraction(self, mode, path, valid_auth,
expect_extract, expect_status):
+ app = self._build_app(mode)
+
+ if valid_auth:
+
+ async def mock_require_auth(request: Request) -> TIToken:
+ ti_id = UUID(
+ request.path_params.get("task_instance_id",
"00000000-0000-0000-0000-000000000000")
+ )
+ return TIToken(id=ti_id, claims=TIClaims(scope="execution"))
+
+ app.dependency_overrides[require_auth] = mock_require_auth
+
+ headers = {"Authorization": "Bearer fake"} if valid_auth else {}
+ real_extract = otel_propagate.extract
+ with (
+ mock.patch.object(otel_propagate, "extract", wraps=real_extract)
as spy,
+ TestClient(app) as test_client,
+ ):
+ response = test_client.get(path, headers=headers)
+
+ assert spy.called is expect_extract
+ if expect_status is not None:
+ assert response.status_code == expect_status
+
+ def test_trace_context_dep_cleans_up_on_route_exception(self):
+ """Verify extract and cleanup run correctly when a route handler
raises."""
+ app = self._build_app("unsafe-always")
+
+ async def mock_require_auth(request: Request) -> TIToken:
+ ti_id = UUID(request.path_params.get("task_instance_id",
"00000000-0000-0000-0000-000000000000"))
+ return TIToken(id=ti_id, claims=TIClaims(scope="execution"))
+
+ app.dependency_overrides[require_auth] = mock_require_auth
+
+ real_extract = otel_propagate.extract
+ # raise_server_exceptions=False lets the app's
@exception_handler(Exception)
+ # return a 500 response rather than re-raising, matching production
behaviour
+ # where AsyncExitStack unwinds the generator in the correct asyncio
context.
+ with (
+ mock.patch.object(otel_propagate, "extract", wraps=real_extract)
as extract_spy,
+ mock.patch("airflow.models.variable.Variable.get",
side_effect=RuntimeError("boom")),
+ TestClient(app, raise_server_exceptions=False) as test_client,
+ ):
+ response = test_client.get("/variables/k",
headers={"Authorization": "Bearer fake"})
+
+ assert extract_spy.called
+ assert response.status_code == 500
+
+ def test_route_exception_not_masked_by_detach_error(self):
+ """A detach failure during cleanup must not replace the original route
exception."""
+ app = self._build_app("unsafe-always")
+
+ async def mock_require_auth(request: Request) -> TIToken:
+ ti_id = UUID(request.path_params.get("task_instance_id",
"00000000-0000-0000-0000-000000000000"))
+ return TIToken(id=ti_id, claims=TIClaims(scope="execution"))
+
+ app.dependency_overrides[require_auth] = mock_require_auth
+
+ real_extract = otel_propagate.extract
+ with (
+ mock.patch.object(otel_propagate, "extract", wraps=real_extract),
+ mock.patch.object(otel_context, "detach",
side_effect=ValueError("token from another context")),
+ mock.patch("airflow.models.variable.Variable.get",
side_effect=RuntimeError("boom")),
+ TestClient(app) as test_client,
+ ):
+ with pytest.raises(RuntimeError, match="boom"):
+ test_client.get("/variables/k", headers={"Authorization":
"Bearer fake"})