hkc-8010 commented on PR #64737:
URL: https://github.com/apache/airflow/pull/64737#issuecomment-4188980992

   Tested this PR on my Airflow local environment with Celery executor running 
multiple replicas of scheduler and triggerer, and it's working as expected. 
Below is the DAG that I used for testing. Thanks @shivaam.
   
   ```
   from __future__ import annotations
   
   from datetime import datetime, timedelta
   
   from airflow.providers.slack.notifications.slack_webhook import 
SlackWebhookNotifier
   from airflow.sdk import dag, task
   from airflow.sdk.definitions.deadline import AsyncCallback, DeadlineAlert, 
DeadlineReference
   
   SLACK_WEBHOOK_CONN_ID = "cre_testing_channel"
   # Sleep longer than the deadline so the DagRun is still running when the 
alert fires.
   SLEEP_SECONDS_PAST_DEADLINE = 180
   
   
   @dag(
       dag_id="example_deadline_callback_slack",
       description="Deadline alert to Slack if the DagRun is still running 2 
minutes after queue",
       schedule=None,
       start_date=datetime(2024, 1, 1),
       catchup=False,
       tags=["example", "deadline", "slack"],
       deadline=DeadlineAlert(
           reference=DeadlineReference.DAGRUN_QUEUED_AT,
           interval=timedelta(minutes=1),
           callback=AsyncCallback(
               SlackWebhookNotifier,
               kwargs={
                   "slack_webhook_conn_id": SLACK_WEBHOOK_CONN_ID,
                   "text": (
                       ":alarm_clock: *Deadline missed* — DAG `{{ 
dag_run.dag_id }}` had not finished "
                       "within 1 minutes of being queued.\n"
                       "*Deadline time:* {{ deadline.deadline_time }}\n"
                       "*Dag run:* {{ dag_run }}"
                   ),
               },
           ),
       ),
   )
   def example_deadline_callback_slack():
       @task
       def run_longer_than_deadline() -> None:
           import logging
           import time
   
           log = logging.getLogger(__name__)
           log.info("Sleeping %s s so the deadline can fire while this run is 
still active.", SLEEP_SECONDS_PAST_DEADLINE)
           time.sleep(SLEEP_SECONDS_PAST_DEADLINE)
   
       run_longer_than_deadline()
   
   
   dag_instance = example_deadline_callback_slack()
   
   if __name__ == "__main__":
       dag_instance.test()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to