kaxil commented on code in PR #66608:
URL: https://github.com/apache/airflow/pull/66608#discussion_r3307566706


##########
task-sdk/src/airflow/sdk/execution_time/callback_supervisor.py:
##########
@@ -318,11 +334,68 @@ def _configure_logging(log_path: str) -> 
tuple[FilteringBoundLogger, BinaryIO]:
     return logger, log_file_descriptor
 
 
+def _fetch_and_build_context(
+    comms,
+    dag_id: str,
+    run_id: str,
+    _log,
+) -> dict | None:
+    """
+    Fetch DagRun via SUPERVISOR_COMMS and build a standard context dict.
+
+    Called inside the forked subprocess when DagRun identifiers are available.
+    Returns a context dict with dag_run, run_id, logical_date, ds, ts, etc.
+    Task-specific fields are absent since callbacks are not tied to a task.
+    """
+    try:
+        response = comms.send(GetDagRun(dag_id=dag_id, run_id=run_id))
+        if not isinstance(response, DagRunResult):
+            _log.warning(
+                "Unexpected response when fetching DagRun for callback 
context",
+                response_type=type(response).__name__,
+            )
+            return None
+
+        context: dict = {"dag_run": response}
+
+        if response.logical_date:
+            logical_date = response.logical_date
+            ds = logical_date.strftime("%Y-%m-%d")
+            ts = logical_date.isoformat()
+            context.update(
+                {
+                    "logical_date": logical_date,
+                    "run_id": response.run_id,
+                    "ds": ds,
+                    "ds_nodash": ds.replace("-", ""),
+                    "ts": ts,
+                    "ts_nodash": logical_date.strftime("%Y%m%dT%H%M%S"),
+                    "ts_nodash_with_tz": ts.replace("-", "").replace(":", ""),
+                    "data_interval_start": response.data_interval_start,

Review Comment:
   These aren't coerced to `pendulum.DateTime`, but the triggerer-side 
`_build_context_from_dag_run_data` and 
`RuntimeTaskInstance.get_template_context()` both wrap them with 
`coerce_datetime`. `DagRunResult` inherits AwareDatetime fields which are 
stdlib `datetime`, not pendulum.
   
   So executor-path callbacks get stdlib datetimes while triggerer-path / task 
callbacks get pendulum. User code doing 
`context['data_interval_start'].in_timezone(...)` or 
`context['logical_date'].add(days=1)` works on triggerer/task contexts and 
crashes on the executor one. Same fix here as in the triggerer side: wrap with 
`coerce_datetime` (or import from `airflow.sdk._shared.timezones`).



##########
airflow-core/src/airflow/jobs/triggerer_job_runner.py:
##########
@@ -792,6 +796,40 @@ def _create_workload(
             timeout_after=trigger.task_instance.trigger_timeout,
         )
 
+    def _fetch_callback_dag_run_data(self, trigger: Trigger, *, session: 
Session) -> dict | None:
+        """
+        Fetch DagRun data for deadline callback triggers.
+
+        When a callback trigger has dag_id/run_id stored in its associated 
Callback.data,
+        fetch the DagRun and return serialized dag_run_data so the 
TriggerRunner can build
+        a standard Context at runtime (same pattern as start_from_trigger).
+        """
+        from airflow.models.dagrun import DagRun
+
+        # The trigger's callback relationship stores the identifiers we need
+        if not trigger.callback:
+            return None
+
+        callback_data = trigger.callback.data
+        dag_id = callback_data.get("dag_id")
+        run_id = callback_data.get("run_id")
+        if not dag_id or not run_id:
+            return None
+
+        dagrun = session.execute(
+            select(DagRun).where(DagRun.dag_id == dag_id, DagRun.run_id == 
run_id)
+        ).scalar()

Review Comment:
   Two small things here:
   
   1. `session.execute(select(...)).scalar()` can just be 
`session.scalar(select(...))`.
   2. This runs for every trigger that hits the `trigger.task_instance is None` 
branch in `_create_workload`, which includes asset-watcher triggers. For asset 
watchers `trigger.callback` is None so we bail at the guard above, but 
`trigger.callback` is lazy-loaded (not in `Trigger.bulk_fetch`'s eager options) 
-- so every asset-watcher trigger creation now pays one extra SELECT. Worth 
adding `joinedload(Trigger.callback)` to `Trigger.bulk_fetch` to avoid the N+1, 
since you're going to access it for the callback path anyway.



##########
airflow-core/src/airflow/triggers/callback.py:
##########
@@ -41,6 +41,9 @@ def __init__(self, callback_path: str, callback_kwargs: 
dict[str, Any] | None =
         super().__init__()
         self.callback_path = callback_path
         self.callback_kwargs = callback_kwargs or {}
+        # Context is set by the TriggerRunner (from dag_run_data) before run() 
is called.
+        # This attribute is set externally; defaults to None for triggers 
without context.
+        self.context: dict | None = None

Review Comment:
   Naming a runtime-injected attribute `self.context` on a `BaseTrigger` 
subclass is risky -- `context` is a very generic name and a subclass / future 
refactor could shadow it. More importantly, this attribute is *not* in 
`serialize()`, so anyone reconstructing a `CallbackTrigger` from `(classpath, 
kwargs)` (tests, manual replays, future HA reshuffles that reload from DB 
without going through `TriggerRunner.create_triggers`) ends up with 
`context=None` and the callback silently runs without context.
   
   Consider `_runner_context` or `_injected_context` to signal it's set 
externally, and a docstring note that it's a runtime-only attribute 
deliberately excluded from serialization.



##########
airflow-core/src/airflow/triggers/callback.py:
##########
@@ -52,8 +55,11 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
         try:
             yield TriggerEvent({PAYLOAD_STATUS_KEY: CallbackState.RUNNING})
             callback = import_string(self.callback_path)
-            # TODO: get full context and run template rendering. Right now, a 
simple context is included in `callback_kwargs`
-            context = self.callback_kwargs.pop("context", None)
+
+            # Context is set on this instance by the TriggerRunner (built from 
dag_run_data).
+            # For backward compatibility, fall back to extracting "context" 
from callback_kwargs
+            # (pre-upgrade triggers that were queued before this change).
+            context = self.context or self.callback_kwargs.pop("context", None)

Review Comment:
   `callback_kwargs.pop("context", None)` mutates `self.callback_kwargs`. If 
`run()` is ever invoked twice on the same instance (a retry path, or some 
watcher pattern), the second call sees `self.context = None` AND 
`kwargs["context"]` already popped, so the callback runs without context. Use 
`.get("context", None)` -- the pre-upgrade compat path is meant to be read-only.
   
   Also worth a comment that this fallback can be removed in N+1 (once no 
triggers queued before this PR can possibly still be in-flight).



##########
airflow-core/src/airflow/jobs/triggerer_job_runner.py:
##########
@@ -1184,6 +1222,42 @@ def create_runtime_ti(
             ),
         )
 
+    @staticmethod
+    def _build_context_from_dag_run_data(dag_run_data: dict) -> Context:
+        """
+        Build a standard Context dict from serialized dag_run_data for 
callback triggers.
+
+        This provides the same DagRun-level context fields (dag_run, run_id, 
logical_date,
+        ds, ts, etc.) that task-bound triggers get via 
RuntimeTaskInstance.get_template_context(),
+        but without task-specific fields since callbacks are not tied to a 
task.
+        """
+        from airflow._shared.timezones.timezone import coerce_datetime
+        from airflow.api_fastapi.execution_api.datamodels.taskinstance import 
DagRun as DRDataModel
+
+        dag_run = DRDataModel(**dag_run_data)
+        context: Context = {"dag_run": dag_run}  # type: ignore[typeddict-item]
+
+        if logical_date := coerce_datetime(dag_run.logical_date):
+            ds = logical_date.strftime("%Y-%m-%d")
+            ts = logical_date.isoformat()
+            context.update(
+                {
+                    "logical_date": logical_date,
+                    "run_id": dag_run.run_id,
+                    "ds": ds,
+                    "ds_nodash": ds.replace("-", ""),
+                    "ts": ts,
+                    "ts_nodash": logical_date.strftime("%Y%m%dT%H%M%S"),
+                    "ts_nodash_with_tz": ts.replace("-", "").replace(":", ""),
+                    "data_interval_start": 
coerce_datetime(dag_run.data_interval_start),
+                    "data_interval_end": 
coerce_datetime(dag_run.data_interval_end),

Review Comment:
   This `_build_context_from_dag_run_data` and `_fetch_and_build_context` in 
`callback_supervisor.py` are ~95% the same logic, and both duplicate the 
DagRun-derived block of `RuntimeTaskInstance.get_template_context()` 
(task_runner.py:317-358). Three copies of the same `{ds, ts, ds_nodash, 
ts_nodash, ts_nodash_with_tz, data_interval_start, data_interval_end}` 
derivation is going to drift -- the `coerce_datetime` divergence I flagged in 
callback_supervisor.py:374 already happened on first write.
   
   Worth extracting one shared helper, e.g. 
`build_dag_run_template_fields(dag_run) -> dict` in `airflow.sdk._shared` (or 
wherever both sides can import from), and calling it from all three places.



##########
airflow-core/src/airflow/models/deadline.py:
##########
@@ -215,45 +215,22 @@ def prune_deadlines(cls, *, session: Session, conditions: 
dict[Mapped, Any]) ->
 
     def handle_miss(self, session: Session):
         """Handle a missed deadline by queueing the callback."""
+        # DagRun identifiers at top level for routing (triggerer/executor uses 
these to fetch context).
+        self.callback.data["dag_id"] = self.dagrun.dag_id
+        self.callback.data["run_id"] = self.dagrun.run_id

Review Comment:
   Stashing routing identifiers (`dag_id`, `run_id`) at the top level of 
`callback.data` mixes them in with the user-facing payload that gets serialized 
into the trigger row and surfaced in `repr()` / metrics. 
`Callback.get_dag_id()` already reads `data["dag_id"]` so this part is 
consistent, but as more routing fields land (you've already got `dag_run_id` in 
the `ExecutorCallback` branch) it'd be cleaner to namespace them, e.g. 
`callback.data["_routing"] = {"dag_id": ..., "run_id": ..., "dag_run_id": 
...}`. Not blocking, but worth deciding before the schema solidifies.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to