This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 36add023a70905dc49a7dcc648a3b80c1fcf491c
Author: Sean Ghaeli <58916776+seangha...@users.noreply.github.com>
AuthorDate: Tue Sep 9 10:36:02 2025 -0700

    Allow attachment of multiple deadlines to a DAG (#55086)
---
 airflow-core/src/airflow/models/dag.py             | 17 ++++++-
 airflow-core/src/airflow/models/dagrun.py          | 10 ++--
 airflow-core/src/airflow/serialization/schema.json |  4 ++
 .../airflow/serialization/serialized_objects.py    | 43 +++++++++++------
 airflow-core/tests/unit/models/test_dag.py         | 55 ++++++++++++++++++++++
 task-sdk/src/airflow/sdk/definitions/dag.py        | 21 +++++++--
 6 files changed, 127 insertions(+), 23 deletions(-)

diff --git a/airflow-core/src/airflow/models/dag.py 
b/airflow-core/src/airflow/models/dag.py
index 316eaf36173..4f6f22b2f80 100644
--- a/airflow-core/src/airflow/models/dag.py
+++ b/airflow-core/src/airflow/models/dag.py
@@ -452,12 +452,25 @@ class DagModel(Base):
     @property
     def deadline(self):
         """Get the deserialized deadline alert."""
-        return DeadlineAlert.deserialize_deadline_alert(self._deadline) if 
self._deadline else None
+        if self._deadline is None:
+            return None
+        if isinstance(self._deadline, list):
+            return [DeadlineAlert.deserialize_deadline_alert(item) for item in 
self._deadline]
+        return DeadlineAlert.deserialize_deadline_alert(self._deadline)
 
     @deadline.setter
     def deadline(self, value):
         """Set and serialize the deadline alert."""
-        self._deadline = value if isinstance(value, dict) else 
value.serialize_deadline_alert()
+        if value is None:
+            self._deadline = None
+        elif isinstance(value, list):
+            self._deadline = [
+                item if isinstance(item, dict) else 
item.serialize_deadline_alert() for item in value
+            ]
+        elif isinstance(value, dict):
+            self._deadline = value
+        else:
+            self._deadline = value.serialize_deadline_alert()
 
     @property
     def timezone(self):
diff --git a/airflow-core/src/airflow/models/dagrun.py 
b/airflow-core/src/airflow/models/dagrun.py
index 26d1d42e524..516ef64b30c 100644
--- a/airflow-core/src/airflow/models/dagrun.py
+++ b/airflow-core/src/airflow/models/dagrun.py
@@ -1222,9 +1222,13 @@ class DagRun(Base, LoggingMixin):
                     msg="success",
                 )
 
-            if (deadline := dag.deadline) and isinstance(deadline.reference, 
DeadlineReference.TYPES.DAGRUN):
-                # The dagrun has succeeded.  If there wre any Deadlines for it 
which were not breached, they are no longer needed.
-                Deadline.prune_deadlines(session=session, 
conditions={DagRun.run_id: self.run_id})
+            if dag.deadline:
+                # The dagrun has succeeded.  If there were any Deadlines for 
it which were not breached, they are no longer needed.
+                if any(
+                    isinstance(d.reference, DeadlineReference.TYPES.DAGRUN)
+                    for d in cast("list", dag.deadline)
+                ):
+                    Deadline.prune_deadlines(session=session, 
conditions={DagRun.run_id: self.run_id})
 
         # if *all tasks* are deadlocked, the run failed
         elif unfinished.should_schedule and not are_runnable_tasks:
diff --git a/airflow-core/src/airflow/serialization/schema.json 
b/airflow-core/src/airflow/serialization/schema.json
index 0ce253e2262..c4740f346f3 100644
--- a/airflow-core/src/airflow/serialization/schema.json
+++ b/airflow-core/src/airflow/serialization/schema.json
@@ -190,6 +190,10 @@
         "deadline": {
             "anyOf": [
                 { "$ref": "#/definitions/dict" },
+                {
+                    "type": "array",
+                    "items": { "$ref": "#/definitions/dict" }
+                },
                 { "type": "null" }
             ]
         },
diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py 
b/airflow-core/src/airflow/serialization/serialized_objects.py
index 3e75993b2e2..280b5907a89 100644
--- a/airflow-core/src/airflow/serialization/serialized_objects.py
+++ b/airflow-core/src/airflow/serialization/serialized_objects.py
@@ -2385,7 +2385,11 @@ class SerializedDAG(DAG, BaseSerialization):
             serialized_dag["dag_dependencies"] = [x.__dict__ for x in 
sorted(dag_deps)]
             serialized_dag["task_group"] = 
TaskGroupSerialization.serialize_task_group(dag.task_group)
 
-            serialized_dag["deadline"] = 
dag.deadline.serialize_deadline_alert() if dag.deadline else None
+            serialized_dag["deadline"] = (
+                [deadline.serialize_deadline_alert() for deadline in 
dag.deadline]
+                if isinstance(dag.deadline, list)
+                else None
+            )
 
             # Edge info in the JSON exactly matches our internal structure
             serialized_dag["edge_info"] = dag.edge_info
@@ -2501,7 +2505,14 @@ class SerializedDAG(DAG, BaseSerialization):
             dag.has_on_failure_callback = True
 
         if "deadline" in encoded_dag and encoded_dag["deadline"] is not None:
-            dag.deadline = 
DeadlineAlert.deserialize_deadline_alert(encoded_dag["deadline"])
+            dag.deadline = (
+                [
+                    DeadlineAlert.deserialize_deadline_alert(deadline_data)
+                    for deadline_data in encoded_dag["deadline"]
+                ]
+                if encoded_dag["deadline"]
+                else None
+            )
 
         keys_to_set_none = dag.get_serialized_fields() - encoded_dag.keys() - 
cls._CONSTRUCTOR_PARAMS.keys()
         for k in keys_to_set_none:
@@ -3096,19 +3107,21 @@ class SerializedDAG(DAG, BaseSerialization):
             session=session,
         )
 
-        if self.deadline and isinstance(self.deadline.reference, 
DeadlineReference.TYPES.DAGRUN):
-            session.add(
-                Deadline(
-                    deadline_time=self.deadline.reference.evaluate_with(
-                        session=session,
-                        interval=self.deadline.interval,
-                        dag_id=self.dag_id,
-                        run_id=run_id,
-                    ),
-                    callback=self.deadline.callback,
-                    dagrun_id=orm_dagrun.id,
-                )
-            )
+        if self.deadline:
+            for deadline in cast("list", self.deadline):
+                if isinstance(deadline.reference, 
DeadlineReference.TYPES.DAGRUN):
+                    session.add(
+                        Deadline(
+                            deadline_time=deadline.reference.evaluate_with(
+                                session=session,
+                                interval=deadline.interval,
+                                dag_id=self.dag_id,
+                                run_id=run_id,
+                            ),
+                            callback=deadline.callback,
+                            dagrun_id=orm_dagrun.id,
+                        )
+                    )
 
         return orm_dagrun
 
diff --git a/airflow-core/tests/unit/models/test_dag.py 
b/airflow-core/tests/unit/models/test_dag.py
index 5ee4b9b2321..1c9b294bddb 100644
--- a/airflow-core/tests/unit/models/test_dag.py
+++ b/airflow-core/tests/unit/models/test_dag.py
@@ -1811,6 +1811,61 @@ my_postgres_conn:
         assert len(dr.deadlines) == 1
         assert dr.deadlines[0].deadline_time == getattr(dr, reference_column, 
DEFAULT_DATE) + interval
 
