This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 5d36858823e feat: Add Deadline Alerts to OL events (#63352)
5d36858823e is described below

commit 5d36858823e9bb343b30bd0110e33e5afa718c37
Author: Kacper Muda <[email protected]>
AuthorDate: Wed Mar 11 20:30:21 2026 +0100

    feat: Add Deadline Alerts to OL events (#63352)
---
 .../airflow/providers/openlineage/utils/utils.py   |  46 +++++
 .../tests/unit/openlineage/utils/test_utils.py     | 218 +++++++++++++++++++++
 2 files changed, 264 insertions(+)

diff --git 
a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py 
b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
index bfc9f566fcb..9fb572d628c 100644
--- a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
+++ b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
@@ -759,6 +759,7 @@ class DagRunInfo(InfoJsonEncodable):
         "dag_bundle_version": lambda dagrun: 
DagRunInfo.dag_version_info(dagrun, "bundle_version"),
         "dag_version_id": lambda dagrun: DagRunInfo.dag_version_info(dagrun, 
"version_id"),
         "dag_version_number": lambda dagrun: 
DagRunInfo.dag_version_info(dagrun, "version_number"),
+        "deadlines": lambda dagrun: DagRunInfo.deadlines(dagrun),
     }
 
     @classmethod
@@ -769,8 +770,53 @@ class DagRunInfo(InfoJsonEncodable):
             return None
         return (dagrun.end_date - dagrun.start_date).total_seconds()
 
+    @classmethod
+    def deadlines(cls, dagrun: DagRun) -> dict[str, Any] | None:
+        """
+        Extract deadline state and alert definitions from a DagRun (on 
scheduler).
+
+        Returns a dict (not a list) so _cast_basic_types passes it through.
+        """
+        try:
+            # AF2 DagRun and AF3 DagRun SDK model (on worker) do not have this 
information
+            deadlines = getattr(dagrun, "deadlines", None)
+            if not deadlines:
+                return None
+        except Exception as err:
+            log.warning("OpenLineage failed to retrieve deadlines. Exception: 
%s", err)
+            return None
+
+        result = []
+        for d in deadlines:
+            try:
+                info: dict[str, Any] = {}
+                if deadline_time := getattr(d, "deadline_time", None):
+                    info["deadline_time"] = deadline_time.isoformat()
+                if (missed := getattr(d, "missed", None)) is not None:
+                    info["missed"] = missed
+                try:
+                    # deadline_alert is a lazy-loaded ORM relationship that may
+                    # trigger a DB query; keep it isolated so a 
detached-session
+                    # error doesn't discard the rest of the deadline info.
+                    if alert := getattr(d, "deadline_alert", None):
+                        info.update(
+                            {
+                                k: v
+                                for k in ("name", "description", "reference", 
"interval", "callback_def")
+                                if (v := getattr(alert, k, None)) is not None
+                            }
+                        )
+                except Exception as err:
+                    log.warning("OpenLineage could not load deadline_alert 
relationship for %s", err)
+                if info:
+                    result.append(info)
+            except Exception as err:
+                log.warning("OpenLineage failed to serialize deadline: %s", 
err)
+        return {"alerts": result} if result else None
+
     @classmethod
     def dag_version_info(cls, dagrun: DagRun, key: str) -> str | int | None:
+        """Extract deg version info for given key, sourced from DagRun (on 
scheduler)."""
         # AF2 DagRun and AF3 DagRun SDK model (on worker) do not have this 
information
         if not getattr(dagrun, "dag_versions", []):
             return None
diff --git a/providers/openlineage/tests/unit/openlineage/utils/test_utils.py 
b/providers/openlineage/tests/unit/openlineage/utils/test_utils.py
index 4e76a795bb8..75365cd5a7c 100644
--- a/providers/openlineage/tests/unit/openlineage/utils/test_utils.py
+++ b/providers/openlineage/tests/unit/openlineage/utils/test_utils.py
@@ -198,6 +198,7 @@ def test_get_airflow_dag_run_facet():
             version_number="version_number",
         )
     ]
+    dagrun_mock.deadlines = []
 
     result = get_airflow_dag_run_facet(dagrun_mock)
 
@@ -232,6 +233,7 @@ def test_get_airflow_dag_run_facet():
                 "start_date": "2024-06-01T01:02:04+00:00",
                 "end_date": "2024-06-01T01:02:14.034172+00:00",
                 "duration": 10.034172,
+                "deadlines": None,
                 "execution_date": "2024-06-01T01:02:04+00:00",
                 "logical_date": "2024-06-01T01:02:04+00:00",
                 "run_after": "2024-06-01T01:02:04+00:00",
@@ -2660,6 +2662,220 @@ class TestDagInfoAirflow3:
         }
 
 
