Lee-W commented on code in PR #57222:
URL: https://github.com/apache/airflow/pull/57222#discussion_r2488357686
##########
airflow-core/docs/howto/deadline-alerts.rst:
##########
@@ -327,25 +327,72 @@ you to create deadlines that suit a wide variety of
operational requirements.
Custom References
^^^^^^^^^^^^^^^^^
-While the built-in references should cover most use cases, and more will be
released over time, you
-can create custom references by implementing a class that inherits from
DeadlineReference. This may
-be useful if you have calendar integrations or other sources that you want to
use as a reference.
+
+The built-in references handle most common scenarios. However, you may need to
create custom
+references for specific integrations like calendars or other data sources. To
do this, create
+a class that inherits from BaseDeadlineReference, add the @deadline_reference
decorator, and
+implement an _evaluate_with() method.
Review Comment:
```suggestion
implement an ``_evaluate_with()`` method.
```
##########
airflow-core/docs/howto/deadline-alerts.rst:
##########
@@ -327,25 +327,72 @@ you to create deadlines that suit a wide variety of
operational requirements.
Custom References
^^^^^^^^^^^^^^^^^
-While the built-in references should cover most use cases, and more will be
released over time, you
-can create custom references by implementing a class that inherits from
DeadlineReference. This may
-be useful if you have calendar integrations or other sources that you want to
use as a reference.
+
+The built-in references handle most common scenarios. However, you may need to
create custom
+references for specific integrations like calendars or other data sources. To
do this, create
+a class that inherits from BaseDeadlineReference, add the @deadline_reference
decorator, and
+implement an _evaluate_with() method.
+
+
+**Creating a Custom Reference**
.. code-block:: python
- class CustomReference(DeadlineReference):
- """A deadline reference that uses a custom data source."""
+ from airflow.models.deadline import ReferenceModels
+ from sqlalchemy.orm import Session
+
+ from airflow.sdk.definitions.deadline import DeadlineReference,
deadline_reference
+ from airflow.sdk.timezone import datetime
+
+
+ # By default, the evaluate_with method will be executed when the dagrun is
created.
+ @deadline_reference()
+ class MyCustomDecoratedReference(ReferenceModels.BaseDeadlineReference):
+ """A custom reference evaluated when Dag runs are created."""
+
+ def _evaluate_with(self, *, session: Session, **kwargs) -> datetime:
+ # Add your business logic here
+ return your_datetime
+
+
+ # You can specify when evaluate_with will be called by providing a
DeadlineReference.TYPES value.
+ @deadline_reference(DeadlineReference.TYPES.DAGRUN_QUEUED)
+ class MyQueuedReference(ReferenceModels.BaseDeadlineReference):
+ """A custom reference evaluated when Dag runs are queued."""
- # Define any required parameters for your reference
- required_kwargs = {"custom_id"}
+ required_kwargs = {"custom_param"}
def _evaluate_with(self, *, session: Session, **kwargs) -> datetime:
- """
- Evaluate the reference time using the provided session and kwargs.
-
- The session parameter can be used for database queries, and kwargs
- will contain any required parameters defined in required_kwargs.
- """
- custom_id = kwargs["custom_id"]
- # Your custom logic here to determine the reference time
+ custom_value = kwargs["custom_param"]
+ # Use custom_value in your calculation
return your_datetime
+
+
+**Using a Custom Reference in a Dag**
+
+Once registered [see notes below], use your custom references in Dag
definitions like any other reference:
+
+.. code-block:: python
+
+ from datetime import timedelta
+ from airflow import DAG
+ from airflow.sdk.definitions.deadline import AsyncCallback, DeadlineAlert,
DeadlineReference
Review Comment:
```suggestion
from airflow import DAG
from airflow.sdk.definitions.deadline import AsyncCallback,
DeadlineAlert, DeadlineReference
from datetime import timedelta
```
##########
task-sdk/src/airflow/sdk/definitions/deadline.py:
##########
@@ -321,3 +326,71 @@ def FIXED_DATETIME(cls, datetime: datetime) ->
DeadlineReferenceType:
(DeadlineReferenceType,),
{"_evaluate_with": lambda self, **kwargs: datetime.now()},
)()
+
+ @classmethod
+ def register_custom_reference(
+ cls,
+ reference_class: type[ReferenceModels.BaseDeadlineReference],
+ timing: DeadlineReferenceTuple | None = None,
+ ):
+ """
+ Register a custom deadline reference class.
+
+ :param reference_class: The custom reference class inheriting from
BaseDeadlineReference
+ :param timing: A DeadlineReference.TYPES for when the deadline should
be evaluated ("DAGRUN_CREATED",
+ "DAGRUN_QUEUED", etc.); defaults to
DeadlineReference.TYPES.DAGRUN_CREATED
+ """
+ from airflow.models.deadline import ReferenceModels
+
+ # Default to DAGRUN_CREATED if no timing specified
+ if timing is None:
+ timing = cls.TYPES.DAGRUN_CREATED
+
+ # Validate the reference class inherits from BaseDeadlineReference
+ if not issubclass(reference_class,
ReferenceModels.BaseDeadlineReference):
+ raise ValueError(f"{reference_class.__name__} must inherit from
BaseDeadlineReference")
+
+ # Register the new reference with ReferenceModels and
DeadlineReference for discoverability
+ setattr(ReferenceModels, reference_class.__name__, reference_class)
+ setattr(cls, reference_class.__name__, reference_class())
+ logger.info("Registered DeadlineReference %s",
reference_class.__name__)
+
+ # Add to appropriate timing classification
+ if timing is cls.TYPES.DAGRUN_CREATED:
+ cls.TYPES.DAGRUN_CREATED = cls.TYPES.DAGRUN_CREATED +
(reference_class,)
+ elif timing is cls.TYPES.DAGRUN_QUEUED:
+ cls.TYPES.DAGRUN_QUEUED = cls.TYPES.DAGRUN_QUEUED +
(reference_class,)
+ else:
+ raise ValueError("Invalid timing value; must be a valid
DeadlineReference.TYPES option.")
+
+ # Refresh the combined DAGRUN tuple
+ cls.TYPES.DAGRUN = cls.TYPES.DAGRUN_CREATED + cls.TYPES.DAGRUN_QUEUED
+
+ return reference_class
+
+
+def deadline_reference(timing: DeadlineReferenceTuple | None = None):
Review Comment:
```suggestion
def deadline_reference(timing: DeadlineReferenceTuple | None = None) ->
Callable[[type[ReferenceModels.BaseDeadlineReference]],
type[ReferenceModels.BaseDeadlineReference]]:
```
##########
task-sdk/src/airflow/sdk/definitions/deadline.py:
##########
@@ -321,3 +326,71 @@ def FIXED_DATETIME(cls, datetime: datetime) ->
DeadlineReferenceType:
(DeadlineReferenceType,),
{"_evaluate_with": lambda self, **kwargs: datetime.now()},
)()
+
+ @classmethod
+ def register_custom_reference(
+ cls,
+ reference_class: type[ReferenceModels.BaseDeadlineReference],
+ timing: DeadlineReferenceTuple | None = None,
+ ):
+ """
+ Register a custom deadline reference class.
+
+ :param reference_class: The custom reference class inheriting from
BaseDeadlineReference
+ :param timing: A DeadlineReference.TYPES for when the deadline should
be evaluated ("DAGRUN_CREATED",
+ "DAGRUN_QUEUED", etc.); defaults to
DeadlineReference.TYPES.DAGRUN_CREATED
+ """
+ from airflow.models.deadline import ReferenceModels
+
+ # Default to DAGRUN_CREATED if no timing specified
+ if timing is None:
+ timing = cls.TYPES.DAGRUN_CREATED
+
+ # Validate the reference class inherits from BaseDeadlineReference
+ if not issubclass(reference_class,
ReferenceModels.BaseDeadlineReference):
+ raise ValueError(f"{reference_class.__name__} must inherit from
BaseDeadlineReference")
+
+ # Register the new reference with ReferenceModels and
DeadlineReference for discoverability
+ setattr(ReferenceModels, reference_class.__name__, reference_class)
+ setattr(cls, reference_class.__name__, reference_class())
+ logger.info("Registered DeadlineReference %s",
reference_class.__name__)
+
+ # Add to appropriate timing classification
+ if timing is cls.TYPES.DAGRUN_CREATED:
+ cls.TYPES.DAGRUN_CREATED = cls.TYPES.DAGRUN_CREATED +
(reference_class,)
+ elif timing is cls.TYPES.DAGRUN_QUEUED:
+ cls.TYPES.DAGRUN_QUEUED = cls.TYPES.DAGRUN_QUEUED +
(reference_class,)
+ else:
+ raise ValueError("Invalid timing value; must be a valid
DeadlineReference.TYPES option.")
+
+ # Refresh the combined DAGRUN tuple
+ cls.TYPES.DAGRUN = cls.TYPES.DAGRUN_CREATED + cls.TYPES.DAGRUN_QUEUED
+
+ return reference_class
+
+
+def deadline_reference(timing: DeadlineReferenceTuple | None = None):
+ """
+ Decorate a class to register a custom deadline reference.
+
+ Usage:
+ @deadline_reference()
+ class MyCustomReference(ReferenceModels.BaseDeadlineReference):
+ # By default, evaluate_with will be called when a new dagrun is
created.
+ def _evaluate_with(self, *, session: Session, **kwargs) ->
datetime:
+ # Put your business logic here
+ return some_datetime
+
+ @deadline_reference(DeadlineReference.TYPES.DAGRUN_QUEUED)
+ class MyQueuedRef(ReferenceModels.BaseDeadlineReference):
+ # Optionally, you can specify when you want it calculated by
providing a DeadlineReference.TYPES
+ def _evaluate_with(self, *, session: Session, **kwargs) ->
datetime:
+ # Put your business logic here
+ return some_datetime
+ """
+
+ def decorator(reference_class):
Review Comment:
```suggestion
def decorator(reference_class:
type[ReferenceModels.BaseDeadlineReference]) -> :
```
##########
airflow-core/docs/howto/deadline-alerts.rst:
##########
@@ -327,25 +327,72 @@ you to create deadlines that suit a wide variety of
operational requirements.
Custom References
^^^^^^^^^^^^^^^^^
-While the built-in references should cover most use cases, and more will be
released over time, you
-can create custom references by implementing a class that inherits from
DeadlineReference. This may
-be useful if you have calendar integrations or other sources that you want to
use as a reference.
+
+The built-in references handle most common scenarios. However, you may need to
create custom
+references for specific integrations like calendars or other data sources. To
do this, create
+a class that inherits from BaseDeadlineReference, add the @deadline_reference
decorator, and
Review Comment:
```suggestion
a class that inherits from BaseDeadlineReference, add the
``@deadline_reference`` decorator, and
```
##########
airflow-core/docs/howto/deadline-alerts.rst:
##########
@@ -327,25 +327,72 @@ you to create deadlines that suit a wide variety of
operational requirements.
Custom References
^^^^^^^^^^^^^^^^^
-While the built-in references should cover most use cases, and more will be
released over time, you
-can create custom references by implementing a class that inherits from
DeadlineReference. This may
-be useful if you have calendar integrations or other sources that you want to
use as a reference.
+
+The built-in references handle most common scenarios. However, you may need to
create custom
+references for specific integrations like calendars or other data sources. To
do this, create
+a class that inherits from BaseDeadlineReference, add the @deadline_reference
decorator, and
+implement an _evaluate_with() method.
+
+
+**Creating a Custom Reference**
.. code-block:: python
- class CustomReference(DeadlineReference):
- """A deadline reference that uses a custom data source."""
+ from airflow.models.deadline import ReferenceModels
+ from sqlalchemy.orm import Session
+
+ from airflow.sdk.definitions.deadline import DeadlineReference,
deadline_reference
+ from airflow.sdk.timezone import datetime
Review Comment:
```suggestion
from airflow.models.deadline import ReferenceModels
from airflow.sdk.definitions.deadline import DeadlineReference,
deadline_reference
from airflow.sdk.timezone import datetime
from sqlalchemy.orm import Session
```
##########
task-sdk/src/airflow/sdk/definitions/deadline.py:
##########
@@ -321,3 +326,71 @@ def FIXED_DATETIME(cls, datetime: datetime) ->
DeadlineReferenceType:
(DeadlineReferenceType,),
{"_evaluate_with": lambda self, **kwargs: datetime.now()},
)()
+
+ @classmethod
+ def register_custom_reference(
+ cls,
+ reference_class: type[ReferenceModels.BaseDeadlineReference],
+ timing: DeadlineReferenceTuple | None = None,
Review Comment:
Should we just name it as `deadline_references_types`? I might want to pass
a `datetime` object when seeing this
##########
task-sdk/src/airflow/sdk/definitions/deadline.py:
##########
@@ -321,3 +326,71 @@ def FIXED_DATETIME(cls, datetime: datetime) ->
DeadlineReferenceType:
(DeadlineReferenceType,),
{"_evaluate_with": lambda self, **kwargs: datetime.now()},
)()
+
+ @classmethod
+ def register_custom_reference(
+ cls,
+ reference_class: type[ReferenceModels.BaseDeadlineReference],
+ timing: DeadlineReferenceTuple | None = None,
+ ):
Review Comment:
```suggestion
) -> type[ReferenceModels.BaseDeadlineReference]:
```
##########
task-sdk/src/airflow/sdk/definitions/deadline.py:
##########
@@ -321,3 +326,71 @@ def FIXED_DATETIME(cls, datetime: datetime) ->
DeadlineReferenceType:
(DeadlineReferenceType,),
{"_evaluate_with": lambda self, **kwargs: datetime.now()},
)()
+
+ @classmethod
+ def register_custom_reference(
+ cls,
+ reference_class: type[ReferenceModels.BaseDeadlineReference],
+ timing: DeadlineReferenceTuple | None = None,
+ ):
+ """
+ Register a custom deadline reference class.
+
+ :param reference_class: The custom reference class inheriting from
BaseDeadlineReference
+ :param timing: A DeadlineReference.TYPES for when the deadline should
be evaluated ("DAGRUN_CREATED",
+ "DAGRUN_QUEUED", etc.); defaults to
DeadlineReference.TYPES.DAGRUN_CREATED
+ """
+ from airflow.models.deadline import ReferenceModels
+
+ # Default to DAGRUN_CREATED if no timing specified
+ if timing is None:
+ timing = cls.TYPES.DAGRUN_CREATED
+
+ # Validate the reference class inherits from BaseDeadlineReference
+ if not issubclass(reference_class,
ReferenceModels.BaseDeadlineReference):
+ raise ValueError(f"{reference_class.__name__} must inherit from
BaseDeadlineReference")
+
+ # Register the new reference with ReferenceModels and
DeadlineReference for discoverability
+ setattr(ReferenceModels, reference_class.__name__, reference_class)
+ setattr(cls, reference_class.__name__, reference_class())
+ logger.info("Registered DeadlineReference %s",
reference_class.__name__)
+
+ # Add to appropriate timing classification
+ if timing is cls.TYPES.DAGRUN_CREATED:
+ cls.TYPES.DAGRUN_CREATED = cls.TYPES.DAGRUN_CREATED +
(reference_class,)
+ elif timing is cls.TYPES.DAGRUN_QUEUED:
+ cls.TYPES.DAGRUN_QUEUED = cls.TYPES.DAGRUN_QUEUED +
(reference_class,)
+ else:
+ raise ValueError("Invalid timing value; must be a valid
DeadlineReference.TYPES option.")
Review Comment:
```suggestion
raise ValueError(f"Invalid timing value: {timing}; must be a
valid DeadlineReference.TYPES option.")
```
##########
task-sdk/src/airflow/sdk/definitions/deadline.py:
##########
@@ -21,15 +21,20 @@
from abc import ABC
from collections.abc import Callable
from datetime import datetime, timedelta
-from typing import Any, cast
+from typing import TYPE_CHECKING, Any, cast
from airflow.models.deadline import DeadlineReferenceType, ReferenceModels
from airflow.sdk.module_loading import import_string, is_valid_dotpath
from airflow.serialization.enums import DagAttributeTypes as DAT, Encoding
from airflow.serialization.serde import deserialize, serialize
+if TYPE_CHECKING:
+ from typing import TypeAlias
+
logger = logging.getLogger(__name__)
+DeadlineReferenceTuple: TypeAlias =
tuple[type[ReferenceModels.BaseDeadlineReference], ...]
Review Comment:
```suggestion
DeadlineReferenceTypes: TypeAlias =
tuple[type[ReferenceModels.BaseDeadlineReference], ...]
```
not sure whether it's better this way 🤔
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]