amoghrajesh commented on code in PR #61461:
URL: https://github.com/apache/airflow/pull/61461#discussion_r2781444229
##########
task-sdk/src/airflow/sdk/definitions/deadline.py:
##########
@@ -29,7 +30,128 @@
logger = logging.getLogger(__name__)
-DeadlineReferenceTypes: TypeAlias =
tuple[type[ReferenceModels.BaseDeadlineReference], ...]
+# Field name used in serialization - must be in sync with
SerializedReferenceModels.REFERENCE_TYPE_FIELD
+REFERENCE_TYPE_FIELD = "reference_type"
+
+
+class BaseDeadlineReference(ABC):
+ """
+ Base class for all Deadline Reference implementations.
+
+ This is a lightweight SDK class for DAG authoring. It only handles
serialization.
+ The actual evaluation logic (_evaluate_with) is in Core's
SerializedReferenceModels.
+
+ For custom deadline references, users should inherit from this class and
implement
+ _evaluate_with() with deferred Core imports (imports inside the method
body).
+ """
+
+ # way to detect builtin types. custom types inherit False while builtins
set this to True
+ __is_builtin__: bool = False
+
+ @property
+ def reference_name(self) -> str:
+ """Return the class name as the reference identifier."""
+ return self.__class__.__name__
+
+ def serialize_reference(self) -> dict[str, Any]:
+ """
+ Serialize this reference type into a dictionary representation.
+
+ Override this method in subclasses if additional data is needed for
serialization.
+ """
+ return {REFERENCE_TYPE_FIELD: self.reference_name}
+
+ @classmethod
+ def deserialize_reference(cls, reference_data: dict[str, Any]) ->
BaseDeadlineReference:
+ """
+ Deserialize a reference type from its dictionary representation.
+
+ :param reference_data: Dictionary containing serialized reference data.
+ """
+ return cls()
+
+ def __eq__(self, other: object) -> bool:
+ if not isinstance(other, BaseDeadlineReference):
+ return NotImplemented
+ return self.serialize_reference() == other.serialize_reference()
+
+ def __hash__(self) -> int:
+ return hash(frozenset(self.serialize_reference().items()))
+
+
+class DagRunLogicalDateDeadline(BaseDeadlineReference):
+ """A deadline that returns a DagRun's logical date."""
+
+ __is_builtin__ = True
+
+ def serialize_reference(self) -> dict[str, Any]:
+ return {REFERENCE_TYPE_FIELD: self.reference_name}
Review Comment:
Interesting perspective about serializer injecting it, we should be able to
yes since serializer anyways owns that the serialization flow
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]