amoghrajesh commented on code in PR #61461:
URL: https://github.com/apache/airflow/pull/61461#discussion_r2870619189


##########
airflow-core/src/airflow/serialization/definitions/deadline.py:
##########
@@ -239,6 +250,52 @@ def serialize_reference(self) -> dict:
         def deserialize_reference(cls, reference_data: dict):
             return cls(max_runs=reference_data["max_runs"], 
min_runs=reference_data.get("min_runs"))
 
+    class SerializedCustomReference(SerializedBaseDeadlineReference):
+        """Wrapper for custom deadline references."""
+
+        def __init__(self, inner_ref):
+            self.inner_ref = inner_ref
+
+        @property
+        def reference_name(self) -> str:
+            return self.inner_ref.reference_name
+
+        def evaluate_with(self, *, session: Session, interval: timedelta, 
**kwargs: Any) -> datetime | None:
+            filtered_kwargs = {k: v for k, v in kwargs.items() if k in 
self.required_kwargs}

Review Comment:
   Good catch, handled in [comments from 
kaxil](https://github.com/apache/airflow/pull/61461/commits/f78d6235c6400934175912142e29eafaf59421a9)



##########
task-sdk/src/airflow/sdk/definitions/deadline.py:
##########
@@ -118,69 +223,64 @@ class TYPES:
 
         # Deadlines that should be created when the DagRun is created.
         DAGRUN_CREATED: DeadlineReferenceTypes = (
-            ReferenceModels.DagRunLogicalDateDeadline,
-            ReferenceModels.FixedDatetimeDeadline,
-            ReferenceModels.AverageRuntimeDeadline,
+            DagRunLogicalDateDeadline,
+            FixedDatetimeDeadline,
+            AverageRuntimeDeadline,
         )
 
         # Deadlines that should be created when the DagRun is queued.
-        DAGRUN_QUEUED: DeadlineReferenceTypes = 
(ReferenceModels.DagRunQueuedAtDeadline,)
+        DAGRUN_QUEUED: DeadlineReferenceTypes = (DagRunQueuedAtDeadline,)
 
         # All DagRun-related deadline types.
         DAGRUN: DeadlineReferenceTypes = DAGRUN_CREATED + DAGRUN_QUEUED
 
-    from airflow.models.deadline import ReferenceModels
-
-    DAGRUN_LOGICAL_DATE: DeadlineReferenceType = 
ReferenceModels.DagRunLogicalDateDeadline()
-    DAGRUN_QUEUED_AT: DeadlineReferenceType = 
ReferenceModels.DagRunQueuedAtDeadline()
+    DAGRUN_LOGICAL_DATE: DeadlineReferenceType = DagRunLogicalDateDeadline()
+    DAGRUN_QUEUED_AT: DeadlineReferenceType = DagRunQueuedAtDeadline()
 
     @classmethod
     def AVERAGE_RUNTIME(cls, max_runs: int = 0, min_runs: int | None = None) 
-> DeadlineReferenceType:
         if max_runs == 0:
-            max_runs = cls.ReferenceModels.AverageRuntimeDeadline.DEFAULT_LIMIT
+            max_runs = AverageRuntimeDeadline.DEFAULT_LIMIT
         if min_runs is None:
             min_runs = max_runs
-        return cls.ReferenceModels.AverageRuntimeDeadline(max_runs, min_runs)
+        return AverageRuntimeDeadline(max_runs, min_runs)
 
     @classmethod
-    def FIXED_DATETIME(cls, datetime: datetime) -> DeadlineReferenceType:
-        return cls.ReferenceModels.FixedDatetimeDeadline(datetime)
+    def FIXED_DATETIME(cls, dt: datetime) -> DeadlineReferenceType:
+        return FixedDatetimeDeadline(dt)
 
     # TODO: Remove this once other deadline types exist.
     #   This is a temporary reference type used only in tests to verify that
     #   dag.has_dagrun_deadline() returns false if the dag has a non-dagrun 
deadline type.
     #   It should be replaced with a real non-dagrun deadline type when one is 
available.
     _TEMPORARY_TEST_REFERENCE = type(
         "TemporaryTestDeadlineForTypeChecking",
-        (DeadlineReferenceType,),
-        {"_evaluate_with": lambda self, **kwargs: datetime.now()},
+        (BaseDeadlineReference,),
+        {"serialize_reference": lambda self: {REFERENCE_TYPE_FIELD: 
"TemporaryTestDeadlineForTypeChecking"}},
     )()
 
     @classmethod
     def register_custom_reference(
         cls,
-        reference_class: type[ReferenceModels.BaseDeadlineReference],
+        reference_class: type[BaseDeadlineReference],
         deadline_reference_type: DeadlineReferenceTypes | None = None,
-    ) -> type[ReferenceModels.BaseDeadlineReference]:
+    ) -> type[BaseDeadlineReference]:
         """
         Register a custom deadline reference class.
 
         :param reference_class: The custom reference class inheriting from 
BaseDeadlineReference
         :param deadline_reference_type: A DeadlineReference.TYPES for when the 
deadline should be evaluated ("DAGRUN_CREATED",
             "DAGRUN_QUEUED", etc.); defaults to 
DeadlineReference.TYPES.DAGRUN_CREATED
         """
-        from airflow.models.deadline import ReferenceModels
-
         # Default to DAGRUN_CREATED if no deadline_reference_type specified
         if deadline_reference_type is None:
             deadline_reference_type = cls.TYPES.DAGRUN_CREATED
 
         # Validate the reference class inherits from BaseDeadlineReference
-        if not issubclass(reference_class, 
ReferenceModels.BaseDeadlineReference):
+        if not issubclass(reference_class, BaseDeadlineReference):

Review Comment:
   Handled in [comments from 
kaxil](https://github.com/apache/airflow/pull/61461/commits/8938e471fec295bb04468c66150923730dd2d40b)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to