Copilot commented on code in PR #64984:
URL: https://github.com/apache/airflow/pull/64984#discussion_r3066491797


##########
airflow-core/src/airflow/triggers/callback.py:
##########
@@ -32,6 +33,38 @@
 PAYLOAD_BODY_KEY = "body"
 
 
+def _is_notifier_class(callback: Any) -> bool:
+    """
+    Check if the callback is a BaseNotifier subclass (not an instance).
+
+    Uses duck-typing (checks for ``async_notify`` and ``template_fields``)
+    to avoid importing ``airflow.sdk`` in core.
+    """
+    return (
+        inspect.isclass(callback)
+        and hasattr(callback, "async_notify")
+        and hasattr(callback, "template_fields")
+        and hasattr(callback, "__await__")
+    )
+
+
+def _render_callback_kwargs(kwargs: dict[str, Any], context: dict) -> 
dict[str, Any]:
+    """
+    Render Jinja2 templates in callback kwargs using the provided context.
+
+    Uses ``Templater.render_template`` to recursively render all string values
+    in the kwargs dict.  Non-string values (int, float, datetime, …) pass
+    through unchanged.
+    """
+    # Use CallbackTrigger (which inherits Templater via BaseTrigger) to access
+    # render_template without importing airflow.sdk directly in core.
+    from jinja2.sandbox import SandboxedEnvironment
+
+    trigger = CallbackTrigger(callback_path="", callback_kwargs={})
+    jinja_env = SandboxedEnvironment(cache_size=0)
+    return trigger.render_template(kwargs, cast("Any", context), jinja_env)

Review Comment:
   _render_callback_kwargs() builds a raw 
`jinja2.sandbox.SandboxedEnvironment`, which bypasses Airflow’s templating 
environment (custom sandbox behavior, filters like `ds/ts`, extensions, etc.). 
This can make template rendering for plain function callbacks behave 
differently than Notifier rendering (which uses 
`Templater.get_template_env()`). Consider using 
`CallbackTrigger.get_template_env()` (or importing Airflow’s SDK 
`SandboxedEnvironment` from `airflow.sdk.definitions._internal.templater`) 
instead of the raw Jinja environment, and drop the string-based `cast("Any", 
...)` in favor of a proper type (or no cast).



##########
airflow-core/src/airflow/models/deadline.py:
##########
@@ -216,28 +216,50 @@ def prune_deadlines(cls, *, session: Session, conditions: 
dict[Mapped, Any]) ->
     def handle_miss(self, session: Session):
         """Handle a missed deadline by queueing the callback."""
 
-        def get_simple_context():
+        def _build_deadline_context():
             from airflow.api_fastapi.core_api.datamodels.dag_run import 
DAGRunResponse
             from airflow.models import DagRun
 
-            # TODO: Use the TaskAPI from within Triggerer to fetch full 
context instead of sending this context
-            #  from the scheduler
-
-            # Fetch the DagRun from the database again to avoid errors when 
self.dagrun's relationship fields
-            # are not in the current session.
+            # Fetch the DagRun from the database again to avoid errors when 
self.dagrun's
+            # relationship fields are not in the current session.
             dagrun = session.get(DagRun, self.dagrun_id)
+            logical_date = dagrun.logical_date
 
-            return {
+            context: dict[str, Any] = {
+                # Full DAGRunResponse as a JSON-serializable dict
                 "dag_run": 
DAGRunResponse.model_validate(dagrun).model_dump(mode="json"),
-                "deadline": {"id": self.id, "deadline_time": 
self.deadline_time},
+                # Top-level convenience keys for Jinja templates (match 
standard context naming)
+                "dag_id": dagrun.dag_id,
+                "run_id": dagrun.run_id,
+                "logical_date": logical_date,
+                "data_interval_start": dagrun.data_interval_start,
+                "data_interval_end": dagrun.data_interval_end,
+                "run_type": dagrun.run_type,
+                "conf": dagrun.conf or {},
+                # Deadline-specific information
+                "deadline": {
+                    "id": self.id,
+                    "deadline_time": self.deadline_time,
+                    "alert_name": self.deadline_alert.name if 
self.deadline_alert else None,
+                },

Review Comment:
   `alert_name` is derived via `self.deadline_alert.name`, but 
`Deadline.deadline_alert` is not eager-loaded in the scheduler’s deadline query 
(it currently selectinloads only `callback` and `dagrun`). Since 
`handle_miss()` is called in a loop, this will trigger a per-deadline lazy-load 
query (N+1). Either eager-load `Deadline.deadline_alert` in the scheduler 
query, or avoid relationship access here by fetching the name in bulk/alongside 
the deadline rows.



##########
airflow-core/tests/unit/models/test_deadline.py:
##########
@@ -234,9 +234,29 @@ def test_handle_miss(self, dagrun, session):
         context = callback_kwargs.pop("context")
         assert callback_kwargs == TEST_CALLBACK_KWARGS
 
+        # Verify enriched context — dag_run and deadline info
+        assert context["dag_run"] == 
DAGRunResponse.model_validate(dagrun).model_dump(mode="json")
         assert context["deadline"]["id"] == deadline_orm.id
         assert context["deadline"]["deadline_time"].timestamp() == 
deadline_orm.deadline_time.timestamp()
-        assert context["dag_run"] == 
DAGRunResponse.model_validate(dagrun).model_dump(mode="json")
+        assert context["deadline"]["alert_name"] is None  # no deadline_alert 
in this test

Review Comment:
   This test validates the enriched context shape, but it mocks 
`deadline_orm.callback.queue()`, so it doesn’t exercise the new context through 
`TriggererCallback.queue()` → `Trigger.from_object()` → 
`airflow.sdk.serde.serialize()`. Since the enriched context now includes 
additional types (UUIDs/datetimes/nested dicts), a regression test should 
ensure the callback can be queued successfully and the trigger kwargs can be 
serialized/deserialized without error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to