This is an automated email from the ASF dual-hosted git repository.

ferruzzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c18288fe58c Add some basic metrics related to Deadline Alerts (#55971)
c18288fe58c is described below

commit c18288fe58c154151618aa4a439c327ad9ee2aa8
Author: D. Ferruzzi <[email protected]>
AuthorDate: Tue Sep 23 13:59:06 2025 -0700

    Add some basic metrics related to Deadline Alerts (#55971)
---
 airflow-core/src/airflow/models/deadline.py          | 20 +++++++++++++++++++-
 .../src/airflow/serialization/serialized_objects.py  |  2 ++
 2 files changed, 21 insertions(+), 1 deletion(-)

diff --git a/airflow-core/src/airflow/models/deadline.py 
b/airflow-core/src/airflow/models/deadline.py
index 4b3560fc99b..21e49a36ed1 100644
--- a/airflow-core/src/airflow/models/deadline.py
+++ b/airflow-core/src/airflow/models/deadline.py
@@ -36,7 +36,8 @@ from airflow.models import Trigger
 from airflow.models.base import Base
 from airflow.serialization.serde import deserialize, serialize
 from airflow.settings import json
-from airflow.triggers.deadline import PAYLOAD_STATUS_KEY, 
DeadlineCallbackTrigger
+from airflow.stats import Stats
+from airflow.triggers.deadline import PAYLOAD_BODY_KEY, PAYLOAD_STATUS_KEY, 
DeadlineCallbackTrigger
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.session import provide_session
 from airflow.utils.sqlalchemy import UtcDateTime
@@ -182,6 +183,10 @@ class Deadline(Base):
             if dagrun.end_date <= deadline.deadline_time:
                 # If the DagRun finished before the Deadline:
                 session.delete(deadline)
+                Stats.incr(
+                    "deadline_alerts.deadline_not_missed",
+                    tags={"dag_id": dagrun.dag_id, "dagrun_id": dagrun.run_id},
+                )
                 deleted_count += 1
                 dagruns_to_refresh.add(dagrun)
         session.flush()
@@ -230,6 +235,10 @@ class Deadline(Base):
 
         self.callback_state = DeadlineCallbackState.QUEUED
         session.add(self)
+        Stats.incr(
+            "deadline_alerts.deadline_missed",
+            tags={"dag_id": self.dagrun.dag_id, "dagrun_id": 
self.dagrun.run_id},
+        )
 
     def handle_callback_event(self, event: TriggerEvent, session: Session):
         if (status := event.payload.get(PAYLOAD_STATUS_KEY)) and status in {
@@ -240,6 +249,15 @@ class Deadline(Base):
             self.callback_state = status
             if status != DeadlineCallbackState.RUNNING:
                 self.trigger = None
+                metric_tags = {
+                    "dag_id": self.dagrun.dag_id,
+                    "callback": self._callback,
+                    "result": event.payload.get(PAYLOAD_BODY_KEY),
+                }
+                if status == DeadlineCallbackState.FAILED:
+                    Stats.incr("deadline_alerts.deadline_callback_failure", 
tags=metric_tags)
+                elif status == DeadlineCallbackState.SUCCESS:
+                    Stats.incr("deadline_alerts.deadline_callback_success", 
tags=metric_tags)
             session.add(self)
         else:
             logger.error("Unexpected event received: %s", event.payload)
diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py 
b/airflow-core/src/airflow/serialization/serialized_objects.py
index 21ebdeed657..a9a29a6b4d5 100644
--- a/airflow-core/src/airflow/serialization/serialized_objects.py
+++ b/airflow-core/src/airflow/serialization/serialized_objects.py
@@ -98,6 +98,7 @@ from airflow.serialization.enums import DagAttributeTypes as 
DAT, Encoding
 from airflow.serialization.helpers import serialize_template_field
 from airflow.serialization.json_schema import load_dag_schema
 from airflow.settings import DAGS_FOLDER, json
+from airflow.stats import Stats
 from airflow.task.priority_strategy import (
     PriorityWeightStrategy,
     airflow_priority_weight_strategies,
@@ -3288,6 +3289,7 @@ class SerializedDAG(BaseSerialization):
                                 dagrun_id=orm_dagrun.id,
                             )
                         )
+                        Stats.incr("deadline_alerts.deadline_created", 
tags={"dag_id": self.dag_id})
 
         return orm_dagrun
 

Reply via email to