Copilot commented on code in PR #67572:
URL: https://github.com/apache/airflow/pull/67572#discussion_r3306371720


##########
airflow-core/src/airflow/models/serialized_dag.py:
##########
@@ -875,6 +875,23 @@ def has_dag(cls, dag_id: str, session: Session = 
NEW_SESSION) -> bool:
         """
         return session.scalar(select(literal(True)).where(cls.dag_id == 
dag_id).limit(1)) is not None
 
+    @classmethod
+    @provide_session
+    def get_count(cls, session: Session = NEW_SESSION) -> int:
+        """
+        Return the total number of serialized DAGs stored in the database.
+
+        :param session: ORM Session
+        :raises RuntimeError: if the database returns None for the COUNT 
query, which indicates
+            a transient connectivity issue rather than an empty table (COUNT 
always returns an int).
+        """
+        result = session.scalar(select(func.count()).select_from(cls))
+        if result is None:
+            raise RuntimeError(
+                "COUNT query on serialized_dag returned None - possible 
database connectivity issue"
+            )
+        return result

Review Comment:
   `COUNT(*)` queries should always return a row with an integer (including 0), 
and DB connectivity failures should typically surface as `SQLAlchemyError` 
rather than `None`. Consider switching to an API that enforces “exactly one 
row” (e.g., executing the statement and using `scalar_one()`), and then 
returning `int(result)` to guarantee the declared return type. This removes an 
effectively-unreachable `None` branch and clarifies the error model.
   



##########
airflow-core/src/airflow/dag_processing/manager.py:
##########
@@ -1566,6 +1571,11 @@ def emit_metrics(*, parse_time: float, dag_file_stats: 
Sequence[DagFileStat]):
     stats.gauge("dag_processing.total_parse_time", parse_time)
     stats.gauge("dagbag_size", sum(stat.num_dags for stat in dag_file_stats))
     stats.gauge("dag_processing.import_errors", sum(stat.import_errors for 
stat in dag_file_stats))
+    try:
+        with create_session() as session:
+            stats.gauge("serialized_dag.count", 
SerializedDagModel.get_count(session=session))
+    except (RuntimeError, SQLAlchemyError):
+        log.exception("Failed to emit serialized_dag.count metric")

Review Comment:
   Emitting `serialized_dag.count` via a `COUNT(*)` each parsing loop can be 
expensive on large tables (many databases will scan the table for an exact 
count), and this code runs on a periodic cadence. Consider throttling (e.g., 
emit every N loops / with a time-based cache), or sourcing the count from an 
already-available in-process value updated during serialization/sync, to avoid 
adding frequent full-count queries to the metadata DB.



##########
airflow-core/tests/unit/dag_processing/test_manager.py:
##########
@@ -2790,3 +2790,32 @@ def test_bundle_version_data_stored_after_refresh(self, 
session):
 
         assert manager._bundle_versions["mock_bundle"] == "newhash"
         assert manager._bundle_version_data["mock_bundle"] == test_data
+
+
+class TestEmitMetrics:
+    """Tests for the emit_metrics module-level function."""
+
+    def test_emit_metrics_emits_serialized_dag_count(self):
+        """emit_metrics emits the serialized_dag.count gauge with the value 
from get_count."""
+        from airflow.dag_processing.manager import emit_metrics
+
+        with mock.patch(
+            "airflow.dag_processing.manager.SerializedDagModel.get_count", 
return_value=3
+        ):
+            with mock.patch("airflow.dag_processing.manager.stats") as 
mock_stats:
+                emit_metrics(parse_time=1.0, dag_file_stats=[])
+
+        calls = {call[0][0]: call[0][1] for call in 
mock_stats.gauge.call_args_list}
+        assert "serialized_dag.count" in calls
+        assert calls["serialized_dag.count"] == 3
+
+    def test_emit_metrics_does_not_raise_on_db_error(self):
+        """emit_metrics logs and swallows RuntimeError from get_count on DB 
failure."""
+        from airflow.dag_processing.manager import emit_metrics
+
+        with mock.patch(
+            "airflow.dag_processing.manager.SerializedDagModel.get_count",
+            side_effect=RuntimeError("COUNT query on serialized_dag returned 
None - possible"),
+        ):
+            with mock.patch("airflow.dag_processing.manager.stats"):
+                emit_metrics(parse_time=1.0, dag_file_stats=[])

Review Comment:
   This test currently only verifies “no exception”. Since the docstring claims 
it “logs and swallows”, it should also assert the logging side-effect (e.g., 
patch `airflow.dag_processing.manager.log` and assert `log.exception` called 
once) and/or assert the `serialized_dag.count` gauge was not emitted when 
`get_count()` fails. This will make the test actually validate the new 
error-handling behavior.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to