ferruzzi commented on code in PR #57222:
URL: https://github.com/apache/airflow/pull/57222#discussion_r2466822391


##########
airflow-core/docs/howto/deadline-alerts.rst:
##########
@@ -328,24 +328,95 @@ Custom References
 ^^^^^^^^^^^^^^^^^
 
 While the built-in references should cover most use cases, and more will be 
released over time, you
-can create custom references by implementing a class that inherits from 
DeadlineReference.  This may
-be useful if you have calendar integrations or other sources that you want to 
use as a reference.
+can create custom references.  This may be useful if you have calendar 
integrations or other sources
+that you want to use as a reference.  You can create custom references by 
implementing a class that
+inherits from BaseDeadlineReference, give it am _evaluate_with() method, and 
register it.  There are
+two ways to accomplish this.  The recommended way is to use the 
``@deadline_reference`` decorator
+but for more complicated implementations, the ``register_custom_reference()`` 
method is available.
+
+**Recommended: Using the decorator**
 
 .. code-block:: python
 
-    class CustomReference(DeadlineReference):
-        """A deadline reference that uses a custom data source."""
+    from airflow._shared.timezones.timezone import datetime
+    from airflow.models.deadline import ReferenceModels
+    from sqlalchemy.orm import Session
+
+    from airflow.sdk.definitions.deadline import DeadlineReference, 
deadline_reference
+
+
+    # By default, the evaluate_with method will be executed when the dagrun is 
created.
+    @deadline_reference()
+    class MyCustomDecoratedReference(ReferenceModels.BaseDeadlineReference):
+        """A custom reference evaluated when DAG runs are created."""
+
+        def _evaluate_with(self, *, session: Session, **kwargs) -> datetime:
+            # Add your business logic here
+            return your_datetime
+
+
+    # You can specify when evaluate_with will be called by providing a 
DeadlineReference.TYPES value.
+    @deadline_reference(DeadlineReference.TYPES.DAGRUN_QUEUED)
+    class MyQueuedReference(ReferenceModels.BaseDeadlineReference):
+        """A custom reference evaluated when DAG runs are queued."""
+
+        required_kwargs = {"custom_param"}
+
+        def _evaluate_with(self, *, session: Session, **kwargs) -> datetime:
+            custom_value = kwargs["custom_param"]
+            # Use custom_value in your calculation
+            return your_datetime
+
+**Alternative: Manual Registration**
+
+For advanced use cases requiring conditional or dynamic registration, you may 
wish use the registration method directly.
+In this case, the plugin file will look something like this:
+
+.. code-block:: python
+
+    from sqlalchemy.orm import Session
+
+    from airflow.models.deadline import ReferenceModels
+    from airflow.sdk.definitions.deadline import DeadlineReference
 
-        # Define any required parameters for your reference
-        required_kwargs = {"custom_id"}
 
+    class MyManualReference(ReferenceModels.BaseDeadlineReference):
         def _evaluate_with(self, *, session: Session, **kwargs) -> datetime:
-            """
-            Evaluate the reference time using the provided session and kwargs.
-
-            The session parameter can be used for database queries, and kwargs
-            will contain any required parameters defined in required_kwargs.
-            """
-            custom_id = kwargs["custom_id"]
-            # Your custom logic here to determine the reference time
+            # Add your business logic here
             return your_datetime
+
+
+    # Register with specific timing based on configuration
+    timing = (
+        DeadlineReference.TYPES.DAGRUN_QUEUED if use_queued_timing else 
DeadlineReference.TYPES.DAGRUN_CREATED
+    )
+    DeadlineReference.register_custom_reference(MyManualReference, timing)
+
+**Using Custom References in DAGs**
+
+Once registered, use your custom references in DAG definitions like any other 
reference:
+
+.. code-block:: python
+
+    from datetime import timedelta
+    from airflow import DAG
+    from airflow.sdk.definitions.deadline import AsyncCallback, DeadlineAlert, 
DeadlineReference
+
+    with DAG(
+        dag_id="custom_reference_example",
+        deadline=DeadlineAlert(
+            reference=DeadlineReference.MyCustomDecoratedReference(),
+            interval=timedelta(hours=2),
+            callback=AsyncCallback(my_callback),
+        ),
+    ):
+        # Your tasks here
+        ...
+
+**Important Notes:**
+
+* **Timezone Awareness**: Always return timezone-aware datetime objects
+* **Plugin Placement**: Place custom references in plugin files (e.g., 
``plugins/my_deadline_references.py``)
+* **Scheduler Restart**: Restart the Airflow scheduler after adding or 
modifying custom references

Review Comment:
   Oh, thanks for pointing that out.  I've been doing manual testing and it 
turns out on Breeze all you have to restart is the API Server, but I'm not 
actually sure WHY at the moment, maybe it'll make more sense to someone else.  
   
   For my manual testing, I'm using this in a  plugins file:
   
   ```python
   from airflow.sdk.timezone import datetime
   from airflow.models.deadline import ReferenceModels
   from sqlalchemy.orm import Session
   
   from airflow.sdk.definitions.deadline import deadline_reference
   
   
   @deadline_reference()
   class MyDecoratedPastRef(ReferenceModels.BaseDeadlineReference):
       def _evaluate_with(self, *, session: Session, **kwargs) -> datetime:
           return datetime(1234, 5, 6, 7, 8, 9)  # Returning a datetime in the 
past will always fire the callback
   ```
   
   and this dag
   
   ```python
   from datetime import timedelta, datetime
   
   from airflow import DAG
   from airflow.providers.slack.notifications.slack_webhook import 
SlackWebhookNotifier
   from airflow.sdk import task
   from airflow.sdk.definitions.deadline import DeadlineAlert, 
DeadlineReference, AsyncCallback
   
   @task.bash(task_id='sleep_task')
   def sleep_10_secs():
       return 'sleep 10'
   
   with DAG(
       dag_id="custom_past_reference",
       tags=["slack", "custom_ref"],
       deadline=DeadlineAlert(
           reference=DeadlineReference.MyDecoratedPastRef,
           interval=timedelta(seconds=1),
           callback=AsyncCallback(
               callback_callable=SlackWebhookNotifier,
               kwargs={"text": "Custom reference deadline for {{ 
(deadline.deadline_time | ds)[:4] }}; Alert should trigger!"},
           )
       )
   ):
       sleep_10_secs()
   ```
   
   Changing the year on the Reference only requires restarting the API Server 
for the new year to print in the Slack message.  I'm not sure I fully 
understand why at this point, but that line in the docs should be updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to