This is an automated email from the ASF dual-hosted git repository.
dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 2e9cbc25ebb Add configurable task span detail level for OTel tracing
(#63568)
2e9cbc25ebb is described below
commit 2e9cbc25ebbe317f7c6ccb6d92f1b7462335ce29
Author: Daniel Standish <[email protected]>
AuthorDate: Thu May 21 18:55:40 2026 -0700
Add configurable task span detail level for OTel tracing (#63568)
Adds an opt-in detail level for OpenTelemetry task spans so users can
get a finer-grained breakdown of where time is spent inside a task
without paying the cost on every run.
Set the level on a dag run via `dag_run.conf`:
{"airflow/task_span_detail_level": 2}
At level >= 2, the task SDK emits child spans around the key task
runner phases — parse, startup, _prepare, _execute_task,
get_template_context, render_templates, _serialize_rendered_fields,
_validate_task_inlets_and_outlets, finalize, etc. — using a new
`@detail_span` decorator/context manager. At the default level (1)
behavior is unchanged and no extra spans are produced.
The level is stamped onto the W3C `tracestate` of the dag run's
context carrier (key `airflow/task_span_detail_level`) so it is
propagated from the scheduler to workers without a separate
config channel; the SDK reads it from the current span's trace
state at runtime.
Also:
- Records exceptions and ERROR status on the top-level task span in
task_runner.main() (KeyboardInterrupt, top-level errors,
AirflowRescheduleException).
- Wraps `_emit_task_span` and `_emit_dagrun_span` in try/except so
tracing failures cannot fail a task or dag run.
- Adjusts the otel integration test to pick the correct Jaeger
trace by run_id.
---
.../execution_api/routes/task_instances.py | 5 +-
airflow-core/src/airflow/models/dagrun.py | 25 ++++-
airflow-core/src/airflow/models/taskinstance.py | 10 +-
airflow-core/tests/integration/otel/test_otel.py | 78 ++++++++++++---
airflow-core/tests/unit/models/test_dagrun.py | 20 ++++
.../tests/unit/models/test_taskinstance.py | 30 +++++-
scripts/ci/docker-compose/integration-otel.yml | 2 +-
.../observability/traces/__init__.py | 35 ++++++-
.../tests/observability/test_traces.py | 104 ++++++++++++++++++++
.../src/airflow/sdk/execution_time/task_runner.py | 66 ++++++++++++-
.../task_sdk/execution_time/test_task_runner.py | 107 ++++++++++++++++++++-
11 files changed, 448 insertions(+), 34 deletions(-)
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
index 577b2200e4c..18b5842f77b 100644
---
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
+++
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
@@ -585,7 +585,10 @@ def _create_ti_state_update_query_and_update_state(
ti_patch_payload.outlet_events,
session,
)
- _emit_task_span(ti, state=updated_state)
+ try:
+ _emit_task_span(ti, state=updated_state)
+ except Exception:
+ log.warning("Failed to emit task span", exc_info=True)
elif isinstance(ti_patch_payload, TIDeferredStatePayload):
# Calculate timeout if it was passed
timeout = None
diff --git a/airflow-core/src/airflow/models/dagrun.py
b/airflow-core/src/airflow/models/dagrun.py
index ca2b68f34be..36ed309feb0 100644
--- a/airflow-core/src/airflow/models/dagrun.py
+++ b/airflow-core/src/airflow/models/dagrun.py
@@ -28,7 +28,8 @@ from typing import TYPE_CHECKING, Any, NamedTuple, TypeVar,
cast, overload
from uuid import UUID
import structlog
-from opentelemetry import context, trace
+from opentelemetry import trace
+from opentelemetry.context import context
from opentelemetry.trace import StatusCode
from opentelemetry.trace.propagation.tracecontext import
TraceContextTextMapPropagator
from sqlalchemy import (
@@ -62,7 +63,11 @@ from sqlalchemy.sql.expression import false, select
from sqlalchemy.sql.functions import coalesce
from airflow._shared.observability.metrics import stats
-from airflow._shared.observability.traces import new_dagrun_trace_carrier,
override_ids
+from airflow._shared.observability.traces import (
+ TASK_SPAN_DETAIL_LEVEL_KEY,
+ new_dagrun_trace_carrier,
+ override_ids,
+)
from airflow._shared.timezones import timezone
from airflow.callbacks.callback_requests import DagCallbackRequest,
DagRunContext
from airflow.configuration import conf as airflow_conf
@@ -376,7 +381,9 @@ class DagRun(Base, LoggingMixin):
self.triggered_by = triggered_by
self.triggering_user_name = triggering_user_name
self.scheduled_by_job_id = None
- self.context_carrier: dict[str, str] = new_dagrun_trace_carrier()
+ self.context_carrier: dict[str, str] = new_dagrun_trace_carrier(
+ task_span_detail_level=self.conf.get(TASK_SPAN_DETAIL_LEVEL_KEY,
None)
+ )
if not isinstance(partition_key, str | None):
raise ValueError(
@@ -1074,6 +1081,13 @@ class DagRun(Base, LoggingMixin):
attributes["airflow.dag_run.logical_date"] =
str(self.logical_date)
if self.partition_key:
attributes["airflow.dag_run.partition_key"] =
str(self.partition_key)
+ # TODO: make the empty parent context optional. Default should be
to
+ # nest the dag run span under the currently active parent span (by
+ # omitting `context` here); only use the empty `context.Context()`
to
+ # force a root span when Airflow itself initiates the run (e.g. dag
+ # triggered via API, scheduler, or backfill). Today this forces a
+ # root span unconditionally.
+ # Tracked at https://github.com/apache/airflow/issues/67210
span = tracer.start_span(
name=f"dag_run.{self.dag_id}",
start_time=int((self.queued_at or self.start_date or
timezone.utcnow()).timestamp() * 1e9),
@@ -1267,7 +1281,10 @@ class DagRun(Base, LoggingMixin):
self.data_interval_end,
)
session.flush()
- self._emit_dagrun_span(state=self.state)
+ try:
+ self._emit_dagrun_span(state=self.state)
+ except Exception:
+ self.log.warning("Failed to emit dag run span", exc_info=True)
self._emit_true_scheduling_delay_stats_for_finished_state(finished_tis)
self._emit_duration_stats_for_finished_state()
diff --git a/airflow-core/src/airflow/models/taskinstance.py
b/airflow-core/src/airflow/models/taskinstance.py
index ea8656237fd..9cc8d6a6271 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -69,7 +69,11 @@ from sqlalchemy.orm.attributes import NO_VALUE,
set_committed_value
from airflow import settings
from airflow._shared.observability.metrics import stats
-from airflow._shared.observability.traces import new_dagrun_trace_carrier,
new_task_run_carrier
+from airflow._shared.observability.traces import (
+ TASK_SPAN_DETAIL_LEVEL_KEY,
+ new_dagrun_trace_carrier,
+ new_task_run_carrier,
+)
from airflow._shared.timezones import timezone
from airflow.assets.manager import asset_manager
from airflow.configuration import conf
@@ -422,7 +426,9 @@ def clear_task_instances(
# Always update clear_number and queued_at when clearing tasks,
regardless of state
dr.clear_number += 1
dr.queued_at = timezone.utcnow()
- dr.context_carrier = new_dagrun_trace_carrier()
+ dr.context_carrier = new_dagrun_trace_carrier(
+ task_span_detail_level=dr.conf.get(TASK_SPAN_DETAIL_LEVEL_KEY)
if dr.conf else None
+ )
_recalculate_dagrun_queued_at_deadlines(dr, dr.queued_at, session)
diff --git a/airflow-core/tests/integration/otel/test_otel.py
b/airflow-core/tests/integration/otel/test_otel.py
index 4f46b37e851..05c7ea5638e 100644
--- a/airflow-core/tests/integration/otel/test_otel.py
+++ b/airflow-core/tests/integration/otel/test_otel.py
@@ -86,7 +86,7 @@ def wait_for_otel_collector(host: str, port: int, timeout:
int = 120) -> None:
)
-def unpause_trigger_dag_and_get_run_id(dag_id: str) -> str:
+def unpause_trigger_dag_and_get_run_id(dag_id: str, conf: dict | None = None)
-> str:
unpause_command = ["airflow", "dags", "unpause", dag_id]
# Unpause the dag using the cli.
@@ -106,6 +106,11 @@ def unpause_trigger_dag_and_get_run_id(dag_id: str) -> str:
execution_date.isoformat(),
]
+ if conf:
+ import json
+
+ trigger_command += ["--conf", json.dumps(conf)]
+
# Trigger the dag using the cli.
subprocess.run(trigger_command, check=True, env=os.environ.copy())
@@ -438,7 +443,46 @@ class TestOtelIntegration:
assert set(metrics_to_check).issubset(metrics_dict.keys())
@pytest.mark.execution_timeout(90)
- def test_dag_execution_succeeds(self, capfd):
+ @pytest.mark.parametrize(
+ ("task_span_detail_level", "expected_hierarchy"),
+ [
+ pytest.param(
+ None,
+ {
+ "dag_run.otel_test_dag": None,
+ "sub_span1": "worker.task1",
+ "task_run.task1": "dag_run.otel_test_dag",
+ "worker.task1": "task_run.task1",
+ },
+ id="default_spans",
+ ),
+ pytest.param(
+ 2,
+ # Additional detail spans are deferred to follow-up PRs;
tracked
+ # at https://linear.app/astronomer/issue/ACD-157.
+ {
+ "hook.on_starting": "startup",
+ "_verify_bundle_access": "parse",
+ "parse": "startup",
+ "get_template_context": "startup",
+ "startup": "worker.task1",
+ "render_templates": "_prepare",
+ "_serialize_rendered_fields": "_prepare",
+ "_validate_task_inlets_and_outlets": "_prepare",
+ "_prepare": "run",
+ "_execute_task": "run",
+ "finalize": "worker.task1",
+ "run": "worker.task1",
+ "sub_span1": "_execute_task",
+ "dag_run.otel_test_dag": None,
+ "task_run.task1": "dag_run.otel_test_dag",
+ "worker.task1": "task_run.task1",
+ },
+ id="detail_spans",
+ ),
+ ],
+ )
+ def test_dag_execution_succeeds(self, capfd, task_span_detail_level,
expected_hierarchy):
"""The same scheduler will start and finish the dag processing."""
scheduler_process = None
apiserver_process = None
@@ -454,7 +498,13 @@ class TestOtelIntegration:
assert dag is not None
- run_id = unpause_trigger_dag_and_get_run_id(dag_id=dag_id)
+ conf = None
+ if task_span_detail_level is not None:
+ from airflow_shared.observability.traces import
TASK_SPAN_DETAIL_LEVEL_KEY
+
+ conf = {TASK_SPAN_DETAIL_LEVEL_KEY: task_span_detail_level}
+
+ run_id = unpause_trigger_dag_and_get_run_id(dag_id=dag_id,
conf=conf)
# Skip the span_status check.
wait_for_dag_run(dag_id=dag_id, run_id=run_id, max_wait_time=90)
@@ -492,8 +542,19 @@ class TestOtelIntegration:
service_name = os.environ.get("OTEL_SERVICE_NAME", "test")
r =
requests.get(f"http://{host}:16686/api/traces?service={service_name}")
data = r.json()
-
- trace = data["data"][-1]
+ # Find the trace for *this* dag run; selecting by position in the
+ # response is flaky because earlier-test traces accumulate in Jaeger.
+ matching = [
+ t
+ for t in data["data"]
+ if any(
+ tag.get("key") == "airflow.dag_run.run_id" and
tag.get("value") == run_id
+ for span in t["spans"]
+ for tag in span.get("tags", [])
+ )
+ ]
+ assert len(matching) == 1, f"expected exactly one trace for
run_id={run_id}, got {len(matching)}"
+ trace = matching[0]
spans = trace["spans"]
def get_span_hierarchy():
@@ -509,12 +570,7 @@ class TestOtelIntegration:
return nested
nested = get_span_hierarchy()
- assert nested == {
- "dag_run.otel_test_dag": None,
- "sub_span1": "worker.task1",
- "task_run.task1": "dag_run.otel_test_dag",
- "worker.task1": "task_run.task1",
- }
+ assert nested == expected_hierarchy
def start_scheduler(self, capture_output: bool = False):
stdout = None if capture_output else subprocess.DEVNULL
diff --git a/airflow-core/tests/unit/models/test_dagrun.py
b/airflow-core/tests/unit/models/test_dagrun.py
index a9f9dfb136c..5f555ba69d0 100644
--- a/airflow-core/tests/unit/models/test_dagrun.py
+++ b/airflow-core/tests/unit/models/test_dagrun.py
@@ -3751,3 +3751,23 @@ class TestDagRunTracing:
assert spans[0].name == f"dag_run.{dr.dag_id}"
else:
assert len(spans) == 0
+
+ @pytest.mark.db_test
+ def test_context_carrier_includes_detail_level_from_conf(self, dag_maker):
+ """DagRun created with TASK_SPAN_DETAIL_LEVEL_KEY in conf should
encode the level in trace state."""
+ from opentelemetry.trace.propagation.tracecontext import
TraceContextTextMapPropagator
+
+ from airflow._shared.observability.traces import (
+ TASK_SPAN_DETAIL_LEVEL_KEY,
+ get_task_span_detail_level,
+ )
+
+ with dag_maker("test_tracing_detail_level"):
+ EmptyOperator(task_id="t1")
+ dr = dag_maker.create_dagrun(conf={TASK_SPAN_DETAIL_LEVEL_KEY: 2})
+
+ ctx = TraceContextTextMapPropagator().extract(dr.context_carrier)
+ from opentelemetry import trace
+
+ span = trace.get_current_span(ctx)
+ assert get_task_span_detail_level(span) == 2
diff --git a/airflow-core/tests/unit/models/test_taskinstance.py
b/airflow-core/tests/unit/models/test_taskinstance.py
index bea68206183..3b4dceb628d 100644
--- a/airflow-core/tests/unit/models/test_taskinstance.py
+++ b/airflow-core/tests/unit/models/test_taskinstance.py
@@ -30,7 +30,7 @@ import pendulum
import pytest
import time_machine
import uuid6
-from opentelemetry import trace as otel_trace
+from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.trace.propagation.tracecontext import
TraceContextTextMapPropagator
from sqlalchemy import delete, func, select
@@ -3802,12 +3802,10 @@ class TestMakeTaskCarrier:
propagator = TraceContextTextMapPropagator()
parent_trace_id = (
-
otel_trace.get_current_span(context=propagator.extract(parent_carrier))
- .get_span_context()
- .trace_id
+
trace.get_current_span(context=propagator.extract(parent_carrier)).get_span_context().trace_id
)
child_trace_id = (
-
otel_trace.get_current_span(context=propagator.extract(child_carrier)).get_span_context().trace_id
+
trace.get_current_span(context=propagator.extract(child_carrier)).get_span_context().trace_id
)
assert child_trace_id == parent_trace_id
assert child_trace_id != 0
@@ -3876,3 +3874,25 @@ def
test_clear_task_instances_resets_context_carrier(dag_maker, session):
assert ti.context_carrier["traceparent"] != original_ti_traceparent
assert dag_run.context_carrier["traceparent"] != original_dr_traceparent
+
+
[email protected]_test
+def test_clear_task_instances_preserves_detail_level(dag_maker, session):
+ """clear_task_instances should produce a new context_carrier that keeps
the detail level from dag run conf."""
+ from airflow._shared.observability.traces import (
+ TASK_SPAN_DETAIL_LEVEL_KEY,
+ get_task_span_detail_level,
+ )
+
+ with dag_maker("test_clear_preserves_level"):
+ EmptyOperator(task_id="t1")
+ dag_run = dag_maker.create_dagrun(conf={TASK_SPAN_DETAIL_LEVEL_KEY: 2})
+ ti = dag_run.get_task_instance("t1", session=session)
+ ti.state = TaskInstanceState.SUCCESS
+ session.flush()
+
+ clear_task_instances([ti], session)
+
+ new_ctx = TraceContextTextMapPropagator().extract(dag_run.context_carrier)
+ span = trace.get_current_span(new_ctx)
+ assert get_task_span_detail_level(span) == 2
diff --git a/scripts/ci/docker-compose/integration-otel.yml
b/scripts/ci/docker-compose/integration-otel.yml
index 3c1b83f4480..8b98699fccb 100644
--- a/scripts/ci/docker-compose/integration-otel.yml
+++ b/scripts/ci/docker-compose/integration-otel.yml
@@ -81,7 +81,7 @@ services:
- OTEL_METRIC_EXPORT_INTERVAL=30000
- AIRFLOW__METRICS__OTEL_ON=True
- AIRFLOW__TRACES__OTEL_ON=True
- - AIRFLOW__TRACES__OTEL_TASK_LOG_EVENT=True
+ - AIRFLOW__TRACES__OTEL_TASK_LOG_EVENT=True # todo: deprecate
depends_on:
- otel-collector
diff --git
a/shared/observability/src/airflow_shared/observability/traces/__init__.py
b/shared/observability/src/airflow_shared/observability/traces/__init__.py
index dc3532262d1..8fd4e127b77 100644
--- a/shared/observability/src/airflow_shared/observability/traces/__init__.py
+++ b/shared/observability/src/airflow_shared/observability/traces/__init__.py
@@ -28,7 +28,7 @@ from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, SpanExporter
from opentelemetry.sdk.trace.id_generator import RandomIdGenerator
-from opentelemetry.trace import NonRecordingSpan, SpanContext, TraceFlags
+from opentelemetry.trace import NonRecordingSpan, Span, SpanContext,
TraceFlags, TraceState
from opentelemetry.trace.propagation.tracecontext import
TraceContextTextMapPropagator
if TYPE_CHECKING:
@@ -56,14 +56,20 @@ class OverrideableRandomIdGenerator(RandomIdGenerator):
return super().generate_trace_id()
-def new_dagrun_trace_carrier() -> dict[str, str]:
+TASK_SPAN_DETAIL_LEVEL_KEY = "airflow/task_span_detail_level"
+DEFAULT_TASK_SPAN_DETAIL_LEVEL = 1
+
+
+def new_dagrun_trace_carrier(task_span_detail_level=None) -> dict[str, str]:
"""Generate a fresh W3C traceparent carrier without creating a recordable
span."""
gen = RandomIdGenerator()
+ trace_state_entries = build_trace_state_entries(task_span_detail_level)
span_ctx = SpanContext(
trace_id=gen.generate_trace_id(),
span_id=gen.generate_span_id(),
is_remote=False,
trace_flags=TraceFlags(TraceFlags.SAMPLED),
+ trace_state=TraceState(entries=trace_state_entries),
)
ctx = trace.set_span_in_context(NonRecordingSpan(span_ctx))
carrier: dict[str, str] = {}
@@ -82,6 +88,31 @@ def new_task_run_carrier(dag_run_context_carrier):
return carrier
+def build_trace_state_entries(task_span_detail_level) -> list[tuple[str, str]]:
+ trace_state_entries = []
+ if task_span_detail_level is not None:
+ try:
+ level = int(task_span_detail_level)
+ except (TypeError, ValueError):
+ level = None
+ if level:
+ trace_state_entries.append((TASK_SPAN_DETAIL_LEVEL_KEY,
str(level)))
+ return trace_state_entries
+
+
+def get_task_span_detail_level(span: Span):
+ span_ctx = span.get_span_context()
+ trace_state = span_ctx.trace_state
+ raw = trace_state.get(TASK_SPAN_DETAIL_LEVEL_KEY)
+ if raw is None:
+ return DEFAULT_TASK_SPAN_DETAIL_LEVEL
+ try:
+ return int(raw)
+ except (TypeError, ValueError):
+ log.warning("%s config in dag run conf must be integer.",
TASK_SPAN_DETAIL_LEVEL_KEY)
+ return DEFAULT_TASK_SPAN_DETAIL_LEVEL
+
+
@contextmanager
def override_ids(trace_id, span_id, ctx=None):
ctx = context.set_value(OVERRIDE_TRACE_ID_KEY, trace_id, context=ctx)
diff --git a/shared/observability/tests/observability/test_traces.py
b/shared/observability/tests/observability/test_traces.py
new file mode 100644
index 00000000000..b21cc3c8761
--- /dev/null
+++ b/shared/observability/tests/observability/test_traces.py
@@ -0,0 +1,104 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from opentelemetry.trace import NonRecordingSpan, SpanContext, TraceFlags,
TraceState
+from opentelemetry.trace.propagation.tracecontext import
TraceContextTextMapPropagator
+
+from airflow_shared.observability.traces import (
+ DEFAULT_TASK_SPAN_DETAIL_LEVEL,
+ TASK_SPAN_DETAIL_LEVEL_KEY,
+ build_trace_state_entries,
+ get_task_span_detail_level,
+ new_dagrun_trace_carrier,
+)
+
+
+class TestBuildTraceStateEntries:
+ def test_with_integer_level(self):
+ entries = build_trace_state_entries(2)
+ assert entries == [(TASK_SPAN_DETAIL_LEVEL_KEY, "2")]
+
+ def test_with_string_level(self):
+ entries = build_trace_state_entries("3")
+ assert entries == [(TASK_SPAN_DETAIL_LEVEL_KEY, "3")]
+
+ def test_with_none(self):
+ assert build_trace_state_entries(None) == []
+
+ def test_with_zero(self):
+ # 0 is falsy — treated as no detail level
+ assert build_trace_state_entries(0) == []
+
+ def test_with_invalid_string(self):
+ # Non-integer string should not raise; returns empty
+ assert build_trace_state_entries("not-a-number") == []
+
+
+class TestNewDagrunTraceCarrier:
+ def test_with_detail_level_embeds_level_in_trace_state(self):
+ carrier = new_dagrun_trace_carrier(task_span_detail_level=2)
+ ctx = TraceContextTextMapPropagator().extract(carrier)
+ from opentelemetry import trace
+
+ span_ctx = trace.get_current_span(ctx).get_span_context()
+ assert span_ctx.trace_state.get(TASK_SPAN_DETAIL_LEVEL_KEY) == "2"
+
+ def test_without_detail_level_has_empty_trace_state(self):
+ carrier = new_dagrun_trace_carrier()
+ ctx = TraceContextTextMapPropagator().extract(carrier)
+ from opentelemetry import trace
+
+ span_ctx = trace.get_current_span(ctx).get_span_context()
+ assert span_ctx.trace_state.get(TASK_SPAN_DETAIL_LEVEL_KEY) is None
+
+
+class TestGetTaskSpanDetailLevel:
+ def _make_span_with_trace_state(self, entries: list[tuple[str, str]]) ->
NonRecordingSpan:
+ from opentelemetry.sdk.trace.id_generator import RandomIdGenerator
+
+ gen = RandomIdGenerator()
+ span_ctx = SpanContext(
+ trace_id=gen.generate_trace_id(),
+ span_id=gen.generate_span_id(),
+ is_remote=False,
+ trace_flags=TraceFlags(TraceFlags.SAMPLED),
+ trace_state=TraceState(entries=entries),
+ )
+ return NonRecordingSpan(span_ctx)
+
+ def test_returns_default_when_no_trace_state(self):
+ span = self._make_span_with_trace_state([])
+ assert get_task_span_detail_level(span) ==
DEFAULT_TASK_SPAN_DETAIL_LEVEL
+
+ def test_reads_level_from_trace_state(self):
+ span = self._make_span_with_trace_state([(TASK_SPAN_DETAIL_LEVEL_KEY,
"2")])
+ assert get_task_span_detail_level(span) == 2
+
+ def test_fallback_on_invalid_value(self):
+ span = self._make_span_with_trace_state([(TASK_SPAN_DETAIL_LEVEL_KEY,
"bad")])
+ assert get_task_span_detail_level(span) ==
DEFAULT_TASK_SPAN_DETAIL_LEVEL
+
+ def test_roundtrip_via_carrier(self):
+ """Level set in new_dagrun_trace_carrier is readable by
get_task_span_detail_level."""
+ carrier = new_dagrun_trace_carrier(task_span_detail_level=3)
+ ctx = TraceContextTextMapPropagator().extract(carrier)
+ from opentelemetry import trace
+
+ span = trace.get_current_span(ctx)
+ assert get_task_span_detail_level(span) == 3
diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
index 10977fb011b..313309fa401 100644
--- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -21,6 +21,7 @@ from __future__ import annotations
import contextvars
import functools
+import inspect
import os
import sys
import time
@@ -36,6 +37,7 @@ import attrs
import lazy_object_proxy
import structlog
from opentelemetry import trace
+from opentelemetry.trace import INVALID_SPAN, Status, StatusCode
from opentelemetry.trace.propagation.tracecontext import
TraceContextTextMapPropagator
from pydantic import AwareDatetime, ConfigDict, Field, JsonValue, TypeAdapter
from structlog.contextvars import bind_contextvars
@@ -43,6 +45,7 @@ from structlog.contextvars import bind_contextvars
from airflow.dag_processing.bundles.base import BaseDagBundle,
BundleVersionLock
from airflow.dag_processing.bundles.manager import DagBundlesManager
from airflow.sdk._shared.observability.metrics import stats
+from airflow.sdk._shared.observability.traces import get_task_span_detail_level
from airflow.sdk._shared.template_rendering import truncate_rendered_value
from airflow.sdk.api.client import get_hostname, getuser
from airflow.sdk.api.datamodels._generated import (
@@ -153,6 +156,38 @@ log = structlog.get_logger("task")
tracer = trace.get_tracer(__name__)
+class detail_span:
+ """Context manager and decorator that creates a child span when detail
level > 1."""
+
+ def __init__(self, *args, **kwargs):
+ self._args = args
+ self._kwargs = kwargs
+ self._ctx = None
+
+ def _make_ctx(self):
+ parent_span = trace.get_current_span()
+ config_level = get_task_span_detail_level(span=parent_span)
+ if config_level > 1:
+ return tracer.start_as_current_span(*self._args, **self._kwargs)
+ return trace.INVALID_SPAN
+
+ def __enter__(self):
+ self._ctx = self._make_ctx()
+ return self._ctx.__enter__()
+
+ def __exit__(self, *exc_info):
+ return self._ctx.__exit__(*exc_info)
+
+ def __call__(self, f):
+ @functools.wraps(f)
+ def wrapper(*inner_args, **inner_kwargs):
+ with self._make_ctx():
+ return f(*inner_args, **inner_kwargs)
+
+ wrapper.__signature__ = inspect.signature(f)
+ return wrapper
+
+
@contextmanager
def _make_task_span(msg: StartupDetails):
parent_context = (
@@ -224,6 +259,7 @@ class RuntimeTaskInstance(TaskInstance):
__rich_repr__.angular = True # type: ignore[attr-defined]
+ @detail_span("get_template_context")
def get_template_context(self) -> Context:
# TODO: Move this to `airflow.sdk.execution_time.context`
# once we port the entire context logic from
airflow/utils/context.py ?
@@ -336,6 +372,7 @@ class RuntimeTaskInstance(TaskInstance):
return self._cached_template_context
+ @detail_span("render_templates")
def render_templates(
self, context: Context | None = None, jinja_env: jinja2.Environment |
None = None
) -> BaseOperator:
@@ -796,6 +833,7 @@ def _maybe_reschedule_startup_failure(
)
+@detail_span("parse")
def parse(what: StartupDetails, log: Logger) -> RuntimeTaskInstance:
# TODO: Task-SDK:
# Using BundleDagBag here is about 98% wrong, but it'll do for now
@@ -898,6 +936,7 @@ SUPERVISOR_COMMS: CommsDecoder[ToTask, ToSupervisor]
# 3. Shutdown and report status
+@detail_span("_verify_bundle_access")
def _verify_bundle_access(bundle_instance: BaseDagBundle, log: Logger) -> None:
"""
Verify bundle is accessible by the current user.
@@ -960,6 +999,7 @@ def get_startup_details() -> StartupDetails:
return msg
+@detail_span("startup")
def startup(msg: StartupDetails) -> tuple[RuntimeTaskInstance, Context,
Logger]:
# setproctitle causes issue on Mac OS:
https://github.com/benoitc/gunicorn/issues/3021
os_type = sys.platform
@@ -982,7 +1022,8 @@ def startup(msg: StartupDetails) ->
tuple[RuntimeTaskInstance, Context, Logger]:
)
try:
- get_listener_manager().hook.on_starting(component=TaskRunnerMarker())
+ with detail_span("hook.on_starting"):
+
get_listener_manager().hook.on_starting(component=TaskRunnerMarker())
except Exception:
log.exception("error calling listener")
@@ -1111,6 +1152,7 @@ def _serialize_template_field(
return serialized
+@detail_span("_serialize_rendered_fields")
def _serialize_rendered_fields(task: AbstractOperator) -> dict[str, JsonValue]:
from airflow.sdk._shared.secrets_masker import redact
@@ -1187,6 +1229,7 @@ def _serialize_outlet_events(events:
OutletEventAccessorsProtocol) -> Iterator[d
yield attrs.asdict(alias_event)
+@detail_span("_prepare")
def _prepare(ti: RuntimeTaskInstance, log: Logger, context: Context) ->
ToSupervisor | None:
ti.hostname = get_hostname()
ti.task = ti.task.prepare_for_execution()
@@ -1228,6 +1271,7 @@ def _prepare(ti: RuntimeTaskInstance, log: Logger,
context: Context) -> ToSuperv
return None
+@detail_span("_validate_task_inlets_and_outlets")
def _validate_task_inlets_and_outlets(*, ti: RuntimeTaskInstance, log: Logger)
-> None:
if not ti.task.inlets and not ti.task.outlets:
return
@@ -1279,6 +1323,7 @@ def _defer_task(
@Sentry.enrich_errors
+@detail_span("run")
def run(
ti: RuntimeTaskInstance,
context: Context,
@@ -1814,6 +1859,7 @@ def _send_error_email_notification(
log.exception("Failed to send email notification")
+@detail_span("_execute_task")
def _execute_task(context: Context, ti: RuntimeTaskInstance, log: Logger):
"""Execute Task (optionally with a Timeout) and push Xcom results."""
task = ti.task
@@ -1937,6 +1983,7 @@ def _push_xcom_if_needed(result: Any, ti:
RuntimeTaskInstance, log: Logger):
_xcom_push(ti, BaseXCom.XCOM_RETURN_KEY, result,
mapped_length=mapped_length)
+@detail_span("finalize")
def finalize(
ti: RuntimeTaskInstance,
state: TaskInstanceState,
@@ -2046,6 +2093,7 @@ def main():
)
stack = ExitStack()
+ span = INVALID_SPAN
with stack:
try:
try:
@@ -2059,8 +2107,8 @@ def main():
# startup message as a ResendLoggingFD response.
if os.environ.pop("_AIRFLOW_FORK_EXEC", None) == "1":
reinit_supervisor_comms()
- span = _make_task_span(msg=startup_details)
- stack.enter_context(span)
+ span_ctx_mgr = _make_task_span(msg=startup_details)
+ span = stack.enter_context(span_ctx_mgr)
ti, context, log = startup(msg=startup_details)
except AirflowRescheduleException as reschedule:
log.warning("Rescheduling task during startup, marking task as
UP_FOR_RESCHEDULE")
@@ -2070,6 +2118,10 @@ def main():
end_date=datetime.now(tz=timezone.utc),
)
)
+ span.record_exception(reschedule)
+ span.set_status(
+ Status(StatusCode.ERROR, description=f"Exception:
{type(reschedule).__name__}")
+ )
sys.exit(0)
with BundleVersionLock(
bundle_name=ti.bundle_instance.name,
@@ -2083,11 +2135,15 @@ def main():
# already run, so callbacks and listeners observed the state.
if getattr(ti, "_terminal_state_send_failed", False):
sys.exit(1)
- except KeyboardInterrupt:
+ except KeyboardInterrupt as e:
log.exception("Ctrl-c hit")
+ span.record_exception(e)
+ span.set_status(Status(StatusCode.ERROR, description=f"Exception:
{type(e).__name__}"))
sys.exit(2)
- except Exception:
+ except Exception as e:
log.exception("Top level error")
+ span.record_exception(e)
+ span.set_status(Status(StatusCode.ERROR, description=f"Exception:
{type(e).__name__}"))
sys.exit(1)
finally:
# Ensure the request socket is closed on the child side in all
circumstances
diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
index b37d2569ea4..56900fbadab 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
@@ -32,7 +32,7 @@ from unittest.mock import call, patch
import pandas as pd
import pytest
-from opentelemetry import trace as otel_trace
+from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.sdk.trace.export.in_memory_span_exporter import
InMemorySpanExporter
@@ -40,7 +40,11 @@ from opentelemetry.trace.propagation.tracecontext import
TraceContextTextMapProp
from task_sdk import FAKE_BUNDLE
from uuid6 import uuid7
-from airflow._shared.observability.traces import
OverrideableRandomIdGenerator, new_task_run_carrier
+from airflow._shared.observability.traces import (
+ OverrideableRandomIdGenerator,
+ new_dagrun_trace_carrier,
+ new_task_run_carrier,
+)
from airflow.api_fastapi.execution_api.routes.task_instances import
_emit_task_span
from airflow.listeners import hookimpl
from airflow.providers.standard.operators.python import PythonOperator
@@ -159,6 +163,7 @@ from airflow.sdk.execution_time.task_runner import (
_push_xcom_if_needed,
_serialize_outlet_events,
_xcom_push,
+ detail_span,
finalize,
get_startup_details,
parse,
@@ -577,7 +582,7 @@ def
test_task_span_is_child_of_dag_run_span(make_ti_context):
ti_carrier = new_task_run_carrier(dag_run_carrier)
# Extract the parent task span context (the stable span ID stored in
ti_carrier).
- parent_task_span_ctx = otel_trace.get_current_span(
+ parent_task_span_ctx = trace.get_current_span(
context=TraceContextTextMapPropagator().extract(ti_carrier)
).get_span_context()
@@ -5113,6 +5118,102 @@ class TestTaskInstanceMetrics:
backend.incr.assert_any_call("ti_failures", tags=stats_tags)
+class TestDetailSpan:
+ """Tests for the detail_span decorator / context manager."""
+
+ def test_level_1_no_child_span_as_context_manager(self):
+ """At detail level 1, entering detail_span should not create a real
recorded span."""
+ exporter = InMemorySpanExporter()
+ provider = TracerProvider()
+ provider.add_span_processor(SimpleSpanProcessor(exporter))
+ t = provider.get_tracer("test")
+ carrier = new_dagrun_trace_carrier(task_span_detail_level=1)
+ parent_ctx = TraceContextTextMapPropagator().extract(carrier)
+
+ with mock.patch("airflow.sdk.execution_time.task_runner.tracer", t):
+ with t.start_as_current_span("parent", context=parent_ctx):
+ with detail_span("child") as span:
+ assert span is trace.INVALID_SPAN
+
+ # Only the "parent" span should be recorded; no "child".
+ names = [s.name for s in exporter.get_finished_spans()]
+ assert "child" not in names
+
+ def test_level_2_creates_child_span_as_context_manager(self):
+ """At detail level 2, detail_span should create a real recorded child
span."""
+ exporter = InMemorySpanExporter()
+ provider = TracerProvider()
+ provider.add_span_processor(SimpleSpanProcessor(exporter))
+ t = provider.get_tracer("test")
+ carrier = new_dagrun_trace_carrier(task_span_detail_level=2)
+ parent_ctx = TraceContextTextMapPropagator().extract(carrier)
+
+ with mock.patch("airflow.sdk.execution_time.task_runner.tracer", t):
+ with t.start_as_current_span("parent", context=parent_ctx):
+ with detail_span("child"):
+ pass
+
+ names = [s.name for s in exporter.get_finished_spans()]
+ assert "child" in names
+
+ def test_decorator_at_level_1_does_not_create_span(self):
+ """@detail_span at level 1 should not produce a recorded span."""
+ exporter = InMemorySpanExporter()
+ provider = TracerProvider()
+ provider.add_span_processor(SimpleSpanProcessor(exporter))
+ t = provider.get_tracer("test")
+ carrier = new_dagrun_trace_carrier(task_span_detail_level=1)
+ parent_ctx = TraceContextTextMapPropagator().extract(carrier)
+
+ @detail_span("decorated")
+ def my_func():
+ return 42
+
+ with mock.patch("airflow.sdk.execution_time.task_runner.tracer", t):
+ with t.start_as_current_span("parent", context=parent_ctx):
+ result = my_func()
+
+ assert result == 42
+ names = [s.name for s in exporter.get_finished_spans()]
+ assert "decorated" not in names
+
+ def
test_decorator_at_level_2_creates_span_and_preserves_return_value(self):
+ """@detail_span at level 2 creates a span and the wrapped function's
return value is preserved."""
+ exporter = InMemorySpanExporter()
+ provider = TracerProvider()
+ provider.add_span_processor(SimpleSpanProcessor(exporter))
+ t = provider.get_tracer("test")
+ carrier = new_dagrun_trace_carrier(task_span_detail_level=2)
+ parent_ctx = TraceContextTextMapPropagator().extract(carrier)
+
+ @detail_span("decorated")
+ def my_func(x):
+ return x * 2
+
+ with mock.patch("airflow.sdk.execution_time.task_runner.tracer", t):
+ with t.start_as_current_span("parent", context=parent_ctx):
+ result = my_func(7)
+
+ assert result == 14
+ names = [s.name for s in exporter.get_finished_spans()]
+ assert "decorated" in names
+
+ def test_exception_in_context_manager_propagates(self):
+ """Exceptions inside `with detail_span(...)` propagate normally."""
+ exporter = InMemorySpanExporter()
+ provider = TracerProvider()
+ provider.add_span_processor(SimpleSpanProcessor(exporter))
+ t = provider.get_tracer("test")
+ carrier = new_dagrun_trace_carrier(task_span_detail_level=2)
+ parent_ctx = TraceContextTextMapPropagator().extract(carrier)
+
+ with mock.patch("airflow.sdk.execution_time.task_runner.tracer", t):
+ with t.start_as_current_span("parent", context=parent_ctx):
+ with pytest.raises(ValueError, match="boom"):
+ with detail_span("child"):
+ raise ValueError("boom")
+
+
def test_dag_add_result(create_runtime_ti, mock_supervisor_comms):
with DAG(dag_id="test_dag_add_result") as dag:
task = PythonOperator(task_id="t", python_callable=lambda: 123)