+    def test_dag_with_multiple_deadlines(self, dag_maker, session):
+        """Test that a DAG with multiple deadlines stores all deadlines in the 
database."""
+        deadlines = [
+            DeadlineAlert(
+                reference=DeadlineReference.DAGRUN_QUEUED_AT,
+                interval=datetime.timedelta(minutes=5),
+                callback=AsyncCallback(empty_callback_for_deadline),
+            ),
+            DeadlineAlert(
+                reference=DeadlineReference.DAGRUN_QUEUED_AT,
+                interval=datetime.timedelta(minutes=10),
+                callback=AsyncCallback(empty_callback_for_deadline),
+            ),
+            DeadlineAlert(
+                reference=DeadlineReference.DAGRUN_LOGICAL_DATE,
+                interval=datetime.timedelta(hours=1),
+                callback=AsyncCallback(empty_callback_for_deadline),
+            ),
+        ]
+
+        with dag_maker(
+            dag_id="test_multiple_deadlines",
+            schedule=datetime.timedelta(days=1),
+            deadline=deadlines,
+        ) as dag:
+            ...
+
+        scheduler_dag = sync_dag_to_db(dag)
+        dr = scheduler_dag.create_dagrun(
+            run_id="test_multiple_deadlines",
+            run_type=DagRunType.SCHEDULED,
+            state=State.QUEUED,
+            logical_date=TEST_DATE,
+            run_after=TEST_DATE,
+            triggered_by=DagRunTriggeredByType.TEST,
+        )
+        session.flush()
+        dr = session.merge(dr)
+
+        # Check that all 3 deadlines were created
+        assert len(dr.deadlines) == 3
+
+        # Verify each deadline has correct properties
+        deadline_times = [d.deadline_time for d in dr.deadlines]
+        expected_times = [
+            dr.queued_at + datetime.timedelta(minutes=5),
+            dr.queued_at + datetime.timedelta(minutes=10),
+            dr.logical_date + datetime.timedelta(hours=1),
+        ]
+
+        # Sort both lists to compare regardless of order
+        deadline_times.sort()
+        expected_times.sort()
+        assert deadline_times == expected_times
+
 
 class TestDagModel:
     def _clean(self):
diff --git a/task-sdk/src/airflow/sdk/definitions/dag.py 
b/task-sdk/src/airflow/sdk/definitions/dag.py
index bab5a6db01b..ed14b7dd587 100644
--- a/task-sdk/src/airflow/sdk/definitions/dag.py
+++ b/task-sdk/src/airflow/sdk/definitions/dag.py
@@ -184,6 +184,15 @@ def _convert_access_control(access_control):
     return updated_access_control
 
 
+def _convert_deadline(deadline: list[DeadlineAlert] | DeadlineAlert | None) -> 
list[DeadlineAlert] | None:
+    """Convert deadline parameter to a list of DeadlineAlert objects."""
+    if deadline is None:
+        return None
+    if isinstance(deadline, DeadlineAlert):
+        return [deadline]
+    return list(deadline)
+
+
 def _convert_doc_md(doc_md: str | None) -> str | None:
     if doc_md is None:
         return doc_md
@@ -437,9 +446,15 @@ class DAG:
         default=None,
         
validator=attrs.validators.optional(attrs.validators.instance_of(timedelta)),
     )
-    deadline: DeadlineAlert | None = attrs.field(
+    deadline: list[DeadlineAlert] | DeadlineAlert | None = attrs.field(
         default=None,
-        
validator=attrs.validators.optional(attrs.validators.instance_of(DeadlineAlert)),
+        converter=_convert_deadline,
+        validator=attrs.validators.optional(
+            attrs.validators.deep_iterable(
+                member_validator=attrs.validators.instance_of(DeadlineAlert),
+                iterable_validator=attrs.validators.instance_of(list),
+            )
+        ),
     )
 
     catchup: bool = attrs.field(
@@ -1415,7 +1430,7 @@ if TYPE_CHECKING:
         catchup: bool = ...,
         on_success_callback: None | DagStateChangeCallback | 
list[DagStateChangeCallback] = None,
         on_failure_callback: None | DagStateChangeCallback | 
list[DagStateChangeCallback] = None,
-        deadline: DeadlineAlert | None = None,
+        deadline: list[DeadlineAlert] | DeadlineAlert | None = None,
         doc_md: str | None = None,
         params: ParamsDict | dict[str, Any] | None = None,
         access_control: dict[str, dict[str, Collection[str]]] | dict[str, 
Collection[str]] | None = None,

Reply via email to