Copilot commented on code in PR #67572:
URL: https://github.com/apache/airflow/pull/67572#discussion_r3306371720
##########
airflow-core/src/airflow/models/serialized_dag.py:
##########
@@ -875,6 +875,23 @@ def has_dag(cls, dag_id: str, session: Session =
NEW_SESSION) -> bool:
"""
return session.scalar(select(literal(True)).where(cls.dag_id ==
dag_id).limit(1)) is not None
+ @classmethod
+ @provide_session
+ def get_count(cls, session: Session = NEW_SESSION) -> int:
+ """
+ Return the total number of serialized DAGs stored in the database.
+
+ :param session: ORM Session
+ :raises RuntimeError: if the database returns None for the COUNT
query, which indicates
+ a transient connectivity issue rather than an empty table (COUNT
always returns an int).
+ """
+ result = session.scalar(select(func.count()).select_from(cls))
+ if result is None:
+ raise RuntimeError(
+ "COUNT query on serialized_dag returned None - possible
database connectivity issue"
+ )
+ return result
Review Comment:
`COUNT(*)` queries should always return a row with an integer (including 0),
and DB connectivity failures should typically surface as `SQLAlchemyError`
rather than `None`. Consider switching to an API that enforces “exactly one
row” (e.g., executing the statement and using `scalar_one()`), and then
returning `int(result)` to guarantee the declared return type. This removes an
effectively-unreachable `None` branch and clarifies the error model.
##########
airflow-core/src/airflow/dag_processing/manager.py:
##########
@@ -1566,6 +1571,11 @@ def emit_metrics(*, parse_time: float, dag_file_stats:
Sequence[DagFileStat]):
stats.gauge("dag_processing.total_parse_time", parse_time)
stats.gauge("dagbag_size", sum(stat.num_dags for stat in dag_file_stats))
stats.gauge("dag_processing.import_errors", sum(stat.import_errors for
stat in dag_file_stats))
+ try:
+ with create_session() as session:
+ stats.gauge("serialized_dag.count",
SerializedDagModel.get_count(session=session))
+ except (RuntimeError, SQLAlchemyError):
+ log.exception("Failed to emit serialized_dag.count metric")
Review Comment:
Emitting `serialized_dag.count` via a `COUNT(*)` each parsing loop can be
expensive on large tables (many databases will scan the table for an exact
count), and this code runs on a periodic cadence. Consider throttling (e.g.,
emit every N loops / with a time-based cache), or sourcing the count from an
already-available in-process value updated during serialization/sync, to avoid
adding frequent full-count queries to the metadata DB.
##########
airflow-core/tests/unit/dag_processing/test_manager.py:
##########
@@ -2790,3 +2790,32 @@ def test_bundle_version_data_stored_after_refresh(self,
session):
assert manager._bundle_versions["mock_bundle"] == "newhash"
assert manager._bundle_version_data["mock_bundle"] == test_data
+
+
+class TestEmitMetrics:
+ """Tests for the emit_metrics module-level function."""
+
+ def test_emit_metrics_emits_serialized_dag_count(self):
+ """emit_metrics emits the serialized_dag.count gauge with the value
from get_count."""
+ from airflow.dag_processing.manager import emit_metrics
+
+ with mock.patch(
+ "airflow.dag_processing.manager.SerializedDagModel.get_count",
return_value=3
+ ):
+ with mock.patch("airflow.dag_processing.manager.stats") as
mock_stats:
+ emit_metrics(parse_time=1.0, dag_file_stats=[])
+
+ calls = {call[0][0]: call[0][1] for call in
mock_stats.gauge.call_args_list}
+ assert "serialized_dag.count" in calls
+ assert calls["serialized_dag.count"] == 3
+
+ def test_emit_metrics_does_not_raise_on_db_error(self):
+ """emit_metrics logs and swallows RuntimeError from get_count on DB
failure."""
+ from airflow.dag_processing.manager import emit_metrics
+
+ with mock.patch(
+ "airflow.dag_processing.manager.SerializedDagModel.get_count",
+ side_effect=RuntimeError("COUNT query on serialized_dag returned
None - possible"),
+ ):
+ with mock.patch("airflow.dag_processing.manager.stats"):
+ emit_metrics(parse_time=1.0, dag_file_stats=[])
Review Comment:
This test currently only verifies “no exception”. Since the docstring claims
it “logs and swallows”, it should also assert the logging side-effect (e.g.,
patch `airflow.dag_processing.manager.log` and assert `log.exception` called
once) and/or assert the `serialized_dag.count` gauge was not emitted when
`get_count()` fails. This will make the test actually validate the new
error-handling behavior.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]