This is an automated email from the ASF dual-hosted git repository.

ferruzzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 2f70917cfb1 Improve UX for adding custom DeadlineReferences (#57222)
2f70917cfb1 is described below

commit 2f70917cfb1e02708d54a38a62c6cb5aa2075180
Author: D. Ferruzzi <[email protected]>
AuthorDate: Tue Nov 25 13:55:59 2025 -0800

    Improve UX for adding custom DeadlineReferences (#57222)
    
    * Improve UX for adding custom DeadlineReferences
---
 airflow-core/docs/howto/deadline-alerts.rst      |  83 ++++++--
 airflow-core/tests/unit/models/test_deadline.py  | 234 ++++++++++++++++++++++-
 task-sdk/src/airflow/sdk/definitions/deadline.py |  89 ++++++++-
 3 files changed, 383 insertions(+), 23 deletions(-)

diff --git a/airflow-core/docs/howto/deadline-alerts.rst 
b/airflow-core/docs/howto/deadline-alerts.rst
index 74f10f1b636..11c2ec97602 100644
--- a/airflow-core/docs/howto/deadline-alerts.rst
+++ b/airflow-core/docs/howto/deadline-alerts.rst
@@ -105,13 +105,13 @@ Airflow provides several built-in reference points that 
you can use with Deadlin
     Specifies a fixed point in time. Useful when Dags must complete by a 
specific time.
 
 ``DeadlineReference.AVERAGE_RUNTIME``
-    Calculates deadlines based on the average runtime of previous DAG runs. 
This reference
+    Calculates deadlines based on the average runtime of previous Dag runs. 
This reference
     analyzes historical execution data to predict when the current run should 
complete.
     The deadline is set to the current time plus the calculated average 
runtime plus the interval.
     If insufficient historical data exists, no deadline is created.
 
     Parameters:
-        * ``max_runs`` (int, optional): Maximum number of recent DAG runs to 
analyze. Defaults to 10.
+        * ``max_runs`` (int, optional): Maximum number of recent Dag runs to 
analyze. Defaults to 10.
         * ``min_runs`` (int, optional): Minimum number of completed runs 
required to calculate average. Defaults to same value as ``max_runs``.
 
     Example usage:
@@ -138,7 +138,7 @@ Here's an example using average runtime:
             interval=timedelta(minutes=30),  # Alert if 30 minutes past 
average runtime
             callback=AsyncCallback(
                 SlackWebhookNotifier,
-                kwargs={"text": "🚨 DAG {{ dag_run.dag_id }} is running longer 
than expected!"},
+                kwargs={"text": "🚨 Dag {{ dag_run.dag_id }} is running longer 
than expected!"},
             ),
         ),
     ):
@@ -327,25 +327,72 @@ you to create deadlines that suit a wide variety of 
operational requirements.
 Custom References
 ^^^^^^^^^^^^^^^^^
 
-While the built-in references should cover most use cases, and more will be 
released over time, you
-can create custom references by implementing a class that inherits from 
DeadlineReference.  This may
-be useful if you have calendar integrations or other sources that you want to 
use as a reference.
+
+The built-in references handle most common scenarios. However, you may need to 
create custom
+references for specific integrations like calendars or other data sources. To 
do this, create
+a class that inherits from BaseDeadlineReference, add the 
``@deadline_reference`` decorator, and
+implement an ``_evaluate_with()`` method.
+
+
+**Creating a Custom Reference**
 
 .. code-block:: python
 
-    class CustomReference(DeadlineReference):
-        """A deadline reference that uses a custom data source."""
+    from airflow.models.deadline import ReferenceModels
+    from sqlalchemy.orm import Session
+
+    from airflow.sdk.definitions.deadline import DeadlineReference, 
deadline_reference
+    from airflow.sdk.timezone import datetime
+
+
+    # By default, the evaluate_with method will be executed when the dagrun is 
created.
+    @deadline_reference()
+    class MyCustomDecoratedReference(ReferenceModels.BaseDeadlineReference):
+        """A custom reference evaluated when Dag runs are created."""
+
+        def _evaluate_with(self, *, session: Session, **kwargs) -> datetime:
+            # Add your business logic here
+            return your_datetime
+
+
+    # You can specify when evaluate_with will be called by providing a 
DeadlineReference.TYPES value.
+    @deadline_reference(DeadlineReference.TYPES.DAGRUN_QUEUED)
+    class MyQueuedReference(ReferenceModels.BaseDeadlineReference):
+        """A custom reference evaluated when Dag runs are queued."""
 
-        # Define any required parameters for your reference
-        required_kwargs = {"custom_id"}
+        required_kwargs = {"custom_param"}
 
         def _evaluate_with(self, *, session: Session, **kwargs) -> datetime:
-            """
-            Evaluate the reference time using the provided session and kwargs.
-
-            The session parameter can be used for database queries, and kwargs
-            will contain any required parameters defined in required_kwargs.
-            """
-            custom_id = kwargs["custom_id"]
-            # Your custom logic here to determine the reference time
+            custom_value = kwargs["custom_param"]
+            # Use custom_value in your calculation
             return your_datetime
+
+
+**Using a Custom Reference in a Dag**
+
+Once registered [see notes below], use your custom references in Dag 
definitions like any other reference:
+
+.. code-block:: python
+
+    from datetime import timedelta
+    from airflow import DAG
+    from airflow.sdk.definitions.deadline import AsyncCallback, DeadlineAlert, 
DeadlineReference
+
+    with DAG(
+        dag_id="custom_reference_example",
+        deadline=DeadlineAlert(
+            reference=DeadlineReference.MyCustomDecoratedReference(),
+            interval=timedelta(hours=2),
+            callback=AsyncCallback(my_callback),
+        ),
+    ):
+        # Your tasks here
+        ...
+
+**Important Notes:**
+
+* **Timezone Awareness**: Always return timezone-aware datetime objects.
+* **Plugin Placement**: One convenient place for custom references is in the 
plugins directory.
+* **API Server Restart**: Restart the Airflow API Server after adding or 
modifying custom references.
+* **Required Parameters**: Use ``required_kwargs`` to specify parameters your 
reference needs.
+* **Database Access**: Use the ``session`` parameter for Airflow database 
queries if needed.
diff --git a/airflow-core/tests/unit/models/test_deadline.py 
b/airflow-core/tests/unit/models/test_deadline.py
index 5ca04dd4849..d2e57592031 100644
--- a/airflow-core/tests/unit/models/test_deadline.py
+++ b/airflow-core/tests/unit/models/test_deadline.py
@@ -18,6 +18,7 @@ from __future__ import annotations
 
 import re
 from datetime import datetime, timedelta
+from typing import TYPE_CHECKING
 from unittest import mock
 
 import pytest
@@ -28,13 +29,17 @@ from airflow.api_fastapi.core_api.datamodels.dag_run import 
DAGRunResponse
 from airflow.models import DagRun
 from airflow.models.deadline import Deadline, ReferenceModels, _fetch_from_db
 from airflow.providers.standard.operators.empty import EmptyOperator
+from airflow.sdk import timezone
 from airflow.sdk.definitions.callback import AsyncCallback, SyncCallback
-from airflow.sdk.definitions.deadline import DeadlineReference
+from airflow.sdk.definitions.deadline import DeadlineReference, 
deadline_reference
 from airflow.utils.state import DagRunState
 
 from tests_common.test_utils import db
 from unit.models import DEFAULT_DATE
 
+if TYPE_CHECKING:
+    from sqlalchemy.orm import Session
+
 DAG_ID = "dag_id_1"
 INVALID_DAG_ID = "invalid_dag_id"
 INVALID_RUN_ID = -1
@@ -57,6 +62,9 @@ TEST_CALLBACK_KWARGS = {"arg1": "value1"}
 TEST_ASYNC_CALLBACK = AsyncCallback(TEST_CALLBACK_PATH, 
kwargs=TEST_CALLBACK_KWARGS)
 TEST_SYNC_CALLBACK = SyncCallback(TEST_CALLBACK_PATH, 
kwargs=TEST_CALLBACK_KWARGS)
 
+ORIGINAL_DAGRUN_QUEUED = frozenset(DeadlineReference.TYPES.DAGRUN_QUEUED)
+ORIGINAL_DAGRUN_CREATED = frozenset(DeadlineReference.TYPES.DAGRUN_CREATED)
+
 
 def _clean_db():
     db.clear_db_dags()
@@ -64,6 +72,23 @@ def _clean_db():
     db.clear_db_deadline()
 
 
+def assert_correct_timing(reference, expected_timing):
+    assert reference in DeadlineReference.TYPES.DAGRUN
+    if expected_timing == DeadlineReference.TYPES.DAGRUN_CREATED:
+        assert reference in DeadlineReference.TYPES.DAGRUN_CREATED
+        assert reference not in DeadlineReference.TYPES.DAGRUN_QUEUED
+    elif expected_timing == DeadlineReference.TYPES.DAGRUN_QUEUED:
+        assert reference in DeadlineReference.TYPES.DAGRUN_QUEUED
+        assert reference not in DeadlineReference.TYPES.DAGRUN_CREATED
+
+
+def assert_builtin_types_unchanged(current_queued, current_created):
+    for builtin_type in ORIGINAL_DAGRUN_CREATED:
+        assert builtin_type in current_created
+    for builtin_type in ORIGINAL_DAGRUN_QUEUED:
+        assert builtin_type in current_queued
+
+
 @pytest.fixture
 def dagrun(session, dag_maker):
     with dag_maker(DAG_ID):
@@ -455,7 +480,9 @@ class TestDeadlineReference:
             # Verify only expected kwargs are passed through.
             expected_kwargs = {k: conditions[k] for k in 
reference.required_kwargs if k in conditions}
             expected_kwargs["session"] = session
+
             mock_evaluate.assert_called_once_with(**expected_kwargs)
+
             assert result == DEFAULT_DATE + self.DEFAULT_INTERVAL
 
     @pytest.mark.parametrize("reference", REFERENCE_TYPES)
@@ -473,6 +500,9 @@ class TestDeadlineReference:
             # Let the lack of an exception here effectively assert that no 
exception is raised.
             reference.evaluate_with(session=session, **self.DEFAULT_ARGS)
 
+        for required_param in reference.required_kwargs:
+            assert required_param in str(raised_exception.value)
+
     def test_deadline_reference_creation(self):
         """Test that DeadlineReference provides consistent interface and 
types."""
         fixed_reference = DeadlineReference.FIXED_DATETIME(DEFAULT_DATE)
@@ -494,3 +524,205 @@ class TestDeadlineReference:
         custom_reference = DeadlineReference.AVERAGE_RUNTIME(max_runs=5, 
min_runs=3)
         assert custom_reference.max_runs == 5
         assert custom_reference.min_runs == 3
+
+
+class TestCustomDeadlineReference:
+    class MyCustomRef(ReferenceModels.BaseDeadlineReference):
+        def _evaluate_with(self, *, session: Session, **kwargs) -> datetime:
+            return timezone.datetime(DEFAULT_DATE)
+
+    class MyInvalidCustomRef:
+        pass
+
+    class MyCustomRefWithKwargs(ReferenceModels.BaseDeadlineReference):
+        required_kwargs = {"custom_id"}
+
+        def _evaluate_with(self, *, session: Session, **kwargs) -> datetime:
+            return timezone.datetime(DEFAULT_DATE)
+
+    def setup_method(self):
+        self.original_dagrun_created = DeadlineReference.TYPES.DAGRUN_CREATED
+        self.original_dagrun_queued = DeadlineReference.TYPES.DAGRUN_QUEUED
+        self.original_dagrun = DeadlineReference.TYPES.DAGRUN
+        self.original_attrs = set(dir(ReferenceModels))
+        self.original_deadline_attrs = set(dir(DeadlineReference))
+
+    def teardown_method(self):
+        DeadlineReference.TYPES.DAGRUN_CREATED = self.original_dagrun_created
+        DeadlineReference.TYPES.DAGRUN_QUEUED = self.original_dagrun_queued
+        DeadlineReference.TYPES.DAGRUN = self.original_dagrun
+
+        for attr in set(dir(ReferenceModels)):
+            if attr not in self.original_attrs:
+                delattr(ReferenceModels, attr)
+
+        for attr in set(dir(DeadlineReference)):
+            if attr not in self.original_deadline_attrs:
+                delattr(DeadlineReference, attr)
+
+    @pytest.mark.parametrize(
+        "reference",
+        [
+            pytest.param(MyCustomRef, id="basic_custom_reference"),
+            pytest.param(MyCustomRefWithKwargs, 
id="custom_reference_with_kwargs"),
+        ],
+    )
+    @pytest.mark.parametrize(
+        "timing",
+        [
+            pytest.param(None, id="default_timing"),
+            pytest.param(DeadlineReference.TYPES.DAGRUN_CREATED, 
id="dagrun_created"),
+            pytest.param(DeadlineReference.TYPES.DAGRUN_QUEUED, 
id="dagrun_queued"),
+        ],
+    )
+    def test_register_custom_reference(self, timing, reference):
+        if timing is None:
+            result = DeadlineReference.register_custom_reference(reference)
+            expected_timing = DeadlineReference.TYPES.DAGRUN_CREATED
+        else:
+            result = DeadlineReference.register_custom_reference(reference, 
timing)
+            expected_timing = timing
+
+        assert result is reference
+        assert getattr(ReferenceModels, reference.__name__) is reference
+        assert getattr(DeadlineReference, reference.__name__).__class__ is 
reference
+
+        assert_correct_timing(reference, expected_timing)
+        assert_builtin_types_unchanged(
+            DeadlineReference.TYPES.DAGRUN_QUEUED, 
DeadlineReference.TYPES.DAGRUN_CREATED
+        )
+
+    def test_register_custom_reference_invalid_inheritance(self):
+        with pytest.raises(ValueError, match="must inherit from 
BaseDeadlineReference"):
+            
DeadlineReference.register_custom_reference(self.MyInvalidCustomRef)
+
+    def test_register_custom_reference_invalid_timing(self):
+        invalid_timing = ("not", "a", "valid", "timing")
+
+        with pytest.raises(
+            ValueError,
+            match=re.escape(
+                f"Invalid deadline reference type {invalid_timing}; "
+                f"must be a valid DeadlineReference.TYPES option."
+            ),
+        ):
+            DeadlineReference.register_custom_reference(self.MyCustomRef, 
invalid_timing)
+
+    def test_custom_reference_discoverable_by_get_reference_class(self):
+        DeadlineReference.register_custom_reference(self.MyCustomRef)
+
+        found_class = 
ReferenceModels.get_reference_class(self.MyCustomRef.__name__)
+
+        assert found_class is self.MyCustomRef
+
+
+class TestDeadlineReferenceDecorator:
+    def setup_method(self):
+        self.original_dagrun_created = DeadlineReference.TYPES.DAGRUN_CREATED
+        self.original_dagrun_queued = DeadlineReference.TYPES.DAGRUN_QUEUED
+        self.original_dagrun = DeadlineReference.TYPES.DAGRUN
+        self.original_attrs = set(dir(ReferenceModels))
+
+    def teardown_method(self):
+        DeadlineReference.TYPES.DAGRUN_CREATED = self.original_dagrun_created
+        DeadlineReference.TYPES.DAGRUN_QUEUED = self.original_dagrun_queued
+        DeadlineReference.TYPES.DAGRUN = self.original_dagrun
+
+        for attr in set(dir(ReferenceModels)):
+            if attr not in self.original_attrs:
+                delattr(ReferenceModels, attr)
+
+    @staticmethod
+    def create_decorated_custom_ref():
+        @deadline_reference()
+        class DecoratedCustomRef(ReferenceModels.BaseDeadlineReference):
+            def _evaluate_with(self, *, session: Session, **kwargs) -> 
datetime:
+                return timezone.datetime(DEFAULT_DATE)
+
+        return DecoratedCustomRef
+
+    @staticmethod
+    def create_decorated_custom_ref_with_kwargs():
+        @deadline_reference()
+        class 
DecoratedCustomRefWithKwargs(ReferenceModels.BaseDeadlineReference):
+            required_kwargs = {"custom_id"}
+
+            def _evaluate_with(self, *, session: Session, **kwargs) -> 
datetime:
+                return timezone.datetime(DEFAULT_DATE)
+
+        return DecoratedCustomRefWithKwargs
+
+    @staticmethod
+    def create_decorated_custom_ref_queued():
+        @deadline_reference(DeadlineReference.TYPES.DAGRUN_QUEUED)
+        class DecoratedCustomRefQueued(ReferenceModels.BaseDeadlineReference):
+            def _evaluate_with(self, *, session: Session, **kwargs) -> 
datetime:
+                return timezone.datetime(DEFAULT_DATE)
+
+        return DecoratedCustomRefQueued
+
+    @pytest.mark.parametrize(
+        ("reference_factory", "expected_timing"),
+        [
+            pytest.param(
+                create_decorated_custom_ref,
+                DeadlineReference.TYPES.DAGRUN_CREATED,
+                id="basic_decorated_custom_ref",
+            ),
+            pytest.param(
+                create_decorated_custom_ref_with_kwargs,
+                DeadlineReference.TYPES.DAGRUN_CREATED,
+                id="decorated_ref_with_kwargs",
+            ),
+            pytest.param(
+                create_decorated_custom_ref_queued,
+                DeadlineReference.TYPES.DAGRUN_QUEUED,
+                id="decorated_ref_queued",
+            ),
+        ],
+    )
+    def test_deadline_reference_decorator(self, reference_factory, 
expected_timing):
+        reference = reference_factory()
+
+        assert getattr(ReferenceModels, reference.__name__) is reference
+        assert getattr(DeadlineReference, reference.__name__).__class__ is 
reference
+
+        assert_correct_timing(reference, expected_timing)
+        assert_builtin_types_unchanged(
+            DeadlineReference.TYPES.DAGRUN_QUEUED, 
DeadlineReference.TYPES.DAGRUN_CREATED
+        )
+
+    def test_deadline_reference_decorator_with_invalid_class(self):
+        """Test that the decorator raises error for invalid classes."""
+        with pytest.raises(ValueError, match="InvalidDecoratedRef must inherit 
from BaseDeadlineReference"):
+
+            @deadline_reference()
+            class InvalidDecoratedRef:
+                pass
+
+    def test_deadline_reference_decorator_with_invalid_timing(self):
+        invalid_timing = ("not", "a", "valid", "timing")
+
+        with pytest.raises(
+            ValueError,
+            match=re.escape(
+                f"Invalid deadline reference type {invalid_timing}; "
+                f"must be a valid DeadlineReference.TYPES option."
+            ),
+        ):
+
+            @deadline_reference(invalid_timing)
+            class DecoratedCustomRef(ReferenceModels.BaseDeadlineReference):
+                def _evaluate_with(self, *, session: Session, **kwargs) -> 
datetime:
+                    return timezone.datetime(DEFAULT_DATE)
+
+    @mock.patch.object(DeadlineReference, "register_custom_reference")
+    def test_deadline_reference_decorator_calls_register_method(self, 
mock_register):
+        timing = DeadlineReference.TYPES.DAGRUN_QUEUED
+
+        @deadline_reference(timing)
+        class DecoratedCustomRef(ReferenceModels.BaseDeadlineReference):
+            def _evaluate_with(self, *, session: Session, **kwargs) -> 
datetime:
+                return timezone.datetime(DEFAULT_DATE)
+
+        mock_register.assert_called_once_with(DecoratedCustomRef, timing)
diff --git a/task-sdk/src/airflow/sdk/definitions/deadline.py 
b/task-sdk/src/airflow/sdk/definitions/deadline.py
index 4e68b6492de..9efa2409dd4 100644
--- a/task-sdk/src/airflow/sdk/definitions/deadline.py
+++ b/task-sdk/src/airflow/sdk/definitions/deadline.py
@@ -18,15 +18,21 @@ from __future__ import annotations
 
 import logging
 from datetime import datetime, timedelta
-from typing import cast
+from typing import TYPE_CHECKING, cast
 
 from airflow.models.deadline import DeadlineReferenceType, ReferenceModels
 from airflow.sdk.definitions.callback import AsyncCallback, Callback
 from airflow.serialization.enums import DagAttributeTypes as DAT, Encoding
 from airflow.serialization.serde import deserialize, serialize
 
+if TYPE_CHECKING:
+    from collections.abc import Callable
+    from typing import TypeAlias
+
 logger = logging.getLogger(__name__)
 
+DeadlineReferenceTypes: TypeAlias = 
tuple[type[ReferenceModels.BaseDeadlineReference], ...]
+
 
 class DeadlineAlertFields:
     """
@@ -145,17 +151,17 @@ class DeadlineReference:
         """Collection of DeadlineReference types for type checking."""
 
         # Deadlines that should be created when the DagRun is created.
-        DAGRUN_CREATED = (
+        DAGRUN_CREATED: DeadlineReferenceTypes = (
             ReferenceModels.DagRunLogicalDateDeadline,
             ReferenceModels.FixedDatetimeDeadline,
             ReferenceModels.AverageRuntimeDeadline,
         )
 
         # Deadlines that should be created when the DagRun is queued.
-        DAGRUN_QUEUED = (ReferenceModels.DagRunQueuedAtDeadline,)
+        DAGRUN_QUEUED: DeadlineReferenceTypes = 
(ReferenceModels.DagRunQueuedAtDeadline,)
 
         # All DagRun-related deadline types.
-        DAGRUN = DAGRUN_CREATED + DAGRUN_QUEUED
+        DAGRUN: DeadlineReferenceTypes = DAGRUN_CREATED + DAGRUN_QUEUED
 
     from airflow.models.deadline import ReferenceModels
 
@@ -183,3 +189,78 @@ class DeadlineReference:
         (DeadlineReferenceType,),
         {"_evaluate_with": lambda self, **kwargs: datetime.now()},
     )()
+
+    @classmethod
+    def register_custom_reference(
+        cls,
+        reference_class: type[ReferenceModels.BaseDeadlineReference],
+        deadline_reference_type: DeadlineReferenceTypes | None = None,
+    ) -> type[ReferenceModels.BaseDeadlineReference]:
+        """
+        Register a custom deadline reference class.
+
+        :param reference_class: The custom reference class inheriting from 
BaseDeadlineReference
+        :param deadline_reference_type: A DeadlineReference.TYPES for when the 
deadline should be evaluated ("DAGRUN_CREATED",
+            "DAGRUN_QUEUED", etc.); defaults to 
DeadlineReference.TYPES.DAGRUN_CREATED
+        """
+        from airflow.models.deadline import ReferenceModels
+
+        # Default to DAGRUN_CREATED if no deadline_reference_type specified
+        if deadline_reference_type is None:
+            deadline_reference_type = cls.TYPES.DAGRUN_CREATED
+
+        # Validate the reference class inherits from BaseDeadlineReference
+        if not issubclass(reference_class, 
ReferenceModels.BaseDeadlineReference):
+            raise ValueError(f"{reference_class.__name__} must inherit from 
BaseDeadlineReference")
+
+        # Register the new reference with ReferenceModels and 
DeadlineReference for discoverability
+        setattr(ReferenceModels, reference_class.__name__, reference_class)
+        setattr(cls, reference_class.__name__, reference_class())
+        logger.info("Registered DeadlineReference %s", 
reference_class.__name__)
+
+        # Add to appropriate deadline_reference_type classification
+        if deadline_reference_type is cls.TYPES.DAGRUN_CREATED:
+            cls.TYPES.DAGRUN_CREATED = cls.TYPES.DAGRUN_CREATED + 
(reference_class,)
+        elif deadline_reference_type is cls.TYPES.DAGRUN_QUEUED:
+            cls.TYPES.DAGRUN_QUEUED = cls.TYPES.DAGRUN_QUEUED + 
(reference_class,)
+        else:
+            raise ValueError(
+                f"Invalid deadline reference type {deadline_reference_type}; "
+                "must be a valid DeadlineReference.TYPES option."
+            )
+
+        # Refresh the combined DAGRUN tuple
+        cls.TYPES.DAGRUN = cls.TYPES.DAGRUN_CREATED + cls.TYPES.DAGRUN_QUEUED
+
+        return reference_class
+
+
+def deadline_reference(
+    deadline_reference_type: DeadlineReferenceTypes | None = None,
+) -> Callable[[type[ReferenceModels.BaseDeadlineReference]], 
type[ReferenceModels.BaseDeadlineReference]]:
+    """
+    Decorate a class to register a custom deadline reference.
+
+    Usage:
+        @deadline_reference()
+        class MyCustomReference(ReferenceModels.BaseDeadlineReference):
+            # By default, evaluate_with will be called when a new dagrun is 
created.
+            def _evaluate_with(self, *, session: Session, **kwargs) -> 
datetime:
+                # Put your business logic here
+                return some_datetime
+
+        @deadline_reference(DeadlineReference.TYPES.DAGRUN_QUEUED)
+        class MyQueuedRef(ReferenceModels.BaseDeadlineReference):
+            # Optionally, you can specify when you want it calculated by 
providing a DeadlineReference.TYPES
+            def _evaluate_with(self, *, session: Session, **kwargs) -> 
datetime:
+                 # Put your business logic here
+                return some_datetime
+    """
+
+    def decorator(
+        reference_class: type[ReferenceModels.BaseDeadlineReference],
+    ) -> type[ReferenceModels.BaseDeadlineReference]:
+        DeadlineReference.register_custom_reference(reference_class, 
deadline_reference_type)
+        return reference_class
+
+    return decorator

Reply via email to