This is an automated email from the ASF dual-hosted git repository.

ashb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 88ec07321fa Propogate OTel trace headers from the client to Execution 
API server-side spans (#67904)
88ec07321fa is described below

commit 88ec07321fa307b7ded6e4e196b99995e792a75a
Author: Ash Berlin-Taylor <[email protected]>
AuthorDate: Tue Jun 2 21:47:15 2026 +0100

    Propogate OTel trace headers from the client to Execution API server-side 
spans (#67904)
    
    Trace propogation is a useful tool to let us trace execution across 
distrubted
    systems -- exactly what we have with the API server and the workers. We
    already propogated the trace context all the way from the task code in 
#66151,
    this continues it to any spans emitted on the API server side.
    
    The mode of trace propagation is set to "only-authenticated" by default to
    defend against data polution (i.e. it's not a security risk):
    
    - Sampling-flag manipulation: they set the sampled flag (`-01` trailer) on
      every request to force your tracing backend to record all their
      reconnaissance/probe traffic. If you pay per ingested span or have
      constrained trace storage, this has a real cost.
    - Trace ID pollution: attacker-controlled trace IDs appear in your backend.
      With 128-bit random IDs, collision with a legitimate trace is negligible,
      but it clutters dashboards.
---
 .../src/airflow/api_fastapi/execution_api/app.py   |  51 ++++++++-
 .../src/airflow/config_templates/config.yml        |  19 ++++
 .../unit/api_fastapi/execution_api/test_app.py     | 122 ++++++++++++++++++++-
 3 files changed, 187 insertions(+), 5 deletions(-)

diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/app.py 
b/airflow-core/src/airflow/api_fastapi/execution_api/app.py
index 8dbdf25f7e0..f10c772e03f 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/app.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/app.py
@@ -17,6 +17,7 @@
 
 from __future__ import annotations
 
+import contextlib
 import json
 import time
 from contextlib import AsyncExitStack
@@ -27,9 +28,12 @@ import attrs
 import svcs
 from cadwyn import (
     Cadwyn,
+    current_dependency_solver,
 )
-from fastapi import FastAPI, Request, Response
+from fastapi import Depends, FastAPI, Request, Response
 from fastapi.responses import JSONResponse
+from fastapi.routing import APIRoute
+from opentelemetry import context as otel_context, propagate as otel_propagate
 from starlette.middleware.base import BaseHTTPMiddleware
 
 from airflow.api_fastapi.auth.tokens import (
@@ -41,7 +45,6 @@ from airflow.api_fastapi.auth.tokens import (
 
 if TYPE_CHECKING:
     import httpx
-    from fastapi.routing import APIRoute
 
 import structlog
 from structlog.contextvars import bind_contextvars
@@ -236,10 +239,51 @@ class CadwynWithOpenAPICustomization(Cadwyn):
         return openapi_schema
 
 
+async def _extract_w3c_trace_context(
+    request: Request,
+    dependency_solver=Depends(current_dependency_solver),
+):
+    # Cadwyn solves dependencies twice (the real request, then again to 
migrate the
+    # request body). Only act in the real "fastapi" pass so we attach/detach 
exactly
+    # once, in the context the endpoint runs in.
+    if dependency_solver != "fastapi":
+        yield
+        return
+    ctx = otel_propagate.extract(request.headers)
+    token = otel_context.attach(ctx)
+    try:
+        yield
+    finally:
+        with contextlib.suppress(Exception):
+            otel_context.detach(token)
+
+
+def _inject_trace_context_dep(routes, mode: str) -> None:
+    dep = Depends(_extract_w3c_trace_context)
+    for route in routes:
+        if not isinstance(route, APIRoute):
+            continue
+        # Idempotent: create_task_execution_api_app() runs more than once per 
process
+        # (cached_app + InProcessExecutionAPI), and execution_api_router is 
shared
+        # module state, so strip any prior injection first.
+        route.dependencies[:] = [
+            d for d in route.dependencies if getattr(d, "dependency", None) is 
not _extract_w3c_trace_context
+        ]
+        match mode:
+            case "unsafe-always":
+                route.dependencies.insert(0, dep)
+            case "only-authenticated":
+                from airflow.api_fastapi.execution_api.security import 
require_auth
+
+                if any(getattr(d, "dependency", None) is require_auth for d in 
route.dependencies):
+                    route.dependencies.append(dep)
+
+
 def create_task_execution_api_app() -> FastAPI:
     """Create FastAPI app for task execution API."""
     from airflow.api_fastapi.execution_api.routes import execution_api_router
     from airflow.api_fastapi.execution_api.versions import bundle
+    from airflow.configuration import conf
 
     def custom_generate_unique_id(route: APIRoute):
         # This is called only if the route doesn't provide an explicit 
operation ID
@@ -260,6 +304,9 @@ def create_task_execution_api_app() -> FastAPI:
     app.add_middleware(CorrelationIdMiddleware)
     app.add_middleware(JWTReissueMiddleware)
 
+    mode = conf.get("execution_api", "otel_trace_propagation", 
fallback="only-authenticated")
+    _inject_trace_context_dep(execution_api_router.routes, mode)
+
     app.generate_and_include_versioned_routers(execution_api_router)
 
     # As we are mounted as a sub app, we don't get any logs for unhandled 
exceptions without this!
diff --git a/airflow-core/src/airflow/config_templates/config.yml 
b/airflow-core/src/airflow/config_templates/config.yml
index 1ab0caec6da..6f97eebbeb0 100644
--- a/airflow-core/src/airflow/config_templates/config.yml
+++ b/airflow-core/src/airflow/config_templates/config.yml
@@ -2208,6 +2208,25 @@ execution_api:
       type: integer
       example: ~
       default: "600"
+    otel_trace_propagation:
+      version_added: 3.3.0
+      description: |
+        Controls when W3C trace context (``traceparent``/``tracestate``) is 
extracted from
+        incoming Execution API requests and attached to the OpenTelemetry 
context.
+
+        ``only-authenticated`` (default): trace context is extracted as a 
FastAPI dependency
+        on authenticated routes, after JWT verification succeeds. 
Unauthenticated requests
+        (including probes and attack traffic) cannot inject trace context.
+
+        ``unsafe-always``: trace context is extracted on all routes (including 
unauthenticated ones
+        such as health checks), before JWT verification. Use this when you 
want trace context
+        propagated even on auth failures. If untrusted traffic can reach your 
API server this can
+        result in data pollution or load/cost in your OTel trace store.
+
+        ``never``: trace context is never extracted. Use this to disable the 
feature entirely.
+      default: "only-authenticated"
+      example: ~
+      type: string
     jwt_audience:
       version_added: 3.0.0
       description: |
diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/test_app.py 
b/airflow-core/tests/unit/api_fastapi/execution_api/test_app.py
index b0cb1d85c2e..b34a7e775c7 100644
--- a/airflow-core/tests/unit/api_fastapi/execution_api/test_app.py
+++ b/airflow-core/tests/unit/api_fastapi/execution_api/test_app.py
@@ -16,11 +16,24 @@
 # under the License.
 from __future__ import annotations
 
+from unittest import mock
+from uuid import UUID
+
 import pytest
+from fastapi import Request
+from fastapi.params import Security as SecurityParam
+from fastapi.routing import APIRoute
+from fastapi.testclient import TestClient
+from opentelemetry import context as otel_context, propagate as otel_propagate
 
+from airflow.api_fastapi.execution_api.app import create_task_execution_api_app
 from airflow.api_fastapi.execution_api.datamodels.taskinstance import 
TaskInstance
+from airflow.api_fastapi.execution_api.datamodels.token import TIClaims, 
TIToken
+from airflow.api_fastapi.execution_api.security import require_auth
 from airflow.api_fastapi.execution_api.versions import bundle
 
+from tests_common.test_utils.config import conf_vars
+
 pytestmark = pytest.mark.db_test
 
 
@@ -45,9 +58,6 @@ def test_access_api_contract(client):
 
 def test_ti_self_routes_have_task_instance_id_param(client):
     """Every route with ti:self scope must have a {task_instance_id} path 
parameter."""
-    from fastapi.params import Security as SecurityParam
-    from fastapi.routing import APIRoute
-
     app = client.app
 
     for route in app.routes:
@@ -111,3 +121,109 @@ class TestCorrelationIdMiddleware:
 
         # Verify they didn't interfere with each other
         assert correlation_id_1 != correlation_id_2
+
+
+class TestTraceContextPropagation:
+    """Exercise ``execution_api.otel_trace_propagation`` on the real Execution 
API app."""
+
+    @pytest.fixture(autouse=True)
+    def _restore_router_dependencies(self):
+        from airflow.api_fastapi.execution_api.routes import 
execution_api_router
+
+        snapshot = {
+            id(route): list(route.dependencies)
+            for route in execution_api_router.routes
+            if isinstance(route, APIRoute)
+        }
+        yield
+        for route in execution_api_router.routes:
+            if isinstance(route, APIRoute):
+                route.dependencies[:] = snapshot[id(route)]
+
+    @staticmethod
+    def _build_app(mode: str):
+        with conf_vars({("execution_api", "otel_trace_propagation"): mode}):
+            return create_task_execution_api_app()
+
+    @pytest.mark.parametrize(
+        ("mode", "path", "valid_auth", "expect_extract", "expect_status"),
+        [
+            pytest.param("unsafe-always", "/health", False, True, 200, 
id="always-unauthenticated"),
+            pytest.param("unsafe-always", "/variables/k", False, True, 401, 
id="always-auth-failure"),
+            pytest.param("unsafe-always", "/variables/k", True, True, None, 
id="always-authenticated"),
+            pytest.param("only-authenticated", "/health", False, False, 200, 
id="onlyauth-unauthenticated"),
+            pytest.param("only-authenticated", "/variables/k", False, False, 
401, id="onlyauth-auth-failure"),
+            pytest.param("only-authenticated", "/variables/k", True, True, 
None, id="onlyauth-authenticated"),
+            pytest.param("never", "/health", False, False, 200, 
id="never-unauthenticated"),
+            pytest.param("never", "/variables/k", False, False, 401, 
id="never-auth-failure"),
+            pytest.param("never", "/variables/k", True, False, None, 
id="never-authenticated"),
+        ],
+    )
+    def test_trace_context_extraction(self, mode, path, valid_auth, 
expect_extract, expect_status):
+        app = self._build_app(mode)
+
+        if valid_auth:
+
+            async def mock_require_auth(request: Request) -> TIToken:
+                ti_id = UUID(
+                    request.path_params.get("task_instance_id", 
"00000000-0000-0000-0000-000000000000")
+                )
+                return TIToken(id=ti_id, claims=TIClaims(scope="execution"))
+
+            app.dependency_overrides[require_auth] = mock_require_auth
+
+        headers = {"Authorization": "Bearer fake"} if valid_auth else {}
+        real_extract = otel_propagate.extract
+        with (
+            mock.patch.object(otel_propagate, "extract", wraps=real_extract) 
as spy,
+            TestClient(app) as test_client,
+        ):
+            response = test_client.get(path, headers=headers)
+
+        assert spy.called is expect_extract
+        if expect_status is not None:
+            assert response.status_code == expect_status
+
+    def test_trace_context_dep_cleans_up_on_route_exception(self):
+        """Verify extract and cleanup run correctly when a route handler 
raises."""
+        app = self._build_app("unsafe-always")
+
+        async def mock_require_auth(request: Request) -> TIToken:
+            ti_id = UUID(request.path_params.get("task_instance_id", 
"00000000-0000-0000-0000-000000000000"))
+            return TIToken(id=ti_id, claims=TIClaims(scope="execution"))
+
+        app.dependency_overrides[require_auth] = mock_require_auth
+
+        real_extract = otel_propagate.extract
+        # raise_server_exceptions=False lets the app's 
@exception_handler(Exception)
+        # return a 500 response rather than re-raising, matching production 
behaviour
+        # where AsyncExitStack unwinds the generator in the correct asyncio 
context.
+        with (
+            mock.patch.object(otel_propagate, "extract", wraps=real_extract) 
as extract_spy,
+            mock.patch("airflow.models.variable.Variable.get", 
side_effect=RuntimeError("boom")),
+            TestClient(app, raise_server_exceptions=False) as test_client,
+        ):
+            response = test_client.get("/variables/k", 
headers={"Authorization": "Bearer fake"})
+
+        assert extract_spy.called
+        assert response.status_code == 500
+
+    def test_route_exception_not_masked_by_detach_error(self):
+        """A detach failure during cleanup must not replace the original route 
exception."""
+        app = self._build_app("unsafe-always")
+
+        async def mock_require_auth(request: Request) -> TIToken:
+            ti_id = UUID(request.path_params.get("task_instance_id", 
"00000000-0000-0000-0000-000000000000"))
+            return TIToken(id=ti_id, claims=TIClaims(scope="execution"))
+
+        app.dependency_overrides[require_auth] = mock_require_auth
+
+        real_extract = otel_propagate.extract
+        with (
+            mock.patch.object(otel_propagate, "extract", wraps=real_extract),
+            mock.patch.object(otel_context, "detach", 
side_effect=ValueError("token from another context")),
+            mock.patch("airflow.models.variable.Variable.get", 
side_effect=RuntimeError("boom")),
+            TestClient(app) as test_client,
+        ):
+            with pytest.raises(RuntimeError, match="boom"):
+                test_client.get("/variables/k", headers={"Authorization": 
"Bearer fake"})

Reply via email to