ferruzzi commented on code in PR #62918:
URL: https://github.com/apache/airflow/pull/62918#discussion_r2955194715
##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -1224,6 +1212,31 @@ def recalculate(self) -> _UnfinishedStates:
self.data_interval_start,
self.data_interval_end,
)
+
+ if dag.deadline:
+ deadline_alerts = [
+ DeadlineAlertModel.get_by_id(alert_id, session) for
alert_id in dag.deadline
+ ]
+
+ has_dagrun_deadlines = any(
+ deadline_alert.reference_class in
SerializedReferenceModels.TYPES.DAGRUN
+ for deadline_alert in deadline_alerts
+ )
+
+ if has_dagrun_deadlines:
+ if self._state == DagRunState.SUCCESS:
+ # Run succeeded before deadline — prune so they don't
fire.
+ Deadline.prune_deadlines(session=session,
conditions={DagRun.id: self.id})
+ elif self._state == DagRunState.FAILED:
+ # Run failed — immediately fire any pending deadline
callbacks.
+ pending_deadlines = session.scalars(
+ select(Deadline)
+ .join(DagRun)
+ .where(DagRun.id == self.id, Deadline.missed ==
false())
Review Comment:
I'm not great with SQL so I may be wrong here, but passing on feedback I got
in the past: We should add a `selectinload` to make this more efficient.
Without it, accessing `deadline.callback` and `deadline.dagrun` inside
`handle_miss()` triggers a separate DB query for each deadline in the loop.
With `selectinload`, those are loaded one time up front, saving a bunch of DB
lookups.
The `~Deadline.missed` is purely semantics and does the same as yours.
```suggestion
pending_deadlines = session.scalars(
select(Deadline)
.join(DagRun)
.where(DagRun.id == self.id, ~Deadline.missed)
.options(selectinload(Deadline.callback),
selectinload(Deadline.dagrun))
```
Mayyybe it's time to consider moving that db lookup into handle_miss.... but
that doesn't have to happen in this PR
##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -1224,6 +1212,31 @@ def recalculate(self) -> _UnfinishedStates:
self.data_interval_start,
self.data_interval_end,
)
+
+ if dag.deadline:
+ deadline_alerts = [
+ DeadlineAlertModel.get_by_id(alert_id, session) for
alert_id in dag.deadline
+ ]
+
+ has_dagrun_deadlines = any(
+ deadline_alert.reference_class in
SerializedReferenceModels.TYPES.DAGRUN
+ for deadline_alert in deadline_alerts
+ )
+
+ if has_dagrun_deadlines:
+ if self._state == DagRunState.SUCCESS:
+ # Run succeeded before deadline — prune so they don't
fire.
+ Deadline.prune_deadlines(session=session,
conditions={DagRun.id: self.id})
+ elif self._state == DagRunState.FAILED:
+ # Run failed — immediately fire any pending deadline
callbacks.
+ pending_deadlines = session.scalars(
+ select(Deadline)
+ .join(DagRun)
+ .where(DagRun.id == self.id, Deadline.missed ==
false())
Review Comment:
Also, is there any particular reason you stored this in a variable instead
of just putting it inline in the for loop?
##########
airflow-core/tests/unit/models/test_dagrun.py:
##########
@@ -1349,6 +1349,34 @@ def
test_dagrun_success_handles_empty_deadline_list(self, mock_prune, dag_maker,
mock_prune.assert_not_called()
assert dag_run.state == DagRunState.SUCCESS
+ @mock.patch.object(Deadline, "prune_deadlines")
+ @mock.patch.object(DeadlineAlertModel, "get_by_id")
+ def test_dagrun_failure_does_not_prune_deadlines(
+ self, mock_get_by_id, mock_prune, session, deadline_test_dag
+ ):
+ """On failure, deadlines should NOT be pruned — handle_miss fires
instead."""
Review Comment:
The name says exactly what the test does, no comment needed
```suggestion
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]