Asquator commented on code in PR #61274:
URL: https://github.com/apache/airflow/pull/61274#discussion_r2842597816


##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -1248,22 +1262,35 @@ def recalculate(self) -> _UnfinishedStates:
             self.set_state(DagRunState.SUCCESS)
             self.notify_dagrun_state_changed(msg="success")
 
-            if execute_callbacks and dag.has_on_success_callback:
-                self.handle_dag_callback(dag=cast("SDKDAG", dag), 
success=True, reason="success")
-            elif dag.has_on_success_callback:
-                callback = DagCallbackRequest(
-                    filepath=self.dag_model.relative_fileloc,
-                    dag_id=self.dag_id,
-                    run_id=self.run_id,
-                    bundle_name=self.dag_model.bundle_name,
-                    bundle_version=self.bundle_version,
-                    context_from_server=DagRunContext(
-                        dag_run=self,
-                        last_ti=self.get_last_ti(dag=dag, session=session),
-                    ),
-                    is_failure_callback=False,
-                    msg="success",
+            if dag.has_on_success_callback:
+                last_succeeded_ti: TI | None = (
+                    max(
+                        (ti for ti in tis_for_dagrun_state if ti.state == 
TaskInstanceState.SUCCESS),
+                        key=lambda ti: ti.end_date,
+                        default=None
+                    )
                 )
+                if execute_callbacks:
+                    self.handle_dag_callback(
+                        dag=cast("SDKDAG", dag),
+                        success=True,
+                        relevant_ti=last_succeeded_ti,
+                        reason="success"
+                    )
+                else:
+                    callback = DagCallbackRequest(
+                        filepath=self.dag_model.relative_fileloc,
+                        dag_id=self.dag_id,
+                        run_id=self.run_id,
+                        bundle_name=self.dag_model.bundle_name,
+                        bundle_version=self.bundle_version,
+                        context_from_server=DagRunContext(
+                            dag_run=self,
+                            last_ti=last_succeeded_ti,
+                        ),
+                        is_failure_callback=False,
+                        msg="success",
+                    )
 

Review Comment:
   Introduced `produce_dag_callback` as a template method.



##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -1283,26 +1310,31 @@ def recalculate(self) -> _UnfinishedStates:
             self.set_state(DagRunState.FAILED)
             self.notify_dagrun_state_changed(msg="all_tasks_deadlocked")
 
-            if execute_callbacks and dag.has_on_failure_callback:
-                self.handle_dag_callback(
-                    dag=cast("SDKDAG", dag),
-                    success=False,
-                    reason="all_tasks_deadlocked",
-                )
-            elif dag.has_on_failure_callback:
-                callback = DagCallbackRequest(
-                    filepath=self.dag_model.relative_fileloc,
-                    dag_id=self.dag_id,
-                    run_id=self.run_id,
-                    bundle_name=self.dag_model.bundle_name,
-                    bundle_version=self.bundle_version,
-                    context_from_server=DagRunContext(
-                        dag_run=self,
-                        last_ti=self.get_last_ti(dag=dag, session=session),
-                    ),
-                    is_failure_callback=True,
-                    msg="all_tasks_deadlocked",
+            if dag.has_on_failure_callback:
+                last_finished_ti: TI | None = (
+                    max(info.finished_tis, key=lambda ti: ti.end_date, 
default=None)
                 )
+                if execute_callbacks:
+                    self.handle_dag_callback(
+                        dag=cast("SDKDAG", dag),
+                        success=False,
+                        relevant_ti=last_finished_ti,
+                        reason="all_tasks_deadlocked",
+                    )
+                else:
+                    callback = DagCallbackRequest(
+                        filepath=self.dag_model.relative_fileloc,
+                        dag_id=self.dag_id,
+                        run_id=self.run_id,
+                        bundle_name=self.dag_model.bundle_name,
+                        bundle_version=self.bundle_version,
+                        context_from_server=DagRunContext(
+                            dag_run=self,
+                            last_ti=last_finished_ti
+                        ),
+                        is_failure_callback=True,
+                        msg="all_tasks_deadlocked",
+                    )

Review Comment:
   Introduced `produce_dag_callback` as a template method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to