This is an automated email from the ASF dual-hosted git repository.

rahulvats pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 4eabf7964a7 Fix DagRun._emit_dagrun_span crash on None context_carrier 
(#64087)
4eabf7964a7 is described below

commit 4eabf7964a7b539d08255091dfdd61cd3e789aaf
Author: Youil <[email protected]>
AuthorDate: Mon Mar 23 22:58:53 2026 +0900

    Fix DagRun._emit_dagrun_span crash on None context_carrier (#64087)
    
    DagRuns created before OTel tracing was enabled have context_carrier=NULL
    in the database. When these DagRuns complete, _emit_dagrun_span() passes
    None to TraceContextTextMapPropagator().extract(), which crashes with
    AttributeError: 'NoneType' object has no attribute 'get'.
    
    Use `self.context_carrier or {}` so OTel receives a valid carrier and
    emits a root span per the OTel spec, consistent with task_runner.py:148.
    
    Co-authored-by: yulit0738 <[email protected]>
    Co-authored-by: Claude Opus 4.6 (1M context) <[email protected]>
---
 airflow-core/src/airflow/models/dagrun.py     |  2 +-
 airflow-core/tests/unit/models/test_dagrun.py | 39 +++++++++++++++++++++++++++
 2 files changed, 40 insertions(+), 1 deletion(-)

diff --git a/airflow-core/src/airflow/models/dagrun.py 
b/airflow-core/src/airflow/models/dagrun.py
index d32a54c15d6..73a5a3a875a 100644
--- a/airflow-core/src/airflow/models/dagrun.py
+++ b/airflow-core/src/airflow/models/dagrun.py
@@ -1027,7 +1027,7 @@ class DagRun(Base, LoggingMixin):
         return leaf_tis
 
     def _emit_dagrun_span(self, state: DagRunState):
-        ctx = TraceContextTextMapPropagator().extract(self.context_carrier)
+        ctx = TraceContextTextMapPropagator().extract(self.context_carrier or 
{})
         span = trace.get_current_span(context=ctx)
         span_context = span.get_span_context()
         with override_ids(span_context.trace_id, span_context.span_id):
diff --git a/airflow-core/tests/unit/models/test_dagrun.py 
b/airflow-core/tests/unit/models/test_dagrun.py
index 850357b68c9..7ad78292a1e 100644
--- a/airflow-core/tests/unit/models/test_dagrun.py
+++ b/airflow-core/tests/unit/models/test_dagrun.py
@@ -3518,3 +3518,42 @@ class TestDagRunTracing:
 
         expected_status = StatusCode.OK if final_state == DagRunState.SUCCESS 
else StatusCode.ERROR
         assert span.status.status_code == expected_status
+
+    @pytest.mark.parametrize("carrier_value", [None, {}])
+    def test_emit_dagrun_span_with_none_or_empty_carrier(self, dag_maker, 
session, carrier_value):
+        """_emit_dagrun_span should emit a root span when context_carrier is 
None or empty.
+
+        This happens for DagRuns created before OTel tracing was enabled, or 
whose
+        context_carrier was cleared/backfilled to NULL. Per OTel spec, missing 
context
+        results in a new root span rather than a crash.
+        """
+        from opentelemetry.sdk.trace import TracerProvider
+        from opentelemetry.sdk.trace.export import SimpleSpanProcessor
+        from opentelemetry.sdk.trace.export.in_memory_span_exporter import 
InMemorySpanExporter
+
+        from airflow._shared.observability.traces import 
OverrideableRandomIdGenerator
+
+        in_mem_exporter = InMemorySpanExporter()
+        provider = TracerProvider(id_generator=OverrideableRandomIdGenerator())
+        provider.add_span_processor(SimpleSpanProcessor(in_mem_exporter))
+        test_tracer = provider.get_tracer("test")
+
+        with dag_maker("test_tracing_none_carrier", session=session) as dag:
+            EmptyOperator(task_id="t1")
+
+        dr = dag_maker.create_dagrun(state=DagRunState.RUNNING)
+        ti = dr.get_task_instance("t1", session=session)
+        ti.state = TaskInstanceState.SUCCESS
+        session.flush()
+        dr.dag = dag
+
+        # Simulate a DagRun with missing context_carrier
+        dr.context_carrier = carrier_value
+
+        with mock.patch("airflow.models.dagrun.tracer", test_tracer):
+            dr.update_state(session=session)
+
+        # A root span should still be emitted
+        spans = in_mem_exporter.get_finished_spans()
+        assert len(spans) == 1
+        assert spans[0].name == f"dag_run.{dr.dag_id}"

Reply via email to