+class TestDagRunInfoDeadlines:
+    """Tests for deadline state and alert definitions in DagRunInfo."""
+
+    def test_dagrun_no_deadlines_attribute(self):
+        dagrun = MagicMock(spec=[])
+        assert DagRunInfo.deadlines(dagrun) is None
+
+    def test_dagrun_empty_deadlines(self):
+        dagrun = MagicMock()
+        dagrun.deadlines = []
+        assert DagRunInfo.deadlines(dagrun) is None
+
+    def test_dagrun_with_deadline_and_alert(self):
+        alert = MagicMock(spec=["name", "description", "reference", 
"interval", "callback_def"])
+        alert.name = "SLA Alert"
+        alert.description = "Must finish within 1 hour"
+        alert.reference = {"reference_type": "DagRunLogicalDateDeadline"}
+        alert.interval = 3600.0
+        alert.callback_def = {"path": "my_module.on_deadline_missed", 
"kwargs": {}}
+
+        deadline = MagicMock(spec=["deadline_time", "missed", 
"deadline_alert"])
+        deadline.deadline_time = datetime.datetime(2025, 6, 1, 12, 0, 0, 
tzinfo=datetime.timezone.utc)
+        deadline.missed = False
+        deadline.deadline_alert = alert
+
+        dagrun = MagicMock()
+        dagrun.deadlines = [deadline]
+
+        assert DagRunInfo.deadlines(dagrun) == {
+            "alerts": [
+                {
+                    "deadline_time": "2025-06-01T12:00:00+00:00",
+                    "missed": False,
+                    "name": "SLA Alert",
+                    "description": "Must finish within 1 hour",
+                    "reference": {"reference_type": 
"DagRunLogicalDateDeadline"},
+                    "interval": 3600.0,
+                    "callback_def": {"path": "my_module.on_deadline_missed", 
"kwargs": {}},
+                },
+            ],
+        }
+
+    def test_dagrun_with_multiple_deadlines(self):
+        alert1 = MagicMock(spec=["name", "description", "reference", 
"interval", "callback_def"])
+        alert1.name = None
+        alert1.description = None
+        alert1.reference = {"reference_type": "DagRunLogicalDateDeadline"}
+        alert1.interval = 3600.0
+        alert1.callback_def = {"path": "mod.cb1", "kwargs": {}}
+
+        alert2 = MagicMock(spec=["name", "description", "reference", 
"interval", "callback_def"])
+        alert2.name = "Queued Deadline"
+        alert2.description = None
+        alert2.reference = {"reference_type": "DagRunQueuedAtDeadline"}
+        alert2.interval = 7200.0
+        alert2.callback_def = {"path": "mod.cb2", "kwargs": {"notify": True}}
+
+        d1 = MagicMock(spec=["deadline_time", "missed", "deadline_alert"])
+        d1.deadline_time = datetime.datetime(2025, 6, 1, 12, 0, 0, 
tzinfo=datetime.timezone.utc)
+        d1.missed = True
+        d1.deadline_alert = alert1
+
+        d2 = MagicMock(spec=["deadline_time", "missed", "deadline_alert"])
+        d2.deadline_time = datetime.datetime(2025, 6, 1, 14, 0, 0, 
tzinfo=datetime.timezone.utc)
+        d2.missed = False
+        d2.deadline_alert = alert2
+
+        dagrun = MagicMock()
+        dagrun.deadlines = [d1, d2]
+
+        assert DagRunInfo.deadlines(dagrun) == {
+            "alerts": [
+                {
+                    "deadline_time": "2025-06-01T12:00:00+00:00",
+                    "missed": True,
+                    "reference": {"reference_type": 
"DagRunLogicalDateDeadline"},
+                    "interval": 3600.0,
+                    "callback_def": {"path": "mod.cb1", "kwargs": {}},
+                },
+                {
+                    "deadline_time": "2025-06-01T14:00:00+00:00",
+                    "missed": False,
+                    "name": "Queued Deadline",
+                    "reference": {"reference_type": "DagRunQueuedAtDeadline"},
+                    "interval": 7200.0,
+                    "callback_def": {"path": "mod.cb2", "kwargs": {"notify": 
True}},
+                },
+            ],
+        }
+
+    def test_dagrun_deadline_alert_access_fails(self):
+        """When the alert relationship can't be loaded, execution details 
still appear."""
+        deadline = MagicMock(spec=["deadline_time", "missed", 
"deadline_alert"])
+        deadline.deadline_time = datetime.datetime(2025, 6, 1, 12, 0, 0, 
tzinfo=datetime.timezone.utc)
+        deadline.missed = False
+        type(deadline).deadline_alert = PropertyMock(side_effect=Exception("DB 
not available"))
+
+        dagrun = MagicMock()
+        dagrun.deadlines = [deadline]
+
+        assert DagRunInfo.deadlines(dagrun) == {
+            "alerts": [
+                {
+                    "deadline_time": "2025-06-01T12:00:00+00:00",
+                    "missed": False,
+                },
+            ],
+        }
+
+    def test_dagrun_deadline_none_alert_fields_excluded(self):
+        """None-valued alert fields are excluded from the output."""
+        alert = MagicMock(spec=["name", "description", "reference", 
"interval", "callback_def"])
+        alert.name = None
+        alert.description = None
+        alert.reference = {"reference_type": "DagRunLogicalDateDeadline"}
+        alert.interval = 3600.0
+        alert.callback_def = None
+
+        deadline = MagicMock(spec=["deadline_time", "missed", 
"deadline_alert"])
+        deadline.deadline_time = datetime.datetime(2025, 6, 1, 12, 0, 0, 
tzinfo=datetime.timezone.utc)
+        deadline.missed = True
+        deadline.deadline_alert = alert
+
+        dagrun = MagicMock()
+        dagrun.deadlines = [deadline]
+
+        result = DagRunInfo.deadlines(dagrun)
+        assert result == {
+            "alerts": [
+                {
+                    "deadline_time": "2025-06-01T12:00:00+00:00",
+                    "missed": True,
+                    "reference": {"reference_type": 
"DagRunLogicalDateDeadline"},
+                    "interval": 3600.0,
+                },
+            ],
+        }
+        assert "name" not in result["alerts"][0]
+        assert "description" not in result["alerts"][0]
+        assert "callback_def" not in result["alerts"][0]
+
+    def test_dagrun_deadlines_property_raises(self):
+        """When accessing dagrun.deadlines itself raises, return None."""
+        dagrun = MagicMock()
+        type(dagrun).deadlines = PropertyMock(side_effect=Exception("Session 
closed"))
+
+        assert DagRunInfo.deadlines(dagrun) is None
+
+    def test_dagrun_deadline_time_isoformat_raises(self):
+        """When deadline_time.isoformat() raises, that deadline is skipped."""
+        bad_time = MagicMock()
+        bad_time.isoformat.side_effect = AttributeError("not a datetime")
+
+        deadline = MagicMock(spec=["deadline_time", "missed", 
"deadline_alert"])
+        deadline.deadline_time = bad_time
+        deadline.missed = False
+        deadline.deadline_alert = None
+
+        dagrun = MagicMock()
+        dagrun.deadlines = [deadline]
+
+        assert DagRunInfo.deadlines(dagrun) is None
+
+    def test_dagrun_bad_deadline_skipped_others_preserved(self):
+        """A failing deadline is skipped; valid siblings still appear."""
+        bad_deadline = MagicMock(spec=["deadline_time", "missed", 
"deadline_alert"])
+        bad_time = MagicMock()
+        bad_time.isoformat.side_effect = TypeError("broken")
+        bad_deadline.deadline_time = bad_time
+        bad_deadline.missed = False
+        bad_deadline.deadline_alert = None
+
+        good_deadline = MagicMock(spec=["deadline_time", "missed", 
"deadline_alert"])
+        good_deadline.deadline_time = datetime.datetime(2025, 6, 1, 14, 0, 0, 
tzinfo=datetime.timezone.utc)
+        good_deadline.missed = True
+        good_deadline.deadline_alert = None
+
+        dagrun = MagicMock()
+        dagrun.deadlines = [bad_deadline, good_deadline]
+
+        assert DagRunInfo.deadlines(dagrun) == {
+            "alerts": [
+                {
+                    "deadline_time": "2025-06-01T14:00:00+00:00",
+                    "missed": True,
+                },
+            ],
+        }
+
+    def test_dagrun_alert_attribute_access_raises(self):
+        """When reading an attribute on the alert object raises, execution 
details still appear."""
+        alert = MagicMock(spec=["name", "description", "reference", 
"interval", "callback_def"])
+        alert.name = "Good Name"
+        type(alert).reference = PropertyMock(side_effect=Exception("Column 
error"))
+
+        deadline = MagicMock(spec=["deadline_time", "missed", 
"deadline_alert"])
+        deadline.deadline_time = datetime.datetime(2025, 6, 1, 12, 0, 0, 
tzinfo=datetime.timezone.utc)
+        deadline.missed = False
+        deadline.deadline_alert = alert
+
+        dagrun = MagicMock()
+        dagrun.deadlines = [deadline]
+
+        result = DagRunInfo.deadlines(dagrun)
+        assert result == {
+            "alerts": [
+                {
+                    "deadline_time": "2025-06-01T12:00:00+00:00",
+                    "missed": False,
+                },
+            ],
+        }
+
+
 @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Airflow 3 test")
 @patch.object(DagRun, "dag_versions", new_callable=PropertyMock)
 def test_dagrun_info_af3(mocked_dag_versions):
@@ -2708,6 +2924,7 @@ def test_dagrun_info_af3(mocked_dag_versions):
         "data_interval_end": "2024-06-01T00:00:00+00:00",
         "data_interval_start": "2024-06-01T00:00:00+00:00",
         "duration": 74.000546,
+        "deadlines": None,
         "end_date": "2024-06-01T00:01:14.000546+00:00",
         "run_id": "dag_run__run_id",
         "run_type": DagRunType.MANUAL,
@@ -2752,6 +2969,7 @@ def test_dagrun_info_af2():
         "data_interval_end": "2024-06-01T00:00:00+00:00",
         "data_interval_start": "2024-06-01T00:00:00+00:00",
         "duration": 74.000546,
+        "deadlines": None,
         "end_date": "2024-06-01T00:01:14.000546+00:00",
         "run_id": "dag_run__run_id",
         "run_type": DagRunType.MANUAL,

Reply via email to