ferruzzi opened a new pull request, #57222:
URL: https://github.com/apache/airflow/pull/57222
Greatly improves the user experience when creating a new DeadlineReference
and the documentation for the process.
Before this change, the process looked like this:
```python
from airflow._shared.timezones import timezone
from airflow.models.deadline import ReferenceModels
from sqlalchemy.orm import Session
from airflow.sdk.definitions.deadline import DeadlineReference
class MyCustomRef(ReferenceModels.BaseDeadlineReference):
def _evaluate_with(self, *, session: Session, **kwargs) -> datetime:
new_deadline = some_business_logic()
return new_deadline
# Step 1: Register the custom reference with ReferenceModels.
# This makes it discoverable by get_reference_class()
ReferenceModels.MyCustomRef = MyCustomRef
# Step 2: Add to the appropriate TYPES tuple for timing classification.
DeadlineReference.TYPES.DAGRUN_CREATED =
DeadlineReference.TYPES.DAGRUN_CREATED + (MyCustomRef,)
# Step 3: Update the combined DAGRUN tuple.
DeadlineReference.TYPES.DAGRUN = DeadlineReference.TYPES.DAGRUN_CREATED +
DeadlineReference.TYPES.DAGRUN_QUEUED
```
After these changes, that same custom reference user code will look like
this:
```python
from airflow._shared.timezones import timezone
from airflow.models.deadline import ReferenceModels
from sqlalchemy.orm import Session
from airflow.sdk.definitions.deadline import DeadlineReference,
deadline_reference
@deadline_reference()
class MyDecoratedCustomRef(ReferenceModels.BaseDeadlineReference):
def _evaluate_with(self, *, session: Session, **kwargs) -> datetime:
new_deadline = some_business_logic()
return new_deadline
```
In use they are transparent from built-in deadlines:
```python
from datetime import timedelta, datetime
from airflow import DAG
from airflow.providers.slack.notifications.slack_webhook import
SlackWebhookNotifier
from airflow.sdk import task
from airflow.sdk.definitions.deadline import DeadlineAlert,
DeadlineReference, AsyncCallback
@task.bash(task_id='sleep_task')
def sleep_10_secs():
return 'sleep 10'
with DAG(
dag_id="custom_past_reference",
tags=["slack", "custom_ref"],
deadline=DeadlineAlert(
reference=DeadlineReference.MyDecoratedCustomRef,
interval=timedelta(seconds=1),
callback=AsyncCallback(
callback_callable=SlackWebhookNotifier,
kwargs={"text": f"Custom reference deadline in the past; Alert
should trigger!"},
)
)
):
sleep_10_secs()
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]