This is an automated email from the ASF dual-hosted git repository.
dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new be44ad1ac6c Make head sampling possible in airflow (#68591)
be44ad1ac6c is described below
commit be44ad1ac6c3b368187b73542ae06e1339ffa8fc
Author: Daniel Standish <[email protected]>
AuthorDate: Thu Jun 18 09:53:24 2026 -0700
Make head sampling possible in airflow (#68591)
* Honor OTEL_TRACES_SAMPLER for DAG-run head sampling
The dag_run.context_carrier hardcoded TraceFlags.SAMPLED, which became the
parent for every downstream span (dag_run, task_run, worker). That poisoned
parent-based sampling: the configured OTEL_TRACES_SAMPLER controlled
nothing,
since the always-sampled remote parent forced ParentBased samplers to keep
emitting task/worker spans regardless of the env var.
Make the carrier carry an honest sampling decision:
- new_dagrun_trace_carrier() now consults the configured tracer provider's
sampler with a ROOT decision (parent_context=None) against the freshly
generated trace_id, setting SAMPLED only when the decision is
RECORD_AND_SAMPLE. When the provider has no sampler (proxy/no-op provider,
i.e. otel off) it falls back to not-sampled, which is observably identical
to today (nothing exports). dag_id/run_type are forwarded as attributes
so a
custom sampler can differentiate by run kind. The detail-level tracestate
is
merged onto whatever the sampler returns so it still round-trips.
- _emit_dagrun_span honors the flag (required: it forces a root span via
context.Context(), so the sampler would otherwise never see the flag). An
invalid/empty carrier (legacy DagRun) still emits a root span as before.
- _emit_task_span honors the dag_run carrier's flag for consistency/safety.
- Threaded dag_id/run_type into both DagRun-based carrier call sites.
The worker span path (_make_task_span) needs no change: with an unsampled
remote parent carrier, start_as_current_span yields a non-recording span via
the default ParentBased sampler (verified).
Backcompat: unset OTEL_TRACES_SAMPLER defaults to parentbased_always_on
whose
root decision is ALWAYS_ON, so everything is still traced — no behavior
change
for anyone who hasn't opted in.
Co-Authored-By: Claude Opus 4.8 (1M context) <[email protected]>
* Clarify why the _emit_task_span head-sampling guard is kept
The previous comment framed the guard as redundant (ParentBased would
drop the child anyway) without saying when it isn't. Spell out that it
keeps emission consistent under non-parent-based samplers and
short-circuits before building the span.
Co-Authored-By: Claude Opus 4.8 (1M context) <[email protected]>
* Parametrize head-sampling span tests and clarify docstrings
Combine the sampled/unsampled carrier span tests into single parametrized
tests in both the DagRun and execution-API suites, and clarify the
head-sampling docstrings.
Co-Authored-By: Claude Opus 4.8 (1M context) <[email protected]>
* Drop dag_id/run_type sampler attributes from carrier creation
Keep this PR focused on honoring the configured OTel sampler. Forwarding
run-identity attributes to the sampler (for custom per-run-kind sampling),
via a shared helper reused at span-emit time, is deferred to a followup PR.
Co-Authored-By: Claude Opus 4.8 (1M context) <[email protected]>
* Simplify the head-sampling guard comments
Co-Authored-By: Claude Opus 4.8 (1M context) <[email protected]>
* Update
shared/observability/src/airflow_shared/observability/traces/__init__.py
* Fix shared-listeners span tests for honest carrier sampling
TestListenerSpan built its parent context from new_dagrun_trace_carrier,
which now consults the global tracer provider's sampler. In tests that
provider is a no-op ProxyTracerProvider (no sampler), so the carrier came
out unsampled and ParentBased dropped the listener spans. Point the
carrier's provider lookup at the real sampling test provider.
Co-Authored-By: Claude Opus 4.8 (1M context) <[email protected]>
* Move dag-run span TODO comment indentation to its own PR
The comment-only indentation change is unrelated to head sampling; it now
lives in a standalone PR.
Co-Authored-By: Claude Opus 4.8 (1M context) <[email protected]>
---------
Co-authored-by: Claude Opus 4.8 (1M context) <[email protected]>
---
.../execution_api/routes/task_instances.py | 10 ++
airflow-core/src/airflow/models/dagrun.py | 10 ++
.../versions/head/test_task_instances.py | 16 ++++
airflow-core/tests/unit/models/test_dagrun.py | 46 ++++++++-
.../tests/listeners/test_listener_manager.py | 12 ++-
.../observability/traces/__init__.py | 43 ++++++++-
.../tests/observability/test_traces.py | 103 +++++++++++++++++++++
.../task_sdk/execution_time/test_task_runner.py | 18 ++++
8 files changed, 249 insertions(+), 9 deletions(-)
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
index ae71e41cf8e..0f862e16258 100644
---
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
+++
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
@@ -544,6 +544,16 @@ def _emit_task_span(ti, state):
return
dr_ctx =
TraceContextTextMapPropagator().extract(ti.dag_run.context_carrier)
+ # Skip if the run was head-sampled out, so every span in the run agrees
with the
+ # carrier's decision. A parent-based sampler would already drop this child
span,
+ # but the explicit check also covers non-parent-based samplers (which
ignore the
+ # parent and would re-sample it in) and short-circuits before building the
span.
+ # An invalid/empty carrier (legacy/NULL) recorded no decision, so it falls
through
+ # and still emits — preserving prior behavior.
+ dr_span_context = trace.get_current_span(context=dr_ctx).get_span_context()
+ if dr_span_context.is_valid and not dr_span_context.trace_flags.sampled:
+ return
+
ti_ctx = TraceContextTextMapPropagator().extract(ti.context_carrier)
ti_span = trace.get_current_span(context=ti_ctx)
span_context = ti_span.get_span_context()
diff --git a/airflow-core/src/airflow/models/dagrun.py
b/airflow-core/src/airflow/models/dagrun.py
index 5218f4ed8b0..348d6ed9813 100644
--- a/airflow-core/src/airflow/models/dagrun.py
+++ b/airflow-core/src/airflow/models/dagrun.py
@@ -1072,6 +1072,16 @@ class DagRun(Base, LoggingMixin):
ctx = TraceContextTextMapPropagator().extract(self.context_carrier)
span = trace.get_current_span(context=ctx)
span_context = span.get_span_context()
+
+ # Skip if the run was head-sampled out. Unlike the task spans, this
guard is
+ # required (not just an optimization): the span below is forced to be
a root span
+ # (context=context.Context()), so the configured sampler never sees
the carrier's
+ # flag. A valid-but-unsampled carrier means head-sampled out; an
invalid/empty
+ # carrier (legacy DagRun) recorded no decision, so it falls through
and still
+ # emits — prior behavior we may want to reconsider.
+ if span_context.is_valid and not span_context.trace_flags.sampled:
+ return
+
with override_ids(span_context.trace_id, span_context.span_id):
attributes: dict[str, str] = {
"airflow.dag_id": str(self.dag_id),
diff --git
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
index 395837c0e61..af445997761 100644
---
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
+++
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
@@ -4244,3 +4244,19 @@ class TestEmitTaskSpan:
_emit_task_span(ti, TaskInstanceState.SUCCESS)
assert len(self.exporter.get_finished_spans()) == 0
+
+ @pytest.mark.parametrize(
+ ("trace_flag", "expected_spans"),
+ [
+ pytest.param("01", 1, id="sampled-carrier-emits"),
+ pytest.param("00", 0, id="unsampled-carrier-skips"),
+ ],
+ )
+ def test_emit_task_span_honors_dagrun_carrier_sampling(self, trace_flag,
expected_spans):
+ """A SAMPLED dag_run carrier (flag 01) emits the task span; an
unsampled one (flag 00) is head-sampled out."""
+ ti = self._make_ti()
+ ti.dag_run.context_carrier = {
+ "traceparent":
f"00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-{trace_flag}"
+ }
+ _emit_task_span(ti, TaskInstanceState.SUCCESS)
+ assert len(self.exporter.get_finished_spans()) == expected_spans
diff --git a/airflow-core/tests/unit/models/test_dagrun.py
b/airflow-core/tests/unit/models/test_dagrun.py
index a55b93685d9..1225cd07ece 100644
--- a/airflow-core/tests/unit/models/test_dagrun.py
+++ b/airflow-core/tests/unit/models/test_dagrun.py
@@ -1535,7 +1535,6 @@ class TestDagRun:
@mock.patch.object(Deadline, "prune_deadlines")
def test_dagrun_deadline_variable_interval_missing_variable_fails(self, _,
session, deadline_test_dag):
-
mock_err = mock.Mock()
mock_err.error.value = "MISSING_DEADLINE"
mock_err.detail = "missing deadline"
@@ -4155,10 +4154,23 @@ class TestDagRunTracing:
@pytest.fixture(autouse=True)
def sdk_tracer_provider(self):
- """Patch the module-level tracer with one backed by a real SDK
provider so spans have valid IDs."""
+ """Patch the module-level tracer with one backed by a real SDK
provider so spans have valid IDs.
+
+ Also patch the provider that ``new_dagrun_trace_carrier`` consults so
the
+ head-sampling decision is made by a real SDK sampler (default
+ parentbased_always_on -> SAMPLED) rather than the no-op
ProxyTracerProvider
+ that is otherwise active in the test process (which would honestly
produce
+ an unsampled carrier and suppress emission).
+ """
provider = TracerProvider()
real_tracer = provider.get_tracer("airflow.models.dagrun")
- with mock.patch("airflow.models.dagrun.tracer", real_tracer):
+ with (
+ mock.patch("airflow.models.dagrun.tracer", real_tracer),
+ mock.patch(
+
"airflow._shared.observability.traces.trace.get_tracer_provider",
+ return_value=provider,
+ ),
+ ):
yield
def test_context_carrier_set_on_init(self, dag_maker):
@@ -4325,6 +4337,34 @@ class TestDagRunTracing:
else:
assert len(spans) == 0
+ @pytest.mark.parametrize(
+ ("dag_id", "trace_flag", "expected_spans"),
+ [
+ pytest.param("test_tracing_sampled", "01", 1,
id="sampled-carrier-emits"),
+ pytest.param("test_tracing_unsampled", "00", 0,
id="unsampled-carrier-skips"),
+ ],
+ )
+ def test_emit_dagrun_span_honors_carrier_sampling(
+ self, dag_id, trace_flag, expected_spans, dag_maker, session
+ ):
+ """A SAMPLED carrier (flag 01) emits the dag_run span; an unsampled
carrier (flag 00) is head-sampled out."""
+ in_mem_exporter = InMemorySpanExporter()
+ provider = TracerProvider(id_generator=OverrideableRandomIdGenerator())
+ provider.add_span_processor(SimpleSpanProcessor(in_mem_exporter))
+ test_tracer = provider.get_tracer("test")
+
+ with dag_maker(dag_id, session=session) as dag:
+ EmptyOperator(task_id="t1")
+ dr = dag_maker.create_dagrun(state=DagRunState.RUNNING)
+ dr.dag = dag
+ traceparent =
f"00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-{trace_flag}"
+ dr.context_carrier = {"traceparent": traceparent}
+
+ with mock.patch("airflow.models.dagrun.tracer", test_tracer):
+ dr._emit_dagrun_span(state=DagRunState.SUCCESS)
+
+ assert len(in_mem_exporter.get_finished_spans()) == expected_spans
+
@pytest.mark.db_test
def test_context_carrier_includes_detail_level_from_conf(self, dag_maker):
"""DagRun created with TASK_SPAN_DETAIL_LEVEL_KEY in conf should
encode the level in trace state."""
diff --git a/shared/listeners/tests/listeners/test_listener_manager.py
b/shared/listeners/tests/listeners/test_listener_manager.py
index 3843c89f88f..d5203d3fd13 100644
--- a/shared/listeners/tests/listeners/test_listener_manager.py
+++ b/shared/listeners/tests/listeners/test_listener_manager.py
@@ -181,7 +181,17 @@ def test_tracer():
provider = TracerProvider()
provider.add_span_processor(SimpleSpanProcessor(exporter))
tracer = provider.get_tracer("test")
- with mock.patch.object(listener_module, "tracer", tracer):
+ with (
+ mock.patch.object(listener_module, "tracer", tracer),
+ # new_dagrun_trace_carrier consults the global provider's sampler to
set the
+ # carrier's SAMPLED flag. In tests the global provider is a no-op
+ # ProxyTracerProvider (no sampler) -> unsampled carrier -> ParentBased
drops
+ # the spans. Point the lookup at a real sampling provider so spans
record.
+ mock.patch(
+ "airflow_shared.observability.traces.trace.get_tracer_provider",
+ return_value=provider,
+ ),
+ ):
yield tracer, exporter
diff --git
a/shared/observability/src/airflow_shared/observability/traces/__init__.py
b/shared/observability/src/airflow_shared/observability/traces/__init__.py
index 8fd4e127b77..6037162232e 100644
--- a/shared/observability/src/airflow_shared/observability/traces/__init__.py
+++ b/shared/observability/src/airflow_shared/observability/traces/__init__.py
@@ -28,6 +28,7 @@ from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, SpanExporter
from opentelemetry.sdk.trace.id_generator import RandomIdGenerator
+from opentelemetry.sdk.trace.sampling import Decision
from opentelemetry.trace import NonRecordingSpan, Span, SpanContext,
TraceFlags, TraceState
from opentelemetry.trace.propagation.tracecontext import
TraceContextTextMapPropagator
@@ -61,15 +62,47 @@ DEFAULT_TASK_SPAN_DETAIL_LEVEL = 1
def new_dagrun_trace_carrier(task_span_detail_level=None) -> dict[str, str]:
- """Generate a fresh W3C traceparent carrier without creating a recordable
span."""
+ """
+ Generate a fresh W3C traceparent carrier without creating a recordable
span.
+
+ The SAMPLED flag is set from an honest *root* sampling decision made by the
+ configured tracer provider's sampler (driven by ``OTEL_TRACES_SAMPLER`` /
+ ``OTEL_TRACES_SAMPLER_ARG``), rather than being hardcoded. This makes the
+ carrier the single head-sampling decision point for a DAG run: every
+ downstream span (dag_run, task_run, worker) rides on this flag.
+ """
gen = RandomIdGenerator()
- trace_state_entries = build_trace_state_entries(task_span_detail_level)
+ trace_id = gen.generate_trace_id()
+
+ provider = trace.get_tracer_provider()
+ sampler = getattr(provider, "sampler", None)
+ if sampler is not None:
+ result = sampler.should_sample(
+ parent_context=None, # root decision
+ trace_id=trace_id,
+ name="dag_run",
+ )
+ sampled = result.decision == Decision.RECORD_AND_SAMPLE
+ sampler_trace_state = result.trace_state
+ else:
+ # No sampler attribute means a proxy/no-op provider (otel disabled).
+ # Nothing exports in that case, so the flag is irrelevant; mirror the
+ # observable behavior of today when otel is off.
+ sampled = False
+ sampler_trace_state = None
+
+ # Preserve the detail-level tracestate by merging it onto whatever the
+ # sampler returned. TraceState is immutable, so update() returns a new one.
+ trace_state = sampler_trace_state or TraceState()
+ for key, value in build_trace_state_entries(task_span_detail_level):
+ trace_state = trace_state.update(key, value)
+
span_ctx = SpanContext(
- trace_id=gen.generate_trace_id(),
+ trace_id=trace_id,
span_id=gen.generate_span_id(),
is_remote=False,
- trace_flags=TraceFlags(TraceFlags.SAMPLED),
- trace_state=TraceState(entries=trace_state_entries),
+ trace_flags=TraceFlags(TraceFlags.SAMPLED if sampled else 0),
+ trace_state=trace_state,
)
ctx = trace.set_span_in_context(NonRecordingSpan(span_ctx))
carrier: dict[str, str] = {}
diff --git a/shared/observability/tests/observability/test_traces.py
b/shared/observability/tests/observability/test_traces.py
index b21cc3c8761..088a54e6248 100644
--- a/shared/observability/tests/observability/test_traces.py
+++ b/shared/observability/tests/observability/test_traces.py
@@ -17,6 +17,15 @@
# under the License.
from __future__ import annotations
+import pytest
+from opentelemetry import trace
+from opentelemetry.sdk.trace import TracerProvider
+from opentelemetry.sdk.trace.sampling import (
+ ALWAYS_OFF,
+ ALWAYS_ON,
+ ParentBased,
+ TraceIdRatioBased,
+)
from opentelemetry.trace import NonRecordingSpan, SpanContext, TraceFlags,
TraceState
from opentelemetry.trace.propagation.tracecontext import
TraceContextTextMapPropagator
@@ -29,6 +38,11 @@ from airflow_shared.observability.traces import (
)
+def _carrier_is_sampled(carrier: dict[str, str]) -> bool:
+ ctx = TraceContextTextMapPropagator().extract(carrier)
+ return trace.get_current_span(ctx).get_span_context().trace_flags.sampled
+
+
class TestBuildTraceStateEntries:
def test_with_integer_level(self):
entries = build_trace_state_entries(2)
@@ -68,6 +82,95 @@ class TestNewDagrunTraceCarrier:
assert span_ctx.trace_state.get(TASK_SPAN_DETAIL_LEVEL_KEY) is None
+class TestNewDagrunTraceCarrierSampling:
+ """The carrier's SAMPLED flag should reflect the configured sampler's root
decision."""
+
+ @pytest.fixture
+ def with_sampler(self, monkeypatch):
+ """Install a TracerProvider with the given sampler for
new_dagrun_trace_carrier."""
+
+ def _install(sampler):
+ provider = TracerProvider(sampler=sampler)
+ monkeypatch.setattr(
+
"airflow_shared.observability.traces.trace.get_tracer_provider",
+ lambda: provider,
+ )
+
+ return _install
+
+ def test_no_sampler_provider_not_sampled(self, monkeypatch):
+ """A proxy/no-op provider (otel off) has no ``sampler`` attribute ->
not sampled."""
+
+ class _NoSamplerProvider:
+ pass
+
+ monkeypatch.setattr(
+ "airflow_shared.observability.traces.trace.get_tracer_provider",
+ lambda: _NoSamplerProvider(),
+ )
+ assert _carrier_is_sampled(new_dagrun_trace_carrier()) is False
+
+ def test_default_provider_is_sampled(self):
+ """The SDK default provider (parentbased_always_on) samples the root
-> backcompat."""
+ # No monkeypatching: rely on whatever default provider is configured.
+ # A bare TracerProvider() defaults to parentbased_always_on.
+ provider = TracerProvider()
+ assert provider.sampler is not None
+ result = provider.sampler.should_sample(parent_context=None,
trace_id=1234, name="dag_run")
+ from opentelemetry.sdk.trace.sampling import Decision
+
+ assert result.decision == Decision.RECORD_AND_SAMPLE
+
+ def test_always_on_is_sampled(self, with_sampler):
+ with_sampler(ParentBased(ALWAYS_ON))
+ assert _carrier_is_sampled(new_dagrun_trace_carrier()) is True
+
+ def test_always_off_is_not_sampled(self, with_sampler):
+ with_sampler(ALWAYS_OFF)
+ assert _carrier_is_sampled(new_dagrun_trace_carrier()) is False
+
+ def test_traceidratio_is_deterministic_per_trace_id(self, with_sampler):
+ """A ratio sampler makes a deterministic decision keyed on trace_id."""
+ with_sampler(TraceIdRatioBased(0.5))
+ # Generate a batch; with ratio 0.5 we expect a mix, and each individual
+ # decision must be stable for its own trace_id.
+ carriers = [new_dagrun_trace_carrier() for _ in range(50)]
+ decisions = [_carrier_is_sampled(c) for c in carriers]
+ # Re-evaluating the same trace_id yields the same decision
(determinism):
+ sampler = TraceIdRatioBased(0.5)
+ from opentelemetry.sdk.trace.sampling import Decision
+
+ for carrier, decided in zip(carriers, decisions):
+ ctx = TraceContextTextMapPropagator().extract(carrier)
+ trace_id = trace.get_current_span(ctx).get_span_context().trace_id
+ redo = sampler.should_sample(parent_context=None,
trace_id=trace_id, name="dag_run")
+ assert (redo.decision == Decision.RECORD_AND_SAMPLE) == decided
+ # Check: ratio 0.5 over 50 should produce a mix of both outcomes.
+ assert any(decisions)
+ assert not all(decisions)
+
+ def test_ratio_zero_never_sampled(self, with_sampler):
+ with_sampler(TraceIdRatioBased(0.0))
+ assert all(_carrier_is_sampled(new_dagrun_trace_carrier()) is False
for _ in range(20))
+
+ def test_detail_level_roundtrips_when_sampled(self, with_sampler):
+ with_sampler(ParentBased(ALWAYS_ON))
+ carrier = new_dagrun_trace_carrier(task_span_detail_level=3)
+ ctx = TraceContextTextMapPropagator().extract(carrier)
+ span = trace.get_current_span(ctx)
+ assert get_task_span_detail_level(span) == 3
+ assert _carrier_is_sampled(carrier) is True
+
+ def test_detail_level_roundtrips_when_not_sampled(self, with_sampler):
+ """Detail-level tracestate must survive even for an unsampled
carrier."""
+ with_sampler(ALWAYS_OFF)
+ carrier = new_dagrun_trace_carrier(task_span_detail_level=2)
+ ctx = TraceContextTextMapPropagator().extract(carrier)
+ span = trace.get_current_span(ctx)
+ assert get_task_span_detail_level(span) == 2
+ assert _carrier_is_sampled(carrier) is False
+
+
class TestGetTaskSpanDetailLevel:
def _make_span_with_trace_state(self, entries: list[tuple[str, str]]) ->
NonRecordingSpan:
from opentelemetry.sdk.trace.id_generator import RandomIdGenerator
diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
index 8c1312833e8..be5ea4165ad 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
@@ -5421,6 +5421,24 @@ class TestTaskInstanceMetrics:
class TestDetailSpan:
"""Tests for the detail_span decorator / context manager."""
+ @pytest.fixture(autouse=True)
+ def _sampled_carrier_provider(self):
+ """Make new_dagrun_trace_carrier produce a SAMPLED carrier.
+
+ new_dagrun_trace_carrier consults the global tracer provider's sampler
to
+ decide the carrier's SAMPLED flag. In the test process the global
provider
+ is a no-op ProxyTracerProvider (no sampler) -> unsampled carrier, which
+ would make the parent span (and its detail children) non-recording.
Patch
+ the lookup to a real SDK provider whose default sampler
+ (parentbased_always_on) samples the root, mirroring "otel on" in
production.
+ """
+ provider = TracerProvider()
+ with mock.patch(
+ "airflow._shared.observability.traces.trace.get_tracer_provider",
+ return_value=provider,
+ ):
+ yield
+
def test_level_1_no_child_span_as_context_manager(self):
"""At detail level 1, entering detail_span should not create a real
recorded span."""
exporter = InMemorySpanExporter()