Copilot commented on code in PR #64984:
URL: https://github.com/apache/airflow/pull/64984#discussion_r3066491797
##########
airflow-core/src/airflow/triggers/callback.py:
##########
@@ -32,6 +33,38 @@
PAYLOAD_BODY_KEY = "body"
+def _is_notifier_class(callback: Any) -> bool:
+ """
+ Check if the callback is a BaseNotifier subclass (not an instance).
+
+ Uses duck-typing (checks for ``async_notify`` and ``template_fields``)
+ to avoid importing ``airflow.sdk`` in core.
+ """
+ return (
+ inspect.isclass(callback)
+ and hasattr(callback, "async_notify")
+ and hasattr(callback, "template_fields")
+ and hasattr(callback, "__await__")
+ )
+
+
+def _render_callback_kwargs(kwargs: dict[str, Any], context: dict) ->
dict[str, Any]:
+ """
+ Render Jinja2 templates in callback kwargs using the provided context.
+
+ Uses ``Templater.render_template`` to recursively render all string values
+ in the kwargs dict. Non-string values (int, float, datetime, …) pass
+ through unchanged.
+ """
+ # Use CallbackTrigger (which inherits Templater via BaseTrigger) to access
+ # render_template without importing airflow.sdk directly in core.
+ from jinja2.sandbox import SandboxedEnvironment
+
+ trigger = CallbackTrigger(callback_path="", callback_kwargs={})
+ jinja_env = SandboxedEnvironment(cache_size=0)
+ return trigger.render_template(kwargs, cast("Any", context), jinja_env)
Review Comment:
_render_callback_kwargs() builds a raw
`jinja2.sandbox.SandboxedEnvironment`, which bypasses Airflow’s templating
environment (custom sandbox behavior, filters like `ds/ts`, extensions, etc.).
This can make template rendering for plain function callbacks behave
differently than Notifier rendering (which uses
`Templater.get_template_env()`). Consider using
`CallbackTrigger.get_template_env()` (or importing Airflow’s SDK
`SandboxedEnvironment` from `airflow.sdk.definitions._internal.templater`)
instead of the raw Jinja environment, and drop the string-based `cast("Any",
...)` in favor of a proper type (or no cast).
##########
airflow-core/src/airflow/models/deadline.py:
##########
@@ -216,28 +216,50 @@ def prune_deadlines(cls, *, session: Session, conditions:
dict[Mapped, Any]) ->
def handle_miss(self, session: Session):
"""Handle a missed deadline by queueing the callback."""
- def get_simple_context():
+ def _build_deadline_context():
from airflow.api_fastapi.core_api.datamodels.dag_run import
DAGRunResponse
from airflow.models import DagRun
- # TODO: Use the TaskAPI from within Triggerer to fetch full
context instead of sending this context
- # from the scheduler
-
- # Fetch the DagRun from the database again to avoid errors when
self.dagrun's relationship fields
- # are not in the current session.
+ # Fetch the DagRun from the database again to avoid errors when
self.dagrun's
+ # relationship fields are not in the current session.
dagrun = session.get(DagRun, self.dagrun_id)
+ logical_date = dagrun.logical_date
- return {
+ context: dict[str, Any] = {
+ # Full DAGRunResponse as a JSON-serializable dict
"dag_run":
DAGRunResponse.model_validate(dagrun).model_dump(mode="json"),
- "deadline": {"id": self.id, "deadline_time":
self.deadline_time},
+ # Top-level convenience keys for Jinja templates (match
standard context naming)
+ "dag_id": dagrun.dag_id,
+ "run_id": dagrun.run_id,
+ "logical_date": logical_date,
+ "data_interval_start": dagrun.data_interval_start,
+ "data_interval_end": dagrun.data_interval_end,
+ "run_type": dagrun.run_type,
+ "conf": dagrun.conf or {},
+ # Deadline-specific information
+ "deadline": {
+ "id": self.id,
+ "deadline_time": self.deadline_time,
+ "alert_name": self.deadline_alert.name if
self.deadline_alert else None,
+ },
Review Comment:
`alert_name` is derived via `self.deadline_alert.name`, but
`Deadline.deadline_alert` is not eager-loaded in the scheduler’s deadline query
(it currently selectinloads only `callback` and `dagrun`). Since
`handle_miss()` is called in a loop, this will trigger a per-deadline lazy-load
query (N+1). Either eager-load `Deadline.deadline_alert` in the scheduler
query, or avoid relationship access here by fetching the name in bulk/alongside
the deadline rows.
##########
airflow-core/tests/unit/models/test_deadline.py:
##########
@@ -234,9 +234,29 @@ def test_handle_miss(self, dagrun, session):
context = callback_kwargs.pop("context")
assert callback_kwargs == TEST_CALLBACK_KWARGS
+ # Verify enriched context — dag_run and deadline info
+ assert context["dag_run"] ==
DAGRunResponse.model_validate(dagrun).model_dump(mode="json")
assert context["deadline"]["id"] == deadline_orm.id
assert context["deadline"]["deadline_time"].timestamp() ==
deadline_orm.deadline_time.timestamp()
- assert context["dag_run"] ==
DAGRunResponse.model_validate(dagrun).model_dump(mode="json")
+ assert context["deadline"]["alert_name"] is None # no deadline_alert
in this test
Review Comment:
This test validates the enriched context shape, but it mocks
`deadline_orm.callback.queue()`, so it doesn’t exercise the new context through
`TriggererCallback.queue()` → `Trigger.from_object()` →
`airflow.sdk.serde.serialize()`. Since the enriched context now includes
additional types (UUIDs/datetimes/nested dicts), a regression test should
ensure the callback can be queued successfully and the trigger kwargs can be
serialized/deserialized without error.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]