ferruzzi opened a new pull request, #57222:
URL: https://github.com/apache/airflow/pull/57222

   Greatly improves the user experience when creating a new DeadlineReference 
and the documentation for the process.  
   
   
   Before this change, the process looked like this:
   
   ```python
   from airflow._shared.timezones import timezone
   from airflow.models.deadline import ReferenceModels
   from sqlalchemy.orm import Session
   
   from airflow.sdk.definitions.deadline import DeadlineReference
   
   
   class MyCustomRef(ReferenceModels.BaseDeadlineReference):
       def _evaluate_with(self, *, session: Session, **kwargs) -> datetime:
           new_deadline = some_business_logic()
           return new_deadline
   
   
   # Step 1: Register the custom reference with ReferenceModels.
   # This makes it discoverable by get_reference_class()
   ReferenceModels.MyCustomRef = MyCustomRef
   
   # Step 2: Add to the appropriate TYPES tuple for timing classification.
   DeadlineReference.TYPES.DAGRUN_CREATED = 
DeadlineReference.TYPES.DAGRUN_CREATED + (MyCustomRef,)
   
   # Step 3: Update the combined DAGRUN tuple.
   DeadlineReference.TYPES.DAGRUN = DeadlineReference.TYPES.DAGRUN_CREATED + 
DeadlineReference.TYPES.DAGRUN_QUEUED
   
   ```
   
   After these changes, that same custom reference user code will look like 
this:
   
   ```python
   from airflow._shared.timezones import timezone
   from airflow.models.deadline import ReferenceModels
   from sqlalchemy.orm import Session
   
   from airflow.sdk.definitions.deadline import DeadlineReference, 
deadline_reference
   
   @deadline_reference()
   class MyDecoratedCustomRef(ReferenceModels.BaseDeadlineReference):
       def _evaluate_with(self, *, session: Session, **kwargs) -> datetime:
           new_deadline = some_business_logic()
           return new_deadline
   ```
   
   In use they are transparent from built-in deadlines:
   
   ```python
   from datetime import timedelta, datetime
   
   from airflow import DAG
   from airflow.providers.slack.notifications.slack_webhook import 
SlackWebhookNotifier
   from airflow.sdk import task
   from airflow.sdk.definitions.deadline import DeadlineAlert, 
DeadlineReference, AsyncCallback
   
   @task.bash(task_id='sleep_task')
   def sleep_10_secs():
       return 'sleep 10'
   
   with DAG(
       dag_id="custom_past_reference",
       tags=["slack", "custom_ref"],
       deadline=DeadlineAlert(
           reference=DeadlineReference.MyDecoratedCustomRef,
           interval=timedelta(seconds=1),
           callback=AsyncCallback(
               callback_callable=SlackWebhookNotifier,
               kwargs={"text": f"Custom reference deadline in the past; Alert 
should trigger!"},
           )
       )
   ):
       sleep_10_secs()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to