YoannAbriel commented on code in PR #62918:
URL: https://github.com/apache/airflow/pull/62918#discussion_r3035201926
##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -1224,6 +1212,31 @@ def recalculate(self) -> _UnfinishedStates:
self.data_interval_start,
self.data_interval_end,
)
+
+ if dag.deadline:
+ deadline_alerts = [
+ DeadlineAlertModel.get_by_id(alert_id, session) for
alert_id in dag.deadline
+ ]
+
+ has_dagrun_deadlines = any(
+ deadline_alert.reference_class in
SerializedReferenceModels.TYPES.DAGRUN
+ for deadline_alert in deadline_alerts
+ )
+
+ if has_dagrun_deadlines:
+ if self._state == DagRunState.SUCCESS:
+ # Run succeeded before deadline — prune so they don't
fire.
+ Deadline.prune_deadlines(session=session,
conditions={DagRun.id: self.id})
+ elif self._state == DagRunState.FAILED:
+ # Run failed — immediately fire any pending deadline
callbacks.
+ pending_deadlines = session.scalars(
+ select(Deadline)
+ .join(DagRun)
+ .where(DagRun.id == self.id, Deadline.missed ==
false())
Review Comment:
Done — added selectinload for callback and dagrun, and inlined the query.
##########
airflow-core/tests/unit/models/test_dagrun.py:
##########
@@ -1349,6 +1349,34 @@ def
test_dagrun_success_handles_empty_deadline_list(self, mock_prune, dag_maker,
mock_prune.assert_not_called()
assert dag_run.state == DagRunState.SUCCESS
+ @mock.patch.object(Deadline, "prune_deadlines")
+ @mock.patch.object(DeadlineAlertModel, "get_by_id")
+ def test_dagrun_failure_does_not_prune_deadlines(
+ self, mock_get_by_id, mock_prune, session, deadline_test_dag
+ ):
+ """On failure, deadlines should NOT be pruned — handle_miss fires
instead."""
Review Comment:
Removed.
##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -1246,6 +1234,26 @@ def recalculate(self) -> _UnfinishedStates:
self.data_interval_start,
self.data_interval_end,
)
+
+ if dag.deadline:
+ if any(
+ DeadlineAlertModel.get_by_id(alert_id,
session).reference_class
+ in SerializedReferenceModels.TYPES.DAGRUN
+ for alert_id in dag.deadline
+ ):
+ if self._state == DagRunState.SUCCESS:
+ # Run succeeded before deadline — prune so they don't
fire.
+ Deadline.prune_deadlines(session=session,
conditions={DagRun.id: self.id})
+ elif self._state == DagRunState.FAILED:
+ # Run failed — immediately fire any pending deadline
callbacks.
+ for deadline in session.scalars(
+ select(Deadline)
+ .join(DagRun)
+ .where(DagRun.id == self.id, ~Deadline.missed)
+ .options(selectinload(Deadline.callback),
selectinload(Deadline.dagrun))
+ ):
+ deadline.handle_miss(session=session)
Review Comment:
Added `test_dagrun_failure_calls_handle_miss` in 5b72205.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]