This is an automated email from the ASF dual-hosted git repository.
dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 0decb3cbd1e Simplify approach for creating dag run and task spans
(#62554)
0decb3cbd1e is described below
commit 0decb3cbd1ecce2a6e55d6148f517f4cde6cefdd
Author: Daniel Standish <[email protected]>
AuthorDate: Tue Mar 10 12:12:42 2026 -0700
Simplify approach for creating dag run and task spans (#62554)
Previously we had a very high touch way of creating long-running spans for
dag runs and tasks and keeping them alive across scheduler handoffs etc. We
figured out a simpler way to do that.
Additionally, we determined that there was really no need to use the custom
airflow tracing interfaces and resolved instead to do OTEL in a way where we
can just use the OTEL interfaces directly.
We pushed the task span creation down to the tasks, so that task spans run
in the task process.
Replace the custom base tracer class and scattered OTEL logic with a
focused helper module in `airflow/observability/traces/`.
Remove the large blocks of tracing code from `scheduler_job_runner.py` and
`dagrun.py` and consolidate span creation for dag runs and task execution into
clean, reusable functions.
Add configurable flush timeout and improve span naming.
---
.../src/airflow/config_templates/config.yml | 12 +
.../src/airflow/executors/base_executor.py | 44 ---
.../src/airflow/executors/workloads/task.py | 2 +-
.../src/airflow/jobs/scheduler_job_runner.py | 262 ------------------
.../src/airflow/jobs/triggerer_job_runner.py | 10 -
airflow-core/src/airflow/models/dagrun.py | 174 +++---------
.../src/airflow/observability/traces/__init__.py | 134 ++++++++++
airflow-core/src/airflow/settings.py | 4 +-
.../tests/integration/otel/dags/otel_test_dag.py | 57 +---
.../dags/otel_test_dag_with_pause_between_tasks.py | 158 -----------
.../otel/dags/otel_test_dag_with_pause_in_task.py | 151 -----------
airflow-core/tests/integration/otel/test_otel.py | 20 +-
airflow-core/tests/unit/jobs/test_scheduler_job.py | 187 -------------
airflow-core/tests/unit/models/test_dagrun.py | 296 +++++++++++----------
docs/spelling_wordlist.txt | 1 +
scripts/ci/docker-compose/integration-otel.yml | 2 +-
task-sdk/src/airflow/sdk/definitions/dag.py | 4 -
.../src/airflow/sdk/execution_time/task_runner.py | 119 ++++++---
.../task_sdk/execution_time/test_task_runner.py | 117 +++++++-
19 files changed, 549 insertions(+), 1205 deletions(-)
diff --git a/airflow-core/src/airflow/config_templates/config.yml
b/airflow-core/src/airflow/config_templates/config.yml
index 8558a71236e..2d70e625c15 100644
--- a/airflow-core/src/airflow/config_templates/config.yml
+++ b/airflow-core/src/airflow/config_templates/config.yml
@@ -1414,9 +1414,21 @@ traces:
description: |
If True, then traces from Airflow internal methods are exported.
Defaults to False.
version_added: 3.1.0
+ version_deprecated: 3.2.0
+ deprecation_reason: |
+ This parameter is no longer used.
type: string
example: ~
default: "False"
+ task_runner_flush_timeout_milliseconds:
+ description: |
+ Timeout in milliseconds to wait for the OpenTelemetry span exporter to
flush pending spans
+ when a task runner process exits. If the exporter does not finish
within this time, any
+ buffered spans may be dropped.
+ version_added: 3.2.0
+ type: integer
+ example: ~
+ default: "30000"
secrets:
description: ~
options:
diff --git a/airflow-core/src/airflow/executors/base_executor.py
b/airflow-core/src/airflow/executors/base_executor.py
index 2997d55d8bb..d67c25c7baf 100644
--- a/airflow-core/src/airflow/executors/base_executor.py
+++ b/airflow-core/src/airflow/executors/base_executor.py
@@ -32,14 +32,11 @@ from airflow.cli.cli_config import DefaultHelpParser
from airflow.configuration import conf
from airflow.executors import workloads
from airflow.executors.executor_loader import ExecutorLoader
-from airflow.executors.workloads.task import TaskInstanceDTO
from airflow.models import Log
from airflow.models.callback import CallbackKey
from airflow.observability.metrics import stats_utils
-from airflow.observability.trace import Trace
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.state import TaskInstanceState
-from airflow.utils.thread_safe_dict import ThreadSafeDict
PARALLELISM: int = conf.getint("core", "PARALLELISM")
@@ -143,8 +140,6 @@ class BaseExecutor(LoggingMixin):
:param parallelism: how many jobs should run at one time.
"""
- active_spans = ThreadSafeDict()
-
supports_ad_hoc_ti_run: bool = False
supports_callbacks: bool = False
supports_multi_team: bool = False
@@ -217,10 +212,6 @@ class BaseExecutor(LoggingMixin):
_repr += ")"
return _repr
- @classmethod
- def set_active_spans(cls, active_spans: ThreadSafeDict):
- cls.active_spans = active_spans
-
def start(self): # pragma: no cover
"""Executors may need to get things started."""
@@ -340,17 +331,6 @@ class BaseExecutor(LoggingMixin):
queued_tasks_metric_name =
self._get_metric_name("executor.queued_tasks")
running_tasks_metric_name =
self._get_metric_name("executor.running_tasks")
- span = Trace.get_current_span()
- if span.is_recording():
- span.add_event(
- name="executor",
- attributes={
- open_slots_metric_name: open_slots,
- queued_tasks_metric_name: num_queued_tasks,
- running_tasks_metric_name: num_running_tasks,
- },
- )
-
self.log.debug("%s running task instances for executor %s",
num_running_tasks, name)
self.log.debug("%s in queue for executor %s", num_queued_tasks, name)
if open_slots == 0:
@@ -415,30 +395,6 @@ class BaseExecutor(LoggingMixin):
if key in self.attempts:
del self.attempts[key]
- if isinstance(workload, workloads.ExecuteTask) and
hasattr(workload, "ti"):
- ti = workload.ti
-
- # If it's None, then the span for the current id hasn't been
started.
- if self.active_spans is not None and
self.active_spans.get("ti:" + str(ti.id)) is None:
- if isinstance(ti, TaskInstanceDTO):
- parent_context =
Trace.extract(ti.parent_context_carrier)
- else:
- parent_context =
Trace.extract(ti.dag_run.context_carrier)
- # Start a new span using the context from the parent.
- # Attributes will be set once the task has finished so
that all
- # values will be available (end_time, duration, etc.).
-
- span = Trace.start_child_span(
- span_name=f"{ti.task_id}",
- parent_context=parent_context,
- component="task",
- start_as_current=False,
- )
- self.active_spans.set("ti:" + str(ti.id), span)
- # Inject the current context into the carrier.
- carrier = Trace.inject()
- ti.context_carrier = carrier
-
workload_list.append(workload)
if workload_list:
diff --git a/airflow-core/src/airflow/executors/workloads/task.py
b/airflow-core/src/airflow/executors/workloads/task.py
index d691dcb6f09..a5939cf4244 100644
--- a/airflow-core/src/airflow/executors/workloads/task.py
+++ b/airflow-core/src/airflow/executors/workloads/task.py
@@ -86,7 +86,7 @@ class ExecuteTask(BaseDagBundleWorkload):
from airflow.utils.helpers import log_filename_template_renderer
ser_ti = TaskInstanceDTO.model_validate(ti, from_attributes=True)
- ser_ti.parent_context_carrier = ti.dag_run.context_carrier
+ ser_ti.context_carrier = ti.dag_run.context_carrier
if not bundle_info:
bundle_info = BundleInfo(
name=ti.dag_model.bundle_name,
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index 2d58f295c6e..c667631756e 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -32,7 +32,6 @@ from datetime import date, datetime, timedelta
from functools import lru_cache, partial
from itertools import groupby
from typing import TYPE_CHECKING, Any
-from uuid import UUID
from sqlalchemy import (
and_,
@@ -98,17 +97,14 @@ from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.models.team import Team
from airflow.models.trigger import TRIGGER_FAIL_REPR, Trigger,
TriggerFailureReason
from airflow.observability.metrics import stats_utils
-from airflow.observability.trace import Trace
from airflow.serialization.definitions.assets import SerializedAssetUniqueKey
from airflow.serialization.definitions.notset import NOTSET
from airflow.ti_deps.dependencies_states import EXECUTION_STATES
from airflow.timetables.simple import AssetTriggeredTimetable
-from airflow.utils.dates import datetime_to_nano
from airflow.utils.event_scheduler import EventScheduler
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.retries import MAX_DB_RETRIES, retry_db_transaction,
run_with_db_retries
from airflow.utils.session import NEW_SESSION, create_session, provide_session
-from airflow.utils.span_status import SpanStatus
from airflow.utils.sqlalchemy import (
get_dialect_name,
is_lock_not_available_error,
@@ -116,7 +112,6 @@ from airflow.utils.sqlalchemy import (
with_row_locks,
)
from airflow.utils.state import CallbackState, DagRunState, State,
TaskInstanceState
-from airflow.utils.thread_safe_dict import ThreadSafeDict
from airflow.utils.types import DagRunTriggeredByType, DagRunType
if TYPE_CHECKING:
@@ -273,14 +268,6 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
job_type = "SchedulerJob"
- # For a dagrun span
- # - key: dag_run.run_id | value: span
- # - dagrun keys will be prefixed with 'dr:'.
- # For a ti span
- # - key: ti.id | value: span
- # - taskinstance keys will be prefixed with 'ti:'.
- active_spans = ThreadSafeDict()
-
def __init__(
self,
job: Job,
@@ -434,9 +421,6 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
def _exit_gracefully(self, signum: int, frame: FrameType | None) -> None:
"""Clean up processor_agent to avoid leaving orphan processes."""
- if self._is_tracing_enabled():
- self._end_active_spans()
-
if not _is_parent_process():
# Only the parent process should perform the cleanup.
return
@@ -1311,18 +1295,6 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
ti.pid,
)
- if (active_ti_span := cls.active_spans.get("ti:" + str(ti.id))) is
not None:
- cls.set_ti_span_attrs(span=active_ti_span, state=state, ti=ti)
- # End the span and remove it from the active_spans dict.
- active_ti_span.end(end_time=datetime_to_nano(ti.end_date))
- cls.active_spans.delete("ti:" + str(ti.id))
- ti.span_status = SpanStatus.ENDED
- else:
- if ti.span_status == SpanStatus.ACTIVE:
- # Another scheduler has started the span.
- # Update the SpanStatus to let the process know that it
must end it.
- ti.span_status = SpanStatus.SHOULD_END
-
# There are two scenarios why the same TI with the same try_number
is queued
# after executor is finished with it:
# 1) the TI was killed externally and it had no time to mark
itself failed
@@ -1459,39 +1431,6 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
return len(event_buffer)
- @classmethod
- def set_ti_span_attrs(cls, span, state, ti):
- span.set_attributes(
- {
- "airflow.category": "scheduler",
- "airflow.task.id": ti.id,
- "airflow.task.task_id": ti.task_id,
- "airflow.task.dag_id": ti.dag_id,
- "airflow.task.state": ti.state,
- "airflow.task.error": state == TaskInstanceState.FAILED,
- "airflow.task.start_date": str(ti.start_date),
- "airflow.task.end_date": str(ti.end_date),
- "airflow.task.duration": ti.duration,
- "airflow.task.executor_config": str(ti.executor_config),
- "airflow.task.logical_date": str(ti.logical_date),
- "airflow.task.hostname": ti.hostname,
- "airflow.task.log_url": ti.log_url,
- "airflow.task.operator": str(ti.operator),
- "airflow.task.try_number": ti.try_number,
- "airflow.task.executor_state": state,
- "airflow.task.pool": ti.pool,
- "airflow.task.queue": ti.queue,
- "airflow.task.priority_weight": ti.priority_weight,
- "airflow.task.queued_dttm": str(ti.queued_dttm),
- "airflow.task.queued_by_job_id": ti.queued_by_job_id,
- "airflow.task.pid": ti.pid,
- }
- )
- if span.is_recording():
- span.add_event(name="airflow.task.queued",
timestamp=datetime_to_nano(ti.queued_dttm))
- span.add_event(name="airflow.task.started",
timestamp=datetime_to_nano(ti.start_date))
- span.add_event(name="airflow.task.ended",
timestamp=datetime_to_nano(ti.end_date))
-
def _execute(self) -> int | None:
import os
@@ -1515,12 +1454,6 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
executor.start()
# local import due to type_checking.
- from airflow.executors.base_executor import BaseExecutor
-
- # Pass a reference to the dictionary.
- # Any changes made by a dag_run instance, will be reflected to the
dictionary of this class.
- DagRun.set_active_spans(active_spans=self.active_spans)
- BaseExecutor.set_active_spans(active_spans=self.active_spans)
stats_factory = stats_utils.get_stats_factory(Stats)
Stats.initialize(factory=stats_factory)
@@ -1571,162 +1504,6 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
except Exception as e: # should not fail the scheduler
self.log.exception("Failed to update dag run state for paused dags
due to %s", e)
- @provide_session
- def _end_active_spans(self, session: Session = NEW_SESSION):
- # No need to do a commit for every update. The annotation will commit
all of them once at the end.
- for prefixed_key, span in self.active_spans.get_all().items():
- # Use partition to split on the first occurrence of ':'.
- prefix, sep, key = prefixed_key.partition(":")
-
- if prefix == "ti":
- ti_result = session.get(TaskInstance, UUID(key))
- if ti_result is None:
- continue
- ti: TaskInstance = ti_result
-
- if ti.state in State.finished:
- self.set_ti_span_attrs(span=span, state=ti.state, ti=ti)
- span.end(end_time=datetime_to_nano(ti.end_date))
- ti.span_status = SpanStatus.ENDED
- else:
- span.end()
- ti.span_status = SpanStatus.NEEDS_CONTINUANCE
- elif prefix == "dr":
- dag_run: DagRun | None = session.scalars(
- select(DagRun).where(DagRun.id == int(key))
- ).one_or_none()
- if dag_run is None:
- continue
- if dag_run.state in State.finished_dr_states:
- dag_run.set_dagrun_span_attrs(span=span)
-
- span.end(end_time=datetime_to_nano(dag_run.end_date))
- dag_run.span_status = SpanStatus.ENDED
- else:
- span.end()
- dag_run.span_status = SpanStatus.NEEDS_CONTINUANCE
- initial_dag_run_context =
Trace.extract(dag_run.context_carrier)
- with Trace.start_child_span(
- span_name="current_scheduler_exited",
parent_context=initial_dag_run_context
- ) as s:
- s.set_attribute("trace_status", "needs continuance")
- else:
- self.log.error("Found key with unknown prefix: '%s'",
prefixed_key)
-
- # Even if there is a key with an unknown prefix, clear the dict.
- # If this method has been called, the scheduler is exiting.
- self.active_spans.clear()
-
- def _end_spans_of_externally_ended_ops(self, session: Session):
- # The scheduler that starts a dag_run or a task is also the one that
starts the spans.
- # Each scheduler should end the spans that it has started.
- #
- # Otel spans are implemented in a certain way so that the objects
- # can't be shared between processes or get recreated.
- # It is done so that the process that starts a span, is also the one
that ends it.
- #
- # If another scheduler has finished processing a dag_run or a task and
there is a reference
- # on the active_spans dictionary, then the current scheduler started
the span,
- # and therefore must end it.
- dag_runs_should_end: list[DagRun] = list(
- session.scalars(select(DagRun).where(DagRun.span_status ==
SpanStatus.SHOULD_END))
- )
- tis_should_end: list[TaskInstance] = list(
-
session.scalars(select(TaskInstance).where(TaskInstance.span_status ==
SpanStatus.SHOULD_END))
- )
-
- for dag_run in dag_runs_should_end:
- active_dagrun_span = self.active_spans.get("dr:" + str(dag_run.id))
- if active_dagrun_span is not None:
- if dag_run.state in State.finished_dr_states:
- dag_run.set_dagrun_span_attrs(span=active_dagrun_span)
-
-
active_dagrun_span.end(end_time=datetime_to_nano(dag_run.end_date))
- else:
- active_dagrun_span.end()
- self.active_spans.delete("dr:" + str(dag_run.id))
- dag_run.span_status = SpanStatus.ENDED
-
- for ti in tis_should_end:
- active_ti_span = self.active_spans.get(f"ti:{ti.id}")
- if active_ti_span is not None:
- if ti.state in State.finished:
- self.set_ti_span_attrs(span=active_ti_span,
state=ti.state, ti=ti)
- active_ti_span.end(end_time=datetime_to_nano(ti.end_date))
- else:
- active_ti_span.end()
- self.active_spans.delete(f"ti:{ti.id}")
- ti.span_status = SpanStatus.ENDED
-
- def _recreate_unhealthy_scheduler_spans_if_needed(self, dag_run: DagRun,
session: Session):
- # There are two scenarios:
- # 1. scheduler is unhealthy but managed to update span_status
- # 2. scheduler is unhealthy and didn't manage to make any updates
- # Check the span_status first, in case the 2nd db query can be avoided
(scenario 1).
-
- # If the dag_run is scheduled by a different scheduler, and it's still
running and the span is active,
- # then check the Job table to determine if the initial scheduler is
still healthy.
- if (
- dag_run.scheduled_by_job_id != self.job.id
- and dag_run.state in State.unfinished_dr_states
- and dag_run.span_status == SpanStatus.ACTIVE
- ):
- initial_scheduler_id = dag_run.scheduled_by_job_id
- job: Job | None = session.scalars(
- select(Job).where(
- Job.id == initial_scheduler_id,
- Job.job_type == "SchedulerJob",
- )
- ).one_or_none()
- if job is None:
- return
-
- if not job.is_alive():
- # Start a new span for the dag_run.
- dr_span = Trace.start_root_span(
- span_name=f"{dag_run.dag_id}_recreated",
- component="dag",
- start_time=dag_run.queued_at,
- start_as_current=False,
- )
- carrier = Trace.inject()
- # Update the context_carrier and leave the SpanStatus as
ACTIVE.
- dag_run.context_carrier = carrier
- self.active_spans.set("dr:" + str(dag_run.id), dr_span)
-
- tis = dag_run.get_task_instances(session=session)
-
- # At this point, any tis will have been adopted by the current
scheduler,
- # and ti.queued_by_job_id will point to the current id.
- # Any tis that have been executed by the unhealthy scheduler,
will need a new span
- # so that it can be associated with the new dag_run span.
- tis_needing_spans = [
- ti
- for ti in tis
- # If it has started and there is a reference on the
active_spans dict,
- # then it was started by the current scheduler.
- if ti.start_date is not None and
self.active_spans.get(f"ti:{ti.id}") is None
- ]
-
- dr_context = Trace.extract(dag_run.context_carrier)
- for ti in tis_needing_spans:
- ti_span = Trace.start_child_span(
- span_name=f"{ti.task_id}_recreated",
- parent_context=dr_context,
- start_time=ti.queued_dttm,
- start_as_current=False,
- )
- ti_carrier = Trace.inject()
- ti.context_carrier = ti_carrier
-
- if ti.state in State.finished:
- self.set_ti_span_attrs(span=ti_span, state=ti.state,
ti=ti)
- ti_span.end(end_time=datetime_to_nano(ti.end_date))
- ti.span_status = SpanStatus.ENDED
- else:
- ti.span_status = SpanStatus.ACTIVE
- self.active_spans.set(f"ti:{ti.id}", ti_span)
-
def _run_scheduler_loop(self) -> None:
"""
Harvest DAG parsing results, queue tasks, and perform executor
heartbeat; the actual scheduler loop.
@@ -1819,9 +1596,6 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
for loop_count in itertools.count(start=1):
with Stats.timer("scheduler.scheduler_loop_duration") as timer:
with create_session() as session:
- if self._is_tracing_enabled():
- self._end_spans_of_externally_ended_ops(session)
-
# This will schedule for as many executors as possible.
num_queued_tis = self._do_scheduling(session)
# Don't keep any objects alive -- we've possibly just
looked at 500+ ORM objects!
@@ -2357,16 +2131,6 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
active_runs_of_dags = Counter({(dag_id, br_id): num for dag_id, br_id,
num in session.execute(query)})
def _update_state(dag: SerializedDAG, dag_run: DagRun):
- span = Trace.get_current_span()
- span.set_attributes(
- {
- "state": str(DagRunState.RUNNING),
- "run_id": dag_run.run_id,
- "type": dag_run.run_type,
- "dag_id": dag_run.dag_id,
- }
- )
-
dag_run.state = DagRunState.RUNNING
dag_run.start_date = timezone.utcnow()
if (
@@ -2383,18 +2147,12 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
tags={},
extra_tags={"dag_id": dag.dag_id},
)
- if span.is_recording():
- span.add_event(
- name="schedule_delay",
- attributes={"dag_id": dag.dag_id, "schedule_delay":
str(schedule_delay)},
- )
# cache saves time during scheduling of many dag_runs for same dag
cached_get_dag: Callable[[DagRun], SerializedDAG | None] = lru_cache()(
partial(self.scheduler_dag_bag.get_dag_for_run, session=session)
)
- span = Trace.get_current_span()
for dag_run in dag_runs:
dag_id = dag_run.dag_id
run_id = dag_run.run_id
@@ -2434,15 +2192,6 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
dag_run.run_id,
)
continue
- if span.is_recording():
- span.add_event(
- name="dag_run",
- attributes={
- "run_id": dag_run.run_id,
- "dag_id": dag_run.dag_id,
- "conf": str(dag_run.conf),
- },
- )
active_runs_of_dags[(dag_run.dag_id, backfill_id)] += 1
_update_state(dag, dag_run)
dag_run.notify_dagrun_state_changed(msg="started")
@@ -2554,17 +2303,6 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
self.log.warning("The DAG disappeared before verifying integrity:
%s. Skipping.", dag_run.dag_id)
return callback
- if (
- self._is_tracing_enabled()
- and dag_run.scheduled_by_job_id is not None
- and dag_run.scheduled_by_job_id != self.job.id
- and self.active_spans.get("dr:" + str(dag_run.id)) is None
- ):
- # If the dag_run has been previously scheduled by another job and
there is no active span,
- # then check if the job is still healthy.
- # If it's not healthy, then recreate the spans.
- self._recreate_unhealthy_scheduler_spans_if_needed(dag_run,
session)
-
dag_run.scheduled_by_job_id = self.job.id
# TODO[HA]: Rename update_state -> schedule_dag_run, ?? something else?
diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py
b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
index ca116478110..1406283c05c 100644
--- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py
+++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
@@ -50,7 +50,6 @@ from airflow.jobs.base_job_runner import BaseJobRunner
from airflow.jobs.job import perform_heartbeat
from airflow.models.trigger import Trigger
from airflow.observability.metrics import stats_utils
-from airflow.observability.trace import Trace
from airflow.sdk.api.datamodels._generated import HITLDetailResponse
from airflow.sdk.execution_time.comms import (
CommsDecoder,
@@ -627,15 +626,6 @@ class TriggerRunnerSupervisor(WatchedSubprocess):
extra_tags={"hostname": self.job.hostname},
)
- span = Trace.get_current_span()
- span.set_attributes(
- {
- "trigger host": self.job.hostname,
- "triggers running": len(self.running_triggers),
- "capacity left": capacity_left,
- }
- )
-
def update_triggers(self, requested_trigger_ids: set[int]):
"""
Request that we update what triggers we're running.
diff --git a/airflow-core/src/airflow/models/dagrun.py
b/airflow-core/src/airflow/models/dagrun.py
index 4bc47dddea7..61242e45390 100644
--- a/airflow-core/src/airflow/models/dagrun.py
+++ b/airflow-core/src/airflow/models/dagrun.py
@@ -28,6 +28,9 @@ from typing import TYPE_CHECKING, Any, NamedTuple, TypeVar,
cast, overload
from uuid import UUID
import structlog
+from opentelemetry import context, trace
+from opentelemetry.trace import StatusCode
+from opentelemetry.trace.propagation.tracecontext import
TraceContextTextMapPropagator
from sqlalchemy import (
JSON,
Enum,
@@ -72,12 +75,11 @@ from airflow.models.taskinstance import TaskInstance as TI
from airflow.models.taskinstancehistory import TaskInstanceHistory as TIH
from airflow.models.tasklog import LogTemplate
from airflow.models.taskmap import TaskMap
-from airflow.observability.trace import Trace
+from airflow.observability.traces import new_dagrun_trace_carrier, override_ids
from airflow.serialization.definitions.deadline import
SerializedReferenceModels
from airflow.serialization.definitions.notset import NOTSET, ArgNotSet,
is_arg_set
from airflow.ti_deps.dep_context import DepContext
from airflow.ti_deps.dependencies_states import SCHEDULEABLE_STATES
-from airflow.utils.dates import datetime_to_nano
from airflow.utils.helpers import chunks, is_container, prune_dict
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.retries import retry_db_transaction
@@ -92,19 +94,16 @@ from airflow.utils.sqlalchemy import (
)
from airflow.utils.state import DagRunState, State, TaskInstanceState
from airflow.utils.strings import get_random_string
-from airflow.utils.thread_safe_dict import ThreadSafeDict
from airflow.utils.types import DagRunTriggeredByType, DagRunType
if TYPE_CHECKING:
from typing import Literal, TypeAlias
- from opentelemetry.sdk.trace import Span
from pydantic import NonNegativeInt
from sqlalchemy.engine import ScalarResult
from sqlalchemy.orm import Session
from sqlalchemy.sql.elements import Case, ColumnElement
- from airflow._shared.observability.traces.base_tracer import EmptySpan
from airflow.models.dag_version import DagVersion
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.sdk import DAG as SDKDAG
@@ -120,6 +119,8 @@ RUN_ID_REGEX =
r"^(?:manual|scheduled|asset_triggered)__(?:\d{4}-\d{2}-\d{2}T\d{
log = structlog.get_logger(__name__)
+tracer = trace.get_tracer(__name__)
+
class TISchedulingDecision(NamedTuple):
"""Type of return for DagRun.task_instance_scheduling_decisions."""
@@ -153,8 +154,6 @@ class DagRun(Base, LoggingMixin):
external trigger (i.e. manual runs).
"""
- active_spans = ThreadSafeDict()
-
__tablename__ = "dag_run"
id: Mapped[int] = mapped_column(Integer, primary_key=True)
@@ -368,7 +367,8 @@ class DagRun(Base, LoggingMixin):
self.triggered_by = triggered_by
self.triggering_user_name = triggering_user_name
self.scheduled_by_job_id = None
- self.context_carrier = {}
+ self.context_carrier: dict[str, str] = new_dagrun_trace_carrier()
+
if not isinstance(partition_key, str | None):
raise ValueError(
f"Expected partition_key to be a `str` or `None` but got
`{partition_key.__class__.__name__}`"
@@ -461,10 +461,6 @@ class DagRun(Base, LoggingMixin):
def stats_tags(self) -> dict[str, str]:
return prune_dict({"dag_id": self.dag_id, "run_type": self.run_type})
- @classmethod
- def set_active_spans(cls, active_spans: ThreadSafeDict):
- cls.active_spans = active_spans
-
def get_state(self):
return self._state
@@ -1019,131 +1015,28 @@ class DagRun(Base, LoggingMixin):
leaf_tis = {ti for ti in tis if ti.task_id in leaf_task_ids if
ti.state != TaskInstanceState.REMOVED}
return leaf_tis
- def set_dagrun_span_attrs(self, span: Span | EmptySpan):
- if self._state == DagRunState.FAILED:
- span.set_attribute("airflow.dag_run.error", True)
-
- # Explicitly set the value type to Union[...] to avoid a mypy error.
- attributes: dict[str, AttributeValueType] = {
- "airflow.category": "DAG runs",
- "airflow.dag_run.dag_id": str(self.dag_id),
- "airflow.dag_run.logical_date": str(self.logical_date),
- "airflow.dag_run.run_id": str(self.run_id),
- "airflow.dag_run.queued_at": str(self.queued_at),
- "airflow.dag_run.run_start_date": str(self.start_date),
- "airflow.dag_run.run_end_date": str(self.end_date),
- "airflow.dag_run.run_duration": str(
- (self.end_date - self.start_date).total_seconds() if
self.start_date and self.end_date else 0
- ),
- "airflow.dag_run.state": str(self._state),
- "airflow.dag_run.run_type": str(self.run_type),
- "airflow.dag_run.data_interval_start":
str(self.data_interval_start),
- "airflow.dag_run.data_interval_end": str(self.data_interval_end),
- "airflow.dag_run.conf": str(self.conf),
- }
- if span.is_recording():
- span.add_event(name="airflow.dag_run.queued",
timestamp=datetime_to_nano(self.queued_at))
- span.add_event(name="airflow.dag_run.started",
timestamp=datetime_to_nano(self.start_date))
- span.add_event(name="airflow.dag_run.ended",
timestamp=datetime_to_nano(self.end_date))
- span.set_attributes(attributes)
-
- def start_dr_spans_if_needed(self, tis: list[TI]):
- # If there is no value in active_spans, then the span hasn't already
been started.
- if self.active_spans is not None and self.active_spans.get("dr:" +
str(self.id)) is None:
- if self.span_status == SpanStatus.NOT_STARTED or self.span_status
== SpanStatus.NEEDS_CONTINUANCE:
- dr_span = None
- continue_ti_spans = False
- if self.span_status == SpanStatus.NOT_STARTED:
- dr_span = Trace.start_root_span(
- span_name=f"{self.dag_id}",
- component="dag",
- start_time=self.queued_at, # This is later converted
to nano.
- start_as_current=False,
- )
- elif self.span_status == SpanStatus.NEEDS_CONTINUANCE:
- # Use the existing context_carrier to set the initial
dag_run span as the parent.
- parent_context = Trace.extract(self.context_carrier)
- with Trace.start_child_span(
- span_name="new_scheduler",
parent_context=parent_context
- ) as s:
- s.set_attribute("trace_status", "continued")
-
- dr_span = Trace.start_child_span(
- span_name=f"{self.dag_id}_continued",
- parent_context=parent_context,
- component="dag",
- # No start time
- start_as_current=False,
- )
- # After this span is started, the context_carrier will be
replaced by the new one.
- # New task span will use this span as the parent.
- continue_ti_spans = True
- carrier = Trace.inject()
- self.context_carrier = carrier
- self.span_status = SpanStatus.ACTIVE
- # Set the span in a synchronized dictionary, so that the
variable can be used to end the span.
- self.active_spans.set("dr:" + str(self.id), dr_span)
- self.log.debug(
- "DagRun span has been started and the injected
context_carrier is: %s",
- self.context_carrier,
- )
- # Start TI spans that also need continuance.
- if continue_ti_spans:
- new_dagrun_context = Trace.extract(self.context_carrier)
- for ti in tis:
- if ti.span_status == SpanStatus.NEEDS_CONTINUANCE:
- ti_span = Trace.start_child_span(
- span_name=f"{ti.task_id}_continued",
- parent_context=new_dagrun_context,
- start_as_current=False,
- )
- ti_carrier = Trace.inject()
- ti.context_carrier = ti_carrier
- ti.span_status = SpanStatus.ACTIVE
- self.active_spans.set(f"ti:{ti.id}", ti_span)
- else:
- self.log.debug(
- "Found span_status '%s', while updating state for dag_run
'%s'",
- self.span_status,
- self.run_id,
- )
-
- def end_dr_span_if_needed(self):
- if self.active_spans is not None:
- active_span = self.active_spans.get("dr:" + str(self.id))
- if active_span is not None:
- self.log.debug(
- "Found active span with span_id: %s, for dag_id: %s,
run_id: %s, state: %s",
- active_span.get_span_context().span_id,
- self.dag_id,
- self.run_id,
- self.state,
- )
-
- self.set_dagrun_span_attrs(span=active_span)
- active_span.end(end_time=datetime_to_nano(self.end_date))
- # Remove the span from the dict.
- self.active_spans.delete("dr:" + str(self.id))
- self.span_status = SpanStatus.ENDED
- else:
- if self.span_status == SpanStatus.ACTIVE:
- # Another scheduler has started the span.
- # Update the DB SpanStatus to notify the owner to end it.
- self.span_status = SpanStatus.SHOULD_END
- elif self.span_status == SpanStatus.NEEDS_CONTINUANCE:
- # This is a corner case where the scheduler exited
gracefully
- # while the dag_run was almost done.
- # Since it reached this point, the dag has finished but
there has been no time
- # to create a new span for the current scheduler.
- # There is no need for more spans, update the status on
the db.
- self.span_status = SpanStatus.ENDED
- else:
- self.log.debug(
- "No active span has been found for dag_id: %s, run_id:
%s, state: %s",
- self.dag_id,
- self.run_id,
- self.state,
- )
+ def _emit_dagrun_span(self, state: DagRunState):
+ ctx = TraceContextTextMapPropagator().extract(self.context_carrier)
+ span = trace.get_current_span(context=ctx)
+ span_context = span.get_span_context()
+ with override_ids(span_context.trace_id, span_context.span_id):
+ attributes = {
+ "airflow.dag_id": str(self.dag_id),
+ "airflow.dag_run.run_id": self.run_id,
+ }
+ if self.logical_date:
+ attributes["airflow.dag_run.logical_date"] =
str(self.logical_date)
+ if self.partition_key:
+ attributes["airflow.dag_run.partition_key"] =
str(self.partition_key)
+ span = tracer.start_span(
+ name=f"dag_run.{self.dag_id}",
+ start_time=int((self.start_date or
timezone.utcnow()).timestamp() * 1e9),
+ attributes=attributes,
+ context=context.Context(),
+ )
+ status_code = StatusCode.OK if state == DagRunState.SUCCESS else
StatusCode.ERROR
+ span.set_status(status_code)
+ span.end()
@provide_session
def update_state(
@@ -1302,9 +1195,6 @@ class DagRun(Base, LoggingMixin):
# finally, if the leaves aren't done, the dag is still running
else:
- # It might need to start TI spans as well.
- self.start_dr_spans_if_needed(tis=tis)
-
self.set_state(DagRunState.RUNNING)
if self._state == DagRunState.FAILED or self._state ==
DagRunState.SUCCESS:
@@ -1331,10 +1221,8 @@ class DagRun(Base, LoggingMixin):
self.data_interval_start,
self.data_interval_end,
)
-
- self.end_dr_span_if_needed()
-
session.flush()
+ self._emit_dagrun_span(state=self.state)
self._emit_true_scheduling_delay_stats_for_finished_state(finished_tis)
self._emit_duration_stats_for_finished_state()
diff --git a/airflow-core/src/airflow/observability/traces/__init__.py
b/airflow-core/src/airflow/observability/traces/__init__.py
index 217e5db9607..6bf0019f747 100644
--- a/airflow-core/src/airflow/observability/traces/__init__.py
+++ b/airflow-core/src/airflow/observability/traces/__init__.py
@@ -15,3 +15,137 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+from __future__ import annotations
+
+import logging
+import os
+from contextlib import contextmanager
+from importlib.metadata import entry_points
+
+from opentelemetry import context, trace
+from opentelemetry.sdk.resources import Resource
+from opentelemetry.sdk.trace import TracerProvider
+from opentelemetry.sdk.trace.export import BatchSpanProcessor, SpanExporter
+from opentelemetry.sdk.trace.id_generator import RandomIdGenerator
+from opentelemetry.trace import NonRecordingSpan, SpanContext, TraceFlags
+from opentelemetry.trace.propagation.tracecontext import
TraceContextTextMapPropagator
+
+from airflow.configuration import conf
+
+log = logging.getLogger(__name__)
+
+OVERRIDE_SPAN_ID_KEY = context.create_key("override_span_id")
+OVERRIDE_TRACE_ID_KEY = context.create_key("override_trace_id")
+
+
+class OverrideableRandomIdGenerator(RandomIdGenerator):
+ """Lets you override the span id."""
+
+ def generate_span_id(self):
+ override = context.get_value(OVERRIDE_SPAN_ID_KEY)
+ if override is not None:
+ return override
+ return super().generate_span_id()
+
+ def generate_trace_id(self):
+ override = context.get_value(OVERRIDE_TRACE_ID_KEY)
+ if override is not None:
+ return override
+ return super().generate_trace_id()
+
+
+def new_dagrun_trace_carrier() -> dict[str, str]:
+ """Generate a fresh W3C traceparent carrier without creating a recordable
span."""
+ gen = RandomIdGenerator()
+ span_ctx = SpanContext(
+ trace_id=gen.generate_trace_id(),
+ span_id=gen.generate_span_id(),
+ is_remote=False,
+ trace_flags=TraceFlags(TraceFlags.SAMPLED),
+ )
+ ctx = trace.set_span_in_context(NonRecordingSpan(span_ctx))
+ carrier: dict[str, str] = {}
+ TraceContextTextMapPropagator().inject(carrier, context=ctx)
+ return carrier
+
+
+@contextmanager
+def override_ids(trace_id, span_id, ctx=None):
+ ctx = context.set_value(OVERRIDE_TRACE_ID_KEY, trace_id, context=ctx)
+ ctx = context.set_value(OVERRIDE_SPAN_ID_KEY, span_id, context=ctx)
+ token = context.attach(ctx)
+ try:
+ yield
+ finally:
+ context.detach(token)
+
+
+def _get_backcompat_config() -> tuple[str | None, Resource | None]:
+ """
+ Possibly get deprecated Airflow configs for otel.
+
+ Ideally we return (None, None) here. But if the old configuration is
there,
+ then we will use it.
+ """
+ resource = None
+ if not os.environ.get("OTEL_SERVICE_NAME") and not
os.environ.get("OTEL_RESOURCE_ATTRIBUTES"):
+ service_name = conf.get("traces", "otel_service", fallback=None)
+ if service_name:
+ resource = Resource({"service.name": service_name})
+
+ endpoint = None
+ if not os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT") and not
os.environ.get(
+ "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT"
+ ):
+ # this is only for backcompat!
+ host = conf.get("traces", "otel_host", fallback=None)
+ port = conf.get("traces", "otel_port", fallback=None)
+ ssl_active = conf.getboolean("traces", "otel_ssl_active",
fallback=False)
+ if host and port:
+ scheme = "https" if ssl_active else "http"
+ endpoint = f"{scheme}://{host}:{port}/v1/traces"
+ return endpoint, resource
+
+
+def _load_exporter_from_env() -> SpanExporter:
+ """
+ Load a span exporter using the OTEL_TRACES_EXPORTER env var.
+
+ Mirrors the entry-point mechanism used by the OTEL SDK auto-instrumentation
+ configurator. Supported values (from installed packages):
+ - ``otlp`` (default) — OTLP/gRPC
+ - ``otlp_proto_http`` — OTLP/HTTP
+ - ``console`` — stdout (useful for debugging)
+ """
+ exporter_name = os.environ.get("OTEL_TRACES_EXPORTER", "otlp")
+ eps = entry_points(group="opentelemetry_traces_exporter",
name=exporter_name)
+ ep = next(iter(eps), None)
+ if ep is None:
+ raise RuntimeError(
+ f"No span exporter found for
OTEL_TRACES_EXPORTER={exporter_name!r}. "
+ f"Available: {[e.name for e in
entry_points(group='opentelemetry_traces_exporter')]}"
+ )
+ return ep.load()()
+
+
+def configure_otel():
+ otel_on = conf.getboolean("traces", "otel_on", fallback=False)
+ if not otel_on:
+ return
+
+ # ideally both endpoint and resource are None here
+ # they would only be something other than None if user is using deprecated
+ # Airflow-defined otel configs
+ backcompat_endpoint, resource = _get_backcompat_config()
+
+ # backcompat: if old-style host/port config provided an endpoint, set the
+ # env var so the exporter (loaded below) picks it up automatically
+
+ otlp_endpoint = os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT")
+ otlp_traces_endpoint = os.environ.get("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT")
+ if backcompat_endpoint and not (otlp_endpoint or otlp_traces_endpoint):
+ os.environ["OTEL_EXPORTER_OTLP_TRACES_ENDPOINT"] = backcompat_endpoint
+
+ provider = TracerProvider(id_generator=OverrideableRandomIdGenerator(),
resource=resource)
+ provider.add_span_processor(BatchSpanProcessor(_load_exporter_from_env()))
+ trace.set_tracer_provider(provider)
diff --git a/airflow-core/src/airflow/settings.py
b/airflow-core/src/airflow/settings.py
index b8bc480ef15..49d46f652c6 100644
--- a/airflow-core/src/airflow/settings.py
+++ b/airflow-core/src/airflow/settings.py
@@ -38,6 +38,8 @@ from sqlalchemy.ext.asyncio import (
)
from sqlalchemy.orm import scoped_session, sessionmaker
+from airflow.observability.traces import configure_otel
+
try:
from sqlalchemy.ext.asyncio import async_sessionmaker
except ImportError:
@@ -722,7 +724,7 @@ def initialize():
load_policy_plugins(policy_mgr)
import_local_settings()
configure_logging()
-
+ configure_otel()
configure_adapters()
# The webservers import this file from models.py with the default settings.
diff --git a/airflow-core/tests/integration/otel/dags/otel_test_dag.py
b/airflow-core/tests/integration/otel/dags/otel_test_dag.py
index 6c005a9927e..25861c8f622 100644
--- a/airflow-core/tests/integration/otel/dags/otel_test_dag.py
+++ b/airflow-core/tests/integration/otel/dags/otel_test_dag.py
@@ -22,12 +22,12 @@ from datetime import datetime
from opentelemetry import trace
from airflow import DAG
-from airflow.sdk import chain, task
-from airflow.sdk.observability.trace import Trace
-from airflow.sdk.observability.traces import otel_tracer
+from airflow.sdk import task
logger = logging.getLogger("airflow.otel_test_dag")
+tracer = trace.get_tracer(__name__)
+
args = {
"owner": "airflow",
"start_date": datetime(2024, 9, 1),
@@ -36,52 +36,13 @@ args = {
@task
-def task1(ti):
- logger.info("Starting Task_1.")
-
- context_carrier = ti.context_carrier
-
- otel_task_tracer = otel_tracer.get_otel_tracer_for_task(Trace)
- tracer_provider = otel_task_tracer.get_otel_tracer_provider()
-
- if context_carrier is not None:
- logger.info("Found ti.context_carrier: %s.", str(context_carrier))
- logger.info("Extracting the span context from the context_carrier.")
- parent_context = otel_task_tracer.extract(context_carrier)
- with otel_task_tracer.start_child_span(
- span_name="task1_sub_span1",
- parent_context=parent_context,
- component="dag",
- ) as s1:
- s1.set_attribute("attr1", "val1")
- logger.info("From task sub_span1.")
-
- with otel_task_tracer.start_child_span("task1_sub_span2") as s2:
- s2.set_attribute("attr2", "val2")
- logger.info("From task sub_span2.")
+def task1():
+ logger.info("starting task1")
- tracer = trace.get_tracer("trace_test.tracer",
tracer_provider=tracer_provider)
- with tracer.start_as_current_span(name="task1_sub_span3") as
s3:
- s3.set_attribute("attr3", "val3")
- logger.info("From task sub_span3.")
+ with tracer.start_as_current_span("sub_span1") as s1:
+ s1.set_attribute("attr1", "val1")
- with otel_task_tracer.start_child_span(
- span_name="task1_sub_span4",
- parent_context=parent_context,
- component="dag",
- ) as s4:
- s4.set_attribute("attr4", "val4")
- logger.info("From task sub_span4.")
-
- logger.info("Task_1 finished.")
-
-
-@task
-def task2():
- logger.info("Starting Task_2.")
- for i in range(3):
- logger.info("Task_2, iteration '%d'.", i)
- logger.info("Task_2 finished.")
+ logger.info("task1 finished.")
with DAG(
@@ -90,4 +51,4 @@ with DAG(
schedule=None,
catchup=False,
) as dag:
- chain(task1(), task2()) # type: ignore
+ task1()
diff --git
a/airflow-core/tests/integration/otel/dags/otel_test_dag_with_pause_between_tasks.py
b/airflow-core/tests/integration/otel/dags/otel_test_dag_with_pause_between_tasks.py
deleted file mode 100644
index 72fb9148a40..00000000000
---
a/airflow-core/tests/integration/otel/dags/otel_test_dag_with_pause_between_tasks.py
+++ /dev/null
@@ -1,158 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-import logging
-import os
-import time
-from datetime import datetime
-
-from opentelemetry import trace
-from sqlalchemy import select
-
-from airflow import DAG
-from airflow.models import TaskInstance
-from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS
-from airflow.sdk import chain, task
-from airflow.sdk.observability.trace import Trace
-from airflow.sdk.observability.traces import otel_tracer
-from airflow.utils.session import create_session
-
-logger = logging.getLogger("airflow.otel_test_dag_with_pause")
-
-args = {
- "owner": "airflow",
- "start_date": datetime(2024, 9, 2),
- "retries": 0,
-}
-
-
-@task
-def task1(ti):
- logger.info("Starting Task_1.")
-
- context_carrier = ti.context_carrier
-
- otel_task_tracer = otel_tracer.get_otel_tracer_for_task(Trace)
- tracer_provider = otel_task_tracer.get_otel_tracer_provider()
-
- if context_carrier is not None:
- logger.info("Found ti.context_carrier: %s.", context_carrier)
- logger.info("Extracting the span context from the context_carrier.")
-
- # If the task takes too long to execute, then the ti should be read
from the db
- # to make sure that the initial context_carrier is the same.
- # Since Airflow 3, direct db access has been removed entirely.
- if not AIRFLOW_V_3_0_PLUS:
- with create_session() as session:
- session_ti: TaskInstance = session.scalars(
- select(TaskInstance).where(
- TaskInstance.task_id == ti.task_id,
- TaskInstance.run_id == ti.run_id,
- )
- ).one()
- context_carrier = session_ti.context_carrier
-
- parent_context = Trace.extract(context_carrier)
- with otel_task_tracer.start_child_span(
- span_name="task1_sub_span1",
- parent_context=parent_context,
- component="dag",
- ) as s1:
- s1.set_attribute("attr1", "val1")
- logger.info("From task sub_span1.")
-
- with otel_task_tracer.start_child_span("task1_sub_span2") as s2:
- s2.set_attribute("attr2", "val2")
- logger.info("From task sub_span2.")
-
- tracer = trace.get_tracer("trace_test.tracer",
tracer_provider=tracer_provider)
- with tracer.start_as_current_span(name="task1_sub_span3") as
s3:
- s3.set_attribute("attr3", "val3")
- logger.info("From task sub_span3.")
-
- if not AIRFLOW_V_3_0_PLUS:
- with create_session() as session:
- session_ti: TaskInstance = session.scalars(
- select(TaskInstance).where(
- TaskInstance.task_id == ti.task_id,
- TaskInstance.run_id == ti.run_id,
- )
- ).one()
- context_carrier = session_ti.context_carrier
- parent_context = Trace.extract(context_carrier)
-
- with otel_task_tracer.start_child_span(
- span_name="task1_sub_span4",
- parent_context=parent_context,
- component="dag",
- ) as s4:
- s4.set_attribute("attr4", "val4")
- logger.info("From task sub_span4.")
-
- logger.info("Task_1 finished.")
-
-
-@task
-def paused_task():
- logger.info("Starting Paused_task.")
-
- dag_folder = os.path.dirname(os.path.abspath(__file__))
- control_file = os.path.join(dag_folder, "dag_control.txt")
-
- # Create the file and write 'pause' to it.
- with open(control_file, "w") as file:
- file.write("pause")
-
- # Pause execution until the word 'pause' is replaced on the file.
- while True:
- # If there is an exception, then writing to the file failed. Let it
exit.
- file_contents = None
- with open(control_file) as file:
- file_contents = file.read()
-
- if "pause" in file_contents:
- logger.info("Task has been paused.")
- time.sleep(1)
- continue
- logger.info("Resuming task execution.")
- # Break the loop and finish with the task execution.
- break
-
- # Cleanup the control file.
- if os.path.exists(control_file):
- os.remove(control_file)
- print("Control file has been cleaned up.")
-
- logger.info("Paused_task finished.")
-
-
-@task
-def task2():
- logger.info("Starting Task_2.")
- for i in range(3):
- logger.info("Task_2, iteration '%d'.", i)
- logger.info("Task_2 finished.")
-
-
-with DAG(
- "otel_test_dag_with_pause_between_tasks",
- default_args=args,
- schedule=None,
- catchup=False,
-) as dag:
- chain(task1(), paused_task(), task2()) # type: ignore
diff --git
a/airflow-core/tests/integration/otel/dags/otel_test_dag_with_pause_in_task.py
b/airflow-core/tests/integration/otel/dags/otel_test_dag_with_pause_in_task.py
deleted file mode 100644
index dfc5c30243f..00000000000
---
a/airflow-core/tests/integration/otel/dags/otel_test_dag_with_pause_in_task.py
+++ /dev/null
@@ -1,151 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-import logging
-import os
-import time
-from datetime import datetime
-
-from opentelemetry import trace
-from sqlalchemy import select
-
-from airflow import DAG
-from airflow.models import TaskInstance
-from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS
-from airflow.sdk import chain, task
-from airflow.sdk.observability.trace import Trace
-from airflow.sdk.observability.traces import otel_tracer
-from airflow.utils.session import create_session
-
-logger = logging.getLogger("airflow.otel_test_dag_with_pause_in_task")
-
-args = {
- "owner": "airflow",
- "start_date": datetime(2024, 9, 2),
- "retries": 0,
-}
-
-
-@task
-def task1(ti):
- logger.info("Starting Task_1.")
-
- context_carrier = ti.context_carrier
-
- dag_folder = os.path.dirname(os.path.abspath(__file__))
- control_file = os.path.join(dag_folder, "dag_control.txt")
-
- # Create the file and write 'pause' to it.
- with open(control_file, "w") as file:
- file.write("pause")
-
- # Pause execution until the word 'pause' is replaced on the file.
- while True:
- # If there is an exception, then writing to the file failed. Let it
exit.
- file_contents = None
- with open(control_file) as file:
- file_contents = file.read()
-
- if "pause" in file_contents:
- logger.info("Task has been paused.")
- time.sleep(1)
- continue
- logger.info("Resuming task execution.")
- # Break the loop and finish with the task execution.
- break
-
- otel_task_tracer = otel_tracer.get_otel_tracer_for_task(Trace)
- tracer_provider = otel_task_tracer.get_otel_tracer_provider()
-
- if context_carrier is not None:
- logger.info("Found ti.context_carrier: %s.", context_carrier)
- logger.info("Extracting the span context from the context_carrier.")
-
- # If the task takes too long to execute, then the ti should be read
from the db
- # to make sure that the initial context_carrier is the same.
- # Since Airflow 3, direct db access has been removed entirely.
- if not AIRFLOW_V_3_0_PLUS:
- with create_session() as session:
- session_ti: TaskInstance = session.scalars(
- select(TaskInstance).where(
- TaskInstance.task_id == ti.task_id,
- TaskInstance.run_id == ti.run_id,
- )
- ).one()
- context_carrier = session_ti.context_carrier
-
- parent_context = Trace.extract(context_carrier)
- with otel_task_tracer.start_child_span(
- span_name="task1_sub_span1",
- parent_context=parent_context,
- component="dag",
- ) as s1:
- s1.set_attribute("attr1", "val1")
- logger.info("From task sub_span1.")
-
- with otel_task_tracer.start_child_span("task1_sub_span2") as s2:
- s2.set_attribute("attr2", "val2")
- logger.info("From task sub_span2.")
-
- tracer = trace.get_tracer("trace_test.tracer",
tracer_provider=tracer_provider)
- with tracer.start_as_current_span(name="task1_sub_span3") as
s3:
- s3.set_attribute("attr3", "val3")
- logger.info("From task sub_span3.")
-
- if not AIRFLOW_V_3_0_PLUS:
- with create_session() as session:
- session_ti: TaskInstance = session.scalars(
- select(TaskInstance).where(
- TaskInstance.task_id == ti.task_id,
- TaskInstance.run_id == ti.run_id,
- )
- ).one()
- context_carrier = session_ti.context_carrier
- parent_context = Trace.extract(context_carrier)
-
- with otel_task_tracer.start_child_span(
- span_name="task1_sub_span4",
- parent_context=parent_context,
- component="dag",
- ) as s4:
- s4.set_attribute("attr4", "val4")
- logger.info("From task sub_span4.")
-
- # Cleanup the control file.
- if os.path.exists(control_file):
- os.remove(control_file)
- print("Control file has been cleaned up.")
-
- logger.info("Task_1 finished.")
-
-
-@task
-def task2():
- logger.info("Starting Task_2.")
- for i in range(3):
- logger.info("Task_2, iteration '%d'.", i)
- logger.info("Task_2 finished.")
-
-
-with DAG(
- "otel_test_dag_with_pause_in_task",
- default_args=args,
- schedule=None,
- catchup=False,
-) as dag:
- chain(task1(), task2()) # type: ignore
diff --git a/airflow-core/tests/integration/otel/test_otel.py
b/airflow-core/tests/integration/otel/test_otel.py
index 0e4546e301d..60af1060ce1 100644
--- a/airflow-core/tests/integration/otel/test_otel.py
+++ b/airflow-core/tests/integration/otel/test_otel.py
@@ -250,7 +250,7 @@ class TestOtelIntegration:
dag_bag = DagBag(dag_folder=cls.dag_folder, include_examples=False)
dag_ids = dag_bag.dag_ids
- assert len(dag_ids) == 3
+ assert len(dag_ids) == 1
dag_dict: dict[str, SerializedDAG] = {}
with create_session() as session:
@@ -317,7 +317,7 @@ class TestOtelIntegration:
try:
# Start the processes here and not as fixtures or in a common
setup,
# so that the test can capture their output.
- scheduler_process, apiserver_process =
self.start_worker_and_scheduler()
+ scheduler_process, apiserver_process = self.start_scheduler()
dag_id = "otel_test_dag"
@@ -441,7 +441,7 @@ class TestOtelIntegration:
try:
# Start the processes here and not as fixtures or in a common
setup,
# so that the test can capture their output.
- scheduler_process, apiserver_process =
self.start_worker_and_scheduler()
+ scheduler_process, apiserver_process = self.start_scheduler()
dag_id = "otel_test_dag"
@@ -486,10 +486,8 @@ class TestOtelIntegration:
log.info("out-start --\n%s\n-- out-end", out)
log.info("err-start --\n%s\n-- err-end", err)
- # host = "host.docker.internal"
host = "jaeger"
service_name = os.environ.get("OTEL_SERVICE_NAME", "test")
- # service_name ``= "my-service-name"
r =
requests.get(f"http://{host}:16686/api/traces?service={service_name}")
data = r.json()
@@ -510,16 +508,12 @@ class TestOtelIntegration:
nested = get_span_hierarchy()
assert nested == {
- "otel_test_dag": None,
- "task1": None,
- "task1_sub_span1": None,
- "task1_sub_span2": None,
- "task1_sub_span3": "task1_sub_span2",
- "task1_sub_span4": None,
- "task2": None,
+ "sub_span1": "task_run.task1",
+ "task_run.task1": "dag_run.otel_test_dag",
+ "dag_run.otel_test_dag": None,
}
- def start_worker_and_scheduler(self):
+ def start_scheduler(self):
scheduler_process = subprocess.Popen(
self.scheduler_command_args,
env=os.environ.copy(),
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index 3aeebcedbdc..c23f180f3a4 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -81,7 +81,6 @@ from airflow.models.serialized_dag import SerializedDagModel
from airflow.models.taskinstance import TaskInstance
from airflow.models.team import Team
from airflow.models.trigger import Trigger
-from airflow.observability.trace import Trace
from airflow.partition_mappers.base import PartitionMapper as
CorePartitionMapper
from airflow.providers.standard.operators.bash import BashOperator
from airflow.providers.standard.operators.empty import EmptyOperator
@@ -93,9 +92,7 @@ from airflow.serialization.definitions.dag import
SerializedDAG
from airflow.serialization.serialized_objects import LazyDeserializedDAG
from airflow.timetables.base import DagRunInfo, DataInterval
from airflow.utils.session import create_session, provide_session
-from airflow.utils.span_status import SpanStatus
from airflow.utils.state import CallbackState, DagRunState, State,
TaskInstanceState
-from airflow.utils.thread_safe_dict import ThreadSafeDict
from airflow.utils.types import DagRunTriggeredByType, DagRunType
from tests_common.pytest_plugin import AIRFLOW_ROOT_PATH
@@ -3283,190 +3280,6 @@ class TestSchedulerJob:
dag_runs = DagRun.find(dag_id=dag.dag_id, session=session)
assert len(dag_runs) == 2
- @pytest.mark.parametrize(
- ("ti_state", "final_ti_span_status"),
- [
- pytest.param(State.SUCCESS, SpanStatus.ENDED,
id="dr_ended_successfully"),
- pytest.param(State.RUNNING, SpanStatus.ACTIVE,
id="dr_still_running"),
- ],
- )
- def test_recreate_unhealthy_scheduler_spans_if_needed(self, ti_state,
final_ti_span_status, dag_maker):
- with dag_maker(
- dag_id="test_recreate_unhealthy_scheduler_spans_if_needed",
- start_date=DEFAULT_DATE,
- max_active_runs=1,
- dagrun_timeout=datetime.timedelta(seconds=60),
- ):
- EmptyOperator(task_id="dummy")
-
- session = settings.Session()
-
- old_job = Job()
- old_job.job_type = SchedulerJobRunner.job_type
-
- session.add(old_job)
- session.commit()
-
- assert old_job.is_alive() is False
-
- new_job = Job()
- new_job.job_type = SchedulerJobRunner.job_type
- session.add(new_job)
- session.flush()
-
- self.job_runner = SchedulerJobRunner(job=new_job)
- self.job_runner.active_spans = ThreadSafeDict()
- assert len(self.job_runner.active_spans.get_all()) == 0
-
- dr = dag_maker.create_dagrun()
- dr.state = State.RUNNING
- dr.span_status = SpanStatus.ACTIVE
- dr.scheduled_by_job_id = old_job.id
-
- ti = dr.get_task_instances(session=session)[0]
- ti.state = ti_state
- ti.start_date = timezone.utcnow()
- ti.span_status = SpanStatus.ACTIVE
- ti.queued_by_job_id = old_job.id
- session.merge(ti)
- session.merge(dr)
- session.commit()
-
- assert dr.scheduled_by_job_id != self.job_runner.job.id
- assert dr.scheduled_by_job_id == old_job.id
- assert dr.run_id is not None
- assert dr.state == State.RUNNING
- assert dr.span_status == SpanStatus.ACTIVE
- assert self.job_runner.active_spans.get("dr:" + str(dr.id)) is None
-
- assert self.job_runner.active_spans.get(f"ti:{ti.id}") is None
- assert ti.state == ti_state
- assert ti.span_status == SpanStatus.ACTIVE
-
- self.job_runner._recreate_unhealthy_scheduler_spans_if_needed(dr,
session)
-
- assert self.job_runner.active_spans.get("dr:" + str(dr.id)) is not None
-
- if final_ti_span_status == SpanStatus.ACTIVE:
- assert self.job_runner.active_spans.get(f"ti:{ti.id}") is not None
- assert len(self.job_runner.active_spans.get_all()) == 2
- else:
- assert self.job_runner.active_spans.get(f"ti:{ti.id}") is None
- assert len(self.job_runner.active_spans.get_all()) == 1
-
- assert dr.span_status == SpanStatus.ACTIVE
- assert ti.span_status == final_ti_span_status
-
- def test_end_spans_of_externally_ended_ops(self, dag_maker):
- with dag_maker(
- dag_id="test_end_spans_of_externally_ended_ops",
- start_date=DEFAULT_DATE,
- max_active_runs=1,
- dagrun_timeout=datetime.timedelta(seconds=60),
- ):
- EmptyOperator(task_id="dummy")
-
- session = settings.Session()
-
- job = Job()
- job.job_type = SchedulerJobRunner.job_type
- session.add(job)
-
- self.job_runner = SchedulerJobRunner(job=job)
- self.job_runner.active_spans = ThreadSafeDict()
- assert len(self.job_runner.active_spans.get_all()) == 0
-
- dr = dag_maker.create_dagrun()
- dr.state = State.SUCCESS
- dr.span_status = SpanStatus.SHOULD_END
-
- ti = dr.get_task_instances(session=session)[0]
- ti.state = State.SUCCESS
- ti.span_status = SpanStatus.SHOULD_END
- ti.context_carrier = {}
- session.merge(ti)
- session.merge(dr)
- session.commit()
-
- dr_span = Trace.start_root_span(span_name="dag_run_span",
start_as_current=False)
- ti_span = Trace.start_child_span(span_name="ti_span",
start_as_current=False)
-
- self.job_runner.active_spans.set("dr:" + str(dr.id), dr_span)
- self.job_runner.active_spans.set(f"ti:{ti.id}", ti_span)
-
- assert dr.span_status == SpanStatus.SHOULD_END
- assert ti.span_status == SpanStatus.SHOULD_END
-
- assert self.job_runner.active_spans.get("dr:" + str(dr.id)) is not None
- assert self.job_runner.active_spans.get(f"ti:{ti.id}") is not None
-
- self.job_runner._end_spans_of_externally_ended_ops(session)
-
- assert dr.span_status == SpanStatus.ENDED
- assert ti.span_status == SpanStatus.ENDED
-
- assert self.job_runner.active_spans.get("dr:" + str(dr.id)) is None
- assert self.job_runner.active_spans.get(f"ti:{ti.id}") is None
-
- @pytest.mark.parametrize(
- ("state", "final_span_status"),
- [
- pytest.param(State.SUCCESS, SpanStatus.ENDED,
id="dr_ended_successfully"),
- pytest.param(State.RUNNING, SpanStatus.NEEDS_CONTINUANCE,
id="dr_still_running"),
- ],
- )
- def test_end_active_spans(self, state, final_span_status, dag_maker):
- with dag_maker(
- dag_id="test_end_active_spans",
- start_date=DEFAULT_DATE,
- max_active_runs=1,
- dagrun_timeout=datetime.timedelta(seconds=60),
- ):
- EmptyOperator(task_id="dummy")
-
- session = settings.Session()
-
- job = Job()
- job.job_type = SchedulerJobRunner.job_type
-
- self.job_runner = SchedulerJobRunner(job=job)
- self.job_runner.active_spans = ThreadSafeDict()
- assert len(self.job_runner.active_spans.get_all()) == 0
-
- dr = dag_maker.create_dagrun()
- dr.state = state
- dr.span_status = SpanStatus.ACTIVE
-
- ti = dr.get_task_instances(session=session)[0]
- ti.state = state
- ti.span_status = SpanStatus.ACTIVE
- ti.context_carrier = {}
- session.merge(ti)
- session.merge(dr)
- session.commit()
-
- dr_span = Trace.start_root_span(span_name="dag_run_span",
start_as_current=False)
- ti_span = Trace.start_child_span(span_name="ti_span",
start_as_current=False)
-
- self.job_runner.active_spans.set("dr:" + str(dr.id), dr_span)
- self.job_runner.active_spans.set(f"ti:{ti.id}", ti_span)
-
- assert dr.span_status == SpanStatus.ACTIVE
- assert ti.span_status == SpanStatus.ACTIVE
-
- assert self.job_runner.active_spans.get("dr:" + str(dr.id)) is not None
- assert self.job_runner.active_spans.get(f"ti:{ti.id}") is not None
- assert len(self.job_runner.active_spans.get_all()) == 2
-
- self.job_runner._end_active_spans(session)
-
- assert dr.span_status == final_span_status
- assert ti.span_status == final_span_status
-
- assert self.job_runner.active_spans.get("dr:" + str(dr.id)) is None
- assert self.job_runner.active_spans.get(f"ti:{ti.id}") is None
- assert len(self.job_runner.active_spans.get_all()) == 0
-
def test_dagrun_timeout_verify_max_active_runs(self, dag_maker, session):
"""
Test if a dagrun will not be scheduled if max_dag_runs
diff --git a/airflow-core/tests/unit/models/test_dagrun.py
b/airflow-core/tests/unit/models/test_dagrun.py
index f3de13422fa..14722f83b0c 100644
--- a/airflow-core/tests/unit/models/test_dagrun.py
+++ b/airflow-core/tests/unit/models/test_dagrun.py
@@ -27,6 +27,7 @@ from unittest.mock import ANY, call
import pendulum
import pytest
+from opentelemetry.sdk.trace import TracerProvider
from sqlalchemy import func, select
from sqlalchemy.orm import joinedload
@@ -54,9 +55,7 @@ from airflow.serialization.serialized_objects import
LazyDeserializedDAG
from airflow.settings import get_policy_plugin_manager
from airflow.task.trigger_rule import TriggerRule
from airflow.triggers.base import StartTriggerArgs
-from airflow.utils.span_status import SpanStatus
from airflow.utils.state import DagRunState, State, TaskInstanceState
-from airflow.utils.thread_safe_dict import ThreadSafeDict
from airflow.utils.types import DagRunTriggeredByType, DagRunType
from tests_common.test_utils import db
@@ -560,142 +559,6 @@ class TestDagRun:
assert dag_run.state == DagRunState.SUCCESS
mock_on_success.assert_called_once()
- def test_start_dr_spans_if_needed_new_span(self, dag_maker, session):
- with dag_maker(
- dag_id="test_start_dr_spans_if_needed_new_span",
- schedule=datetime.timedelta(days=1),
- start_date=datetime.datetime(2017, 1, 1),
- ) as dag:
- dag_task1 = EmptyOperator(task_id="test_task1")
- dag_task2 = EmptyOperator(task_id="test_task2")
- dag_task1.set_downstream(dag_task2)
-
- initial_task_states = {
- "test_task1": TaskInstanceState.QUEUED,
- "test_task2": TaskInstanceState.QUEUED,
- }
-
- dag_run = self.create_dag_run(dag=dag,
task_states=initial_task_states, session=session)
-
- active_spans = ThreadSafeDict()
- dag_run.set_active_spans(active_spans)
-
- tis = dag_run.get_task_instances()
-
- assert dag_run.active_spans is not None
- assert dag_run.active_spans.get("dr:" + str(dag_run.id)) is None
- assert dag_run.span_status == SpanStatus.NOT_STARTED
-
- dag_run.start_dr_spans_if_needed(tis=tis)
-
- assert dag_run.span_status == SpanStatus.ACTIVE
- assert dag_run.active_spans.get("dr:" + str(dag_run.id)) is not None
-
- def test_start_dr_spans_if_needed_span_with_continuance(self, dag_maker,
session):
- with dag_maker(
- dag_id="test_start_dr_spans_if_needed_span_with_continuance",
- schedule=datetime.timedelta(days=1),
- start_date=datetime.datetime(2017, 1, 1),
- ) as dag:
- dag_task1 = EmptyOperator(task_id="test_task1")
- dag_task2 = EmptyOperator(task_id="test_task2")
- dag_task1.set_downstream(dag_task2)
-
- initial_task_states = {
- "test_task1": TaskInstanceState.RUNNING,
- "test_task2": TaskInstanceState.QUEUED,
- }
-
- dag_run = self.create_dag_run(dag=dag,
task_states=initial_task_states, session=session)
-
- active_spans = ThreadSafeDict()
- dag_run.set_active_spans(active_spans)
-
- dag_run.span_status = SpanStatus.NEEDS_CONTINUANCE
-
- tis = dag_run.get_task_instances()
-
- first_ti = tis[0]
- first_ti.span_status = SpanStatus.NEEDS_CONTINUANCE
-
- assert dag_run.active_spans is not None
- assert dag_run.active_spans.get("dr:" + str(dag_run.id)) is None
- assert dag_run.active_spans.get(f"ti:{first_ti.id}") is None
- assert dag_run.span_status == SpanStatus.NEEDS_CONTINUANCE
- assert first_ti.span_status == SpanStatus.NEEDS_CONTINUANCE
-
- dag_run.start_dr_spans_if_needed(tis=tis)
-
- assert dag_run.span_status == SpanStatus.ACTIVE
- assert first_ti.span_status == SpanStatus.ACTIVE
- assert dag_run.active_spans.get("dr:" + str(dag_run.id)) is not None
- assert dag_run.active_spans.get(f"ti:{first_ti.id}") is not None
-
- def test_end_dr_span_if_needed(self, testing_dag_bundle, dag_maker,
session):
- with dag_maker(
- dag_id="test_end_dr_span_if_needed",
- schedule=datetime.timedelta(days=1),
- start_date=datetime.datetime(2017, 1, 1),
- ) as dag:
- dag_task1 = EmptyOperator(task_id="test_task1")
- dag_task2 = EmptyOperator(task_id="test_task2")
- dag_task1.set_downstream(dag_task2)
-
- initial_task_states = {
- "test_task1": TaskInstanceState.SUCCESS,
- "test_task2": TaskInstanceState.SUCCESS,
- }
-
- dag_run = self.create_dag_run(dag=dag,
task_states=initial_task_states, session=session)
-
- active_spans = ThreadSafeDict()
- dag_run.set_active_spans(active_spans)
-
- from airflow.observability.trace import Trace
-
- dr_span = Trace.start_root_span(span_name="test_span",
start_as_current=False)
-
- active_spans.set("dr:" + str(dag_run.id), dr_span)
-
- assert dag_run.active_spans is not None
- assert dag_run.active_spans.get("dr:" + str(dag_run.id)) is not None
-
- dag_run.end_dr_span_if_needed()
-
- assert dag_run.span_status == SpanStatus.ENDED
- assert dag_run.active_spans.get("dr:" + str(dag_run.id)) is None
-
- def test_end_dr_span_if_needed_with_span_from_another_scheduler(
- self, testing_dag_bundle, dag_maker, session
- ):
- with dag_maker(
-
dag_id="test_end_dr_span_if_needed_with_span_from_another_scheduler",
- schedule=datetime.timedelta(days=1),
- start_date=datetime.datetime(2017, 1, 1),
- ) as dag:
- dag_task1 = EmptyOperator(task_id="test_task1")
- dag_task2 = EmptyOperator(task_id="test_task2")
- dag_task1.set_downstream(dag_task2)
-
- initial_task_states = {
- "test_task1": TaskInstanceState.SUCCESS,
- "test_task2": TaskInstanceState.SUCCESS,
- }
-
- dag_run = self.create_dag_run(dag=dag,
task_states=initial_task_states, session=session)
-
- active_spans = ThreadSafeDict()
- dag_run.set_active_spans(active_spans)
-
- dag_run.span_status = SpanStatus.ACTIVE
-
- assert dag_run.active_spans is not None
- assert dag_run.active_spans.get("dr:" + str(dag_run.id)) is None
-
- dag_run.end_dr_span_if_needed()
-
- assert dag_run.span_status == SpanStatus.SHOULD_END
-
def test_dagrun_update_state_with_handle_callback_success(self,
testing_dag_bundle, dag_maker, session):
def on_success_callable(context):
assert context["dag_run"].dag_id ==
"test_dagrun_update_state_with_handle_callback_success"
@@ -744,7 +607,6 @@ class TestDagRun:
)
def test_dagrun_update_state_with_handle_callback_failure(self,
testing_dag_bundle, dag_maker, session):
-
def on_failure_callable(context):
assert context["dag_run"].dag_id ==
"test_dagrun_update_state_with_handle_callback_failure"
@@ -3292,3 +3154,159 @@ class TestDagRunHandleDagCallback:
assert context_received["ti"].task_id == "test_task"
assert context_received["ti"].dag_id == "test_dag"
assert context_received["ti"].run_id == dr.run_id
+
+
+class TestDagRunTracing:
+ """Tests for DagRun OpenTelemetry span behavior."""
+
+ @pytest.fixture(autouse=True)
+ def sdk_tracer_provider(self):
+ """Patch the module-level tracer with one backed by a real SDK
provider so spans have valid IDs."""
+ provider = TracerProvider()
+ real_tracer = provider.get_tracer("airflow.models.dagrun")
+ with mock.patch("airflow.models.dagrun.tracer", real_tracer):
+ yield
+
+ def test_context_carrier_set_on_init(self, dag_maker):
+ """DagRun.__init__ should populate context_carrier with a W3C
traceparent."""
+ with dag_maker("test_tracing_init"):
+ EmptyOperator(task_id="t1")
+ dr = dag_maker.create_dagrun()
+
+ assert dr.context_carrier is not None
+ assert isinstance(dr.context_carrier, dict)
+ assert "traceparent" in dr.context_carrier
+
+ def test_context_carrier_unique_per_dagrun(self, dag_maker):
+ """Each DagRun should get a distinct trace context."""
+ with dag_maker("test_tracing_unique1"):
+ EmptyOperator(task_id="t1")
+ dr1 = dag_maker.create_dagrun()
+
+ with dag_maker("test_tracing_unique2"):
+ EmptyOperator(task_id="t1")
+ dr2 = dag_maker.create_dagrun()
+
+ assert dr1.context_carrier["traceparent"] !=
dr2.context_carrier["traceparent"]
+
+ @pytest.mark.parametrize("final_state", [DagRunState.SUCCESS,
DagRunState.FAILED])
+ def test_emit_dagrun_span_called_on_completion(self, dag_maker, session,
final_state):
+ """_emit_dagrun_span should be called exactly once when a dag run
finishes."""
+ with dag_maker("test_tracing_emit", session=session) as dag:
+ EmptyOperator(task_id="t1")
+
+ dr = dag_maker.create_dagrun(state=DagRunState.RUNNING)
+ ti = dr.get_task_instance("t1", session=session)
+ ti.state = (
+ TaskInstanceState.SUCCESS if final_state == DagRunState.SUCCESS
else TaskInstanceState.FAILED
+ )
+ session.flush()
+
+ dr.dag = dag
+
+ with mock.patch.object(dr, "_emit_dagrun_span") as mock_emit:
+ dr.update_state(session=session)
+
+ mock_emit.assert_called_once_with(state=final_state)
+
+ def test_emit_dagrun_span_not_called_while_running(self, dag_maker,
session):
+ """_emit_dagrun_span should not be called while the dag run is still
running."""
+ with dag_maker("test_tracing_no_emit_running", session=session) as dag:
+ EmptyOperator(task_id="t1")
+ EmptyOperator(task_id="t2")
+
+ dr = dag_maker.create_dagrun(state=DagRunState.RUNNING)
+ tis = dr.get_task_instances(session=session)
+ for ti in tis:
+ if ti.task_id == "t1":
+ ti.state = TaskInstanceState.SUCCESS
+ else:
+ ti.state = TaskInstanceState.RUNNING
+ session.flush()
+
+ dr.dag = dag
+
+ with mock.patch.object(dr, "_emit_dagrun_span") as mock_emit:
+ dr.update_state(session=session)
+
+ mock_emit.assert_not_called()
+
+ def test_emit_dagrun_span_uses_context_carrier_ids(self, dag_maker,
session):
+ """The emitted span should inherit trace_id/span_id from the
context_carrier."""
+ from opentelemetry.sdk.trace import TracerProvider
+ from opentelemetry.sdk.trace.export import SimpleSpanProcessor
+ from opentelemetry.sdk.trace.export.in_memory_span_exporter import
InMemorySpanExporter
+ from opentelemetry.trace.propagation.tracecontext import
TraceContextTextMapPropagator
+
+ from airflow.observability.traces import OverrideableRandomIdGenerator
+
+ in_mem_exporter = InMemorySpanExporter()
+ provider = TracerProvider(id_generator=OverrideableRandomIdGenerator())
+ provider.add_span_processor(SimpleSpanProcessor(in_mem_exporter))
+ test_tracer = provider.get_tracer("test")
+
+ with dag_maker("test_tracing_ids", session=session) as dag:
+ EmptyOperator(task_id="t1")
+
+ dr = dag_maker.create_dagrun(state=DagRunState.RUNNING)
+ ti = dr.get_task_instance("t1", session=session)
+ ti.state = TaskInstanceState.SUCCESS
+ session.flush()
+ dr.dag = dag
+
+ with mock.patch("airflow.models.dagrun.tracer", test_tracer):
+ dr.update_state(session=session)
+
+ spans = in_mem_exporter.get_finished_spans()
+ assert len(spans) == 1
+ span = spans[0]
+
+ # Decode the expected trace_id/span_id from the stored context_carrier
+ ctx = TraceContextTextMapPropagator().extract(dr.context_carrier)
+ from opentelemetry import trace as otel_trace
+
+ stored_span = otel_trace.get_current_span(context=ctx)
+ stored_ctx = stored_span.get_span_context()
+
+ assert span.context.trace_id == stored_ctx.trace_id
+ assert span.context.span_id == stored_ctx.span_id
+
+ @pytest.mark.parametrize("final_state", [DagRunState.SUCCESS,
DagRunState.FAILED])
+ def test_emit_dagrun_span_attributes_and_status(self, dag_maker, session,
final_state):
+ """The emitted span should have the correct name, attributes, and
status code."""
+ from opentelemetry.sdk.trace import TracerProvider
+ from opentelemetry.sdk.trace.export import SimpleSpanProcessor
+ from opentelemetry.sdk.trace.export.in_memory_span_exporter import
InMemorySpanExporter
+ from opentelemetry.trace import StatusCode
+
+ from airflow.observability.traces import OverrideableRandomIdGenerator
+
+ in_mem_exporter = InMemorySpanExporter()
+ provider = TracerProvider(id_generator=OverrideableRandomIdGenerator())
+ provider.add_span_processor(SimpleSpanProcessor(in_mem_exporter))
+ test_tracer = provider.get_tracer("test")
+
+ with dag_maker("test_tracing_attrs", session=session) as dag:
+ EmptyOperator(task_id="t1")
+
+ dr = dag_maker.create_dagrun(state=DagRunState.RUNNING)
+ ti = dr.get_task_instance("t1", session=session)
+ ti.state = (
+ TaskInstanceState.SUCCESS if final_state == DagRunState.SUCCESS
else TaskInstanceState.FAILED
+ )
+ session.flush()
+ dr.dag = dag
+
+ with mock.patch("airflow.models.dagrun.tracer", test_tracer):
+ dr.update_state(session=session)
+
+ spans = in_mem_exporter.get_finished_spans()
+ assert len(spans) == 1
+ span = spans[0]
+
+ assert span.name == f"dag_run.{dr.dag_id}"
+ assert span.attributes["airflow.dag_id"] == dr.dag_id
+ assert span.attributes["airflow.dag_run.run_id"] == dr.run_id
+
+ expected_status = StatusCode.OK if final_state == DagRunState.SUCCESS
else StatusCode.ERROR
+ assert span.status.status_code == expected_status
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index f6b79fba29a..aa0bd36de11 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -1002,6 +1002,7 @@ middleware
middlewares
midnights
milli
+millis
milton
minikube
misconfigured
diff --git a/scripts/ci/docker-compose/integration-otel.yml
b/scripts/ci/docker-compose/integration-otel.yml
index 9d5c6c8117d..f0d32104a14 100644
--- a/scripts/ci/docker-compose/integration-otel.yml
+++ b/scripts/ci/docker-compose/integration-otel.yml
@@ -70,7 +70,7 @@ services:
- INTEGRATION_OTEL=true
- OTEL_SERVICE_NAME=test
- OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf
- - OTEL_TRACES_EXPORTER=otlp
+ - OTEL_TRACES_EXPORTER=otlp_proto_http
- OTEL_METRICS_EXPORTER=otlp
-
OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=http://breeze-otel-collector:4318/v1/traces
-
OTEL_EXPORTER_OTLP_METRICS_ENDPOINT=http://breeze-otel-collector:4318/v1/metrics
diff --git a/task-sdk/src/airflow/sdk/definitions/dag.py
b/task-sdk/src/airflow/sdk/definitions/dag.py
index 94e447e37a5..c3f78658474 100644
--- a/task-sdk/src/airflow/sdk/definitions/dag.py
+++ b/task-sdk/src/airflow/sdk/definitions/dag.py
@@ -1324,10 +1324,6 @@ class DAG:
triggered_by=DagRunTriggeredByType.TEST,
triggering_user_name="dag_test",
)
- # Start a mock span so that one is present and not started
downstream. We
- # don't care about otel in dag.test and starting the span during
dagrun update
- # is not functioning properly in this context anyway.
- dr.start_dr_spans_if_needed(tis=[])
log.debug("starting dagrun")
# Instead of starting a scheduler, we run the minimal loop
possible to check
diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
index 0ef7a74a24b..674935f5eae 100644
--- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -19,14 +19,13 @@
from __future__ import annotations
-import contextlib
import contextvars
import functools
import os
import sys
import time
from collections.abc import Callable, Iterable, Iterator, Mapping
-from contextlib import suppress
+from contextlib import ExitStack, contextmanager, suppress
from datetime import datetime, timedelta, timezone
from itertools import product
from pathlib import Path
@@ -36,6 +35,8 @@ from urllib.parse import quote
import attrs
import lazy_object_proxy
import structlog
+from opentelemetry import trace
+from opentelemetry.trace.propagation.tracecontext import
TraceContextTextMapPropagator
from pydantic import AwareDatetime, ConfigDict, Field, JsonValue, TypeAdapter
from airflow.dag_processing.bundles.base import BaseDagBundle,
BundleVersionLock
@@ -133,6 +134,32 @@ if TYPE_CHECKING:
from airflow.sdk.exceptions import DagRunTriggerException
from airflow.sdk.types import OutletEventAccessorsProtocol
+log = structlog.get_logger("task")
+
+tracer = trace.get_tracer(__name__)
+
+
+@contextmanager
+def _make_task_span(msg: StartupDetails):
+ parent_context = (
+ TraceContextTextMapPropagator().extract(msg.ti.context_carrier) if
msg.ti.context_carrier else None
+ )
+ ti = msg.ti
+ span_name = f"task_run.{ti.task_id}"
+ if ti.map_index is not None and ti.map_index >= 0:
+ span_name += f"_{ti.map_index}"
+ with tracer.start_as_current_span(span_name, context=parent_context) as
span:
+ span.set_attributes(
+ {
+ "airflow.dag_id": ti.dag_id,
+ "airflow.task_id": ti.task_id,
+ "airflow.dag_run.run_id": ti.run_id,
+ "airflow.task_instance.try_number": ti.try_number,
+ "airflow.task_instance.map_index": ti.map_index if
ti.map_index is not None else -1,
+ }
+ )
+ yield span
+
class TaskRunnerMarker:
"""Marker for listener hooks, to properly detect from which component they
are called."""
@@ -476,8 +503,6 @@ class RuntimeTaskInstance(TaskInstance):
retries: int = self.task.retries or 0
first_try_number = max_tries - retries + 1
- log = structlog.get_logger(logger_name="task")
-
log.debug("Requesting first reschedule date from supervisor")
response = SUPERVISOR_COMMS.send(
@@ -494,8 +519,6 @@ class RuntimeTaskInstance(TaskInstance):
context = self.get_template_context()
dag_run = context.get("dag_run")
- log = structlog.get_logger(logger_name="task")
-
log.debug("Getting previous Dag run", dag_run=dag_run)
if dag_run is None:
@@ -530,7 +553,6 @@ class RuntimeTaskInstance(TaskInstance):
context = self.get_template_context()
dag_run = context.get("dag_run")
- log = structlog.get_logger(logger_name="task")
log.debug("Getting previous task instance", task_id=self.task_id,
state=state)
# Use current dag run's logical_date if not provided
@@ -846,7 +868,6 @@ def _verify_bundle_access(bundle_instance: BaseDagBundle,
log: Logger) -> None:
def get_startup_details() -> StartupDetails:
# The parent sends us a StartupDetails message un-prompted. After this,
every single message is only sent
# in response to us sending a request.
- log = structlog.get_logger(logger_name="task")
if os.environ.get("_AIRFLOW__REEXECUTED_PROCESS") == "1" and (
msgjson := os.environ.get("_AIRFLOW__STARTUP_MSG")
@@ -871,7 +892,6 @@ def get_startup_details() -> StartupDetails:
def startup(msg: StartupDetails) -> tuple[RuntimeTaskInstance, Context,
Logger]:
- log = structlog.get_logger("task")
# setproctitle causes issue on Mac OS:
https://github.com/benoitc/gunicorn/issues/3021
os_type = sys.platform
if os_type == "darwin":
@@ -1238,7 +1258,7 @@ def run(
import jinja2
# If the task failed, swallow rendering error so it doesn't
mask the main error.
- with contextlib.suppress(jinja2.TemplateSyntaxError,
jinja2.UndefinedError):
+ with suppress(jinja2.TemplateSyntaxError,
jinja2.UndefinedError):
previous_rendered_map_index = ti.rendered_map_index
ti.rendered_map_index = _render_map_index(context, ti=ti,
log=log)
# Send update only if value changed (e.g., user set
context variables during execution)
@@ -1796,6 +1816,20 @@ def finalize(
log.exception("error calling listener")
+@contextmanager
+def flush_spans():
+ try:
+ yield
+ finally:
+ provider = trace.get_tracer_provider()
+ if hasattr(provider, "force_flush"):
+ from airflow.sdk.configuration import conf
+
+ timeout_millis = conf.getint("traces",
"task_runner_flush_timeout_milliseconds", fallback=30000)
+ provider.force_flush(timeout_millis=timeout_millis)
+
+
+@flush_spans()
def main():
log = structlog.get_logger(logger_name="task")
@@ -1805,38 +1839,42 @@ def main():
stats_factory = stats_utils.get_stats_factory(Stats)
Stats.initialize(factory=stats_factory)
- try:
+ stack = ExitStack()
+ with stack:
try:
- startup_details = get_startup_details()
- ti, context, log = startup(msg=startup_details)
- except AirflowRescheduleException as reschedule:
- log.warning("Rescheduling task during startup, marking task as
UP_FOR_RESCHEDULE")
- SUPERVISOR_COMMS.send(
- msg=RescheduleTask(
- reschedule_date=reschedule.reschedule_date,
- end_date=datetime.now(tz=timezone.utc),
+ try:
+ startup_details = get_startup_details()
+ span = _make_task_span(msg=startup_details)
+ stack.enter_context(span)
+ ti, context, log = startup(msg=startup_details)
+ except AirflowRescheduleException as reschedule:
+ log.warning("Rescheduling task during startup, marking task as
UP_FOR_RESCHEDULE")
+ SUPERVISOR_COMMS.send(
+ msg=RescheduleTask(
+ reschedule_date=reschedule.reschedule_date,
+ end_date=datetime.now(tz=timezone.utc),
+ )
)
- )
- sys.exit(0)
- with BundleVersionLock(
- bundle_name=ti.bundle_instance.name,
- bundle_version=ti.bundle_instance.version,
- ):
- state, _, error = run(ti, context, log)
- context["exception"] = error
- finalize(ti, state, context, log, error)
- except KeyboardInterrupt:
- log.exception("Ctrl-c hit")
- sys.exit(2)
- except Exception:
- log.exception("Top level error")
- sys.exit(1)
- finally:
- # Ensure the request socket is closed on the child side in all
circumstances
- # before the process fully terminates.
- if SUPERVISOR_COMMS and SUPERVISOR_COMMS.socket:
- with suppress(Exception):
- SUPERVISOR_COMMS.socket.close()
+ sys.exit(0)
+ with BundleVersionLock(
+ bundle_name=ti.bundle_instance.name,
+ bundle_version=ti.bundle_instance.version,
+ ):
+ state, _, error = run(ti, context, log)
+ context["exception"] = error
+ finalize(ti, state, context, log, error)
+ except KeyboardInterrupt:
+ log.exception("Ctrl-c hit")
+ sys.exit(2)
+ except Exception:
+ log.exception("Top level error")
+ sys.exit(1)
+ finally:
+ # Ensure the request socket is closed on the child side in all
circumstances
+ # before the process fully terminates.
+ if SUPERVISOR_COMMS and SUPERVISOR_COMMS.socket:
+ with suppress(Exception):
+ SUPERVISOR_COMMS.socket.close()
def reinit_supervisor_comms() -> None:
@@ -1851,7 +1889,6 @@ def reinit_supervisor_comms() -> None:
if "SUPERVISOR_COMMS" not in globals():
global SUPERVISOR_COMMS
- log = structlog.get_logger(logger_name="task")
fd = int(os.environ.get("__AIRFLOW_SUPERVISOR_FD", "0"))
diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
index 2a495e557f7..05191806be8 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
@@ -127,6 +127,7 @@ from airflow.sdk.execution_time.task_runner import (
TaskRunnerMarker,
_defer_task,
_execute_task,
+ _make_task_span,
_push_xcom_if_needed,
_xcom_push,
finalize,
@@ -367,7 +368,7 @@ def
test_parse_not_found_does_not_reschedule_when_max_attempts_reached(test_dags
@mock.patch("airflow.sdk.execution_time.task_runner.get_startup_details")
@mock.patch("airflow.sdk.execution_time.task_runner.CommsDecoder")
def test_main_sends_reschedule_task_when_startup_reschedules(
- mock_comms_decoder_cls, mock_get_startup_details, mock_startup, mock_exit,
time_machine
+ mock_comms_decoder_cls, mock_get_startup_details, mock_startup, mock_exit,
time_machine, make_ti_context
):
"""
If startup raises AirflowRescheduleException, the task runner should
report a RescheduleTask
@@ -379,7 +380,23 @@ def
test_main_sends_reschedule_task_when_startup_reschedules(
mock_comms_instance = mock.Mock()
mock_comms_instance.socket = None
mock_comms_decoder_cls.__getitem__.return_value.return_value =
mock_comms_instance
- mock_get_startup_details.return_value = mock.Mock()
+ what = StartupDetails(
+ ti=TaskInstance(
+ id=uuid7(),
+ task_id="my_task",
+ dag_id="test_dag",
+ run_id="test_run",
+ try_number=1,
+ dag_version_id=uuid7(),
+ context_carrier={},
+ ),
+ dag_rel_path="",
+ bundle_info=BundleInfo(name="my-bundle", version=None),
+ ti_context=make_ti_context(),
+ start_date=timezone.utcnow(),
+ sentry_integration="",
+ )
+ mock_get_startup_details.return_value = what
mock_startup.side_effect =
AirflowRescheduleException(reschedule_date=reschedule_date)
# Move time
@@ -395,6 +412,102 @@ def
test_main_sends_reschedule_task_when_startup_reschedules(
]
+def test_task_span_is_child_of_dag_run_span(make_ti_context):
+ """Task span must be a child of the dag run span propagated via
context_carrier."""
+ from opentelemetry.sdk.trace import TracerProvider
+ from opentelemetry.sdk.trace.export import SimpleSpanProcessor
+ from opentelemetry.sdk.trace.export.in_memory_span_exporter import
InMemorySpanExporter
+ from opentelemetry.trace.propagation.tracecontext import
TraceContextTextMapPropagator
+
+ # Build a real SDK provider and exporter so we can inspect finished spans.
+ in_mem_exporter = InMemorySpanExporter()
+ provider = TracerProvider()
+ provider.add_span_processor(SimpleSpanProcessor(in_mem_exporter))
+
+ # Create a "dag run" span whose context we will propagate into the task.
+ dag_run_tracer = provider.get_tracer("dag_run")
+ with dag_run_tracer.start_as_current_span("dag_run.test_dag") as
dag_run_span:
+ carrier: dict[str, str] = {}
+ TraceContextTextMapPropagator().inject(carrier)
+ dag_run_span_ctx = dag_run_span.get_span_context()
+
+ what = StartupDetails(
+ ti=TaskInstance(
+ id=uuid7(),
+ task_id="my_task",
+ dag_id="test_dag",
+ run_id="test_run",
+ try_number=1,
+ dag_version_id=uuid7(),
+ context_carrier=carrier,
+ ),
+ dag_rel_path="",
+ bundle_info=BundleInfo(name="my-bundle", version=None),
+ ti_context=make_ti_context(),
+ start_date=timezone.utcnow(),
+ sentry_integration="",
+ )
+
+ task_tracer = provider.get_tracer("airflow.sdk.execution_time.task_runner")
+ with mock.patch("airflow.sdk.execution_time.task_runner.tracer",
task_tracer):
+ with _make_task_span(what) as span:
+ task_span_ctx = span.get_span_context()
+
+ # The task span must share the dag run's trace ID.
+ assert task_span_ctx.trace_id == dag_run_span_ctx.trace_id
+
+ # The task span's parent must be the dag run span.
+ finished = in_mem_exporter.get_finished_spans()
+ task_spans = [s for s in finished if s.name == "task_run.my_task"]
+ assert len(task_spans) == 1
+ assert task_spans[0].parent is not None
+ assert task_spans[0].parent.span_id == dag_run_span_ctx.span_id
+
+ # Span attributes are set correctly.
+ attrs = task_spans[0].attributes
+ assert attrs["airflow.dag_id"] == "test_dag"
+ assert attrs["airflow.task_id"] == "my_task"
+ assert attrs["airflow.dag_run.run_id"] == "test_run"
+ assert attrs["airflow.task_instance.try_number"] == 1
+
+
+def test_task_span_no_parent_when_no_context_carrier(make_ti_context):
+ """When context_carrier is absent, the task span should be a root span (no
parent)."""
+ from opentelemetry.sdk.trace import TracerProvider
+ from opentelemetry.sdk.trace.export import SimpleSpanProcessor
+ from opentelemetry.sdk.trace.export.in_memory_span_exporter import
InMemorySpanExporter
+
+ in_mem_exporter = InMemorySpanExporter()
+ provider = TracerProvider()
+ provider.add_span_processor(SimpleSpanProcessor(in_mem_exporter))
+
+ what = StartupDetails(
+ ti=TaskInstance(
+ id=uuid7(),
+ task_id="standalone_task",
+ dag_id="test_dag",
+ run_id="test_run",
+ try_number=1,
+ dag_version_id=uuid7(),
+ context_carrier=None,
+ ),
+ dag_rel_path="",
+ bundle_info=BundleInfo(name="my-bundle", version=None),
+ ti_context=make_ti_context(),
+ start_date=timezone.utcnow(),
+ sentry_integration="",
+ )
+
+ task_tracer = provider.get_tracer("airflow.sdk.execution_time.task_runner")
+ with mock.patch("airflow.sdk.execution_time.task_runner.tracer",
task_tracer):
+ with _make_task_span(what):
+ pass
+
+ finished = in_mem_exporter.get_finished_spans()
+ assert len(finished) == 1
+ assert finished[0].parent is None
+
+
def test_parse_module_in_bundle_root(tmp_path: Path, make_ti_context):
"""Check that the bundle path is added to sys.path, so Dags can import
shared modules."""
tmp_path.joinpath("util.py").write_text("NAME = 'dag_name'")