seanghaeli commented on code in PR #66608:
URL: https://github.com/apache/airflow/pull/66608#discussion_r3314242017
##########
airflow-core/src/airflow/jobs/triggerer_job_runner.py:
##########
@@ -792,6 +796,40 @@ def _create_workload(
timeout_after=trigger.task_instance.trigger_timeout,
)
+ def _fetch_callback_dag_run_data(self, trigger: Trigger, *, session:
Session) -> dict | None:
+ """
+ Fetch DagRun data for deadline callback triggers.
+
+ When a callback trigger has dag_id/run_id stored in its associated
Callback.data,
+ fetch the DagRun and return serialized dag_run_data so the
TriggerRunner can build
+ a standard Context at runtime (same pattern as start_from_trigger).
+ """
+ from airflow.models.dagrun import DagRun
+
+ # The trigger's callback relationship stores the identifiers we need
+ if not trigger.callback:
+ return None
+
+ callback_data = trigger.callback.data
+ dag_id = callback_data.get("dag_id")
+ run_id = callback_data.get("run_id")
+ if not dag_id or not run_id:
+ return None
+
+ dagrun = session.execute(
+ select(DagRun).where(DagRun.dag_id == dag_id, DagRun.run_id ==
run_id)
+ ).scalar()
Review Comment:
Added `session.scalar`, and added logic to `Trigger.bulk_fetch` so callback
is eagerly loaded on fetch avoiding the extra select.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]