This is an automated email from the ASF dual-hosted git repository.
ferruzzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 5609361e40d Docs updates for Deadlines for 3.2 (#62494)
5609361e40d is described below
commit 5609361e40d031f97a3f32b2b55f22e1182a7d2b
Author: D. Ferruzzi <[email protected]>
AuthorDate: Mon Mar 9 18:22:34 2026 -0700
Docs updates for Deadlines for 3.2 (#62494)
* Docs updates for Deadlines for 3.2
---
.../logging-monitoring/callbacks.rst | 10 ++
airflow-core/docs/howto/deadline-alerts.rst | 150 ++++++++++++++++++---
airflow-core/newsfragments/61153.significant.rst | 19 +++
3 files changed, 163 insertions(+), 16 deletions(-)
diff --git
a/airflow-core/docs/administration-and-deployment/logging-monitoring/callbacks.rst
b/airflow-core/docs/administration-and-deployment/logging-monitoring/callbacks.rst
index 25838377e5c..9cbd3b2976b 100644
---
a/airflow-core/docs/administration-and-deployment/logging-monitoring/callbacks.rst
+++
b/airflow-core/docs/administration-and-deployment/logging-monitoring/callbacks.rst
@@ -163,3 +163,13 @@ Here's an example of using a custom notifier:
For a list of community-managed Notifiers, see
:doc:`apache-airflow-providers:core-extensions/notifications`.
For more information on writing a custom Notifier, see the :doc:`Notifiers
<../../howto/notifications>` how-to page.
+
+Deadline Alert Callbacks
+^^^^^^^^^^^^^^^^^^^^^^^^
+
+In addition to the Dag/task lifecycle callbacks above, Airflow supports
**Deadline Alert** callbacks which
+trigger when a Dag run exceeds a configured time threshold. Deadline Alert
callbacks use
+:class:`~airflow.sdk.AsyncCallback` (runs in the Triggerer) or
:class:`~airflow.sdk.SyncCallback`
+(runs in the executor) and are configured on the Dag via the ``deadline``
parameter.
+
+For full details, see :doc:`/howto/deadline-alerts`.
diff --git a/airflow-core/docs/howto/deadline-alerts.rst
b/airflow-core/docs/howto/deadline-alerts.rst
index 643e17fc185..1ed9750bf4e 100644
--- a/airflow-core/docs/howto/deadline-alerts.rst
+++ b/airflow-core/docs/howto/deadline-alerts.rst
@@ -21,13 +21,15 @@ Deadline Alerts
.. warning::
Deadline Alerts are new in Airflow 3.1 and should be considered
experimental. The feature may be
- subject to changes in 3.2 without warning based on user feedback.
+ subject to changes in future versions without warning based on user feedback.
|experimental|
Deadline Alerts allow you to set time thresholds for your Dag runs and
automatically respond when those
-thresholds are exceeded. You can set up Deadline Alerts by choosing a built-in
reference point, setting
-an interval, and defining a response using either Airflow's Notifiers or a
custom callback function.
+thresholds are exceeded. You configure Deadline Alerts by choosing a reference
point, setting an interval,
+and defining a callback to execute if the deadline is missed. A reference may
be one of the built-in
+DeadlineReference options such as when the dagrun is queued or any custom
method that returns a timestamp.
+The callback can either be one of Airflow's Notifiers or a custom callback
function.
Migrating from SLA
------------------
@@ -57,8 +59,7 @@ Below is an example Dag implementation. If the Dag has not
finished 15 minutes a
.. code-block:: python
from datetime import datetime, timedelta
- from airflow import DAG
- from airflow.sdk import AsyncCallback, DeadlineAlert, DeadlineReference
+ from airflow.sdk import AsyncCallback, DAG, DeadlineAlert,
DeadlineReference
from airflow.providers.slack.notifications.slack_webhook import
SlackWebhookNotifier
from airflow.providers.standard.operators.empty import EmptyOperator
@@ -196,18 +197,19 @@ Using Callbacks
---------------
When a deadline is exceeded, the callback's callable is executed with the
specified kwargs. You can use an
-existing :doc:`Notifier </howto/notifications>` or create a custom callable.
A callback must be an
-:class:`~airflow.sdk.AsyncCallback`, with support coming soon for
:class:`~airflow.sdk.SyncCallback`.
+existing :doc:`Notifier </howto/notifications>` or create a custom callable.
A callback must be either an
+:class:`~airflow.sdk.AsyncCallback`, or a :class:`~airflow.sdk.SyncCallback`
(SyncCallback support added in 3.2).
Using Built-in Notifiers
^^^^^^^^^^^^^^^^^^^^^^^^
-Here's an example using the Slack Notifier if the Dag run has not finished
within 30 minutes of it being queued:
+Here's an example using the Slack Notifier with an **asynchronous callback**
if the Dag run has not finished
+within 30 minutes of it being queued. The callback runs in the Triggerer:
.. code-block:: python
with DAG(
- dag_id="slack_deadline_alert",
+ dag_id="slack_deadline_alert_async",
deadline=DeadlineAlert(
reference=DeadlineReference.DAGRUN_QUEUED_AT,
interval=timedelta(minutes=30),
@@ -221,13 +223,33 @@ Here's an example using the Slack Notifier if the Dag run
has not finished withi
):
EmptyOperator(task_id="example_task")
+Here's the same example using a **synchronous callback**. The callback runs in
the executor:
+
+.. code-block:: python
+
+ with DAG(
+ dag_id="slack_deadline_alert_sync",
+ deadline=DeadlineAlert(
+ reference=DeadlineReference.DAGRUN_QUEUED_AT,
+ interval=timedelta(minutes=30),
+ callback=SyncCallback(
+ SlackWebhookNotifier,
+ kwargs={
+ "text": "🚨 Dag {{ dag_run.dag_id }} missed deadline at {{
deadline.deadline_time }}. DagRun: {{ dag_run }}"
+ },
+ ),
+ ),
+ ):
+ EmptyOperator(task_id="example_task")
+
Creating Custom Callbacks
^^^^^^^^^^^^^^^^^^^^^^^^^
You can create custom callables for more complex handling. If ``kwargs`` are
specified in the ``Callback``,
they are passed to the callback function. **Asynchronous callbacks** must be
defined somewhere in the
-Triggerer's system path.
+Triggerer's system path. **Synchronous callbacks** must be importable on the
worker where they will be executed.
+
.. note::
Regarding Async Custom Deadline callbacks:
@@ -237,6 +259,11 @@ Triggerer's system path.
Nested callables are not currently supported.
* The Triggerer will need to be restarted when a callback is added or
changed in order to reload the file.
+.. note::
+ Regarding Synchronous callbacks:
+
+ * Sync callbacks are sent to the executor and treated just like a Dag task
with top priority.
+
.. note::
**Airflow ``context``:** When a deadline is missed, Airflow automatically
provides a ``context``
kwarg into the callback containing information about the Dag run and the
deadline. To receive it,
@@ -245,9 +272,60 @@ Triggerer's system path.
the callable accepts. The ``context`` keyword is reserved and cannot be
used in the ``kwargs``
parameter of a ``Callback``; attempting to do so will raise a
``ValueError`` at DAG parse time.
+
+A **custom synchronous callback** might look like this:
+
+1. Place this method in your plugins folder (e.g.
``$AIRFLOW_HOME/plugins/deadline_callbacks.py``):
+
+.. code-block:: python
+
+ def custom_sync_callback(**kwargs):
+ """Handle deadline violation with custom logic."""
+ context = kwargs.get("context", {})
+ print(f"Deadline exceeded for Dag {context.get('dag_run',
{}).get('dag_id')}!")
+ print(f"Context: {context}")
+ print(f"Alert type: {kwargs.get('alert_type')}")
+ # Additional custom handling here
+
+2. Place this in a Dag file:
+
+.. code-block:: python
+
+ from datetime import timedelta
+
+ from deadline_callbacks import custom_sync_callback
+
+ from airflow.providers.standard.operators.empty import EmptyOperator
+ from airflow.sdk import DAG, DeadlineAlert, DeadlineReference, SyncCallback
+
+ with DAG(
+ dag_id="custom_sync_deadline_alert",
+ deadline=DeadlineAlert(
+ reference=DeadlineReference.DAGRUN_QUEUED_AT,
+ interval=timedelta(minutes=15),
+ callback=SyncCallback(
+ custom_sync_callback,
+ kwargs={"alert_type": "time_exceeded"},
+ ),
+ ),
+ ):
+ EmptyOperator(task_id="example_task")
+
+.. tip::
+ ``SyncCallback`` accepts an optional ``executor`` parameter to target a
specific executor.
+ If not specified, the default executor is used.
+
+ .. code-block:: python
+
+ SyncCallback(
+ my_callback,
+ kwargs={"msg": "deadline missed"},
+ executor="celery_executor",
+ )
+
A **custom asynchronous callback** might look like this:
-1. Place this method in ``/files/plugins/deadline_callbacks.py``:
+1. Place this method in your plugins folder (e.g.
``$AIRFLOW_HOME/plugins/deadline_callbacks.py``):
.. code-block:: python
@@ -268,9 +346,8 @@ A **custom asynchronous callback** might look like this:
from deadline_callbacks import custom_async_callback
- from airflow import DAG
from airflow.providers.standard.operators.empty import EmptyOperator
- from airflow.sdk import AsyncCallback, DeadlineAlert, DeadlineReference
+ from airflow.sdk import AsyncCallback, DAG, DeadlineAlert,
DeadlineReference
with DAG(
dag_id="custom_deadline_alert",
@@ -302,7 +379,7 @@ A deadline's trigger time is calculated by adding the
``interval`` to the dateti
the ``reference``. For ``FIXED_DATETIME`` references, negative intervals can
be particularly
useful to trigger the callback *before* the reference time.
-For example:
+In the following examples, ``notify_team`` is either a SyncCallback or
AsyncCallback defined elsewhere:
.. code-block:: python
@@ -386,8 +463,7 @@ Once registered [see notes below], use your custom
references in Dag definitions
.. code-block:: python
from datetime import timedelta
- from airflow import DAG
- from airflow.sdk import AsyncCallback, DeadlineAlert, DeadlineReference
+ from airflow.sdk import AsyncCallback, DAG, DeadlineAlert,
DeadlineReference
with DAG(
dag_id="custom_reference_example",
@@ -400,6 +476,48 @@ Once registered [see notes below], use your custom
references in Dag definitions
# Your tasks here
...
+Multiple Deadline Alerts
+^^^^^^^^^^^^^^^^^^^^^^^^
+
+A Dag can have multiple Deadline Alerts. Pass a list to the ``deadline``
parameter instead of a single
+``DeadlineAlert``. Each alert in the list is evaluated independently, and each
may use any combination
+of reference points and callback types (sync or async).
+
+.. code-block:: python
+
+ from datetime import timedelta
+ from airflow.sdk import AsyncCallback, DAG, DeadlineAlert,
DeadlineReference, SyncCallback
+ from airflow.providers.slack.notifications.slack_webhook import
SlackWebhookNotifier
+ from airflow.providers.standard.operators.empty import EmptyOperator
+
+ with DAG(
+ dag_id="multiple_deadline_alerts",
+ deadline=[
+ # First alert: warn via Slack (async) if not done 30 min after
queuing
+ DeadlineAlert(
+ reference=DeadlineReference.DAGRUN_QUEUED_AT,
+ interval=timedelta(minutes=30),
+ callback=AsyncCallback(
+ SlackWebhookNotifier,
+ kwargs={"text": "⚠️ Dag {{ dag_run.dag_id }} is
approaching its deadline."},
+ ),
+ ),
+ # Second alert: escalate via custom sync callback if not done 60
min after queuing
+ DeadlineAlert(
+ reference=DeadlineReference.DAGRUN_QUEUED_AT,
+ interval=timedelta(minutes=60),
+ callback=SyncCallback(
+ "my_plugins.escalation.escalate_to_oncall",
+ kwargs={"severity": "high"},
+ ),
+ ),
+ ],
+ ):
+ EmptyOperator(task_id="example_task")
+
+This pattern is useful for creating tiered alerting strategies — for example,
a warning notification
+followed by a more urgent escalation if the Dag is still running.
+
**Important Notes:**
* **Timezone Awareness**: Always return timezone-aware datetime objects.
diff --git a/airflow-core/newsfragments/61153.significant.rst
b/airflow-core/newsfragments/61153.significant.rst
new file mode 100644
index 00000000000..51f4727c240
--- /dev/null
+++ b/airflow-core/newsfragments/61153.significant.rst
@@ -0,0 +1,19 @@
+Add synchronous callback support (``SyncCallback``) for Deadline Alerts
+
+Deadline Alerts now support synchronous callbacks via ``SyncCallback`` in
addition to the existing
+asynchronous ``AsyncCallback``. Synchronous callbacks are executed by the
executor (rather than
+the triggerer), and can optionally target a specific executor via the
``executor`` parameter.
+
+A DAG can also define multiple Deadline Alerts by passing a list to the
``deadline`` parameter,
+and each alert can use either callback type.
+
+* Types of change
+
+ * [ ] Dag changes
+ * [ ] Config changes
+ * [ ] API changes
+ * [ ] CLI changes
+ * [x] Behaviour changes
+ * [ ] Plugin changes
+ * [ ] Dependency changes
+ * [ ] Code interface changes