amoghrajesh commented on PR #61118:
URL: https://github.com/apache/airflow/pull/61118#issuecomment-3822062006

   Thanks @ferruzzi!
   
   
   Just to gain some more confidence prior to merge, I tried it out 
functionally.
   
   Used this custom callback:
   
   ```python
   async def custom_async_callback(**kwargs):
       context = kwargs.get("context", {})
       print(
           f"Deadline exceeded for Dag {context.get('dag_run', 
{}).get('dag_id')}!"
       )
       print(f"Context: {context}")
       print(f"Alert type: {kwargs.get('alert_type')}")
   
   ```
   
   This DAG:
   ```python
   from datetime import timedelta
   
   from deadline_callback import custom_async_callback
   
   from airflow.providers.standard.operators.bash import BashOperator
   from airflow.sdk import DAG
   from airflow.sdk.definitions.deadline import AsyncCallback, DeadlineAlert, 
DeadlineReference
   
   with DAG(
       dag_id="custom_deadline_alert",
       deadline=DeadlineAlert(
           reference=DeadlineReference.DAGRUN_QUEUED_AT,
           interval=timedelta(seconds=10),
           callback=AsyncCallback(
               custom_async_callback,
               kwargs={"alert_type": "time_exceeded"},
           ),
       ),
   ):
       BashOperator(task_id="example_task", bash_command="sleep 30")
   
   ```
   
   And it works as I expect it to:
   
   <img width="2559" height="1005" alt="image" 
src="https://github.com/user-attachments/assets/1d02859e-5f52-49c3-94e9-403a8d1c8d64";
 />
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to