This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 5d36858823e feat: Add Deadline Alerts to OL events (#63352)
5d36858823e is described below
commit 5d36858823e9bb343b30bd0110e33e5afa718c37
Author: Kacper Muda <[email protected]>
AuthorDate: Wed Mar 11 20:30:21 2026 +0100
feat: Add Deadline Alerts to OL events (#63352)
---
.../airflow/providers/openlineage/utils/utils.py | 46 +++++
.../tests/unit/openlineage/utils/test_utils.py | 218 +++++++++++++++++++++
2 files changed, 264 insertions(+)
diff --git
a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
index bfc9f566fcb..9fb572d628c 100644
--- a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
+++ b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
@@ -759,6 +759,7 @@ class DagRunInfo(InfoJsonEncodable):
"dag_bundle_version": lambda dagrun:
DagRunInfo.dag_version_info(dagrun, "bundle_version"),
"dag_version_id": lambda dagrun: DagRunInfo.dag_version_info(dagrun,
"version_id"),
"dag_version_number": lambda dagrun:
DagRunInfo.dag_version_info(dagrun, "version_number"),
+ "deadlines": lambda dagrun: DagRunInfo.deadlines(dagrun),
}
@classmethod
@@ -769,8 +770,53 @@ class DagRunInfo(InfoJsonEncodable):
return None
return (dagrun.end_date - dagrun.start_date).total_seconds()
+ @classmethod
+ def deadlines(cls, dagrun: DagRun) -> dict[str, Any] | None:
+ """
+ Extract deadline state and alert definitions from a DagRun (on
scheduler).
+
+ Returns a dict (not a list) so _cast_basic_types passes it through.
+ """
+ try:
+ # AF2 DagRun and AF3 DagRun SDK model (on worker) do not have this
information
+ deadlines = getattr(dagrun, "deadlines", None)
+ if not deadlines:
+ return None
+ except Exception as err:
+ log.warning("OpenLineage failed to retrieve deadlines. Exception:
%s", err)
+ return None
+
+ result = []
+ for d in deadlines:
+ try:
+ info: dict[str, Any] = {}
+ if deadline_time := getattr(d, "deadline_time", None):
+ info["deadline_time"] = deadline_time.isoformat()
+ if (missed := getattr(d, "missed", None)) is not None:
+ info["missed"] = missed
+ try:
+ # deadline_alert is a lazy-loaded ORM relationship that may
+ # trigger a DB query; keep it isolated so a
detached-session
+ # error doesn't discard the rest of the deadline info.
+ if alert := getattr(d, "deadline_alert", None):
+ info.update(
+ {
+ k: v
+ for k in ("name", "description", "reference",
"interval", "callback_def")
+ if (v := getattr(alert, k, None)) is not None
+ }
+ )
+ except Exception as err:
+ log.warning("OpenLineage could not load deadline_alert
relationship for %s", err)
+ if info:
+ result.append(info)
+ except Exception as err:
+ log.warning("OpenLineage failed to serialize deadline: %s",
err)
+ return {"alerts": result} if result else None
+
@classmethod
def dag_version_info(cls, dagrun: DagRun, key: str) -> str | int | None:
+ """Extract deg version info for given key, sourced from DagRun (on
scheduler)."""
# AF2 DagRun and AF3 DagRun SDK model (on worker) do not have this
information
if not getattr(dagrun, "dag_versions", []):
return None
diff --git a/providers/openlineage/tests/unit/openlineage/utils/test_utils.py
b/providers/openlineage/tests/unit/openlineage/utils/test_utils.py
index 4e76a795bb8..75365cd5a7c 100644
--- a/providers/openlineage/tests/unit/openlineage/utils/test_utils.py
+++ b/providers/openlineage/tests/unit/openlineage/utils/test_utils.py
@@ -198,6 +198,7 @@ def test_get_airflow_dag_run_facet():
version_number="version_number",
)
]
+ dagrun_mock.deadlines = []
result = get_airflow_dag_run_facet(dagrun_mock)
@@ -232,6 +233,7 @@ def test_get_airflow_dag_run_facet():
"start_date": "2024-06-01T01:02:04+00:00",
"end_date": "2024-06-01T01:02:14.034172+00:00",
"duration": 10.034172,
+ "deadlines": None,
"execution_date": "2024-06-01T01:02:04+00:00",
"logical_date": "2024-06-01T01:02:04+00:00",
"run_after": "2024-06-01T01:02:04+00:00",
@@ -2660,6 +2662,220 @@ class TestDagInfoAirflow3:
}
+class TestDagRunInfoDeadlines:
+ """Tests for deadline state and alert definitions in DagRunInfo."""
+
+ def test_dagrun_no_deadlines_attribute(self):
+ dagrun = MagicMock(spec=[])
+ assert DagRunInfo.deadlines(dagrun) is None
+
+ def test_dagrun_empty_deadlines(self):
+ dagrun = MagicMock()
+ dagrun.deadlines = []
+ assert DagRunInfo.deadlines(dagrun) is None
+
+ def test_dagrun_with_deadline_and_alert(self):
+ alert = MagicMock(spec=["name", "description", "reference",
"interval", "callback_def"])
+ alert.name = "SLA Alert"
+ alert.description = "Must finish within 1 hour"
+ alert.reference = {"reference_type": "DagRunLogicalDateDeadline"}
+ alert.interval = 3600.0
+ alert.callback_def = {"path": "my_module.on_deadline_missed",
"kwargs": {}}
+
+ deadline = MagicMock(spec=["deadline_time", "missed",
"deadline_alert"])
+ deadline.deadline_time = datetime.datetime(2025, 6, 1, 12, 0, 0,
tzinfo=datetime.timezone.utc)
+ deadline.missed = False
+ deadline.deadline_alert = alert
+
+ dagrun = MagicMock()
+ dagrun.deadlines = [deadline]
+
+ assert DagRunInfo.deadlines(dagrun) == {
+ "alerts": [
+ {
+ "deadline_time": "2025-06-01T12:00:00+00:00",
+ "missed": False,
+ "name": "SLA Alert",
+ "description": "Must finish within 1 hour",
+ "reference": {"reference_type":
"DagRunLogicalDateDeadline"},
+ "interval": 3600.0,
+ "callback_def": {"path": "my_module.on_deadline_missed",
"kwargs": {}},
+ },
+ ],
+ }
+
+ def test_dagrun_with_multiple_deadlines(self):
+ alert1 = MagicMock(spec=["name", "description", "reference",
"interval", "callback_def"])
+ alert1.name = None
+ alert1.description = None
+ alert1.reference = {"reference_type": "DagRunLogicalDateDeadline"}
+ alert1.interval = 3600.0
+ alert1.callback_def = {"path": "mod.cb1", "kwargs": {}}
+
+ alert2 = MagicMock(spec=["name", "description", "reference",
"interval", "callback_def"])
+ alert2.name = "Queued Deadline"
+ alert2.description = None
+ alert2.reference = {"reference_type": "DagRunQueuedAtDeadline"}
+ alert2.interval = 7200.0
+ alert2.callback_def = {"path": "mod.cb2", "kwargs": {"notify": True}}
+
+ d1 = MagicMock(spec=["deadline_time", "missed", "deadline_alert"])
+ d1.deadline_time = datetime.datetime(2025, 6, 1, 12, 0, 0,
tzinfo=datetime.timezone.utc)
+ d1.missed = True
+ d1.deadline_alert = alert1
+
+ d2 = MagicMock(spec=["deadline_time", "missed", "deadline_alert"])
+ d2.deadline_time = datetime.datetime(2025, 6, 1, 14, 0, 0,
tzinfo=datetime.timezone.utc)
+ d2.missed = False
+ d2.deadline_alert = alert2
+
+ dagrun = MagicMock()
+ dagrun.deadlines = [d1, d2]
+
+ assert DagRunInfo.deadlines(dagrun) == {
+ "alerts": [
+ {
+ "deadline_time": "2025-06-01T12:00:00+00:00",
+ "missed": True,
+ "reference": {"reference_type":
"DagRunLogicalDateDeadline"},
+ "interval": 3600.0,
+ "callback_def": {"path": "mod.cb1", "kwargs": {}},
+ },
+ {
+ "deadline_time": "2025-06-01T14:00:00+00:00",
+ "missed": False,
+ "name": "Queued Deadline",
+ "reference": {"reference_type": "DagRunQueuedAtDeadline"},
+ "interval": 7200.0,
+ "callback_def": {"path": "mod.cb2", "kwargs": {"notify":
True}},
+ },
+ ],
+ }
+
+ def test_dagrun_deadline_alert_access_fails(self):
+ """When the alert relationship can't be loaded, execution details
still appear."""
+ deadline = MagicMock(spec=["deadline_time", "missed",
"deadline_alert"])
+ deadline.deadline_time = datetime.datetime(2025, 6, 1, 12, 0, 0,
tzinfo=datetime.timezone.utc)
+ deadline.missed = False
+ type(deadline).deadline_alert = PropertyMock(side_effect=Exception("DB
not available"))
+
+ dagrun = MagicMock()
+ dagrun.deadlines = [deadline]
+
+ assert DagRunInfo.deadlines(dagrun) == {
+ "alerts": [
+ {
+ "deadline_time": "2025-06-01T12:00:00+00:00",
+ "missed": False,
+ },
+ ],
+ }
+
+ def test_dagrun_deadline_none_alert_fields_excluded(self):
+ """None-valued alert fields are excluded from the output."""
+ alert = MagicMock(spec=["name", "description", "reference",
"interval", "callback_def"])
+ alert.name = None
+ alert.description = None
+ alert.reference = {"reference_type": "DagRunLogicalDateDeadline"}
+ alert.interval = 3600.0
+ alert.callback_def = None
+
+ deadline = MagicMock(spec=["deadline_time", "missed",
"deadline_alert"])
+ deadline.deadline_time = datetime.datetime(2025, 6, 1, 12, 0, 0,
tzinfo=datetime.timezone.utc)
+ deadline.missed = True
+ deadline.deadline_alert = alert
+
+ dagrun = MagicMock()
+ dagrun.deadlines = [deadline]
+
+ result = DagRunInfo.deadlines(dagrun)
+ assert result == {
+ "alerts": [
+ {
+ "deadline_time": "2025-06-01T12:00:00+00:00",
+ "missed": True,
+ "reference": {"reference_type":
"DagRunLogicalDateDeadline"},
+ "interval": 3600.0,
+ },
+ ],
+ }
+ assert "name" not in result["alerts"][0]
+ assert "description" not in result["alerts"][0]
+ assert "callback_def" not in result["alerts"][0]
+
+ def test_dagrun_deadlines_property_raises(self):
+ """When accessing dagrun.deadlines itself raises, return None."""
+ dagrun = MagicMock()
+ type(dagrun).deadlines = PropertyMock(side_effect=Exception("Session
closed"))
+
+ assert DagRunInfo.deadlines(dagrun) is None
+
+ def test_dagrun_deadline_time_isoformat_raises(self):
+ """When deadline_time.isoformat() raises, that deadline is skipped."""
+ bad_time = MagicMock()
+ bad_time.isoformat.side_effect = AttributeError("not a datetime")
+
+ deadline = MagicMock(spec=["deadline_time", "missed",
"deadline_alert"])
+ deadline.deadline_time = bad_time
+ deadline.missed = False
+ deadline.deadline_alert = None
+
+ dagrun = MagicMock()
+ dagrun.deadlines = [deadline]
+
+ assert DagRunInfo.deadlines(dagrun) is None
+
+ def test_dagrun_bad_deadline_skipped_others_preserved(self):
+ """A failing deadline is skipped; valid siblings still appear."""
+ bad_deadline = MagicMock(spec=["deadline_time", "missed",
"deadline_alert"])
+ bad_time = MagicMock()
+ bad_time.isoformat.side_effect = TypeError("broken")
+ bad_deadline.deadline_time = bad_time
+ bad_deadline.missed = False
+ bad_deadline.deadline_alert = None
+
+ good_deadline = MagicMock(spec=["deadline_time", "missed",
"deadline_alert"])
+ good_deadline.deadline_time = datetime.datetime(2025, 6, 1, 14, 0, 0,
tzinfo=datetime.timezone.utc)
+ good_deadline.missed = True
+ good_deadline.deadline_alert = None
+
+ dagrun = MagicMock()
+ dagrun.deadlines = [bad_deadline, good_deadline]
+
+ assert DagRunInfo.deadlines(dagrun) == {
+ "alerts": [
+ {
+ "deadline_time": "2025-06-01T14:00:00+00:00",
+ "missed": True,
+ },
+ ],
+ }
+
+ def test_dagrun_alert_attribute_access_raises(self):
+ """When reading an attribute on the alert object raises, execution
details still appear."""
+ alert = MagicMock(spec=["name", "description", "reference",
"interval", "callback_def"])
+ alert.name = "Good Name"
+ type(alert).reference = PropertyMock(side_effect=Exception("Column
error"))
+
+ deadline = MagicMock(spec=["deadline_time", "missed",
"deadline_alert"])
+ deadline.deadline_time = datetime.datetime(2025, 6, 1, 12, 0, 0,
tzinfo=datetime.timezone.utc)
+ deadline.missed = False
+ deadline.deadline_alert = alert
+
+ dagrun = MagicMock()
+ dagrun.deadlines = [deadline]
+
+ result = DagRunInfo.deadlines(dagrun)
+ assert result == {
+ "alerts": [
+ {
+ "deadline_time": "2025-06-01T12:00:00+00:00",
+ "missed": False,
+ },
+ ],
+ }
+
+
@pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Airflow 3 test")
@patch.object(DagRun, "dag_versions", new_callable=PropertyMock)
def test_dagrun_info_af3(mocked_dag_versions):
@@ -2708,6 +2924,7 @@ def test_dagrun_info_af3(mocked_dag_versions):
"data_interval_end": "2024-06-01T00:00:00+00:00",
"data_interval_start": "2024-06-01T00:00:00+00:00",
"duration": 74.000546,
+ "deadlines": None,
"end_date": "2024-06-01T00:01:14.000546+00:00",
"run_id": "dag_run__run_id",
"run_type": DagRunType.MANUAL,
@@ -2752,6 +2969,7 @@ def test_dagrun_info_af2():
"data_interval_end": "2024-06-01T00:00:00+00:00",
"data_interval_start": "2024-06-01T00:00:00+00:00",
"duration": 74.000546,
+ "deadlines": None,
"end_date": "2024-06-01T00:01:14.000546+00:00",
"run_id": "dag_run__run_id",
"run_type": DagRunType.MANUAL,