This is an automated email from the ASF dual-hosted git repository.
rahulvats pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-1-test by this push:
new c2a22bfb0b9 [V3-1-test] Fix broken `dag_processing.total_parse_time`
metric (#62128) (#62764)
c2a22bfb0b9 is described below
commit c2a22bfb0b93a491e170f2441efdecce760671d0
Author: Rahul Vats <[email protected]>
AuthorDate: Tue Mar 3 15:43:07 2026 +0530
[V3-1-test] Fix broken `dag_processing.total_parse_time` metric (#62128)
(#62764)
* Fix broken `dag_processing.total_parse_time` metric (#62128)
DagFileProcessorManager has been emitting a nonsense value for
`dag_processing.total_parse_time` since 8774f28d76, which reversed the
order in which `emit_metrics` and `prepare_file_queue` (then called
`prepare_file_path_queue`) were called.
As `prepare_file_path_queue` was responsible for resetting the value of
`self._parsing_start_time`, the assumption made by `emit_metrics` was
that it would be called once the file queue had been cleared, but
crucially before `prepare_file_queue` was called to refill the queue.
Additionally, there was no guarantee that we'd parsed any files at all
since the last time the metric was emitted. If no work was due, we'd
gladly emit near-zero metrics every time around the while loop.
I've rearranged things in such a way that I hope will be harder to
accidentally break in future:
- `self._parsing_start_time` may be reset whenever files are added to
the queue, if it was not set already.
- metrics are emitted when `prepare_file_queue` is called -- when the
queue is empty -- but only if `self._parsing_start_time` is set,
meaning only if we've actually parsed any files since the last time
metrics were emitted.
Together, this means we should now emit metrics once per parsing loop.
I've added a test which fails on main and passes on this branch.
(cherry picked from commit 57a7c64a77503fef4eb7c6801a28a628a4098535)
* fix static checks
---------
Co-authored-by: Nick Stenning <[email protected]>
---
airflow-core/src/airflow/dag_processing/manager.py | 102 ++++++++++++---------
airflow-core/src/airflow/typing_compat.py | 5 +-
.../tests/unit/dag_processing/test_manager.py | 31 ++++++-
3 files changed, 94 insertions(+), 44 deletions(-)
diff --git a/airflow-core/src/airflow/dag_processing/manager.py
b/airflow-core/src/airflow/dag_processing/manager.py
index 65d41533e24..e8e5651d324 100644
--- a/airflow-core/src/airflow/dag_processing/manager.py
+++ b/airflow-core/src/airflow/dag_processing/manager.py
@@ -37,7 +37,7 @@ from dataclasses import dataclass, field
from datetime import datetime, timedelta
from operator import attrgetter, itemgetter
from pathlib import Path
-from typing import TYPE_CHECKING, Any, NamedTuple, cast
+from typing import TYPE_CHECKING, Any, Literal, NamedTuple, cast
import attrs
import structlog
@@ -65,6 +65,7 @@ from airflow.sdk import SecretCache
from airflow.sdk.log import init_log_file, logging_processors
from airflow.stats import Stats
from airflow.traces.tracer import DebugTrace
+from airflow.typing_compat import assert_never
from airflow.utils.file import list_py_file_paths, might_contain_dag
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.net import get_hostname
@@ -76,6 +77,7 @@ from airflow.utils.session import NEW_SESSION,
create_session, provide_session
from airflow.utils.sqlalchemy import prohibit_commit, with_row_locks
if TYPE_CHECKING:
+ from collections.abc import Sequence
from socket import socket
from sqlalchemy.orm import Session
@@ -206,7 +208,7 @@ class DagFileProcessorManager(LoggingMixin):
_processors: dict[DagFileInfo, DagFileProcessorProcess] =
attrs.field(factory=dict, init=False)
- _parsing_start_time: float = attrs.field(init=False)
+ _parsing_start_time: float | None = attrs.field(default=None, init=False)
_num_run: int = attrs.field(default=0, init=False)
_callback_to_execute: dict[DagFileInfo, list[CallbackRequest]] =
attrs.field(
@@ -391,7 +393,6 @@ class DagFileProcessorManager(LoggingMixin):
# clear down, we must have cleared all files found from
scanning the dags dir _and_ have
# cleared all files added as a result of callbacks
self.prepare_file_queue(known_files=known_files)
- self.emit_metrics()
self._start_new_processes()
@@ -449,16 +450,8 @@ class DagFileProcessorManager(LoggingMixin):
def _queue_requested_files_for_parsing(self) -> None:
"""Queue any files requested for parsing as requested by users via
UI/API."""
files = self._get_priority_files()
- bundles_to_refresh: set[str] = set()
- for file in files:
- # Try removing the file if already present
- with contextlib.suppress(ValueError):
- self._file_queue.remove(file)
- # enqueue file to the start of the queue.
- self._file_queue.appendleft(file)
- bundles_to_refresh.add(file.bundle_name)
-
- self._force_refresh_bundles |= bundles_to_refresh
+ self._add_files_to_queue(files, mode="frontprio")
+ self._force_refresh_bundles |= {file.bundle_name for file in files}
if self._force_refresh_bundles:
self.log.info("Bundles being force refreshed: %s", ",
".join(self._force_refresh_bundles))
@@ -535,7 +528,7 @@ class DagFileProcessorManager(LoggingMixin):
bundle_version=request.bundle_version,
)
self._callback_to_execute[file_info].append(request)
- self._add_files_to_queue([file_info], True)
+ self._add_files_to_queue([file_info], mode="front")
Stats.incr("dag_processing.other_callback_count")
def _refresh_dag_bundles(self, known_files: dict[str, set[DagFileInfo]]):
@@ -1005,7 +998,7 @@ class DagFileProcessorManager(LoggingMixin):
if new_files:
self.log.info("Adding %d new files to the front of the queue",
len(new_files))
- self._add_files_to_queue(new_files, True)
+ self._add_files_to_queue(new_files, mode="front")
def _resort_file_queue(self):
if self._file_parsing_sort_mode == "modified_time" and
self._file_queue:
@@ -1060,7 +1053,15 @@ class DagFileProcessorManager(LoggingMixin):
Note this method is only called when the file path queue is empty
"""
- self._parsing_start_time = time.perf_counter()
+ # We only emit metrics after processing all files in the queue. If
`self._parsing_start_time` is None
+ # when this method is called, no files have yet been added to the
queue so we shouldn't emit metrics.
+ if self._parsing_start_time is not None:
+ emit_metrics(
+ parse_time=time.perf_counter() - self._parsing_start_time,
+ stats=list(self._file_stats.values()),
+ )
+ self._parsing_start_time = None
+
# If the file path is already being processed, or if a file was
# processed recently, wait until the next batch
in_progress = set(self._processors)
@@ -1106,7 +1107,7 @@ class DagFileProcessorManager(LoggingMixin):
"Queuing the following files for processing:\n\t%s",
"\n\t".join(str(f.rel_path) for f in to_queue),
)
- self._add_files_to_queue(to_queue, False)
+ self._add_files_to_queue(to_queue, mode="back")
Stats.incr("dag_processing.file_path_queue_update_count")
def _kill_timed_out_processors(self):
@@ -1144,13 +1145,34 @@ class DagFileProcessorManager(LoggingMixin):
processor = self._processors.pop(proc)
processor.logger_filehandle.close()
- def _add_files_to_queue(self, files: list[DagFileInfo], add_at_front:
bool):
+ def _add_files_to_queue(
+ self,
+ files: list[DagFileInfo],
+ *,
+ mode: Literal["front", "back", "frontprio"],
+ ):
"""Add stuff to the back or front of the file queue, unless it's
already present."""
- new_files = list(f for f in files if f not in self._file_queue)
- if add_at_front:
+ if mode == "frontprio":
+ for file in files:
+ # Try removing the file if already present
+ with contextlib.suppress(ValueError):
+ self._file_queue.remove(file)
+ # enqueue file to the start of the queue.
+ self._file_queue.appendleft(file)
+ elif mode == "front":
+ new_files = list(f for f in files if f not in self._file_queue)
self._file_queue.extendleft(new_files)
- else:
+ elif mode == "back":
+ new_files = list(f for f in files if f not in self._file_queue)
self._file_queue.extend(new_files)
+ else:
+ assert_never(mode)
+
+ # If we've just added files to the queue for the first time since
metrics were last emitted, reset the
+ # parse time counter.
+ if self._parsing_start_time is None and self._file_queue:
+ self._parsing_start_time = time.perf_counter()
+
Stats.gauge("dag_processing.file_path_queue_size",
len(self._file_queue))
def max_runs_reached(self):
@@ -1177,27 +1199,25 @@ class DagFileProcessorManager(LoggingMixin):
if pids_to_kill:
kill_child_processes_by_pids(pids_to_kill)
- def emit_metrics(self):
- """
- Emit metrics about dag parsing summary.
- This is called once every time around the parsing "loop" - i.e. after
- all files have been parsed.
- """
- with DebugTrace.start_span(span_name="emit_metrics",
component="DagFileProcessorManager") as span:
- parse_time = time.perf_counter() - self._parsing_start_time
- Stats.gauge("dag_processing.total_parse_time", parse_time)
- Stats.gauge("dagbag_size", sum(stat.num_dags for stat in
self._file_stats.values()))
- Stats.gauge(
- "dag_processing.import_errors", sum(stat.import_errors for
stat in self._file_stats.values())
- )
- span.set_attributes(
- {
- "total_parse_time": parse_time,
- "dag_bag_size": sum(stat.num_dags for stat in
self._file_stats.values()),
- "import_errors": sum(stat.import_errors for stat in
self._file_stats.values()),
- }
- )
+def emit_metrics(*, parse_time: float, stats: Sequence[DagFileStat]):
+ """
+ Emit metrics about dag parsing summary.
+
+ This is called once every time around the parsing "loop" - i.e. after
+ all files have been parsed.
+ """
+ with DebugTrace.start_span(span_name="emit_metrics",
component="DagFileProcessorManager") as span:
+ Stats.gauge("dag_processing.total_parse_time", parse_time)
+ Stats.gauge("dagbag_size", sum(stat.num_dags for stat in stats))
+ Stats.gauge("dag_processing.import_errors", sum(stat.import_errors for
stat in stats))
+ span.set_attributes(
+ {
+ "total_parse_time": parse_time,
+ "dag_bag_size": sum(stat.num_dags for stat in stats),
+ "import_errors": sum(stat.import_errors for stat in stats),
+ }
+ )
def process_parse_results(
diff --git a/airflow-core/src/airflow/typing_compat.py
b/airflow-core/src/airflow/typing_compat.py
index 8a00ac06bd7..c72d5607bd1 100644
--- a/airflow-core/src/airflow/typing_compat.py
+++ b/airflow-core/src/airflow/typing_compat.py
@@ -25,6 +25,7 @@ __all__ = [
"Self",
"TypeAlias",
"TypeGuard",
+ "assert_never",
]
import sys
@@ -33,6 +34,6 @@ import sys
from typing import Literal, ParamSpec, TypeAlias, TypeGuard
if sys.version_info >= (3, 11):
- from typing import Self
+ from typing import Self, assert_never
else:
- from typing_extensions import Self
+ from typing_extensions import Self, assert_never
diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py
b/airflow-core/tests/unit/dag_processing/test_manager.py
index 00de3148c96..7ae185d9aa5 100644
--- a/airflow-core/tests/unit/dag_processing/test_manager.py
+++ b/airflow-core/tests/unit/dag_processing/test_manager.py
@@ -26,7 +26,7 @@ import shutil
import signal
import textwrap
import time
-from collections import deque
+from collections import defaultdict, deque
from datetime import datetime, timedelta
from pathlib import Path
from socket import socket, socketpair
@@ -1379,3 +1379,32 @@ class TestDagFileProcessorManager:
bundle_names_being_parsed = {b.name for b in manager._dag_bundles}
assert bundle_names_being_parsed == expected
+
+ @mock.patch("airflow.dag_processing.manager.Stats.gauge")
+ def test_stats_total_parse_time(self, statsd_gauge_mock, tmp_path,
configure_testing_dag_bundle):
+ key = "dag_processing.total_parse_time"
+ gauge_values = defaultdict(list)
+ statsd_gauge_mock.side_effect = lambda name, value:
gauge_values[name].append(value)
+
+ dag_path = tmp_path / "temp_dag.py"
+ dag_code = textwrap.dedent(
+ """
+ from airflow import DAG
+ dag = DAG(dag_id='temp_dag')
+ """
+ )
+ dag_path.write_text(dag_code)
+
+ with configure_testing_dag_bundle(tmp_path):
+ manager = DagFileProcessorManager(max_runs=0)
+
+ for _ in range(3):
+ manager.max_runs += 1
+ manager.run()
+
+ assert key in gauge_values
+ assert len(gauge_values[key]) == 1
+ assert gauge_values[key][0] >= 1e-4
+
+ dag_path.touch() # make the loop run faster
+ gauge_values.clear()