uranusjr commented on code in PR #51949:
URL: https://github.com/apache/airflow/pull/51949#discussion_r2160705615


##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -1352,21 +1352,58 @@ def notify_dagrun_state_changed(self, msg: str = ""):
 
     def handle_dag_callback(self, dag: SDKDAG, success: bool = True, reason: 
str = "success"):
         """Only needed for `dag.test` where `execute_callbacks=True` is passed 
to `update_state`."""
+        task_instances = self.get_task_instances()
+
+        # Identify the most relevant task instance
+        last_relevant_ti = None
+        if not success:
+            failed_tis = [ti for ti in task_instances if ti.state in 
State.failed_states and ti.end_date]
+            failed_tis.sort(key=lambda x: x.end_date, reverse=True)
+            last_relevant_ti = failed_tis[0] if failed_tis else None
+        else:
+            success_tis = [ti for ti in task_instances if ti.state in 
State.success_states and ti.end_date]
+            success_tis.sort(key=lambda x: x.end_date, reverse=True)
+            last_relevant_ti = success_tis[0] if success_tis else None
+
+        # Enrich DAG-level callback context
         context: Context = {  # type: ignore[assignment]
             "dag": dag,
             "run_id": str(self.run_id),
+            "execution_date": self.logical_date,
+            "start_date": self.start_date,
+            "end_date": self.end_date,
+            "data_interval_start": self.data_interval_start,
+            "data_interval_end": self.data_interval_end,
             "reason": reason,
+            "run_duration": (
+                (self.end_date - self.start_date).total_seconds()
+                if self.start_date and self.end_date
+                else None
+            ),
         }
 
+        # Add task-level metadata if available
+        if last_relevant_ti:
+            context.update(
+                {
+                    "task_instance": last_relevant_ti,
+                    "ti": last_relevant_ti,
+                    "try_number": last_relevant_ti.try_number,
+                    "max_tries": last_relevant_ti.max_tries,
+                    "log_url": last_relevant_ti.log_url,
+                    "mark_success_url": last_relevant_ti.mark_success_url,
+                }
+            )

Review Comment:
   In Jinja you can also do `{{ ti.log_url }}`. I think people can live with 
three extra characters.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to