This is an automated email from the ASF dual-hosted git repository.
ashb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new f481758f04c Propagate task OTel trace context through IPC and into
execution API requests (#66151)
f481758f04c is described below
commit f481758f04cc28d1a417c4671b8705cd6e5f83e8
Author: Ash Berlin-Taylor <[email protected]>
AuthorDate: Tue May 5 13:14:18 2026 +0100
Propagate task OTel trace context through IPC and into execution API
requests (#66151)
The supervisor makes HTTP calls (XCom pushes, RTIF writes, connection
lookups) on behalf of the task process via a Unix socket IPC channel.
Without explicit propagation, those calls either float under the
supervisor's own span or are unparented entirely — the task's trace
context never crosses the process boundary.
This commit wires the full chain:
IPC leg (task process → supervisor):
- Add traceparent: str | None = None field to _RequestFrame
- _make_frame() injects the task runner's active W3C traceparent into
every outgoing IPC frame via TraceContextTextMapPropagator.inject()
- handle_requests() extracts the traceparent and calls
otel_context.attach() before dispatching each request, restoring the
task's trace context in the supervisor process for that request's
lifetime
- TriggerCommsDecoder.asend() now calls _make_frame() instead of
constructing _RequestFrame directly, so trigger IPC frames carry
the active span's traceparent too
HTTP leg (supervisor → execution API):
- inject_trace_context event hook on the httpx Client propagates the
currently-active span's traceparent header on every outgoing request,
linking server-side spans to the correct task span
- _log_and_trace_retry records http.retry events on the active span
alongside the existing log warning
Dependency cleanup:
- Move opentelemetry-api>=1.27.0 from [otel] optional extras to base
[dependencies] in shared/observability — it flows unconditionally into
task-sdk via shared_distributions, the same way airflow-core already
has it unconditionally
- Replace try/except ImportError guards and _NoOpTracer fallbacks in
comms.py, supervisor.py, and client.py with direct imports; inject()
and get_current_span() are no-ops when no TracerProvider is configured,
so the guards were only testing "is OTel installed?" not "is it enabled?"
- Introduce _FrameMixin (plain Python mixin, not a msgspec.Struct) to
share _encoder and as_bytes() between _RequestFrame and _ResponseFrame
---
shared/observability/pyproject.toml | 3 +-
task-sdk/pyproject.toml | 3 ++
task-sdk/src/airflow/sdk/api/client.py | 30 ++++++++++++-
task-sdk/src/airflow/sdk/execution_time/comms.py | 50 ++++++++++++++--------
.../src/airflow/sdk/execution_time/supervisor.py | 17 ++++++++
.../task_sdk/execution_time/test_supervisor.py | 46 ++++++++++++++++++++
uv.lock | 13 +++---
7 files changed, 134 insertions(+), 28 deletions(-)
diff --git a/shared/observability/pyproject.toml
b/shared/observability/pyproject.toml
index 54b902157df..724ff21e750 100644
--- a/shared/observability/pyproject.toml
+++ b/shared/observability/pyproject.toml
@@ -24,15 +24,14 @@ classifiers = [
]
dependencies = [
+ "opentelemetry-api>=1.27.0",
"pendulum>=3.1.0",
- "pygtrie>=2.5.0",
"structlog>=25.4.0",
"methodtools>=0.4.7",
]
[project.optional-dependencies]
"otel" = [
- "opentelemetry-api>=1.27.0",
"opentelemetry-exporter-otlp>=1.27.0",
"opentelemetry-proto<9999,>=1.27.0",
]
diff --git a/task-sdk/pyproject.toml b/task-sdk/pyproject.toml
index a33748911af..6e4a0b1017d 100644
--- a/task-sdk/pyproject.toml
+++ b/task-sdk/pyproject.toml
@@ -87,6 +87,9 @@ dependencies = [
# Start of shared providers-discovery dependencies
"jsonschema>=4.19.1",
# End of shared providers-discovery dependencies
+ # Start of shared observability dependencies
+ "opentelemetry-api>=1.27.0",
+ # End of shared observability dependencies
]
[project.optional-dependencies]
diff --git a/task-sdk/src/airflow/sdk/api/client.py
b/task-sdk/src/airflow/sdk/api/client.py
index 54927794bf1..15513010062 100644
--- a/task-sdk/src/airflow/sdk/api/client.py
+++ b/task-sdk/src/airflow/sdk/api/client.py
@@ -29,6 +29,8 @@ import certifi
import httpx
import msgspec
import structlog
+from opentelemetry import trace
+from opentelemetry.trace.propagation.tracecontext import
TraceContextTextMapPropagator
from pydantic import BaseModel
from tenacity import (
before_log,
@@ -163,6 +165,9 @@ def getuser() -> str:
log = structlog.get_logger(logger_name=__name__)
+_trace_propagator = TraceContextTextMapPropagator()
+_log_retry_warning = before_log(log, logging.WARNING)
+
__all__ = [
"Client",
"ConnectionOperations",
@@ -206,6 +211,24 @@ def add_correlation_id(request: httpx.Request):
request.headers["correlation-id"] = str(uuid7())
+def inject_trace_context(request: httpx.Request) -> None:
+ _trace_propagator.inject(request.headers)
+
+
+def _log_and_trace_retry(retry_state) -> None:
+ _log_retry_warning(retry_state)
+ span = trace.get_current_span()
+ if span.is_recording():
+ exc = retry_state.outcome.exception() if retry_state.outcome else None
+ span.add_event(
+ "http.retry",
+ attributes={
+ "attempt_number": retry_state.attempt_number,
+ "error": str(exc) if exc else "",
+ },
+ )
+
+
class TaskInstanceOperations:
__slots__ = ("client",)
@@ -980,7 +1003,10 @@ class Client(httpx.Client):
"user-agent": f"apache-airflow-task-sdk/{__version__}
(Python/{pyver})",
"airflow-api-version": API_VERSION,
},
- event_hooks={"response": [self._update_auth, raise_on_4xx_5xx],
"request": [add_correlation_id]},
+ event_hooks={
+ "response": [self._update_auth, raise_on_4xx_5xx],
+ "request": [add_correlation_id, inject_trace_context],
+ },
**kwargs,
)
@@ -993,7 +1019,7 @@ class Client(httpx.Client):
retry=retry_if_exception(_should_retry_api_request),
stop=stop_after_attempt(API_RETRIES),
wait=wait_random_exponential(min=API_RETRY_WAIT_MIN,
max=API_RETRY_WAIT_MAX),
- before_sleep=before_log(log, logging.WARNING),
+ before_sleep=_log_and_trace_retry,
reraise=True,
)
def request(self, *args, **kwargs):
diff --git a/task-sdk/src/airflow/sdk/execution_time/comms.py
b/task-sdk/src/airflow/sdk/execution_time/comms.py
index 87c7881333a..1e11e9636e5 100644
--- a/task-sdk/src/airflow/sdk/execution_time/comms.py
+++ b/task-sdk/src/airflow/sdk/execution_time/comms.py
@@ -103,6 +103,10 @@ except ImportError:
# Available on Unix and Windows (so "everywhere") but lets be safe
recv_fds = None # type: ignore[assignment]
+from opentelemetry.trace.propagation.tracecontext import
TraceContextTextMapPropagator
+
+_trace_propagator = TraceContextTextMapPropagator()
+
if TYPE_CHECKING:
from structlog.typing import FilteringBoundLogger as Logger
@@ -133,23 +137,14 @@ def _new_encoder() -> msgspec.msgpack.Encoder:
return msgspec.msgpack.Encoder(enc_hook=_msgpack_enc_hook)
-class _RequestFrame(msgspec.Struct, array_like=True, frozen=True,
omit_defaults=True): # type: ignore[call-arg]
- id: int
- """
- The request id, set by the sender.
-
- This is used to allow "pipeling" of requests and to be able to tie
response to requests, which is
- particularly useful in the Triggerer where multiple async tasks can send a
requests concurrently.
- """
- body: dict[str, Any] | None
-
- req_encoder: ClassVar[msgspec.msgpack.Encoder] = _new_encoder()
+class _FrameMixin:
+ _encoder: ClassVar[msgspec.msgpack.Encoder] = _new_encoder()
def as_bytes(self) -> bytearray:
# https://jcristharif.com/msgspec/perf-tips.html#length-prefix-framing
for inspiration
buffer = bytearray(256)
- self.req_encoder.encode_into(self, buffer, 4)
+ self._encoder.encode_into(self, buffer, 4) # type: ignore[arg-type]
n = len(buffer) - 4
if n >= 2**32:
@@ -159,7 +154,25 @@ class _RequestFrame(msgspec.Struct, array_like=True,
frozen=True, omit_defaults=
return buffer
-class _ResponseFrame(_RequestFrame, frozen=True): # type: ignore[call-arg]
+class _RequestFrame(_FrameMixin, msgspec.Struct, array_like=True, frozen=True,
omit_defaults=True): # type: ignore[call-arg]
+ id: int
+ """
+ The request id, set by the sender.
+
+ This is used to allow "pipeling" of requests and to be able to tie
response to requests, which is
+ particularly useful in the Triggerer where multiple async tasks can send a
requests concurrently.
+ """
+ body: dict[str, Any] | None
+ context_carrier: dict[str, str] | None = None
+ """W3C trace context carrier (traceparent + tracestate) of the task
runner's active span.
+
+ The supervisor extracts this to restore the task runner's trace context
before making outbound HTTP
+ calls, so that server-side spans (e.g. POST /xcoms/…) appear as children
of the correct task span
+ rather than under the supervisor's own span.
+ """
+
+
+class _ResponseFrame(_FrameMixin, msgspec.Struct, array_like=True,
frozen=True, omit_defaults=True): # type: ignore[call-arg]
id: int
"""
The id of the request this is a response to
@@ -193,10 +206,14 @@ class CommsDecoder(Generic[ReceiveMsgType, SendMsgType]):
# Async lock for async operations
_async_lock: asyncio.Lock = attrs.field(factory=asyncio.Lock, repr=False)
+ def _make_frame(self, msg: SendMsgType) -> _RequestFrame:
+ carrier: dict[str, str] = {}
+ _trace_propagator.inject(carrier)
+ return _RequestFrame(id=next(self.id_counter), body=msg.model_dump(),
context_carrier=carrier or None)
+
def send(self, msg: SendMsgType) -> ReceiveMsgType | None:
"""Send a request to the parent and block until the response is
received."""
- frame = _RequestFrame(id=next(self.id_counter), body=msg.model_dump())
- frame_bytes = frame.as_bytes()
+ frame_bytes = self._make_frame(msg).as_bytes()
# We must make sure sockets aren't intermixed between sync and async
calls,
# thus we need a dual locking mechanism to ensure that.
@@ -224,8 +241,7 @@ class CommsDecoder(Generic[ReceiveMsgType, SendMsgType]):
Uses async lock for coroutine safety and thread lock for socket safety.
"""
- frame = _RequestFrame(id=next(self.id_counter), body=msg.model_dump())
- frame_bytes = frame.as_bytes()
+ frame_bytes = self._make_frame(msg).as_bytes()
async with self._async_lock:
# Acquire the threading lock without blocking the event loop
diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index 375c5a9e30b..cd25d927957 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -132,6 +132,11 @@ try:
except ImportError:
send_fds = None # type: ignore[assignment]
+from opentelemetry import context as otel_context, trace
+from opentelemetry.trace.propagation.tracecontext import
TraceContextTextMapPropagator
+
+_trace_propagator = TraceContextTextMapPropagator()
+
if TYPE_CHECKING:
from structlog.typing import FilteringBoundLogger, WrappedLogger
from typing_extensions import Self
@@ -738,7 +743,13 @@ class WatchedSubprocess:
log.exception("Unable to decode message", body=request.body)
continue
+ # Restore the task runner's trace context so that any outbound
HTTP calls made while
+ # handling this request are linked to the correct task span, not
the supervisor's own span.
+ token = None
try:
+ if request.context_carrier:
+ ctx = _trace_propagator.extract(request.context_carrier)
+ token = otel_context.attach(ctx)
self._handle_request(msg, log, request.id)
except ServerResponseError as e:
error_details = e.response.json() if e.response else None
@@ -762,6 +773,9 @@ class WatchedSubprocess:
),
request_id=request.id,
)
+ finally:
+ if token is not None:
+ otel_context.detach(token)
def _handle_request(self, msg, log: FilteringBoundLogger, req_id: int) ->
None:
raise NotImplementedError()
@@ -2246,6 +2260,9 @@ def supervise_task(
finally:
if log_path and log_file_descriptor:
log_file_descriptor.close()
+ provider = trace.get_tracer_provider()
+ if hasattr(provider, "force_flush"):
+ provider.force_flush(timeout_millis=5000) # upper bound, not
a fixed wait
def supervise(**kwargs) -> int:
diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
index 3695af1fff5..f0d8e1a0b65 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
@@ -44,6 +44,10 @@ import msgspec
import psutil
import pytest
import structlog
+from opentelemetry.sdk.trace import TracerProvider
+from opentelemetry.sdk.trace.export import SimpleSpanProcessor
+from opentelemetry.sdk.trace.export.in_memory_span_exporter import
InMemorySpanExporter
+from opentelemetry.trace import get_current_span
from pytest_unordered import unordered
from task_sdk import FAKE_BUNDLE, make_client
from uuid6 import uuid7
@@ -3515,3 +3519,45 @@ class TestChildExecMain:
os.close(saved_2)
for s in [req_a, req_b, out_a, out_b, err_a, err_b]:
s.close()
+
+
+def test_ipc_trace_context_propagation(mocker):
+ """Full IPC propagation chain: _make_frame injects the active span;
handle_requests restores it."""
+ provider = TracerProvider()
+ provider.add_span_processor(SimpleSpanProcessor(InMemorySpanExporter()))
+ tracer = provider.get_tracer("test")
+
+ # Task-runner side: _make_frame injects the active span via the real
propagator.
+ with tracer.start_as_current_span("task_span") as span:
+ frame = CommsDecoder(socket=None)._make_frame(GetVariable(key="k")) #
type: ignore[arg-type]
+ expected_span_id = span.get_span_context().span_id
+
+ assert frame.context_carrier is not None
+ assert f"{expected_span_id:016x}" in
frame.context_carrier.get("traceparent", "")
+
+ # Supervisor side: handle_requests extracts and restores the context
before dispatch.
+ # Capture the active span from inside the client call — exercises the full
dispatch path.
+ _, write_end = socket.socketpair()
+ proc = ActivitySubprocess(
+ process_log=mocker.MagicMock(),
+ id=TI_ID,
+ pid=12345,
+ stdin=write_end,
+ client=mocker.Mock(),
+ process=mocker.Mock(),
+ )
+ captured: list[int] = []
+
+ def capture_on_get(key):
+ captured.append(get_current_span().get_span_context().span_id)
+ return VariableResult(key=key, value="v")
+
+ proc.client.variables.get.side_effect = capture_on_get
+
+ generator = proc.handle_requests(log=mocker.Mock())
+ next(generator)
+ generator.send(frame)
+
+ assert captured == [expected_span_id]
+ # Context is detached after dispatch — no leak.
+ assert get_current_span().get_span_context().span_id != expected_span_id
diff --git a/uv.lock b/uv.lock
index b31aab16b3b..8a0ba2fa354 100644
--- a/uv.lock
+++ b/uv.lock
@@ -8161,15 +8161,14 @@ version = "0.0"
source = { editable = "shared/observability" }
dependencies = [
{ name = "methodtools" },
+ { name = "opentelemetry-api" },
{ name = "pendulum" },
- { name = "pygtrie" },
{ name = "structlog" },
]
[package.optional-dependencies]
all = [
{ name = "datadog" },
- { name = "opentelemetry-api" },
{ name = "opentelemetry-exporter-otlp" },
{ name = "opentelemetry-proto" },
{ name = "statsd" },
@@ -8178,7 +8177,6 @@ datadog = [
{ name = "datadog" },
]
otel = [
- { name = "opentelemetry-api" },
{ name = "opentelemetry-exporter-otlp" },
{ name = "opentelemetry-proto" },
]
@@ -8200,11 +8198,10 @@ requires-dist = [
{ name = "apache-airflow-shared-observability", extras = ["otel",
"statsd", "datadog"], marker = "extra == 'all'", editable =
"shared/observability" },
{ name = "datadog", marker = "extra == 'datadog'", specifier = ">=0.50.0"
},
{ name = "methodtools", specifier = ">=0.4.7" },
- { name = "opentelemetry-api", marker = "extra == 'otel'", specifier =
">=1.27.0" },
+ { name = "opentelemetry-api", specifier = ">=1.27.0" },
{ name = "opentelemetry-exporter-otlp", marker = "extra == 'otel'",
specifier = ">=1.27.0" },
{ name = "opentelemetry-proto", marker = "extra == 'otel'", specifier =
">=1.27.0,<9999" },
{ name = "pendulum", specifier = ">=3.1.0" },
- { name = "pygtrie", specifier = ">=2.5.0" },
{ name = "statsd", marker = "extra == 'statsd'", specifier = ">=3.3.0" },
{ name = "structlog", specifier = ">=25.4.0" },
]
@@ -8446,6 +8443,7 @@ dependencies = [
{ name = "jsonschema" },
{ name = "methodtools" },
{ name = "msgspec" },
+ { name = "opentelemetry-api" },
{ name = "packaging" },
{ name = "pathspec" },
{ name = "pendulum" },
@@ -8521,6 +8519,7 @@ requires-dist = [
{ name = "jsonschema", specifier = ">=4.19.1" },
{ name = "methodtools", specifier = ">=0.4.7" },
{ name = "msgspec", specifier = ">=0.19.0" },
+ { name = "opentelemetry-api", specifier = ">=1.27.0" },
{ name = "opentelemetry-api", marker = "extra == 'all'", specifier =
">=1.27.0" },
{ name = "opentelemetry-api", marker = "extra == 'otel'", specifier =
">=1.27.0" },
{ name = "opentelemetry-exporter-otlp", marker = "extra == 'all'",
specifier = ">=1.27.0" },
@@ -20298,8 +20297,8 @@ name = "secretstorage"
version = "3.5.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
- { name = "cryptography", marker = "python_full_version >= '3.14' or
platform_machine != 'arm64' or sys_platform != 'darwin'" },
- { name = "jeepney", marker = "python_full_version >= '3.14' or
platform_machine != 'arm64' or sys_platform != 'darwin'" },
+ { name = "cryptography", marker = "(python_full_version >= '3.14' and
sys_platform == 'darwin') or (python_full_version < '3.15' and sys_platform ==
'emscripten') or (python_full_version < '3.15' and sys_platform == 'win32') or
(platform_machine != 'arm64' and sys_platform == 'darwin') or (sys_platform !=
'darwin' and sys_platform != 'emscripten' and sys_platform != 'win32')" },
+ { name = "jeepney", marker = "(python_full_version >= '3.14' and
sys_platform == 'darwin') or (python_full_version < '3.15' and sys_platform ==
'emscripten') or (python_full_version < '3.15' and sys_platform == 'win32') or
(platform_machine != 'arm64' and sys_platform == 'darwin') or (sys_platform !=
'darwin' and sys_platform != 'emscripten' and sys_platform != 'win32')" },
]
sdist = { url =
"https://files.pythonhosted.org/packages/1c/03/e834bcd866f2f8a49a85eaff47340affa3bfa391ee9912a952a1faa68c7b/secretstorage-3.5.0.tar.gz",
hash =
"sha256:f04b8e4689cbce351744d5537bf6b1329c6fc68f91fa666f60a380edddcd11be", size
= 19884, upload-time = "2025-11-23T19:02:53.191Z" }
wheels = [