ferruzzi commented on code in PR #62918:
URL: https://github.com/apache/airflow/pull/62918#discussion_r2955194715


##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -1224,6 +1212,31 @@ def recalculate(self) -> _UnfinishedStates:
                 self.data_interval_start,
                 self.data_interval_end,
             )
+
+            if dag.deadline:
+                deadline_alerts = [
+                    DeadlineAlertModel.get_by_id(alert_id, session) for 
alert_id in dag.deadline
+                ]
+
+                has_dagrun_deadlines = any(
+                    deadline_alert.reference_class in 
SerializedReferenceModels.TYPES.DAGRUN
+                    for deadline_alert in deadline_alerts
+                )
+
+                if has_dagrun_deadlines:
+                    if self._state == DagRunState.SUCCESS:
+                        # Run succeeded before deadline — prune so they don't 
fire.
+                        Deadline.prune_deadlines(session=session, 
conditions={DagRun.id: self.id})
+                    elif self._state == DagRunState.FAILED:
+                        # Run failed — immediately fire any pending deadline 
callbacks.
+                        pending_deadlines = session.scalars(
+                            select(Deadline)
+                            .join(DagRun)
+                            .where(DagRun.id == self.id, Deadline.missed == 
false())

Review Comment:
   I'm not great with SQL so I may be wrong here, but passing on feedback I got 
in the past:  We should add a `selectinload` to make this more efficient.  
Without it, accessing `deadline.callback` and `deadline.dagrun` inside 
`handle_miss()` triggers a separate DB query for each deadline in the loop. 
With `selectinload`, those  are loaded one time up front, saving a bunch of DB 
lookups.
   
   The `~Deadline.missed` is purely semantics and does the same as yours.
   
   ```suggestion
                           pending_deadlines = session.scalars(
                               select(Deadline)
                               .join(DagRun)
                               .where(DagRun.id == self.id, ~Deadline.missed)
                               .options(selectinload(Deadline.callback), 
selectinload(Deadline.dagrun))
    ```
   
   
   
   Mayyybe it's time to consider moving that db lookup into handle_miss.... but 
that doesn't have to happen in this PR



##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -1224,6 +1212,31 @@ def recalculate(self) -> _UnfinishedStates:
                 self.data_interval_start,
                 self.data_interval_end,
             )
+
+            if dag.deadline:
+                deadline_alerts = [
+                    DeadlineAlertModel.get_by_id(alert_id, session) for 
alert_id in dag.deadline
+                ]
+
+                has_dagrun_deadlines = any(
+                    deadline_alert.reference_class in 
SerializedReferenceModels.TYPES.DAGRUN
+                    for deadline_alert in deadline_alerts
+                )
+
+                if has_dagrun_deadlines:
+                    if self._state == DagRunState.SUCCESS:
+                        # Run succeeded before deadline — prune so they don't 
fire.
+                        Deadline.prune_deadlines(session=session, 
conditions={DagRun.id: self.id})
+                    elif self._state == DagRunState.FAILED:
+                        # Run failed — immediately fire any pending deadline 
callbacks.
+                        pending_deadlines = session.scalars(
+                            select(Deadline)
+                            .join(DagRun)
+                            .where(DagRun.id == self.id, Deadline.missed == 
false())

Review Comment:
   Also, is there any particular reason you stored this in a variable instead 
of just putting it inline in the for loop?



##########
airflow-core/tests/unit/models/test_dagrun.py:
##########
@@ -1349,6 +1349,34 @@ def 
test_dagrun_success_handles_empty_deadline_list(self, mock_prune, dag_maker,
         mock_prune.assert_not_called()
         assert dag_run.state == DagRunState.SUCCESS
 
+    @mock.patch.object(Deadline, "prune_deadlines")
+    @mock.patch.object(DeadlineAlertModel, "get_by_id")
+    def test_dagrun_failure_does_not_prune_deadlines(
+        self, mock_get_by_id, mock_prune, session, deadline_test_dag
+    ):
+        """On failure, deadlines should NOT be pruned — handle_miss fires 
instead."""

Review Comment:
   The name says exactly what the test does, no comment needed
   
   ```suggestion
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to