Lee-W commented on code in PR #53727:
URL: https://github.com/apache/airflow/pull/53727#discussion_r2275267341


##########
airflow-core/docs/howto/deadline-alerts.rst:
##########
@@ -0,0 +1,312 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+Deadline Alerts
+===============
+
+Deadline Alerts allow you to set time thresholds for your Dag runs and 
automatically respond when those
+thresholds are exceeded. You can set up Deadline Alerts by choosing a built-in 
reference point, setting
+an interval, and defining a response using either Airflow's Notifiers or a 
custom callback function.
+
+Creating a Deadline Alert
+-------------------------
+
+Creating a Deadline Alert requires three mandatory parameters:
+
+* Reference: When to start counting from
+* Interval: How far before or after the reference point to trigger the alert
+* Callback: A Callback object which contains a path to a callable and optional 
kwargs to pass to it if the deadline is exceeded
+
+Here is how Deadlines are calculated:
+
+::
+
+    [Reference] ------ [Interval] ------> [Deadline]
+        ^                                     ^
+        |                                     |
+     Start time                          Trigger point
+
+Below is an example Dag implementation. If the Dag has not finished 15 minutes 
after it was queued, send a Slack message:
+
+.. code-block:: python
+
+    from datetime import datetime, timedelta
+    from airflow import DAG
+    from airflow.sdk.definitions.deadline import AsyncCallback, DeadlineAlert, 
DeadlineReference
+    from airflow.providers.slack.notifications.slack_webhook import 
SlackWebhookNotifier
+    from airflow.providers.standard.operators.empty import EmptyOperator
+
+    with DAG(
+        dag_id="deadline_alert_example",
+        deadline=DeadlineAlert(
+            reference=DeadlineReference.DAGRUN_QUEUED_AT,
+            interval=timedelta(minutes=15),
+            callback=AsyncCallback(
+                SlackWebhookNotifier,
+                kwargs={
+                    "slack_conn_id": "slack_default",
+                    "channel": "#alerts",
+                    "text": "Dag 'slack_deadline_alert' still running after 30 
minutes.",
+                    "username": "Airflow Alerts",
+                },
+            ),
+        ),
+    ):
+        EmptyOperator(task_id="example_task")
+
+The timeline for this example would look like this:
+
+::
+
+    |------|-----------|---------|-----------|--------|
+        Scheduled    Queued    Started    Deadline
+         00:00       00:03      00:05      00:18
+
+Using Built-in References
+-------------------------
+
+Airflow provides several built-in reference points that you can use with 
DeadlineAlert:
+
+``DeadlineReference.DAGRUN_QUEUED_AT``
+    Measures time from when the Dag run was queued. Useful for monitoring 
resource constraints.
+
+``DeadlineReference.DAGRUN_LOGICAL_DATE``
+    References when the Dag run was scheduled to start. For example, setting 
an interval of
+    ``timedelta(minutes=15)`` would trigger the alert if the Dag hasn't 
completed 15 minutes
+    after it was scheduled to start, regardless of when (or if) it actually 
began executing.
+    Useful for ensuring scheduled Dags complete before their next scheduled 
run.
+
+``DeadlineReference.FIXED_DATETIME``
+    Specifies a fixed point in time. Useful when Dags must complete by a 
specific time.
+
+Here's an example using a fixed datetime:
+
+.. code-block:: python
+
+    tomorrow_at_ten = datetime.combine(datetime.now().date() + 
timedelta(days=1), time(10, 0))
+
+    with DAG(
+        dag_id="fixed_deadline_alert",
+        deadline=DeadlineAlert(
+            reference=DeadlineReference.FIXED_DATETIME(tomorrow_at_ten),
+            interval=timedelta(minutes=-30),  # Alert 30 minutes before the 
reference.
+            callback=AsyncCallback(
+                SlackWebhookNotifier,
+                kwargs={
+                    "slack_conn_id": "slack_default",
+                    "channel": "#alerts",
+                    "text": "Dag 'slack_deadline_alert' still running after 30 
minutes.",
+                    "username": "Airflow Alerts",
+                },
+            ),
+        ),
+    ):
+        EmptyOperator(task_id="example_task")
+
+The timeline for this example would look like this:
+
+::
+
+    |------|----------|---------|------------|--------|
+         Queued     Start    Deadline    Reference
+         09:15      09:17     09:30       10:00
+
+.. note::
+    Note that since the interval is a negative value, the deadline is before 
the reference in this case.
+
+Using Callbacks
+---------------
+
+When a deadline is exceeded, the callback is executed. You can use an existing 
:doc:`Notifier </howto/notifications>`
+or create a custom callback function.  A callback must be either an 
:class:`~airflow.sdk.definitions.deadline.AsyncCallback`
+or a :class:`~airflow.sdk.definitions.deadline.SyncCallback`.
+
+Using Built-in Notifiers
+^^^^^^^^^^^^^^^^^^^^^^^^
+
+Here's an example using the Slack Notifier if the Dag run has not finished 
within 30 minutes of it being queued:
+
+.. code-block:: python
+
+    with DAG(
+        dag_id="slack_deadline_alert",
+        deadline=DeadlineAlert(
+            reference=DeadlineReference.DAGRUN_QUEUED_AT,
+            interval=timedelta(minutes=30),
+            callback=AsyncCallback(
+                SlackWebhookNotifier,
+                kwargs={
+                    "slack_conn_id": "slack_default",
+                    "channel": "#alerts",
+                    "text": "Dag 'slack_deadline_alert' still running after 30 
minutes.",
+                    "username": "Airflow Alerts",
+                },
+            ),
+        ),
+    ):
+        EmptyOperator(task_id="example_task")
+
+Creating Custom Callbacks
+^^^^^^^^^^^^^^^^^^^^^^^^^
+
+You can create custom callbacks for more complex handling. The ``kwargs`` 
specified in the ``Callback``
+are passed to the callback function, if any are provided.  **Synchronous 
callbacks** (standard python

Review Comment:
   ```suggestion
   are passed to the callback function, if any are provided. **Synchronous 
callbacks** (standard python
   ```



##########
airflow-core/docs/howto/deadline-alerts.rst:
##########
@@ -0,0 +1,312 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+Deadline Alerts
+===============
+
+Deadline Alerts allow you to set time thresholds for your Dag runs and 
automatically respond when those
+thresholds are exceeded. You can set up Deadline Alerts by choosing a built-in 
reference point, setting
+an interval, and defining a response using either Airflow's Notifiers or a 
custom callback function.
+
+Creating a Deadline Alert
+-------------------------
+
+Creating a Deadline Alert requires three mandatory parameters:
+
+* Reference: When to start counting from
+* Interval: How far before or after the reference point to trigger the alert
+* Callback: A Callback object which contains a path to a callable and optional 
kwargs to pass to it if the deadline is exceeded
+
+Here is how Deadlines are calculated:
+
+::
+
+    [Reference] ------ [Interval] ------> [Deadline]
+        ^                                     ^
+        |                                     |
+     Start time                          Trigger point
+
+Below is an example Dag implementation. If the Dag has not finished 15 minutes 
after it was queued, send a Slack message:
+
+.. code-block:: python
+
+    from datetime import datetime, timedelta
+    from airflow import DAG
+    from airflow.sdk.definitions.deadline import AsyncCallback, DeadlineAlert, 
DeadlineReference
+    from airflow.providers.slack.notifications.slack_webhook import 
SlackWebhookNotifier
+    from airflow.providers.standard.operators.empty import EmptyOperator
+
+    with DAG(
+        dag_id="deadline_alert_example",
+        deadline=DeadlineAlert(
+            reference=DeadlineReference.DAGRUN_QUEUED_AT,
+            interval=timedelta(minutes=15),
+            callback=AsyncCallback(
+                SlackWebhookNotifier,
+                kwargs={
+                    "slack_conn_id": "slack_default",
+                    "channel": "#alerts",
+                    "text": "Dag 'slack_deadline_alert' still running after 30 
minutes.",
+                    "username": "Airflow Alerts",
+                },
+            ),
+        ),
+    ):
+        EmptyOperator(task_id="example_task")
+
+The timeline for this example would look like this:
+
+::
+
+    |------|-----------|---------|-----------|--------|
+        Scheduled    Queued    Started    Deadline
+         00:00       00:03      00:05      00:18
+
+Using Built-in References
+-------------------------
+
+Airflow provides several built-in reference points that you can use with 
DeadlineAlert:
+
+``DeadlineReference.DAGRUN_QUEUED_AT``
+    Measures time from when the Dag run was queued. Useful for monitoring 
resource constraints.
+
+``DeadlineReference.DAGRUN_LOGICAL_DATE``
+    References when the Dag run was scheduled to start. For example, setting 
an interval of
+    ``timedelta(minutes=15)`` would trigger the alert if the Dag hasn't 
completed 15 minutes
+    after it was scheduled to start, regardless of when (or if) it actually 
began executing.
+    Useful for ensuring scheduled Dags complete before their next scheduled 
run.
+
+``DeadlineReference.FIXED_DATETIME``
+    Specifies a fixed point in time. Useful when Dags must complete by a 
specific time.
+
+Here's an example using a fixed datetime:
+
+.. code-block:: python
+
+    tomorrow_at_ten = datetime.combine(datetime.now().date() + 
timedelta(days=1), time(10, 0))
+
+    with DAG(
+        dag_id="fixed_deadline_alert",
+        deadline=DeadlineAlert(
+            reference=DeadlineReference.FIXED_DATETIME(tomorrow_at_ten),
+            interval=timedelta(minutes=-30),  # Alert 30 minutes before the 
reference.
+            callback=AsyncCallback(
+                SlackWebhookNotifier,
+                kwargs={
+                    "slack_conn_id": "slack_default",
+                    "channel": "#alerts",
+                    "text": "Dag 'slack_deadline_alert' still running after 30 
minutes.",
+                    "username": "Airflow Alerts",
+                },
+            ),
+        ),
+    ):
+        EmptyOperator(task_id="example_task")
+
+The timeline for this example would look like this:
+
+::
+
+    |------|----------|---------|------------|--------|
+         Queued     Start    Deadline    Reference
+         09:15      09:17     09:30       10:00
+
+.. note::
+    Note that since the interval is a negative value, the deadline is before 
the reference in this case.
+
+Using Callbacks
+---------------
+
+When a deadline is exceeded, the callback is executed. You can use an existing 
:doc:`Notifier </howto/notifications>`
+or create a custom callback function.  A callback must be either an 
:class:`~airflow.sdk.definitions.deadline.AsyncCallback`
+or a :class:`~airflow.sdk.definitions.deadline.SyncCallback`.
+
+Using Built-in Notifiers
+^^^^^^^^^^^^^^^^^^^^^^^^
+
+Here's an example using the Slack Notifier if the Dag run has not finished 
within 30 minutes of it being queued:
+
+.. code-block:: python
+
+    with DAG(
+        dag_id="slack_deadline_alert",
+        deadline=DeadlineAlert(
+            reference=DeadlineReference.DAGRUN_QUEUED_AT,
+            interval=timedelta(minutes=30),
+            callback=AsyncCallback(
+                SlackWebhookNotifier,
+                kwargs={
+                    "slack_conn_id": "slack_default",
+                    "channel": "#alerts",
+                    "text": "Dag 'slack_deadline_alert' still running after 30 
minutes.",
+                    "username": "Airflow Alerts",
+                },
+            ),
+        ),
+    ):
+        EmptyOperator(task_id="example_task")
+
+Creating Custom Callbacks
+^^^^^^^^^^^^^^^^^^^^^^^^^
+
+You can create custom callbacks for more complex handling. The ``kwargs`` 
specified in the ``Callback``
+are passed to the callback function, if any are provided.  **Synchronous 
callbacks** (standard python
+methods) can be defined in the dag bundle and are run in the Executor.  
**Asynchronous callbacks** must

Review Comment:
   ```suggestion
   methods) can be defined in the dag bundle and are run in the Executor. 
**Asynchronous callbacks** must
   ```



##########
airflow-core/docs/core-concepts/dags.rst:
##########
@@ -829,3 +829,40 @@ if it fails for ``N`` number of times consecutively.
 we can also provide and override these configuration from DAG argument:
 
 - ``max_consecutive_failed_dag_runs``: Overrides 
:ref:`config:core__max_consecutive_failed_dag_runs_per_dag`.
+
+Deadline Alerts
+---------------
+
+.. versionadded:: 3.1
+
+Deadline Alerts allow you to set time thresholds for your DAG runs and 
automatically respond when those

Review Comment:
   I noticed other files have been updated. This one might be missed 



##########
airflow-core/docs/core-concepts/dags.rst:
##########
@@ -829,3 +829,40 @@ if it fails for ``N`` number of times consecutively.
 we can also provide and override these configuration from DAG argument:
 
 - ``max_consecutive_failed_dag_runs``: Overrides 
:ref:`config:core__max_consecutive_failed_dag_runs_per_dag`.
+
+Deadline Alerts
+---------------
+
+.. versionadded:: 3.1
+
+Deadline Alerts allow you to set time thresholds for your DAG runs and 
automatically respond when those

Review Comment:
   Sounds good 🙂 These can be all replaced as Dag before we merge the PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to