amoghrajesh commented on PR #61118:
URL: https://github.com/apache/airflow/pull/61118#issuecomment-3822062006
Thanks @ferruzzi!
Just to gain some more confidence prior to merge, I tried it out
functionally.
Used this custom callback:
```python
async def custom_async_callback(**kwargs):
context = kwargs.get("context", {})
print(
f"Deadline exceeded for Dag {context.get('dag_run',
{}).get('dag_id')}!"
)
print(f"Context: {context}")
print(f"Alert type: {kwargs.get('alert_type')}")
```
This DAG:
```python
from datetime import timedelta
from deadline_callback import custom_async_callback
from airflow.providers.standard.operators.bash import BashOperator
from airflow.sdk import DAG
from airflow.sdk.definitions.deadline import AsyncCallback, DeadlineAlert,
DeadlineReference
with DAG(
dag_id="custom_deadline_alert",
deadline=DeadlineAlert(
reference=DeadlineReference.DAGRUN_QUEUED_AT,
interval=timedelta(seconds=10),
callback=AsyncCallback(
custom_async_callback,
kwargs={"alert_type": "time_exceeded"},
),
),
):
BashOperator(task_id="example_task", bash_command="sleep 30")
```
And it works as I expect it to:
<img width="2559" height="1005" alt="image"
src="https://github.com/user-attachments/assets/1d02859e-5f52-49c3-94e9-403a8d1c8d64"
/>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]