This is an automated email from the ASF dual-hosted git repository.
ferruzzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c18288fe58c Add some basic metrics related to Deadline Alerts (#55971)
c18288fe58c is described below
commit c18288fe58c154151618aa4a439c327ad9ee2aa8
Author: D. Ferruzzi <[email protected]>
AuthorDate: Tue Sep 23 13:59:06 2025 -0700
Add some basic metrics related to Deadline Alerts (#55971)
---
airflow-core/src/airflow/models/deadline.py | 20 +++++++++++++++++++-
.../src/airflow/serialization/serialized_objects.py | 2 ++
2 files changed, 21 insertions(+), 1 deletion(-)
diff --git a/airflow-core/src/airflow/models/deadline.py
b/airflow-core/src/airflow/models/deadline.py
index 4b3560fc99b..21e49a36ed1 100644
--- a/airflow-core/src/airflow/models/deadline.py
+++ b/airflow-core/src/airflow/models/deadline.py
@@ -36,7 +36,8 @@ from airflow.models import Trigger
from airflow.models.base import Base
from airflow.serialization.serde import deserialize, serialize
from airflow.settings import json
-from airflow.triggers.deadline import PAYLOAD_STATUS_KEY,
DeadlineCallbackTrigger
+from airflow.stats import Stats
+from airflow.triggers.deadline import PAYLOAD_BODY_KEY, PAYLOAD_STATUS_KEY,
DeadlineCallbackTrigger
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.session import provide_session
from airflow.utils.sqlalchemy import UtcDateTime
@@ -182,6 +183,10 @@ class Deadline(Base):
if dagrun.end_date <= deadline.deadline_time:
# If the DagRun finished before the Deadline:
session.delete(deadline)
+ Stats.incr(
+ "deadline_alerts.deadline_not_missed",
+ tags={"dag_id": dagrun.dag_id, "dagrun_id": dagrun.run_id},
+ )
deleted_count += 1
dagruns_to_refresh.add(dagrun)
session.flush()
@@ -230,6 +235,10 @@ class Deadline(Base):
self.callback_state = DeadlineCallbackState.QUEUED
session.add(self)
+ Stats.incr(
+ "deadline_alerts.deadline_missed",
+ tags={"dag_id": self.dagrun.dag_id, "dagrun_id":
self.dagrun.run_id},
+ )
def handle_callback_event(self, event: TriggerEvent, session: Session):
if (status := event.payload.get(PAYLOAD_STATUS_KEY)) and status in {
@@ -240,6 +249,15 @@ class Deadline(Base):
self.callback_state = status
if status != DeadlineCallbackState.RUNNING:
self.trigger = None
+ metric_tags = {
+ "dag_id": self.dagrun.dag_id,
+ "callback": self._callback,
+ "result": event.payload.get(PAYLOAD_BODY_KEY),
+ }
+ if status == DeadlineCallbackState.FAILED:
+ Stats.incr("deadline_alerts.deadline_callback_failure",
tags=metric_tags)
+ elif status == DeadlineCallbackState.SUCCESS:
+ Stats.incr("deadline_alerts.deadline_callback_success",
tags=metric_tags)
session.add(self)
else:
logger.error("Unexpected event received: %s", event.payload)
diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py
b/airflow-core/src/airflow/serialization/serialized_objects.py
index 21ebdeed657..a9a29a6b4d5 100644
--- a/airflow-core/src/airflow/serialization/serialized_objects.py
+++ b/airflow-core/src/airflow/serialization/serialized_objects.py
@@ -98,6 +98,7 @@ from airflow.serialization.enums import DagAttributeTypes as
DAT, Encoding
from airflow.serialization.helpers import serialize_template_field
from airflow.serialization.json_schema import load_dag_schema
from airflow.settings import DAGS_FOLDER, json
+from airflow.stats import Stats
from airflow.task.priority_strategy import (
PriorityWeightStrategy,
airflow_priority_weight_strategies,
@@ -3288,6 +3289,7 @@ class SerializedDAG(BaseSerialization):
dagrun_id=orm_dagrun.id,
)
)
+ Stats.incr("deadline_alerts.deadline_created",
tags={"dag_id": self.dag_id})
return orm_dagrun