This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 2e9cbc25ebb Add configurable task span detail level for OTel tracing 
(#63568)
2e9cbc25ebb is described below

commit 2e9cbc25ebbe317f7c6ccb6d92f1b7462335ce29
Author: Daniel Standish <[email protected]>
AuthorDate: Thu May 21 18:55:40 2026 -0700

    Add configurable task span detail level for OTel tracing (#63568)
    
    Adds an opt-in detail level for OpenTelemetry task spans so users can
      get a finer-grained breakdown of where time is spent inside a task
      without paying the cost on every run.
    
      Set the level on a dag run via `dag_run.conf`:
    
          {"airflow/task_span_detail_level": 2}
    
      At level >= 2, the task SDK emits child spans around the key task
      runner phases — parse, startup, _prepare, _execute_task,
      get_template_context, render_templates, _serialize_rendered_fields,
      _validate_task_inlets_and_outlets, finalize, etc. — using a new
      `@detail_span` decorator/context manager. At the default level (1)
      behavior is unchanged and no extra spans are produced.
    
      The level is stamped onto the W3C `tracestate` of the dag run's
      context carrier (key `airflow/task_span_detail_level`) so it is
      propagated from the scheduler to workers without a separate
      config channel; the SDK reads it from the current span's trace
      state at runtime.
    
      Also:
    
      - Records exceptions and ERROR status on the top-level task span in
        task_runner.main() (KeyboardInterrupt, top-level errors,
        AirflowRescheduleException).
      - Wraps `_emit_task_span` and `_emit_dagrun_span` in try/except so
        tracing failures cannot fail a task or dag run.
      - Adjusts the otel integration test to pick the correct Jaeger
        trace by run_id.
---
 .../execution_api/routes/task_instances.py         |   5 +-
 airflow-core/src/airflow/models/dagrun.py          |  25 ++++-
 airflow-core/src/airflow/models/taskinstance.py    |  10 +-
 airflow-core/tests/integration/otel/test_otel.py   |  78 ++++++++++++---
 airflow-core/tests/unit/models/test_dagrun.py      |  20 ++++
 .../tests/unit/models/test_taskinstance.py         |  30 +++++-
 scripts/ci/docker-compose/integration-otel.yml     |   2 +-
 .../observability/traces/__init__.py               |  35 ++++++-
 .../tests/observability/test_traces.py             | 104 ++++++++++++++++++++
 .../src/airflow/sdk/execution_time/task_runner.py  |  66 ++++++++++++-
 .../task_sdk/execution_time/test_task_runner.py    | 107 ++++++++++++++++++++-
 11 files changed, 448 insertions(+), 34 deletions(-)

diff --git 
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py 
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
index 577b2200e4c..18b5842f77b 100644
--- 
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
+++ 
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
@@ -585,7 +585,10 @@ def _create_ti_state_update_query_and_update_state(
                     ti_patch_payload.outlet_events,
                     session,
                 )
-        _emit_task_span(ti, state=updated_state)
+        try:
+            _emit_task_span(ti, state=updated_state)
+        except Exception:
+            log.warning("Failed to emit task span", exc_info=True)
     elif isinstance(ti_patch_payload, TIDeferredStatePayload):
         # Calculate timeout if it was passed
         timeout = None
diff --git a/airflow-core/src/airflow/models/dagrun.py 
b/airflow-core/src/airflow/models/dagrun.py
index ca2b68f34be..36ed309feb0 100644
--- a/airflow-core/src/airflow/models/dagrun.py
+++ b/airflow-core/src/airflow/models/dagrun.py
@@ -28,7 +28,8 @@ from typing import TYPE_CHECKING, Any, NamedTuple, TypeVar, 
cast, overload
 from uuid import UUID
 
 import structlog
-from opentelemetry import context, trace
+from opentelemetry import trace
+from opentelemetry.context import context
 from opentelemetry.trace import StatusCode
 from opentelemetry.trace.propagation.tracecontext import 
TraceContextTextMapPropagator
 from sqlalchemy import (
@@ -62,7 +63,11 @@ from sqlalchemy.sql.expression import false, select
 from sqlalchemy.sql.functions import coalesce
 
 from airflow._shared.observability.metrics import stats
-from airflow._shared.observability.traces import new_dagrun_trace_carrier, 
override_ids
+from airflow._shared.observability.traces import (
+    TASK_SPAN_DETAIL_LEVEL_KEY,
+    new_dagrun_trace_carrier,
+    override_ids,
+)
 from airflow._shared.timezones import timezone
 from airflow.callbacks.callback_requests import DagCallbackRequest, 
DagRunContext
 from airflow.configuration import conf as airflow_conf
@@ -376,7 +381,9 @@ class DagRun(Base, LoggingMixin):
         self.triggered_by = triggered_by
         self.triggering_user_name = triggering_user_name
         self.scheduled_by_job_id = None
-        self.context_carrier: dict[str, str] = new_dagrun_trace_carrier()
+        self.context_carrier: dict[str, str] = new_dagrun_trace_carrier(
+            task_span_detail_level=self.conf.get(TASK_SPAN_DETAIL_LEVEL_KEY, 
None)
+        )
 
         if not isinstance(partition_key, str | None):
             raise ValueError(
@@ -1074,6 +1081,13 @@ class DagRun(Base, LoggingMixin):
                 attributes["airflow.dag_run.logical_date"] = 
str(self.logical_date)
             if self.partition_key:
                 attributes["airflow.dag_run.partition_key"] = 
str(self.partition_key)
+            # TODO: make the empty parent context optional. Default should be 
to
+            # nest the dag run span under the currently active parent span (by
+            # omitting `context` here); only use the empty `context.Context()` 
to
+            # force a root span when Airflow itself initiates the run (e.g. dag
+            # triggered via API, scheduler, or backfill). Today this forces a
+            # root span unconditionally.
+            # Tracked at https://github.com/apache/airflow/issues/67210
             span = tracer.start_span(
                 name=f"dag_run.{self.dag_id}",
                 start_time=int((self.queued_at or self.start_date or 
timezone.utcnow()).timestamp() * 1e9),
@@ -1267,7 +1281,10 @@ class DagRun(Base, LoggingMixin):
                 self.data_interval_end,
             )
             session.flush()
-            self._emit_dagrun_span(state=self.state)
+            try:
+                self._emit_dagrun_span(state=self.state)
+            except Exception:
+                self.log.warning("Failed to emit dag run span", exc_info=True)
 
         self._emit_true_scheduling_delay_stats_for_finished_state(finished_tis)
         self._emit_duration_stats_for_finished_state()
diff --git a/airflow-core/src/airflow/models/taskinstance.py 
b/airflow-core/src/airflow/models/taskinstance.py
index ea8656237fd..9cc8d6a6271 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -69,7 +69,11 @@ from sqlalchemy.orm.attributes import NO_VALUE, 
set_committed_value
 
 from airflow import settings
 from airflow._shared.observability.metrics import stats
-from airflow._shared.observability.traces import new_dagrun_trace_carrier, 
new_task_run_carrier
+from airflow._shared.observability.traces import (
+    TASK_SPAN_DETAIL_LEVEL_KEY,
+    new_dagrun_trace_carrier,
+    new_task_run_carrier,
+)
 from airflow._shared.timezones import timezone
 from airflow.assets.manager import asset_manager
 from airflow.configuration import conf
@@ -422,7 +426,9 @@ def clear_task_instances(
             # Always update clear_number and queued_at when clearing tasks, 
regardless of state
             dr.clear_number += 1
             dr.queued_at = timezone.utcnow()
-            dr.context_carrier = new_dagrun_trace_carrier()
+            dr.context_carrier = new_dagrun_trace_carrier(
+                task_span_detail_level=dr.conf.get(TASK_SPAN_DETAIL_LEVEL_KEY) 
if dr.conf else None
+            )
 
             _recalculate_dagrun_queued_at_deadlines(dr, dr.queued_at, session)
 
diff --git a/airflow-core/tests/integration/otel/test_otel.py 
b/airflow-core/tests/integration/otel/test_otel.py
index 4f46b37e851..05c7ea5638e 100644
--- a/airflow-core/tests/integration/otel/test_otel.py
+++ b/airflow-core/tests/integration/otel/test_otel.py
@@ -86,7 +86,7 @@ def wait_for_otel_collector(host: str, port: int, timeout: 
int = 120) -> None:
     )
 
 
-def unpause_trigger_dag_and_get_run_id(dag_id: str) -> str:
+def unpause_trigger_dag_and_get_run_id(dag_id: str, conf: dict | None = None) 
-> str:
     unpause_command = ["airflow", "dags", "unpause", dag_id]
 
     # Unpause the dag using the cli.
@@ -106,6 +106,11 @@ def unpause_trigger_dag_and_get_run_id(dag_id: str) -> str:
         execution_date.isoformat(),
     ]
 
+    if conf:
+        import json
+
+        trigger_command += ["--conf", json.dumps(conf)]
+
     # Trigger the dag using the cli.
     subprocess.run(trigger_command, check=True, env=os.environ.copy())
 
@@ -438,7 +443,46 @@ class TestOtelIntegration:
             assert set(metrics_to_check).issubset(metrics_dict.keys())
 
     @pytest.mark.execution_timeout(90)
-    def test_dag_execution_succeeds(self, capfd):
+    @pytest.mark.parametrize(
+        ("task_span_detail_level", "expected_hierarchy"),
+        [
+            pytest.param(
+                None,
+                {
+                    "dag_run.otel_test_dag": None,
+                    "sub_span1": "worker.task1",
+                    "task_run.task1": "dag_run.otel_test_dag",
+                    "worker.task1": "task_run.task1",
+                },
+                id="default_spans",
+            ),
+            pytest.param(
+                2,
+                # Additional detail spans are deferred to follow-up PRs; 
tracked
+                # at https://linear.app/astronomer/issue/ACD-157.
+                {
+                    "hook.on_starting": "startup",
+                    "_verify_bundle_access": "parse",
+                    "parse": "startup",
+                    "get_template_context": "startup",
+                    "startup": "worker.task1",
+                    "render_templates": "_prepare",
+                    "_serialize_rendered_fields": "_prepare",
+                    "_validate_task_inlets_and_outlets": "_prepare",
+                    "_prepare": "run",
+                    "_execute_task": "run",
+                    "finalize": "worker.task1",
+                    "run": "worker.task1",
+                    "sub_span1": "_execute_task",
+                    "dag_run.otel_test_dag": None,
+                    "task_run.task1": "dag_run.otel_test_dag",
+                    "worker.task1": "task_run.task1",
+                },
+                id="detail_spans",
+            ),
+        ],
+    )
+    def test_dag_execution_succeeds(self, capfd, task_span_detail_level, 
expected_hierarchy):
         """The same scheduler will start and finish the dag processing."""
         scheduler_process = None
         apiserver_process = None
@@ -454,7 +498,13 @@ class TestOtelIntegration:
 
             assert dag is not None
 
-            run_id = unpause_trigger_dag_and_get_run_id(dag_id=dag_id)
+            conf = None
+            if task_span_detail_level is not None:
+                from airflow_shared.observability.traces import 
TASK_SPAN_DETAIL_LEVEL_KEY
+
+                conf = {TASK_SPAN_DETAIL_LEVEL_KEY: task_span_detail_level}
+
+            run_id = unpause_trigger_dag_and_get_run_id(dag_id=dag_id, 
conf=conf)
 
             # Skip the span_status check.
             wait_for_dag_run(dag_id=dag_id, run_id=run_id, max_wait_time=90)
@@ -492,8 +542,19 @@ class TestOtelIntegration:
         service_name = os.environ.get("OTEL_SERVICE_NAME", "test")
         r = 
requests.get(f"http://{host}:16686/api/traces?service={service_name}";)
         data = r.json()
-
-        trace = data["data"][-1]
+        # Find the trace for *this* dag run; selecting by position in the
+        # response is flaky because earlier-test traces accumulate in Jaeger.
+        matching = [
+            t
+            for t in data["data"]
+            if any(
+                tag.get("key") == "airflow.dag_run.run_id" and 
tag.get("value") == run_id
+                for span in t["spans"]
+                for tag in span.get("tags", [])
+            )
+        ]
+        assert len(matching) == 1, f"expected exactly one trace for 
run_id={run_id}, got {len(matching)}"
+        trace = matching[0]
         spans = trace["spans"]
 
         def get_span_hierarchy():
@@ -509,12 +570,7 @@ class TestOtelIntegration:
             return nested
 
         nested = get_span_hierarchy()
-        assert nested == {
-            "dag_run.otel_test_dag": None,
-            "sub_span1": "worker.task1",
-            "task_run.task1": "dag_run.otel_test_dag",
-            "worker.task1": "task_run.task1",
-        }
+        assert nested == expected_hierarchy
 
     def start_scheduler(self, capture_output: bool = False):
         stdout = None if capture_output else subprocess.DEVNULL
diff --git a/airflow-core/tests/unit/models/test_dagrun.py 
b/airflow-core/tests/unit/models/test_dagrun.py
index a9f9dfb136c..5f555ba69d0 100644
--- a/airflow-core/tests/unit/models/test_dagrun.py
+++ b/airflow-core/tests/unit/models/test_dagrun.py
@@ -3751,3 +3751,23 @@ class TestDagRunTracing:
             assert spans[0].name == f"dag_run.{dr.dag_id}"
         else:
             assert len(spans) == 0
+
+    @pytest.mark.db_test
+    def test_context_carrier_includes_detail_level_from_conf(self, dag_maker):
+        """DagRun created with TASK_SPAN_DETAIL_LEVEL_KEY in conf should 
encode the level in trace state."""
+        from opentelemetry.trace.propagation.tracecontext import 
TraceContextTextMapPropagator
+
+        from airflow._shared.observability.traces import (
+            TASK_SPAN_DETAIL_LEVEL_KEY,
+            get_task_span_detail_level,
+        )
+
+        with dag_maker("test_tracing_detail_level"):
+            EmptyOperator(task_id="t1")
+        dr = dag_maker.create_dagrun(conf={TASK_SPAN_DETAIL_LEVEL_KEY: 2})
+
+        ctx = TraceContextTextMapPropagator().extract(dr.context_carrier)
+        from opentelemetry import trace
+
+        span = trace.get_current_span(ctx)
+        assert get_task_span_detail_level(span) == 2
diff --git a/airflow-core/tests/unit/models/test_taskinstance.py 
b/airflow-core/tests/unit/models/test_taskinstance.py
index bea68206183..3b4dceb628d 100644
--- a/airflow-core/tests/unit/models/test_taskinstance.py
+++ b/airflow-core/tests/unit/models/test_taskinstance.py
@@ -30,7 +30,7 @@ import pendulum
 import pytest
 import time_machine
 import uuid6
-from opentelemetry import trace as otel_trace
+from opentelemetry import trace
 from opentelemetry.sdk.trace import TracerProvider
 from opentelemetry.trace.propagation.tracecontext import 
TraceContextTextMapPropagator
 from sqlalchemy import delete, func, select
@@ -3802,12 +3802,10 @@ class TestMakeTaskCarrier:
 
         propagator = TraceContextTextMapPropagator()
         parent_trace_id = (
-            
otel_trace.get_current_span(context=propagator.extract(parent_carrier))
-            .get_span_context()
-            .trace_id
+            
trace.get_current_span(context=propagator.extract(parent_carrier)).get_span_context().trace_id
         )
         child_trace_id = (
-            
otel_trace.get_current_span(context=propagator.extract(child_carrier)).get_span_context().trace_id
+            
trace.get_current_span(context=propagator.extract(child_carrier)).get_span_context().trace_id
         )
         assert child_trace_id == parent_trace_id
         assert child_trace_id != 0
@@ -3876,3 +3874,25 @@ def 
test_clear_task_instances_resets_context_carrier(dag_maker, session):
 
     assert ti.context_carrier["traceparent"] != original_ti_traceparent
     assert dag_run.context_carrier["traceparent"] != original_dr_traceparent
+
+
[email protected]_test
+def test_clear_task_instances_preserves_detail_level(dag_maker, session):
+    """clear_task_instances should produce a new context_carrier that keeps 
the detail level from dag run conf."""
+    from airflow._shared.observability.traces import (
+        TASK_SPAN_DETAIL_LEVEL_KEY,
+        get_task_span_detail_level,
+    )
+
+    with dag_maker("test_clear_preserves_level"):
+        EmptyOperator(task_id="t1")
+    dag_run = dag_maker.create_dagrun(conf={TASK_SPAN_DETAIL_LEVEL_KEY: 2})
+    ti = dag_run.get_task_instance("t1", session=session)
+    ti.state = TaskInstanceState.SUCCESS
+    session.flush()
+
+    clear_task_instances([ti], session)
+
+    new_ctx = TraceContextTextMapPropagator().extract(dag_run.context_carrier)
+    span = trace.get_current_span(new_ctx)
+    assert get_task_span_detail_level(span) == 2
diff --git a/scripts/ci/docker-compose/integration-otel.yml 
b/scripts/ci/docker-compose/integration-otel.yml
index 3c1b83f4480..8b98699fccb 100644
--- a/scripts/ci/docker-compose/integration-otel.yml
+++ b/scripts/ci/docker-compose/integration-otel.yml
@@ -81,7 +81,7 @@ services:
       - OTEL_METRIC_EXPORT_INTERVAL=30000
       - AIRFLOW__METRICS__OTEL_ON=True
       - AIRFLOW__TRACES__OTEL_ON=True
-      - AIRFLOW__TRACES__OTEL_TASK_LOG_EVENT=True
+      - AIRFLOW__TRACES__OTEL_TASK_LOG_EVENT=True  # todo: deprecate
 
     depends_on:
       - otel-collector
diff --git 
a/shared/observability/src/airflow_shared/observability/traces/__init__.py 
b/shared/observability/src/airflow_shared/observability/traces/__init__.py
index dc3532262d1..8fd4e127b77 100644
--- a/shared/observability/src/airflow_shared/observability/traces/__init__.py
+++ b/shared/observability/src/airflow_shared/observability/traces/__init__.py
@@ -28,7 +28,7 @@ from opentelemetry.sdk.resources import Resource
 from opentelemetry.sdk.trace import TracerProvider
 from opentelemetry.sdk.trace.export import BatchSpanProcessor, SpanExporter
 from opentelemetry.sdk.trace.id_generator import RandomIdGenerator
-from opentelemetry.trace import NonRecordingSpan, SpanContext, TraceFlags
+from opentelemetry.trace import NonRecordingSpan, Span, SpanContext, 
TraceFlags, TraceState
 from opentelemetry.trace.propagation.tracecontext import 
TraceContextTextMapPropagator
 
 if TYPE_CHECKING:
@@ -56,14 +56,20 @@ class OverrideableRandomIdGenerator(RandomIdGenerator):
         return super().generate_trace_id()
 
 
-def new_dagrun_trace_carrier() -> dict[str, str]:
+TASK_SPAN_DETAIL_LEVEL_KEY = "airflow/task_span_detail_level"
+DEFAULT_TASK_SPAN_DETAIL_LEVEL = 1
+
+
+def new_dagrun_trace_carrier(task_span_detail_level=None) -> dict[str, str]:
     """Generate a fresh W3C traceparent carrier without creating a recordable 
span."""
     gen = RandomIdGenerator()
+    trace_state_entries = build_trace_state_entries(task_span_detail_level)
     span_ctx = SpanContext(
         trace_id=gen.generate_trace_id(),
         span_id=gen.generate_span_id(),
         is_remote=False,
         trace_flags=TraceFlags(TraceFlags.SAMPLED),
+        trace_state=TraceState(entries=trace_state_entries),
     )
     ctx = trace.set_span_in_context(NonRecordingSpan(span_ctx))
     carrier: dict[str, str] = {}
@@ -82,6 +88,31 @@ def new_task_run_carrier(dag_run_context_carrier):
     return carrier
 
 
+def build_trace_state_entries(task_span_detail_level) -> list[tuple[str, str]]:
+    trace_state_entries = []
+    if task_span_detail_level is not None:
+        try:
+            level = int(task_span_detail_level)
+        except (TypeError, ValueError):
+            level = None
+        if level:
+            trace_state_entries.append((TASK_SPAN_DETAIL_LEVEL_KEY, 
str(level)))
+    return trace_state_entries
+
+
+def get_task_span_detail_level(span: Span):
+    span_ctx = span.get_span_context()
+    trace_state = span_ctx.trace_state
+    raw = trace_state.get(TASK_SPAN_DETAIL_LEVEL_KEY)
+    if raw is None:
+        return DEFAULT_TASK_SPAN_DETAIL_LEVEL
+    try:
+        return int(raw)
+    except (TypeError, ValueError):
+        log.warning("%s config in dag run conf must be integer.", 
TASK_SPAN_DETAIL_LEVEL_KEY)
+        return DEFAULT_TASK_SPAN_DETAIL_LEVEL
+
+
 @contextmanager
 def override_ids(trace_id, span_id, ctx=None):
     ctx = context.set_value(OVERRIDE_TRACE_ID_KEY, trace_id, context=ctx)
diff --git a/shared/observability/tests/observability/test_traces.py 
b/shared/observability/tests/observability/test_traces.py
new file mode 100644
index 00000000000..b21cc3c8761
--- /dev/null
+++ b/shared/observability/tests/observability/test_traces.py
@@ -0,0 +1,104 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from opentelemetry.trace import NonRecordingSpan, SpanContext, TraceFlags, 
TraceState
+from opentelemetry.trace.propagation.tracecontext import 
TraceContextTextMapPropagator
+
+from airflow_shared.observability.traces import (
+    DEFAULT_TASK_SPAN_DETAIL_LEVEL,
+    TASK_SPAN_DETAIL_LEVEL_KEY,
+    build_trace_state_entries,
+    get_task_span_detail_level,
+    new_dagrun_trace_carrier,
+)
+
+
+class TestBuildTraceStateEntries:
+    def test_with_integer_level(self):
+        entries = build_trace_state_entries(2)
+        assert entries == [(TASK_SPAN_DETAIL_LEVEL_KEY, "2")]
+
+    def test_with_string_level(self):
+        entries = build_trace_state_entries("3")
+        assert entries == [(TASK_SPAN_DETAIL_LEVEL_KEY, "3")]
+
+    def test_with_none(self):
+        assert build_trace_state_entries(None) == []
+
+    def test_with_zero(self):
+        # 0 is falsy — treated as no detail level
+        assert build_trace_state_entries(0) == []
+
+    def test_with_invalid_string(self):
+        # Non-integer string should not raise; returns empty
+        assert build_trace_state_entries("not-a-number") == []
+
+
+class TestNewDagrunTraceCarrier:
+    def test_with_detail_level_embeds_level_in_trace_state(self):
+        carrier = new_dagrun_trace_carrier(task_span_detail_level=2)
+        ctx = TraceContextTextMapPropagator().extract(carrier)
+        from opentelemetry import trace
+
+        span_ctx = trace.get_current_span(ctx).get_span_context()
+        assert span_ctx.trace_state.get(TASK_SPAN_DETAIL_LEVEL_KEY) == "2"
+
+    def test_without_detail_level_has_empty_trace_state(self):
+        carrier = new_dagrun_trace_carrier()
+        ctx = TraceContextTextMapPropagator().extract(carrier)
+        from opentelemetry import trace
+
+        span_ctx = trace.get_current_span(ctx).get_span_context()
+        assert span_ctx.trace_state.get(TASK_SPAN_DETAIL_LEVEL_KEY) is None
+
+
+class TestGetTaskSpanDetailLevel:
+    def _make_span_with_trace_state(self, entries: list[tuple[str, str]]) -> 
NonRecordingSpan:
+        from opentelemetry.sdk.trace.id_generator import RandomIdGenerator
+
+        gen = RandomIdGenerator()
+        span_ctx = SpanContext(
+            trace_id=gen.generate_trace_id(),
+            span_id=gen.generate_span_id(),
+            is_remote=False,
+            trace_flags=TraceFlags(TraceFlags.SAMPLED),
+            trace_state=TraceState(entries=entries),
+        )
+        return NonRecordingSpan(span_ctx)
+
+    def test_returns_default_when_no_trace_state(self):
+        span = self._make_span_with_trace_state([])
+        assert get_task_span_detail_level(span) == 
DEFAULT_TASK_SPAN_DETAIL_LEVEL
+
+    def test_reads_level_from_trace_state(self):
+        span = self._make_span_with_trace_state([(TASK_SPAN_DETAIL_LEVEL_KEY, 
"2")])
+        assert get_task_span_detail_level(span) == 2
+
+    def test_fallback_on_invalid_value(self):
+        span = self._make_span_with_trace_state([(TASK_SPAN_DETAIL_LEVEL_KEY, 
"bad")])
+        assert get_task_span_detail_level(span) == 
DEFAULT_TASK_SPAN_DETAIL_LEVEL
+
+    def test_roundtrip_via_carrier(self):
+        """Level set in new_dagrun_trace_carrier is readable by 
get_task_span_detail_level."""
+        carrier = new_dagrun_trace_carrier(task_span_detail_level=3)
+        ctx = TraceContextTextMapPropagator().extract(carrier)
+        from opentelemetry import trace
+
+        span = trace.get_current_span(ctx)
+        assert get_task_span_detail_level(span) == 3
diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py 
b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
index 10977fb011b..313309fa401 100644
--- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -21,6 +21,7 @@ from __future__ import annotations
 
 import contextvars
 import functools
+import inspect
 import os
 import sys
 import time
@@ -36,6 +37,7 @@ import attrs
 import lazy_object_proxy
 import structlog
 from opentelemetry import trace
+from opentelemetry.trace import INVALID_SPAN, Status, StatusCode
 from opentelemetry.trace.propagation.tracecontext import 
TraceContextTextMapPropagator
 from pydantic import AwareDatetime, ConfigDict, Field, JsonValue, TypeAdapter
 from structlog.contextvars import bind_contextvars
@@ -43,6 +45,7 @@ from structlog.contextvars import bind_contextvars
 from airflow.dag_processing.bundles.base import BaseDagBundle, 
BundleVersionLock
 from airflow.dag_processing.bundles.manager import DagBundlesManager
 from airflow.sdk._shared.observability.metrics import stats
+from airflow.sdk._shared.observability.traces import get_task_span_detail_level
 from airflow.sdk._shared.template_rendering import truncate_rendered_value
 from airflow.sdk.api.client import get_hostname, getuser
 from airflow.sdk.api.datamodels._generated import (
@@ -153,6 +156,38 @@ log = structlog.get_logger("task")
 tracer = trace.get_tracer(__name__)
 
 
+class detail_span:
+    """Context manager and decorator that creates a child span when detail 
level > 1."""
+
+    def __init__(self, *args, **kwargs):
+        self._args = args
+        self._kwargs = kwargs
+        self._ctx = None
+
+    def _make_ctx(self):
+        parent_span = trace.get_current_span()
+        config_level = get_task_span_detail_level(span=parent_span)
+        if config_level > 1:
+            return tracer.start_as_current_span(*self._args, **self._kwargs)
+        return trace.INVALID_SPAN
+
+    def __enter__(self):
+        self._ctx = self._make_ctx()
+        return self._ctx.__enter__()
+
+    def __exit__(self, *exc_info):
+        return self._ctx.__exit__(*exc_info)
+
+    def __call__(self, f):
+        @functools.wraps(f)
+        def wrapper(*inner_args, **inner_kwargs):
+            with self._make_ctx():
+                return f(*inner_args, **inner_kwargs)
+
+        wrapper.__signature__ = inspect.signature(f)
+        return wrapper
+
+
 @contextmanager
 def _make_task_span(msg: StartupDetails):
     parent_context = (
@@ -224,6 +259,7 @@ class RuntimeTaskInstance(TaskInstance):
 
     __rich_repr__.angular = True  # type: ignore[attr-defined]
 
+    @detail_span("get_template_context")
     def get_template_context(self) -> Context:
         # TODO: Move this to `airflow.sdk.execution_time.context`
         #   once we port the entire context logic from 
airflow/utils/context.py ?
@@ -336,6 +372,7 @@ class RuntimeTaskInstance(TaskInstance):
 
         return self._cached_template_context
 
+    @detail_span("render_templates")
     def render_templates(
         self, context: Context | None = None, jinja_env: jinja2.Environment | 
None = None
     ) -> BaseOperator:
@@ -796,6 +833,7 @@ def _maybe_reschedule_startup_failure(
     )
 
 
+@detail_span("parse")
 def parse(what: StartupDetails, log: Logger) -> RuntimeTaskInstance:
     # TODO: Task-SDK:
     # Using BundleDagBag here is about 98% wrong, but it'll do for now
@@ -898,6 +936,7 @@ SUPERVISOR_COMMS: CommsDecoder[ToTask, ToSupervisor]
 # 3. Shutdown and report status
 
 
+@detail_span("_verify_bundle_access")
 def _verify_bundle_access(bundle_instance: BaseDagBundle, log: Logger) -> None:
     """
     Verify bundle is accessible by the current user.
@@ -960,6 +999,7 @@ def get_startup_details() -> StartupDetails:
     return msg
 
 
+@detail_span("startup")
 def startup(msg: StartupDetails) -> tuple[RuntimeTaskInstance, Context, 
Logger]:
     # setproctitle causes issue on Mac OS: 
https://github.com/benoitc/gunicorn/issues/3021
     os_type = sys.platform
@@ -982,7 +1022,8 @@ def startup(msg: StartupDetails) -> 
tuple[RuntimeTaskInstance, Context, Logger]:
     )
 
     try:
-        get_listener_manager().hook.on_starting(component=TaskRunnerMarker())
+        with detail_span("hook.on_starting"):
+            
get_listener_manager().hook.on_starting(component=TaskRunnerMarker())
     except Exception:
         log.exception("error calling listener")
 
@@ -1111,6 +1152,7 @@ def _serialize_template_field(
     return serialized
 
 
+@detail_span("_serialize_rendered_fields")
 def _serialize_rendered_fields(task: AbstractOperator) -> dict[str, JsonValue]:
     from airflow.sdk._shared.secrets_masker import redact
 
@@ -1187,6 +1229,7 @@ def _serialize_outlet_events(events: 
OutletEventAccessorsProtocol) -> Iterator[d
             yield attrs.asdict(alias_event)
 
 
+@detail_span("_prepare")
 def _prepare(ti: RuntimeTaskInstance, log: Logger, context: Context) -> 
ToSupervisor | None:
     ti.hostname = get_hostname()
     ti.task = ti.task.prepare_for_execution()
@@ -1228,6 +1271,7 @@ def _prepare(ti: RuntimeTaskInstance, log: Logger, 
context: Context) -> ToSuperv
     return None
 
 
+@detail_span("_validate_task_inlets_and_outlets")
 def _validate_task_inlets_and_outlets(*, ti: RuntimeTaskInstance, log: Logger) 
-> None:
     if not ti.task.inlets and not ti.task.outlets:
         return
@@ -1279,6 +1323,7 @@ def _defer_task(
 
 
 @Sentry.enrich_errors
+@detail_span("run")
 def run(
     ti: RuntimeTaskInstance,
     context: Context,
@@ -1814,6 +1859,7 @@ def _send_error_email_notification(
         log.exception("Failed to send email notification")
 
 
+@detail_span("_execute_task")
 def _execute_task(context: Context, ti: RuntimeTaskInstance, log: Logger):
     """Execute Task (optionally with a Timeout) and push Xcom results."""
     task = ti.task
@@ -1937,6 +1983,7 @@ def _push_xcom_if_needed(result: Any, ti: 
RuntimeTaskInstance, log: Logger):
     _xcom_push(ti, BaseXCom.XCOM_RETURN_KEY, result, 
mapped_length=mapped_length)
 
 
+@detail_span("finalize")
 def finalize(
     ti: RuntimeTaskInstance,
     state: TaskInstanceState,
@@ -2046,6 +2093,7 @@ def main():
     )
 
     stack = ExitStack()
+    span = INVALID_SPAN
     with stack:
         try:
             try:
@@ -2059,8 +2107,8 @@ def main():
                 # startup message as a ResendLoggingFD response.
                 if os.environ.pop("_AIRFLOW_FORK_EXEC", None) == "1":
                     reinit_supervisor_comms()
-                span = _make_task_span(msg=startup_details)
-                stack.enter_context(span)
+                span_ctx_mgr = _make_task_span(msg=startup_details)
+                span = stack.enter_context(span_ctx_mgr)
                 ti, context, log = startup(msg=startup_details)
             except AirflowRescheduleException as reschedule:
                 log.warning("Rescheduling task during startup, marking task as 
UP_FOR_RESCHEDULE")
@@ -2070,6 +2118,10 @@ def main():
                         end_date=datetime.now(tz=timezone.utc),
                     )
                 )
+                span.record_exception(reschedule)
+                span.set_status(
+                    Status(StatusCode.ERROR, description=f"Exception: 
{type(reschedule).__name__}")
+                )
                 sys.exit(0)
             with BundleVersionLock(
                 bundle_name=ti.bundle_instance.name,
@@ -2083,11 +2135,15 @@ def main():
                 # already run, so callbacks and listeners observed the state.
                 if getattr(ti, "_terminal_state_send_failed", False):
                     sys.exit(1)
-        except KeyboardInterrupt:
+        except KeyboardInterrupt as e:
             log.exception("Ctrl-c hit")
+            span.record_exception(e)
+            span.set_status(Status(StatusCode.ERROR, description=f"Exception: 
{type(e).__name__}"))
             sys.exit(2)
-        except Exception:
+        except Exception as e:
             log.exception("Top level error")
+            span.record_exception(e)
+            span.set_status(Status(StatusCode.ERROR, description=f"Exception: 
{type(e).__name__}"))
             sys.exit(1)
         finally:
             # Ensure the request socket is closed on the child side in all 
circumstances
diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py 
b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
index b37d2569ea4..56900fbadab 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
@@ -32,7 +32,7 @@ from unittest.mock import call, patch
 
 import pandas as pd
 import pytest
-from opentelemetry import trace as otel_trace
+from opentelemetry import trace
 from opentelemetry.sdk.trace import TracerProvider
 from opentelemetry.sdk.trace.export import SimpleSpanProcessor
 from opentelemetry.sdk.trace.export.in_memory_span_exporter import 
InMemorySpanExporter
@@ -40,7 +40,11 @@ from opentelemetry.trace.propagation.tracecontext import 
TraceContextTextMapProp
 from task_sdk import FAKE_BUNDLE
 from uuid6 import uuid7
 
-from airflow._shared.observability.traces import 
OverrideableRandomIdGenerator, new_task_run_carrier
+from airflow._shared.observability.traces import (
+    OverrideableRandomIdGenerator,
+    new_dagrun_trace_carrier,
+    new_task_run_carrier,
+)
 from airflow.api_fastapi.execution_api.routes.task_instances import 
_emit_task_span
 from airflow.listeners import hookimpl
 from airflow.providers.standard.operators.python import PythonOperator
@@ -159,6 +163,7 @@ from airflow.sdk.execution_time.task_runner import (
     _push_xcom_if_needed,
     _serialize_outlet_events,
     _xcom_push,
+    detail_span,
     finalize,
     get_startup_details,
     parse,
@@ -577,7 +582,7 @@ def 
test_task_span_is_child_of_dag_run_span(make_ti_context):
         ti_carrier = new_task_run_carrier(dag_run_carrier)
 
     # Extract the parent task span context (the stable span ID stored in 
ti_carrier).
-    parent_task_span_ctx = otel_trace.get_current_span(
+    parent_task_span_ctx = trace.get_current_span(
         context=TraceContextTextMapPropagator().extract(ti_carrier)
     ).get_span_context()
 
@@ -5113,6 +5118,102 @@ class TestTaskInstanceMetrics:
             backend.incr.assert_any_call("ti_failures", tags=stats_tags)
 
 
+class TestDetailSpan:
+    """Tests for the detail_span decorator / context manager."""
+
+    def test_level_1_no_child_span_as_context_manager(self):
+        """At detail level 1, entering detail_span should not create a real 
recorded span."""
+        exporter = InMemorySpanExporter()
+        provider = TracerProvider()
+        provider.add_span_processor(SimpleSpanProcessor(exporter))
+        t = provider.get_tracer("test")
+        carrier = new_dagrun_trace_carrier(task_span_detail_level=1)
+        parent_ctx = TraceContextTextMapPropagator().extract(carrier)
+
+        with mock.patch("airflow.sdk.execution_time.task_runner.tracer", t):
+            with t.start_as_current_span("parent", context=parent_ctx):
+                with detail_span("child") as span:
+                    assert span is trace.INVALID_SPAN
+
+        # Only the "parent" span should be recorded; no "child".
+        names = [s.name for s in exporter.get_finished_spans()]
+        assert "child" not in names
+
+    def test_level_2_creates_child_span_as_context_manager(self):
+        """At detail level 2, detail_span should create a real recorded child 
span."""
+        exporter = InMemorySpanExporter()
+        provider = TracerProvider()
+        provider.add_span_processor(SimpleSpanProcessor(exporter))
+        t = provider.get_tracer("test")
+        carrier = new_dagrun_trace_carrier(task_span_detail_level=2)
+        parent_ctx = TraceContextTextMapPropagator().extract(carrier)
+
+        with mock.patch("airflow.sdk.execution_time.task_runner.tracer", t):
+            with t.start_as_current_span("parent", context=parent_ctx):
+                with detail_span("child"):
+                    pass
+
+        names = [s.name for s in exporter.get_finished_spans()]
+        assert "child" in names
+
+    def test_decorator_at_level_1_does_not_create_span(self):
+        """@detail_span at level 1 should not produce a recorded span."""
+        exporter = InMemorySpanExporter()
+        provider = TracerProvider()
+        provider.add_span_processor(SimpleSpanProcessor(exporter))
+        t = provider.get_tracer("test")
+        carrier = new_dagrun_trace_carrier(task_span_detail_level=1)
+        parent_ctx = TraceContextTextMapPropagator().extract(carrier)
+
+        @detail_span("decorated")
+        def my_func():
+            return 42
+
+        with mock.patch("airflow.sdk.execution_time.task_runner.tracer", t):
+            with t.start_as_current_span("parent", context=parent_ctx):
+                result = my_func()
+
+        assert result == 42
+        names = [s.name for s in exporter.get_finished_spans()]
+        assert "decorated" not in names
+
+    def 
test_decorator_at_level_2_creates_span_and_preserves_return_value(self):
+        """@detail_span at level 2 creates a span and the wrapped function's 
return value is preserved."""
+        exporter = InMemorySpanExporter()
+        provider = TracerProvider()
+        provider.add_span_processor(SimpleSpanProcessor(exporter))
+        t = provider.get_tracer("test")
+        carrier = new_dagrun_trace_carrier(task_span_detail_level=2)
+        parent_ctx = TraceContextTextMapPropagator().extract(carrier)
+
+        @detail_span("decorated")
+        def my_func(x):
+            return x * 2
+
+        with mock.patch("airflow.sdk.execution_time.task_runner.tracer", t):
+            with t.start_as_current_span("parent", context=parent_ctx):
+                result = my_func(7)
+
+        assert result == 14
+        names = [s.name for s in exporter.get_finished_spans()]
+        assert "decorated" in names
+
+    def test_exception_in_context_manager_propagates(self):
+        """Exceptions inside `with detail_span(...)` propagate normally."""
+        exporter = InMemorySpanExporter()
+        provider = TracerProvider()
+        provider.add_span_processor(SimpleSpanProcessor(exporter))
+        t = provider.get_tracer("test")
+        carrier = new_dagrun_trace_carrier(task_span_detail_level=2)
+        parent_ctx = TraceContextTextMapPropagator().extract(carrier)
+
+        with mock.patch("airflow.sdk.execution_time.task_runner.tracer", t):
+            with t.start_as_current_span("parent", context=parent_ctx):
+                with pytest.raises(ValueError, match="boom"):
+                    with detail_span("child"):
+                        raise ValueError("boom")
+
+
 def test_dag_add_result(create_runtime_ti, mock_supervisor_comms):
     with DAG(dag_id="test_dag_add_result") as dag:
         task = PythonOperator(task_id="t", python_callable=lambda: 123)


Reply via email to