This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 0f4884c3ca [AIP-49] OpenTelemetry Traces for Apache Airflow Part 2
(#40802)
0f4884c3ca is described below
commit 0f4884c3ca26c39bc6bd21967c950e718589fcd6
Author: Howard Yoo <[email protected]>
AuthorDate: Sat Jul 20 02:11:40 2024 -0500
[AIP-49] OpenTelemetry Traces for Apache Airflow Part 2 (#40802)
---------
Co-authored-by: D. Ferruzzi <[email protected]>
---
airflow/dag_processing/manager.py | 302 ++++++++++++++----------
airflow/executors/base_executor.py | 75 +++++-
airflow/executors/local_executor.py | 17 ++
airflow/executors/sequential_executor.py | 10 +
airflow/jobs/job.py | 104 +++++----
airflow/jobs/local_task_job_runner.py | 79 ++++---
airflow/jobs/scheduler_job_runner.py | 303 ++++++++++++++++++-------
airflow/jobs/triggerer_job_runner.py | 45 +++-
airflow/models/dagrun.py | 33 +++
airflow/models/taskinstance.py | 22 ++
airflow/traces/__init__.py | 1 +
airflow/traces/tracer.py | 3 +
airflow/traces/utils.py | 13 +-
scripts/ci/docker-compose/integration-otel.yml | 2 +-
14 files changed, 715 insertions(+), 294 deletions(-)
diff --git a/airflow/dag_processing/manager.py
b/airflow/dag_processing/manager.py
index f358dd9ee1..57858fc427 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -53,7 +53,9 @@ from airflow.models.errors import ParseImportError
from airflow.models.serialized_dag import SerializedDagModel
from airflow.secrets.cache import SecretCache
from airflow.stats import Stats
+from airflow.traces.tracer import Trace, span
from airflow.utils import timezone
+from airflow.utils.dates import datetime_to_nano
from airflow.utils.file import list_py_file_paths, might_contain_dag
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.mixins import MultiprocessingStartMethodMixin
@@ -228,7 +230,9 @@ class DagFileProcessorAgent(LoggingMixin,
MultiprocessingStartMethodMixin):
# to kill all sub-process of this at the OS-level, rather than having
# to iterate the child processes
set_new_process_group()
-
+ span = Trace.get_current_span()
+ span.set_attribute("dag_directory", str(dag_directory))
+ span.set_attribute("dag_ids", str(dag_ids))
setproctitle("airflow scheduler -- DagFileProcessorManager")
reload_configuration_for_dag_processing()
processor_manager = DagFileProcessorManager(
@@ -258,8 +262,10 @@ class DagFileProcessorAgent(LoggingMixin,
MultiprocessingStartMethodMixin):
self._heartbeat_manager()
def _process_message(self, message):
+ span = Trace.get_current_span()
self.log.debug("Received message of type %s", type(message).__name__)
if isinstance(message, DagParsingStat):
+ span.set_attribute("all_files_processed",
str(message.all_files_processed))
self._sync_metadata(message)
else:
raise RuntimeError(f"Unexpected message received of type
{type(message).__name__}")
@@ -562,118 +568,144 @@ class DagFileProcessorManager(LoggingMixin):
# in sync mode we need to be told to start a "loop"
self.start_new_processes()
while True:
- loop_start_time = time.monotonic()
- ready = multiprocessing.connection.wait(self.waitables.keys(),
timeout=poll_time)
- self.heartbeat()
- if self._direct_scheduler_conn is not None and
self._direct_scheduler_conn in ready:
- agent_signal = self._direct_scheduler_conn.recv()
-
- self.log.debug("Received %s signal from
DagFileProcessorAgent", agent_signal)
- if agent_signal == DagParsingSignal.TERMINATE_MANAGER:
- self.terminate()
- break
- elif agent_signal == DagParsingSignal.END_MANAGER:
- self.end()
- sys.exit(os.EX_OK)
- elif agent_signal == DagParsingSignal.AGENT_RUN_ONCE:
- # continue the loop to parse dags
- pass
- elif isinstance(agent_signal, CallbackRequest):
- self._add_callback_to_queue(agent_signal)
- else:
- raise ValueError(f"Invalid message {type(agent_signal)}")
-
- if not ready and not self._async_mode:
- # In "sync" mode we don't want to parse the DAGs until we
- # are told to (as that would open another connection to the
- # SQLite DB which isn't a good practice
-
- # This shouldn't happen, as in sync mode poll should block for
- # ever. Lets be defensive about that.
- self.log.warning(
- "wait() unexpectedly returned nothing ready after infinite
timeout (%r)!", poll_time
- )
-
- continue
-
- for sentinel in ready:
- if sentinel is not self._direct_scheduler_conn:
- processor = self.waitables.get(sentinel)
- if processor:
- self._collect_results_from_processor(processor)
- self.waitables.pop(sentinel)
- self._processors.pop(processor.file_path)
-
- if self.standalone_dag_processor:
- self._fetch_callbacks(max_callbacks_per_loop)
- self._scan_stale_dags()
- DagWarning.purge_inactive_dag_warnings()
- refreshed_dag_dir = self._refresh_dag_dir()
-
- self._kill_timed_out_processors()
-
- # Generate more file paths to process if we processed all the
files already. Note for this
- # to clear down, we must have cleared all files found from
scanning the dags dir _and_ have
- # cleared all files added as a result of callbacks
- if not self._file_path_queue:
- self.emit_metrics()
- self.prepare_file_path_queue()
-
- # if new files found in dag dir, add them
- elif refreshed_dag_dir:
- self.add_new_file_path_to_queue()
-
- self._refresh_requested_filelocs()
- self.start_new_processes()
-
- # Update number of loop iteration.
- self._num_run += 1
-
- if not self._async_mode:
- self.log.debug("Waiting for processors to finish since we're
using sqlite")
- # Wait until the running DAG processors are finished before
- # sending a DagParsingStat message back. This means the Agent
- # can tell we've got to the end of this iteration when it sees
- # this type of message
- self.wait_until_finished()
-
- # Collect anything else that has finished, but don't kick off any
more processors
- self.collect_results()
+ with Trace.start_span(span_name="dag_parsing_loop",
component="DagFileProcessorManager") as span:
+ loop_start_time = time.monotonic()
+ ready = multiprocessing.connection.wait(self.waitables.keys(),
timeout=poll_time)
+ if span.is_recording():
+ span.add_event(name="heartbeat")
+ self.heartbeat()
+ if self._direct_scheduler_conn is not None and
self._direct_scheduler_conn in ready:
+ agent_signal = self._direct_scheduler_conn.recv()
+
+ self.log.debug("Received %s signal from
DagFileProcessorAgent", agent_signal)
+ if agent_signal == DagParsingSignal.TERMINATE_MANAGER:
+ if span.is_recording():
+ span.add_event(name="terminate")
+ self.terminate()
+ break
+ elif agent_signal == DagParsingSignal.END_MANAGER:
+ if span.is_recording():
+ span.add_event(name="end")
+ self.end()
+ sys.exit(os.EX_OK)
+ elif agent_signal == DagParsingSignal.AGENT_RUN_ONCE:
+ # continue the loop to parse dags
+ pass
+ elif isinstance(agent_signal, CallbackRequest):
+ self._add_callback_to_queue(agent_signal)
+ else:
+ raise ValueError(f"Invalid message
{type(agent_signal)}")
+
+ if not ready and not self._async_mode:
+ # In "sync" mode we don't want to parse the DAGs until we
+ # are told to (as that would open another connection to the
+ # SQLite DB which isn't a good practice
+
+ # This shouldn't happen, as in sync mode poll should block
for
+ # ever. Lets be defensive about that.
+ self.log.warning(
+ "wait() unexpectedly returned nothing ready after
infinite timeout (%r)!", poll_time
+ )
- self._print_stat()
+ continue
- all_files_processed = all(self.get_last_finish_time(x) is not None
for x in self.file_paths)
- max_runs_reached = self.max_runs_reached()
+ for sentinel in ready:
+ if sentinel is not self._direct_scheduler_conn:
+ processor = self.waitables.get(sentinel)
+ if processor:
+ self._collect_results_from_processor(processor)
+ self.waitables.pop(sentinel)
+ self._processors.pop(processor.file_path)
+
+ if self.standalone_dag_processor:
+ self._fetch_callbacks(max_callbacks_per_loop)
+ self._scan_stale_dags()
+ DagWarning.purge_inactive_dag_warnings()
+ refreshed_dag_dir = self._refresh_dag_dir()
+
+ if span.is_recording():
+ span.add_event(name="_kill_timed_out_processors")
+ self._kill_timed_out_processors()
+
+ # Generate more file paths to process if we processed all the
files already. Note for this
+ # to clear down, we must have cleared all files found from
scanning the dags dir _and_ have
+ # cleared all files added as a result of callbacks
+ if not self._file_path_queue:
+ self.emit_metrics()
+ if span.is_recording():
+ span.add_event(name="prepare_file_path_queue")
+ self.prepare_file_path_queue()
+
+ # if new files found in dag dir, add them
+ elif refreshed_dag_dir:
+ if span.is_recording():
+ span.add_event(name="add_new_file_path_to_queue")
+ self.add_new_file_path_to_queue()
+
+ self._refresh_requested_filelocs()
+ if span.is_recording():
+ span.add_event(name="start_new_processes")
+ self.start_new_processes()
+
+ # Update number of loop iteration.
+ self._num_run += 1
+
+ if not self._async_mode:
+ self.log.debug("Waiting for processors to finish since
we're using sqlite")
+ # Wait until the running DAG processors are finished before
+ # sending a DagParsingStat message back. This means the
Agent
+ # can tell we've got to the end of this iteration when it
sees
+ # this type of message
+ self.wait_until_finished()
+
+ # Collect anything else that has finished, but don't kick off
any more processors
+ if span.is_recording():
+ span.add_event(name="collect_results")
+ self.collect_results()
+
+ if span.is_recording():
+ span.add_event(name="print_stat")
+ self._print_stat()
+
+ all_files_processed = all(self.get_last_finish_time(x) is not
None for x in self.file_paths)
+ max_runs_reached = self.max_runs_reached()
- try:
- if self._direct_scheduler_conn:
- self._direct_scheduler_conn.send(
- DagParsingStat(
- max_runs_reached,
- all_files_processed,
+ try:
+ if self._direct_scheduler_conn:
+ self._direct_scheduler_conn.send(
+ DagParsingStat(
+ max_runs_reached,
+ all_files_processed,
+ )
)
+ except BlockingIOError:
+ # Try again next time around the loop!
+
+ # It is better to fail, than it is deadlock. This should
+ # "almost never happen" since the DagParsingStat object is
+ # small, and in async mode this stat is not actually
_required_
+ # for normal operation (It only drives "max runs")
+ self.log.debug("BlockingIOError received trying to send
DagParsingStat, ignoring")
+
+ if max_runs_reached:
+ self.log.info(
+ "Exiting dag parsing loop as all files have been
processed %s times", self._max_runs
)
- except BlockingIOError:
- # Try again next time around the loop!
-
- # It is better to fail, than it is deadlock. This should
- # "almost never happen" since the DagParsingStat object is
- # small, and in async mode this stat is not actually _required_
- # for normal operation (It only drives "max runs")
- self.log.debug("BlockingIOError received trying to send
DagParsingStat, ignoring")
-
- if max_runs_reached:
- self.log.info(
- "Exiting dag parsing loop as all files have been processed
%s times", self._max_runs
- )
- break
+ if span.is_recording():
+ span.add_event(
+ name="info",
+ attributes={
+ "message": "Exiting dag parsing loop as all
files have been processed {self._max_runs} times"
+ },
+ )
+ break
- if self._async_mode:
- loop_duration = time.monotonic() - loop_start_time
- if loop_duration < 1:
- poll_time = 1 - loop_duration
- else:
- poll_time = 0.0
+ if self._async_mode:
+ loop_duration = time.monotonic() - loop_start_time
+ if loop_duration < 1:
+ poll_time = 1 - loop_duration
+ else:
+ poll_time = 0.0
@provide_session
def _fetch_callbacks(self, max_callbacks: int, session: Session =
NEW_SESSION):
@@ -1103,6 +1135,24 @@ class DagFileProcessorManager(LoggingMixin):
)
self._file_stats[processor.file_path] = stat
file_name = Path(processor.file_path).stem
+ """crude exposure of instrumentation code which may need to be
furnished"""
+ span = Trace.get_tracer("DagFileProcessorManager").start_span(
+ "dag_processing", start_time=datetime_to_nano(processor.start_time)
+ )
+ span.set_attribute("file_path", processor.file_path)
+ span.set_attribute("run_count",
self.get_run_count(processor.file_path) + 1)
+
+ if processor.result is None:
+ span.set_attribute("error", True)
+ span.set_attribute("processor.exit_code", processor.exit_code)
+ else:
+ span.set_attribute("num_dags", num_dags)
+ span.set_attribute("import_errors", count_import_errors)
+ if count_import_errors > 0:
+ span.set_attribute("error", True)
+
+ span.end(end_time=datetime_to_nano(last_finish_time))
+
Stats.timing(f"dag_processing.last_duration.{file_name}",
last_duration)
Stats.timing("dag_processing.last_duration", last_duration,
tags={"file_name": file_name})
@@ -1134,6 +1184,7 @@ class DagFileProcessorManager(LoggingMixin):
callback_requests=callback_requests,
)
+ @span
def start_new_processes(self):
"""Start more processors if we have enough slots and files to
process."""
# initialize cache to mutualize calls to Variable.get in DAGs
@@ -1157,14 +1208,21 @@ class DagFileProcessorManager(LoggingMixin):
del self._callback_to_execute[file_path]
Stats.incr("dag_processing.processes", tags={"file_path":
file_path, "action": "start"})
-
+ span = Trace.get_current_span()
+ span.set_attribute("category", "processing")
processor.start()
self.log.debug("Started a process (PID: %s) to generate tasks for
%s", processor.pid, file_path)
+ if span.is_recording():
+ span.add_event(
+ name="dag_processing processor started",
+ attributes={"file_path": file_path, "pid": processor.pid},
+ )
self._processors[file_path] = processor
self.waitables[processor.waitable_handle] = processor
Stats.gauge("dag_processing.file_path_queue_size",
len(self._file_path_queue))
+ @span
def add_new_file_path_to_queue(self):
for file_path in self.file_paths:
if file_path not in self._file_stats:
@@ -1172,6 +1230,11 @@ class DagFileProcessorManager(LoggingMixin):
self.log.info("Adding new file %s to parsing queue", file_path)
self._file_stats[file_path] =
DagFileProcessorManager.DEFAULT_FILE_STAT
self._file_path_queue.appendleft(file_path)
+ span = Trace.get_current_span()
+ if span.is_recording():
+ span.add_event(
+ name="adding new file to parsing queue",
attributes={"file_path": file_path}
+ )
def prepare_file_path_queue(self):
"""
@@ -1285,6 +1348,13 @@ class DagFileProcessorManager(LoggingMixin):
# Deprecated; may be removed in a future Airflow release.
Stats.incr("dag_file_processor_timeouts")
processor.kill()
+ span = Trace.get_current_span()
+ span.set_attribute("category", "processing")
+ if span.is_recording():
+ span.add_event(
+ name="dag processing killed processor",
+ attributes={"file_path": file_path, "action":
"timeout"},
+ )
# Clean up processor references
self.waitables.pop(processor.waitable_handle)
@@ -1345,12 +1415,16 @@ class DagFileProcessorManager(LoggingMixin):
This is called once every time around the parsing "loop" - i.e. after
all files have been parsed.
"""
- parse_time = time.perf_counter() - self._parsing_start_time
- Stats.gauge("dag_processing.total_parse_time", parse_time)
- Stats.gauge("dagbag_size", sum(stat.num_dags for stat in
self._file_stats.values()))
- Stats.gauge(
- "dag_processing.import_errors", sum(stat.import_errors for stat in
self._file_stats.values())
- )
+ with Trace.start_span(span_name="emit_metrics",
component="DagFileProcessorManager") as span:
+ parse_time = time.perf_counter() - self._parsing_start_time
+ Stats.gauge("dag_processing.total_parse_time", parse_time)
+ Stats.gauge("dagbag_size", sum(stat.num_dags for stat in
self._file_stats.values()))
+ Stats.gauge(
+ "dag_processing.import_errors", sum(stat.import_errors for
stat in self._file_stats.values())
+ )
+ span.set_attribute("total_parse_time", parse_time)
+ span.set_attribute("dag_bag_size", sum(stat.num_dags for stat in
self._file_stats.values()))
+ span.set_attribute("import_errors", sum(stat.import_errors for
stat in self._file_stats.values()))
@property
def file_paths(self):
diff --git a/airflow/executors/base_executor.py
b/airflow/executors/base_executor.py
index 0f94701174..098bb93215 100644
--- a/airflow/executors/base_executor.py
+++ b/airflow/executors/base_executor.py
@@ -32,6 +32,9 @@ from airflow.configuration import conf
from airflow.exceptions import RemovedInAirflow3Warning
from airflow.models import Log
from airflow.stats import Stats
+from airflow.traces import NO_TRACE_ID
+from airflow.traces.tracer import Trace, gen_context, span
+from airflow.traces.utils import gen_span_id_from_ti_key, gen_trace_id
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.state import TaskInstanceState
@@ -224,6 +227,7 @@ class BaseExecutor(LoggingMixin):
Executors should override this to perform gather statuses.
"""
+ @span
def heartbeat(self) -> None:
"""Heartbeat sent to trigger new jobs."""
if not self.parallelism:
@@ -241,6 +245,17 @@ class BaseExecutor(LoggingMixin):
else:
self.log.debug("%s open slots", open_slots)
+ span = Trace.get_current_span()
+ if span.is_recording():
+ span.add_event(
+ name="executor",
+ attributes={
+ "executor.open_slots": open_slots,
+ "executor.queued_tasks": num_queued_tasks,
+ "executor.running_tasks": num_running_tasks,
+ },
+ )
+
Stats.gauge(
"executor.open_slots", value=open_slots, tags={"status": "open",
"name": self.__class__.__name__}
)
@@ -273,12 +288,14 @@ class BaseExecutor(LoggingMixin):
reverse=True,
)
+ @span
def trigger_tasks(self, open_slots: int) -> None:
"""
Initiate async execution of the queued tasks, up to the number of
available slots.
:param open_slots: Number of open slots
"""
+ span = Trace.get_current_span()
sorted_queue = self.order_queued_tasks_by_priority()
task_tuples = []
@@ -322,15 +339,40 @@ class BaseExecutor(LoggingMixin):
if key in self.attempts:
del self.attempts[key]
task_tuples.append((key, command, queue, ti.executor_config))
+ if span.is_recording():
+ span.add_event(
+ name="task to trigger",
+ attributes={"command": str(command), "conf":
str(ti.executor_config)},
+ )
if task_tuples:
self._process_tasks(task_tuples)
+ @span
def _process_tasks(self, task_tuples: list[TaskTuple]) -> None:
for key, command, queue, executor_config in task_tuples:
- del self.queued_tasks[key]
- self.execute_async(key=key, command=command, queue=queue,
executor_config=executor_config)
- self.running.add(key)
+ task_instance = self.queued_tasks[key][3] # TaskInstance in
fourth element
+ trace_id = int(gen_trace_id(task_instance.dag_run, as_int=True))
+ span_id = int(gen_span_id_from_ti_key(key, as_int=True))
+ links = [{"trace_id": trace_id, "span_id": span_id}]
+
+ # assuming that the span_id will very likely be unique inside the
trace
+ with Trace.start_span(
+ span_name=f"{key.dag_id}.{key.task_id}",
+ component="BaseExecutor",
+ span_id=span_id,
+ links=links,
+ ) as span:
+ span.set_attribute("dag_id", key.dag_id)
+ span.set_attribute("run_id", key.run_id)
+ span.set_attribute("task_id", key.task_id)
+ span.set_attribute("try_number", key.try_number)
+ span.set_attribute("command", str(command))
+ span.set_attribute("queue", str(queue))
+ span.set_attribute("executor_config", str(executor_config))
+ del self.queued_tasks[key]
+ self.execute_async(key=key, command=command, queue=queue,
executor_config=executor_config)
+ self.running.add(key)
def change_state(
self, key: TaskInstanceKey, state: TaskInstanceState, info=None,
remove_running=True
@@ -358,6 +400,20 @@ class BaseExecutor(LoggingMixin):
:param info: Executor information for the task instance
:param key: Unique key for the task instance
"""
+ trace_id = Trace.get_current_span().get_span_context().trace_id
+ if trace_id != NO_TRACE_ID:
+ span_id = int(gen_span_id_from_ti_key(key, as_int=True))
+ with Trace.start_span(
+ span_name="fail",
+ component="BaseExecutor",
+ parent_sc=gen_context(trace_id=trace_id, span_id=span_id),
+ ) as span:
+ span.set_attribute("dag_id", key.dag_id)
+ span.set_attribute("run_id", key.run_id)
+ span.set_attribute("task_id", key.task_id)
+ span.set_attribute("try_number", key.try_number)
+ span.set_attribute("error", True)
+
self.change_state(key, TaskInstanceState.FAILED, info)
def success(self, key: TaskInstanceKey, info=None) -> None:
@@ -367,6 +423,19 @@ class BaseExecutor(LoggingMixin):
:param info: Executor information for the task instance
:param key: Unique key for the task instance
"""
+ trace_id = Trace.get_current_span().get_span_context().trace_id
+ if trace_id != NO_TRACE_ID:
+ span_id = int(gen_span_id_from_ti_key(key, as_int=True))
+ with Trace.start_span(
+ span_name="success",
+ component="BaseExecutor",
+ parent_sc=gen_context(trace_id=trace_id, span_id=span_id),
+ ) as span:
+ span.set_attribute("dag_id", key.dag_id)
+ span.set_attribute("run_id", key.run_id)
+ span.set_attribute("task_id", key.task_id)
+ span.set_attribute("try_number", key.try_number - 1)
+
self.change_state(key, TaskInstanceState.SUCCESS, info)
def queued(self, key: TaskInstanceKey, info=None) -> None:
diff --git a/airflow/executors/local_executor.py
b/airflow/executors/local_executor.py
index 3b2670e755..90cedf7dbd 100644
--- a/airflow/executors/local_executor.py
+++ b/airflow/executors/local_executor.py
@@ -39,6 +39,7 @@ from setproctitle import getproctitle, setproctitle
from airflow import settings
from airflow.exceptions import AirflowException
from airflow.executors.base_executor import PARALLELISM, BaseExecutor
+from airflow.traces.tracer import Trace, span
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.state import TaskInstanceState
@@ -77,6 +78,7 @@ class LocalWorkerBase(Process, LoggingMixin):
setproctitle("airflow worker -- LocalExecutor")
return super().run()
+ @span
def execute_work(self, key: TaskInstanceKey, command: CommandType) -> None:
"""
Execute command received and stores result state in queue.
@@ -98,6 +100,7 @@ class LocalWorkerBase(Process, LoggingMixin):
# Remove the command since the worker is done executing the task
setproctitle("airflow worker -- LocalExecutor")
+ @span
def _execute_work_in_subprocess(self, command: CommandType) ->
TaskInstanceState:
try:
subprocess.check_call(command, close_fds=True)
@@ -106,6 +109,7 @@ class LocalWorkerBase(Process, LoggingMixin):
self.log.error("Failed to execute task %s.", e)
return TaskInstanceState.FAILED
+ @span
def _execute_work_in_fork(self, command: CommandType) -> TaskInstanceState:
pid = os.fork()
if pid:
@@ -165,6 +169,7 @@ class LocalWorker(LocalWorkerBase):
self.key: TaskInstanceKey = key
self.command: CommandType = command
+ @span
def do_work(self) -> None:
self.execute_work(key=self.key, command=self.command)
@@ -184,6 +189,7 @@ class QueuedLocalWorker(LocalWorkerBase):
super().__init__(result_queue=result_queue)
self.task_queue = task_queue
+ @span
def do_work(self) -> None:
while True:
try:
@@ -244,6 +250,7 @@ class LocalExecutor(BaseExecutor):
self.executor.workers_used = 0
self.executor.workers_active = 0
+ @span
def execute_async(
self,
key: TaskInstanceKey,
@@ -262,6 +269,14 @@ class LocalExecutor(BaseExecutor):
if TYPE_CHECKING:
assert self.executor.result_queue
+ span = Trace.get_current_span()
+ if span.is_recording():
+ span.set_attribute("dag_id", key.dag_id)
+ span.set_attribute("run_id", key.run_id)
+ span.set_attribute("task_id", key.task_id)
+ span.set_attribute("try_number", key.try_number - 1)
+ span.set_attribute("commands_to_run", str(command))
+
local_worker = LocalWorker(self.executor.result_queue, key=key,
command=command)
self.executor.workers_used += 1
self.executor.workers_active += 1
@@ -311,6 +326,7 @@ class LocalExecutor(BaseExecutor):
for worker in self.executor.workers:
worker.start()
+ @span
def execute_async(
self,
key: TaskInstanceKey,
@@ -372,6 +388,7 @@ class LocalExecutor(BaseExecutor):
self.impl.start()
+ @span
def execute_async(
self,
key: TaskInstanceKey,
diff --git a/airflow/executors/sequential_executor.py
b/airflow/executors/sequential_executor.py
index 41b8ae9ddc..1b145892eb 100644
--- a/airflow/executors/sequential_executor.py
+++ b/airflow/executors/sequential_executor.py
@@ -29,6 +29,7 @@ import subprocess
from typing import TYPE_CHECKING, Any
from airflow.executors.base_executor import BaseExecutor
+from airflow.traces.tracer import Trace, span
if TYPE_CHECKING:
from airflow.executors.base_executor import CommandType
@@ -59,6 +60,7 @@ class SequentialExecutor(BaseExecutor):
super().__init__()
self.commands_to_run = []
+ @span
def execute_async(
self,
key: TaskInstanceKey,
@@ -69,6 +71,14 @@ class SequentialExecutor(BaseExecutor):
self.validate_airflow_tasks_run_command(command)
self.commands_to_run.append((key, command))
+ span = Trace.get_current_span()
+ if span.is_recording():
+ span.set_attribute("dag_id", key.dag_id)
+ span.set_attribute("run_id", key.run_id)
+ span.set_attribute("task_id", key.task_id)
+ span.set_attribute("try_number", key.try_number - 1)
+ span.set_attribute("commands_to_run", str(self.commands_to_run))
+
def sync(self) -> None:
for key, command in self.commands_to_run:
self.log.info("Executing command: %s", command)
diff --git a/airflow/jobs/job.py b/airflow/jobs/job.py
index 4273f1d334..9384821807 100644
--- a/airflow/jobs/job.py
+++ b/airflow/jobs/job.py
@@ -34,6 +34,7 @@ from airflow.listeners.listener import get_listener_manager
from airflow.models.base import ID_LEN, Base
from airflow.serialization.pydantic.job import JobPydantic
from airflow.stats import Stats
+from airflow.traces.tracer import Trace, span
from airflow.utils import timezone
from airflow.utils.helpers import convert_camel_to_snake
from airflow.utils.log.logging_mixin import LoggingMixin
@@ -199,52 +200,62 @@ class Job(Base, LoggingMixin):
:param session to use for saving the job
"""
previous_heartbeat = self.latest_heartbeat
-
- try:
- # This will cause it to load from the db
- self._merge_from(Job._fetch_from_db(self, session))
- previous_heartbeat = self.latest_heartbeat
-
- if self.state == JobState.RESTARTING:
- self.kill()
-
- # Figure out how long to sleep for
- sleep_for = 0
- if self.latest_heartbeat:
- seconds_remaining = (
- self.heartrate - (timezone.utcnow() -
self.latest_heartbeat).total_seconds()
- )
- sleep_for = max(0, seconds_remaining)
- sleep(sleep_for)
-
- job = Job._update_heartbeat(job=self, session=session)
- self._merge_from(job)
- time_since_last_heartbeat = (timezone.utcnow() -
previous_heartbeat).total_seconds()
- health_check_threshold_value =
health_check_threshold(self.job_type, self.heartrate)
- if time_since_last_heartbeat > health_check_threshold_value:
- self.log.info("Heartbeat recovered after %.2f seconds",
time_since_last_heartbeat)
- # At this point, the DB has updated.
- previous_heartbeat = self.latest_heartbeat
-
- heartbeat_callback(session)
- self.log.debug("[heartbeat]")
- self.heartbeat_failed = False
- except OperationalError:
- Stats.incr(convert_camel_to_snake(self.__class__.__name__) +
"_heartbeat_failure", 1, 1)
- if not self.heartbeat_failed:
- self.log.exception("%s heartbeat failed with error",
self.__class__.__name__)
- self.heartbeat_failed = True
- if self.is_alive():
- self.log.error(
- "%s heartbeat failed with error. Scheduler may go into
unhealthy state",
- self.__class__.__name__,
- )
- else:
- self.log.error(
- "%s heartbeat failed with error. Scheduler is in unhealthy
state", self.__class__.__name__
- )
- # We didn't manage to heartbeat, so make sure that the timestamp
isn't updated
- self.latest_heartbeat = previous_heartbeat
+ with Trace.start_span(span_name="heartbeat", component="Job") as span:
+ try:
+ span.set_attribute("heartbeat", str(self.latest_heartbeat))
+ # This will cause it to load from the db
+ self._merge_from(Job._fetch_from_db(self, session))
+ previous_heartbeat = self.latest_heartbeat
+
+ if self.state == JobState.RESTARTING:
+ self.kill()
+
+ # Figure out how long to sleep for
+ sleep_for = 0
+ if self.latest_heartbeat:
+ seconds_remaining = (
+ self.heartrate - (timezone.utcnow() -
self.latest_heartbeat).total_seconds()
+ )
+ sleep_for = max(0, seconds_remaining)
+ if span.is_recording():
+ span.add_event(name="sleep", attributes={"sleep_for":
sleep_for})
+ sleep(sleep_for)
+
+ job = Job._update_heartbeat(job=self, session=session)
+ self._merge_from(job)
+ time_since_last_heartbeat = (timezone.utcnow() -
previous_heartbeat).total_seconds()
+ health_check_threshold_value =
health_check_threshold(self.job_type, self.heartrate)
+ if time_since_last_heartbeat > health_check_threshold_value:
+ self.log.info("Heartbeat recovered after %.2f seconds",
time_since_last_heartbeat)
+ # At this point, the DB has updated.
+ previous_heartbeat = self.latest_heartbeat
+
+ heartbeat_callback(session)
+ self.log.debug("[heartbeat]")
+ self.heartbeat_failed = False
+ except OperationalError:
+ Stats.incr(convert_camel_to_snake(self.__class__.__name__) +
"_heartbeat_failure", 1, 1)
+ if not self.heartbeat_failed:
+ self.log.exception("%s heartbeat failed with error",
self.__class__.__name__)
+ self.heartbeat_failed = True
+ msg = f"{self.__class__.__name__} heartbeat got an
exception"
+ if span.is_recording():
+ span.add_event(name="error", attributes={"message":
msg})
+ if self.is_alive():
+ self.log.error(
+ "%s heartbeat failed with error. Scheduler may go into
unhealthy state",
+ self.__class__.__name__,
+ )
+ msg = f"{self.__class__.__name__} heartbeat failed with
error. Scheduler may go into unhealthy state"
+ if span.is_recording():
+ span.add_event(name="error", attributes={"message":
msg})
+ else:
+ msg = f"{self.__class__.__name__} heartbeat failed with
error. Scheduler is in unhealthy state"
+ self.log.error(msg)
+ if span.is_recording():
+ span.add_event(name="error", attributes={"message":
msg})
+ # We didn't manage to heartbeat, so make sure that the
timestamp isn't updated
+ self.latest_heartbeat = previous_heartbeat
@provide_session
def prepare_for_execution(self, session: Session = NEW_SESSION):
@@ -448,6 +459,7 @@ def execute_job(job: Job, execute_callable: Callable[[],
int | None]) -> int | N
return ret
+@span
def perform_heartbeat(
job: Job, heartbeat_callback: Callable[[Session], None],
only_if_necessary: bool
) -> None:
diff --git a/airflow/jobs/local_task_job_runner.py
b/airflow/jobs/local_task_job_runner.py
index 96e36bcfe7..a6a1f0ac8f 100644
--- a/airflow/jobs/local_task_job_runner.py
+++ b/airflow/jobs/local_task_job_runner.py
@@ -28,6 +28,7 @@ from airflow.jobs.base_job_runner import BaseJobRunner
from airflow.jobs.job import perform_heartbeat
from airflow.models.taskinstance import TaskReturnCode
from airflow.stats import Stats
+from airflow.traces.tracer import Trace
from airflow.utils import timezone
from airflow.utils.log.file_task_handler import _set_task_deferred_context_var
from airflow.utils.log.logging_mixin import LoggingMixin
@@ -184,39 +185,55 @@ class LocalTaskJobRunner(BaseJobRunner, LoggingMixin):
# If LocalTaskJob receives SIGTERM, LocalTaskJob passes SIGTERM to
_run_raw_task
# If the state of task_instance is changed, LocalTaskJob sends
SIGTERM to _run_raw_task
while not self.terminating:
- # Monitor the task to see if it's done. Wait in a syscall
- # (`os.wait`) for as long as possible so we notice the
- # subprocess finishing as quick as we can
- max_wait_time = max(
- 0, # Make sure this value is never negative,
- min(
- (
- heartbeat_time_limit
- - (timezone.utcnow() -
self.job.latest_heartbeat).total_seconds() * 0.75
+ with Trace.start_span(
+ span_name="local_task_job_loop",
component="LocalTaskJobRunner"
+ ) as span:
+ # Monitor the task to see if it's done. Wait in a syscall
+ # (`os.wait`) for as long as possible so we notice the
+ # subprocess finishing as quick as we can
+ max_wait_time = max(
+ 0, # Make sure this value is never negative,
+ min(
+ (
+ heartbeat_time_limit
+ - (timezone.utcnow() -
self.job.latest_heartbeat).total_seconds() * 0.75
+ ),
+ self.job.heartrate if self.job.heartrate is not
None else heartbeat_time_limit,
),
- self.job.heartrate if self.job.heartrate is not None
else heartbeat_time_limit,
- ),
- )
- return_code =
self.task_runner.return_code(timeout=max_wait_time)
- if return_code is not None:
- self.handle_task_exit(return_code)
- return return_code
-
- perform_heartbeat(
- job=self.job, heartbeat_callback=self.heartbeat_callback,
only_if_necessary=False
- )
-
- # If it's been too long since we've heartbeat, then it's
possible that
- # the scheduler rescheduled this task, so kill launched
processes.
- # This can only really happen if the worker can't read the DB
for a long time
- time_since_last_heartbeat = (timezone.utcnow() -
self.job.latest_heartbeat).total_seconds()
- if time_since_last_heartbeat > heartbeat_time_limit:
- Stats.incr("local_task_job_prolonged_heartbeat_failure",
1, 1)
- self.log.error("Heartbeat time limit exceeded!")
- raise AirflowException(
- f"Time since last
heartbeat({time_since_last_heartbeat:.2f}s) exceeded limit "
- f"({heartbeat_time_limit}s)."
)
+ return_code =
self.task_runner.return_code(timeout=max_wait_time)
+ if return_code is not None:
+ self.handle_task_exit(return_code)
+ return return_code
+
+ if span.is_recording():
+ span.add_event(name="perform_heartbeat")
+ perform_heartbeat(
+ job=self.job,
heartbeat_callback=self.heartbeat_callback, only_if_necessary=False
+ )
+
+ # If it's been too long since we've heartbeat, then it's
possible that
+ # the scheduler rescheduled this task, so kill launched
processes.
+ # This can only really happen if the worker can't read the
DB for a long time
+ time_since_last_heartbeat = (
+ timezone.utcnow() - self.job.latest_heartbeat
+ ).total_seconds()
+ if time_since_last_heartbeat > heartbeat_time_limit:
+
Stats.incr("local_task_job_prolonged_heartbeat_failure", 1, 1)
+ self.log.error("Heartbeat time limit exceeded!")
+ if span.is_recording():
+ span.add_event(
+ name="error",
+ attributes={
+ "message": "Heartbeat time limit exceeded",
+ "heartbeat_time_limit(s)":
heartbeat_time_limit,
+ "time_since_last_heartbeat(s)":
time_since_last_heartbeat,
+ },
+ )
+ raise AirflowException(
+ f"Time since last
heartbeat({time_since_last_heartbeat:.2f}s) exceeded limit "
+ f"({heartbeat_time_limit}s)."
+ )
return return_code
finally:
# Print a marker for log grouping of details before task execution
diff --git a/airflow/jobs/scheduler_job_runner.py
b/airflow/jobs/scheduler_job_runner.py
index f657cbd5f6..883cd05d2b 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -60,7 +60,10 @@ from airflow.models.taskinstance import SimpleTaskInstance,
TaskInstance
from airflow.stats import Stats
from airflow.ti_deps.dependencies_states import EXECUTION_STATES
from airflow.timetables.simple import DatasetTriggeredTimetable
+from airflow.traces import utils as trace_utils
+from airflow.traces.tracer import Trace, span
from airflow.utils import timezone
+from airflow.utils.dates import datetime_to_nano
from airflow.utils.event_scheduler import EventScheduler
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.retries import MAX_DB_RETRIES, retry_db_transaction,
run_with_db_retries
@@ -815,6 +818,60 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
ti.pid,
)
+ with Trace.start_span_from_taskinstance(ti=ti) as span:
+ span.set_attribute("category", "scheduler")
+ span.set_attribute("task_id", ti.task_id)
+ span.set_attribute("dag_id", ti.dag_id)
+ span.set_attribute("state", ti.state)
+ if ti.state == TaskInstanceState.FAILED:
+ span.set_attribute("error", True)
+ span.set_attribute("start_date", str(ti.start_date))
+ span.set_attribute("end_date", str(ti.end_date))
+ span.set_attribute("duration", ti.duration)
+ span.set_attribute("executor_config", str(ti.executor_config))
+ span.set_attribute("execution_date", str(ti.execution_date))
+ span.set_attribute("hostname", ti.hostname)
+ span.set_attribute("log_url", ti.log_url)
+ span.set_attribute("operator", str(ti.operator))
+ span.set_attribute("try_number", ti.try_number - 1)
+ span.set_attribute("executor_state", state)
+ span.set_attribute("job_id", ti.job_id)
+ span.set_attribute("pool", ti.pool)
+ span.set_attribute("queue", ti.queue)
+ span.set_attribute("priority_weight", ti.priority_weight)
+ span.set_attribute("queued_dttm", str(ti.queued_dttm))
+ span.set_attribute("ququed_by_job_id", ti.queued_by_job_id)
+ span.set_attribute("pid", ti.pid)
+ if span.is_recording():
+ span.add_event(name="queued",
timestamp=datetime_to_nano(ti.queued_dttm))
+ span.add_event(name="started",
timestamp=datetime_to_nano(ti.start_date))
+ span.add_event(name="ended",
timestamp=datetime_to_nano(ti.end_date))
+ if conf.has_option("traces", "otel_task_log_event") and
conf.getboolean(
+ "traces", "otel_task_log_event"
+ ):
+ from airflow.utils.log.log_reader import TaskLogReader
+
+ task_log_reader = TaskLogReader()
+ if task_log_reader.supports_read:
+ metadata: dict[str, Any] = {}
+ logs, metadata = task_log_reader.read_log_chunks(ti,
ti.try_number, metadata)
+ if ti.hostname in dict(logs[0]):
+ message =
str(dict(logs[0])[ti.hostname]).replace("\\n", "\n")
+ while metadata["end_of_log"] is False:
+ logs, metadata =
task_log_reader.read_log_chunks(
+ ti, ti.try_number - 1, metadata
+ )
+ if ti.hostname in dict(logs[0]):
+ message = message +
str(dict(logs[0])[ti.hostname]).replace("\\n", "\n")
+ if span.is_recording():
+ span.add_event(
+ name="task_log",
+ attributes={
+ "message": message,
+ "metadata": str(metadata),
+ },
+ )
+
# There are two scenarios why the same TI with the same try_number
is queued
# after executor is finished with it:
# 1) the TI was killed externally and it had no time to mark
itself failed
@@ -1044,13 +1101,16 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
)
for loop_count in itertools.count(start=1):
- with Stats.timer("scheduler.scheduler_loop_duration") as timer:
- if self.using_sqlite and self.processor_agent:
- self.processor_agent.run_single_parsing_loop()
- # For the sqlite case w/ 1 thread, wait until the processor
- # is finished to avoid concurrent access to the DB.
- self.log.debug("Waiting for processors to finish since
we're using sqlite")
- self.processor_agent.wait_until_finished()
+ with Trace.start_span(span_name="scheduler_job_loop",
component="SchedulerJobRunner") as span:
+ span.set_attribute("category", "scheduler")
+ span.set_attribute("loop_count", loop_count)
+ with Stats.timer("scheduler.scheduler_loop_duration") as timer:
+ if self.using_sqlite and self.processor_agent:
+ self.processor_agent.run_single_parsing_loop()
+ # For the sqlite case w/ 1 thread, wait until the
processor
+ # is finished to avoid concurrent access to the DB.
+ self.log.debug("Waiting for processors to finish since
we're using sqlite")
+ self.processor_agent.wait_until_finished()
with create_session() as session:
# This will schedule for as many executors as possible.
@@ -1079,16 +1139,23 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
if self.processor_agent:
self.processor_agent.heartbeat()
- # Heartbeat the scheduler periodically
- perform_heartbeat(
- job=self.job, heartbeat_callback=self.heartbeat_callback,
only_if_necessary=True
- )
+ # Heartbeat the scheduler periodically
+ perform_heartbeat(
+ job=self.job,
heartbeat_callback=self.heartbeat_callback, only_if_necessary=True
+ )
- # Run any pending timed events
- next_event = timers.run(blocking=False)
- self.log.debug("Next timed event is in %f", next_event)
+ # Run any pending timed events
+ next_event = timers.run(blocking=False)
+ self.log.debug("Next timed event is in %f", next_event)
self.log.debug("Ran scheduling loop in %.2f seconds",
timer.duration)
+ if span.is_recording():
+ span.add_event(
+ name="Ran scheduling loop",
+ attributes={
+ "duration in seconds": timer.duration,
+ },
+ )
if not is_unit_test and not num_queued_tis and not
num_finished_events:
# If the scheduler is doing things, don't sleep. This means
when there is work to do, the
@@ -1102,6 +1169,8 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
self.num_runs,
loop_count,
)
+ if span.is_recording():
+ span.add_event("Exiting scheduler loop as requested number
of runs has been reached")
break
if self.processor_agent and self.processor_agent.done:
self.log.info(
@@ -1110,6 +1179,8 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
self.num_times_parse_dags,
loop_count,
)
+ if span.is_recording():
+ span.add_event("Exiting scheduler loop as requested DAG
parse count has been reached")
break
def _do_scheduling(self, session: Session) -> int:
@@ -1229,6 +1300,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
guard.commit()
# END: create dagruns
+ @span
def _create_dag_runs(self, dag_models: Collection[DagModel], session:
Session) -> None:
"""Create a DAG run and update the dag_model to control if/when the
next DAGRun should be created."""
# Bulk Fetch DagRuns with dag_id and execution_date same
@@ -1436,6 +1508,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
return False
return True
+ @span
def _start_queued_dagruns(self, session: Session) -> None:
"""Find DagRuns in queued state and decide moving them to running
state."""
# added all() to save runtime, otherwise query is executed more than
once
@@ -1445,7 +1518,14 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
DagRun.active_runs_of_dags((dr.dag_id for dr in dag_runs),
only_running=True, session=session),
)
+ @span
def _update_state(dag: DAG, dag_run: DagRun):
+ __span = Trace.get_current_span()
+ __span.set_attribute("state", str(DagRunState.RUNNING))
+ __span.set_attribute("run_id", dag_run.run_id)
+ __span.set_attribute("type", dag_run.run_type)
+ __span.set_attribute("dag_id", dag_run.dag_id)
+
dag_run.state = DagRunState.RUNNING
dag_run.start_date = timezone.utcnow()
if dag.timetable.periodic and not dag_run.external_trigger and
dag_run.clear_number < 1:
@@ -1465,12 +1545,18 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
schedule_delay,
tags={"dag_id": dag.dag_id},
)
+ if __span.is_recording():
+ __span.add_event(
+ name="schedule_delay",
+ attributes={"dag_id": dag.dag_id, "schedule_delay":
str(schedule_delay)},
+ )
# cache saves time during scheduling of many dag_runs for same dag
cached_get_dag: Callable[[str], DAG | None] = lru_cache()(
partial(self.dagbag.get_dag, session=session)
)
+ _span = Trace.get_current_span()
for dag_run in dag_runs:
dag = dag_run.dag = cached_get_dag(dag_run.dag_id)
@@ -1487,6 +1573,15 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
dag_run.execution_date,
)
else:
+ if _span.is_recording():
+ _span.add_event(
+ name="dag_run",
+ attributes={
+ "run_id": dag_run.run_id,
+ "dag_id": dag_run.dag_id,
+ "conf": str(dag_run.conf),
+ },
+ )
active_runs_of_dags[dag_run.dag_id] += 1
_update_state(dag, dag_run)
dag_run.notify_dagrun_state_changed()
@@ -1514,70 +1609,101 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
:param dag_run: The DagRun to schedule
:return: Callback that needs to be executed
"""
- callback: DagCallbackRequest | None = None
+ trace_id = int(trace_utils.gen_trace_id(dag_run=dag_run, as_int=True))
+ span_id = int(trace_utils.gen_dag_span_id(dag_run=dag_run,
as_int=True))
+ links = [{"trace_id": trace_id, "span_id": span_id}]
+
+ with Trace.start_span(
+ span_name="_schedule_dag_run", component="SchedulerJobRunner",
links=links
+ ) as span:
+ span.set_attribute("dag_id", dag_run.dag_id)
+ span.set_attribute("run_id", dag_run.run_id)
+ span.set_attribute("run_type", dag_run.run_type)
+ callback: DagCallbackRequest | None = None
+
+ dag = dag_run.dag = self.dagbag.get_dag(dag_run.dag_id,
session=session)
+ dag_model = DM.get_dagmodel(dag_run.dag_id, session)
+
+ if not dag or not dag_model:
+ self.log.error("Couldn't find DAG %s in DAG bag or database!",
dag_run.dag_id)
+ return callback
+
+ if (
+ dag_run.start_date
+ and dag.dagrun_timeout
+ and dag_run.start_date < timezone.utcnow() - dag.dagrun_timeout
+ ):
+ dag_run.set_state(DagRunState.FAILED)
+ unfinished_task_instances = session.scalars(
+ select(TI)
+ .where(TI.dag_id == dag_run.dag_id)
+ .where(TI.run_id == dag_run.run_id)
+ .where(TI.state.in_(State.unfinished))
+ )
+ for task_instance in unfinished_task_instances:
+ task_instance.state = TaskInstanceState.SKIPPED
+ session.merge(task_instance)
+ session.flush()
+ self.log.info("Run %s of %s has timed-out", dag_run.run_id,
dag_run.dag_id)
+
+ if self._should_update_dag_next_dagruns(
+ dag, dag_model, last_dag_run=dag_run, session=session
+ ):
+ dag_model.calculate_dagrun_date_fields(dag,
dag.get_run_data_interval(dag_run))
+
+ callback_to_execute = DagCallbackRequest(
+ full_filepath=dag.fileloc,
+ dag_id=dag.dag_id,
+ run_id=dag_run.run_id,
+ is_failure_callback=True,
+ processor_subdir=dag_model.processor_subdir,
+ msg="timed_out",
+ )
- dag = dag_run.dag = self.dagbag.get_dag(dag_run.dag_id,
session=session)
- dag_model = DM.get_dagmodel(dag_run.dag_id, session)
+ dag_run.notify_dagrun_state_changed()
+ duration = dag_run.end_date - dag_run.start_date
+ Stats.timing(f"dagrun.duration.failed.{dag_run.dag_id}",
duration)
+ Stats.timing("dagrun.duration.failed", duration,
tags={"dag_id": dag_run.dag_id})
+ span.set_attribute("error", True)
+ if span.is_recording():
+ span.add_event(
+ name="error",
+ attributes={
+ "message": f"Run {dag_run.run_id} of
{dag_run.dag_id} has timed-out",
+ "duration": str(duration),
+ },
+ )
+ return callback_to_execute
- if not dag or not dag_model:
- self.log.error("Couldn't find DAG %s in DAG bag or database!",
dag_run.dag_id)
- return callback
+ if dag_run.execution_date > timezone.utcnow() and not
dag.allow_future_exec_dates:
+ self.log.error("Execution date is in future: %s",
dag_run.execution_date)
+ return callback
- if (
- dag_run.start_date
- and dag.dagrun_timeout
- and dag_run.start_date < timezone.utcnow() - dag.dagrun_timeout
- ):
- dag_run.set_state(DagRunState.FAILED)
- unfinished_task_instances = session.scalars(
- select(TI)
- .where(TI.dag_id == dag_run.dag_id)
- .where(TI.run_id == dag_run.run_id)
- .where(TI.state.in_(State.unfinished))
- )
- for task_instance in unfinished_task_instances:
- task_instance.state = TaskInstanceState.SKIPPED
- session.merge(task_instance)
- session.flush()
- self.log.info("Run %s of %s has timed-out", dag_run.run_id,
dag_run.dag_id)
+ if not self._verify_integrity_if_dag_changed(dag_run=dag_run,
session=session):
+ self.log.warning(
+ "The DAG disappeared before verifying integrity: %s.
Skipping.", dag_run.dag_id
+ )
+ return callback
+ # TODO[HA]: Rename update_state -> schedule_dag_run, ?? something
else?
+ schedulable_tis, callback_to_run =
dag_run.update_state(session=session, execute_callbacks=False)
if self._should_update_dag_next_dagruns(dag, dag_model,
last_dag_run=dag_run, session=session):
dag_model.calculate_dagrun_date_fields(dag,
dag.get_run_data_interval(dag_run))
+ # This will do one query per dag run. We "could" build up a complex
+ # query to update all the TIs across all the execution dates and
dag
+ # IDs in a single query, but it turns out that can be _very very
slow_
+ # see #11147/commit ee90807ac for more details
+ if span.is_recording():
+ span.add_event(
+ name="schedule_tis",
+ attributes={
+ "message": "dag_run scheduling its tis",
+ "schedulable_tis": [_ti.task_id for _ti in
schedulable_tis],
+ },
+ )
+ dag_run.schedule_tis(schedulable_tis, session,
max_tis_per_query=self.job.max_tis_per_query)
- callback_to_execute = DagCallbackRequest(
- full_filepath=dag.fileloc,
- dag_id=dag.dag_id,
- run_id=dag_run.run_id,
- is_failure_callback=True,
- processor_subdir=dag_model.processor_subdir,
- msg="timed_out",
- )
-
- dag_run.notify_dagrun_state_changed()
- duration = dag_run.end_date - dag_run.start_date
- Stats.timing(f"dagrun.duration.failed.{dag_run.dag_id}", duration)
- Stats.timing("dagrun.duration.failed", duration, tags={"dag_id":
dag_run.dag_id})
- return callback_to_execute
-
- if dag_run.execution_date > timezone.utcnow() and not
dag.allow_future_exec_dates:
- self.log.error("Execution date is in future: %s",
dag_run.execution_date)
- return callback
-
- if not self._verify_integrity_if_dag_changed(dag_run=dag_run,
session=session):
- self.log.warning("The DAG disappeared before verifying integrity:
%s. Skipping.", dag_run.dag_id)
- return callback
- # TODO[HA]: Rename update_state -> schedule_dag_run, ?? something else?
- schedulable_tis, callback_to_run =
dag_run.update_state(session=session, execute_callbacks=False)
-
- if self._should_update_dag_next_dagruns(dag, dag_model,
last_dag_run=dag_run, session=session):
- dag_model.calculate_dagrun_date_fields(dag,
dag.get_run_data_interval(dag_run))
- # This will do one query per dag run. We "could" build up a complex
- # query to update all the TIs across all the execution dates and dag
- # IDs in a single query, but it turns out that can be _very very slow_
- # see #11147/commit ee90807ac for more details
- dag_run.schedule_tis(schedulable_tis, session,
max_tis_per_query=self.job.max_tis_per_query)
-
- return callback_to_run
+ return callback_to_run
def _verify_integrity_if_dag_changed(self, dag_run: DagRun, session:
Session) -> bool:
"""
@@ -1681,20 +1807,27 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
def _emit_pool_metrics(self, session: Session = NEW_SESSION) -> None:
from airflow.models.pool import Pool
- pools = Pool.slots_stats(session=session)
- for pool_name, slot_stats in pools.items():
- Stats.gauge(f"pool.open_slots.{pool_name}", slot_stats["open"])
- Stats.gauge(f"pool.queued_slots.{pool_name}", slot_stats["queued"])
- Stats.gauge(f"pool.running_slots.{pool_name}",
slot_stats["running"])
- Stats.gauge(f"pool.deferred_slots.{pool_name}",
slot_stats["deferred"])
- Stats.gauge(f"pool.scheduled_slots.{pool_name}",
slot_stats["scheduled"])
-
- # Same metrics with tagging
- Stats.gauge("pool.open_slots", slot_stats["open"],
tags={"pool_name": pool_name})
- Stats.gauge("pool.queued_slots", slot_stats["queued"],
tags={"pool_name": pool_name})
- Stats.gauge("pool.running_slots", slot_stats["running"],
tags={"pool_name": pool_name})
- Stats.gauge("pool.deferred_slots", slot_stats["deferred"],
tags={"pool_name": pool_name})
- Stats.gauge("pool.scheduled_slots", slot_stats["scheduled"],
tags={"pool_name": pool_name})
+ with Trace.start_span(span_name="emit_pool_metrics",
component="SchedulerJobRunner") as span:
+ pools = Pool.slots_stats(session=session)
+ for pool_name, slot_stats in pools.items():
+ Stats.gauge(f"pool.open_slots.{pool_name}", slot_stats["open"])
+ Stats.gauge(f"pool.queued_slots.{pool_name}",
slot_stats["queued"])
+ Stats.gauge(f"pool.running_slots.{pool_name}",
slot_stats["running"])
+ Stats.gauge(f"pool.deferred_slots.{pool_name}",
slot_stats["deferred"])
+ Stats.gauge(f"pool.scheduled_slots.{pool_name}",
slot_stats["scheduled"])
+
+ # Same metrics with tagging
+ Stats.gauge("pool.open_slots", slot_stats["open"],
tags={"pool_name": pool_name})
+ Stats.gauge("pool.queued_slots", slot_stats["queued"],
tags={"pool_name": pool_name})
+ Stats.gauge("pool.running_slots", slot_stats["running"],
tags={"pool_name": pool_name})
+ Stats.gauge("pool.deferred_slots", slot_stats["deferred"],
tags={"pool_name": pool_name})
+ Stats.gauge("pool.scheduled_slots", slot_stats["scheduled"],
tags={"pool_name": pool_name})
+
+ span.set_attribute("category", "scheduler")
+ span.set_attribute(f"pool.open_slots.{pool_name}",
slot_stats["open"])
+ span.set_attribute(f"pool.queued_slots.{pool_name}",
slot_stats["queued"])
+ span.set_attribute(f"pool.running_slots.{pool_name}",
slot_stats["running"])
+ span.set_attribute(f"pool.deferred_slots.{pool_name}",
slot_stats["deferred"])
@provide_session
def adopt_or_reset_orphaned_tasks(self, session: Session = NEW_SESSION) ->
int:
diff --git a/airflow/jobs/triggerer_job_runner.py
b/airflow/jobs/triggerer_job_runner.py
index e1736ae8e5..080323a1d1 100644
--- a/airflow/jobs/triggerer_job_runner.py
+++ b/airflow/jobs/triggerer_job_runner.py
@@ -37,6 +37,7 @@ from airflow.jobs.base_job_runner import BaseJobRunner
from airflow.jobs.job import perform_heartbeat
from airflow.models.trigger import Trigger
from airflow.stats import Stats
+from airflow.traces.tracer import Trace, span
from airflow.triggers.base import TriggerEvent
from airflow.typing_compat import TypedDict
from airflow.utils import timezone
@@ -362,26 +363,43 @@ class TriggererJobRunner(BaseJobRunner, LoggingMixin):
if not self.trigger_runner.is_alive():
self.log.error("Trigger runner thread has died! Exiting.")
break
- # Clean out unused triggers
- Trigger.clean_unused()
- # Load/delete triggers
- self.load_triggers()
- # Handle events
- self.handle_events()
- # Handle failed triggers
- self.handle_failed_triggers()
- perform_heartbeat(self.job,
heartbeat_callback=self.heartbeat_callback, only_if_necessary=True)
- # Collect stats
- self.emit_metrics()
+ with Trace.start_span(span_name="triggerer_job_loop",
component="TriggererJobRunner") as span:
+ # Clean out unused triggers
+ if span.is_recording():
+ span.add_event(name="Trigger.clean_unused")
+ Trigger.clean_unused()
+ # Load/delete triggers
+ if span.is_recording():
+ span.add_event(name="load_triggers")
+ self.load_triggers()
+ # Handle events
+ if span.is_recording():
+ span.add_event(name="handle_events")
+ self.handle_events()
+ # Handle failed triggers
+ if span.is_recording():
+ span.add_event(name="handle_failed_triggers")
+ self.handle_failed_triggers()
+ if span.is_recording():
+ span.add_event(name="perform_heartbeat")
+ perform_heartbeat(
+ self.job, heartbeat_callback=self.heartbeat_callback,
only_if_necessary=True
+ )
+ # Collect stats
+ if span.is_recording():
+ span.add_event(name="emit_metrics")
+ self.emit_metrics()
# Idle sleep
time.sleep(1)
+ @span
def load_triggers(self):
"""Query the database for the triggers we're supposed to be running
and update the runner."""
Trigger.assign_unassigned(self.job.id, self.capacity,
self.health_check_threshold)
ids = Trigger.ids_for_triggerer(self.job.id)
self.trigger_runner.update_triggers(set(ids))
+ @span
def handle_events(self):
"""Dispatch outbound events to the Trigger model which pushes them to
the relevant task instances."""
while self.trigger_runner.events:
@@ -392,6 +410,7 @@ class TriggererJobRunner(BaseJobRunner, LoggingMixin):
# Emit stat event
Stats.incr("triggers.succeeded")
+ @span
def handle_failed_triggers(self):
"""
Handle "failed" triggers. - ones that errored or exited before they
sent an event.
@@ -405,11 +424,15 @@ class TriggererJobRunner(BaseJobRunner, LoggingMixin):
# Emit stat event
Stats.incr("triggers.failed")
+ @span
def emit_metrics(self):
Stats.gauge(f"triggers.running.{self.job.hostname}",
len(self.trigger_runner.triggers))
Stats.gauge(
"triggers.running", len(self.trigger_runner.triggers),
tags={"hostname": self.job.hostname}
)
+ span = Trace.get_current_span()
+ span.set_attribute("trigger host", self.job.hostname)
+ span.set_attribute("triggers running",
len(self.trigger_runner.triggers))
class TriggerDetails(TypedDict):
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 6c3d0715b9..d4ef937e9d 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -62,7 +62,9 @@ from airflow.models.tasklog import LogTemplate
from airflow.stats import Stats
from airflow.ti_deps.dep_context import DepContext
from airflow.ti_deps.dependencies_states import SCHEDULEABLE_STATES
+from airflow.traces.tracer import Trace
from airflow.utils import timezone
+from airflow.utils.dates import datetime_to_nano
from airflow.utils.helpers import chunks, is_container, prune_dict
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.session import NEW_SESSION, provide_session
@@ -919,6 +921,37 @@ class DagRun(Base, LoggingMixin):
self.data_interval_end,
self.dag_hash,
)
+
+ with Trace.start_span_from_dagrun(dagrun=self) as span:
+ if self._state is DagRunState.FAILED:
+ span.set_attribute("error", True)
+ attributes = {
+ "category": "DAG runs",
+ "dag_id": str(self.dag_id),
+ "execution_date": str(self.execution_date),
+ "run_id": str(self.run_id),
+ "queued_at": str(self.queued_at),
+ "run_start_date": str(self.start_date),
+ "run_end_date": str(self.end_date),
+ "run_duration": str(
+ (self.end_date - self.start_date).total_seconds()
+ if self.start_date and self.end_date
+ else 0
+ ),
+ "state": str(self._state),
+ "external_trigger": str(self.external_trigger),
+ "run_type": str(self.run_type),
+ "data_interval_start": str(self.data_interval_start),
+ "data_interval_end": str(self.data_interval_end),
+ "dag_hash": str(self.dag_hash),
+ "conf": str(self.conf),
+ }
+ if span.is_recording():
+ span.add_event(name="queued",
timestamp=datetime_to_nano(self.queued_at))
+ span.add_event(name="started",
timestamp=datetime_to_nano(self.start_date))
+ span.add_event(name="ended",
timestamp=datetime_to_nano(self.end_date))
+ span.set_attributes(attributes)
+
session.flush()
self._emit_true_scheduling_delay_stats_for_finished_state(finished_tis)
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 1dacbe7525..27eb5c26c2 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -108,6 +108,7 @@ from airflow.stats import Stats
from airflow.templates import SandboxedEnvironment
from airflow.ti_deps.dep_context import DepContext
from airflow.ti_deps.dependencies_deps import REQUEUEABLE_DEPS, RUNNING_DEPS
+from airflow.traces.tracer import Trace
from airflow.utils import timezone
from airflow.utils.context import (
ConnectionAccessor,
@@ -1211,6 +1212,27 @@ def _handle_failure(
if not test_mode:
TaskInstance.save_to_db(failure_context["ti"], session)
+ with Trace.start_span_from_taskinstance(ti=task_instance) as span:
+ # ---- error info ----
+ span.set_attribute("error", "true")
+ span.set_attribute("error_msg", str(error))
+ span.set_attribute("context", context)
+ span.set_attribute("force_fail", force_fail)
+ # ---- common info ----
+ span.set_attribute("category", "DAG runs")
+ span.set_attribute("task_id", task_instance.task_id)
+ span.set_attribute("dag_id", task_instance.dag_id)
+ span.set_attribute("state", task_instance.state)
+ span.set_attribute("start_date", str(task_instance.start_date))
+ span.set_attribute("end_date", str(task_instance.end_date))
+ span.set_attribute("duration", task_instance.duration)
+ span.set_attribute("executor_config",
str(task_instance.executor_config))
+ span.set_attribute("execution_date", str(task_instance.execution_date))
+ span.set_attribute("hostname", task_instance.hostname)
+ if isinstance(task_instance, TaskInstance):
+ span.set_attribute("log_url", task_instance.log_url)
+ span.set_attribute("operator", str(task_instance.operator))
+
def _refresh_from_task(
*, task_instance: TaskInstance | TaskInstancePydantic, task: Operator,
pool_override: str | None = None
diff --git a/airflow/traces/__init__.py b/airflow/traces/__init__.py
index abe55b5103..7b2f416872 100644
--- a/airflow/traces/__init__.py
+++ b/airflow/traces/__init__.py
@@ -18,3 +18,4 @@ from __future__ import annotations
TRACEPARENT = "traceparent"
TRACESTATE = "tracestate"
+NO_TRACE_ID = 1
diff --git a/airflow/traces/tracer.py b/airflow/traces/tracer.py
index 88999abe2b..1d58717287 100644
--- a/airflow/traces/tracer.py
+++ b/airflow/traces/tracer.py
@@ -96,6 +96,9 @@ class EmptySpan:
"""Set multiple attributes at once."""
pass
+ def is_recording(self):
+ return False
+
def add_event(
self,
name: str,
diff --git a/airflow/traces/utils.py b/airflow/traces/utils.py
index eaf3c1c065..afab2591d5 100644
--- a/airflow/traces/utils.py
+++ b/airflow/traces/utils.py
@@ -20,6 +20,7 @@ from __future__ import annotations
import logging
from typing import TYPE_CHECKING
+from airflow.traces import NO_TRACE_ID
from airflow.utils.hashlib_wrapper import md5
from airflow.utils.state import TaskInstanceState
@@ -40,9 +41,12 @@ def _gen_id(seeds: list[str], as_int: bool = False, type:
int = TRACE_ID) -> str
def gen_trace_id(dag_run: DagRun, as_int: bool = False) -> str | int:
+ if dag_run.start_date is None:
+ return NO_TRACE_ID
+
"""Generate trace id from DagRun."""
return _gen_id(
- [dag_run.dag_id, dag_run.run_id, str(dag_run.start_date.timestamp())],
+ [dag_run.dag_id, str(dag_run.run_id),
str(dag_run.start_date.timestamp())],
as_int,
)
@@ -50,7 +54,7 @@ def gen_trace_id(dag_run: DagRun, as_int: bool = False) ->
str | int:
def gen_span_id_from_ti_key(ti_key: TaskInstanceKey, as_int: bool = False) ->
str | int:
"""Generate span id from TI key."""
return _gen_id(
- [ti_key.dag_id, ti_key.run_id, ti_key.task_id, str(ti_key.try_number)],
+ [ti_key.dag_id, str(ti_key.run_id), ti_key.task_id,
str(ti_key.try_number)],
as_int,
SPAN_ID,
)
@@ -58,8 +62,11 @@ def gen_span_id_from_ti_key(ti_key: TaskInstanceKey, as_int:
bool = False) -> st
def gen_dag_span_id(dag_run: DagRun, as_int: bool = False) -> str | int:
"""Generate dag's root span id using dag_run."""
+ if dag_run.start_date is None:
+ return NO_TRACE_ID
+
return _gen_id(
- [dag_run.dag_id, dag_run.run_id, str(dag_run.start_date.timestamp())],
+ [dag_run.dag_id, str(dag_run.run_id),
str(dag_run.start_date.timestamp())],
as_int,
SPAN_ID,
)
diff --git a/scripts/ci/docker-compose/integration-otel.yml
b/scripts/ci/docker-compose/integration-otel.yml
index 6573709bc3..7a635c17c7 100644
--- a/scripts/ci/docker-compose/integration-otel.yml
+++ b/scripts/ci/docker-compose/integration-otel.yml
@@ -54,7 +54,7 @@ services:
- ./grafana/volume/provisioning:/grafana/provisioning
jaeger:
- image: jaegertracing/all-in-one
+ image: jaegertracing/all-in-one:1.57
container_name: "breeze-jaeger"
environment:
COLLECTOR_OTLP_ENABLED: true