This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v3-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 36add023a70905dc49a7dcc648a3b80c1fcf491c Author: Sean Ghaeli <58916776+seangha...@users.noreply.github.com> AuthorDate: Tue Sep 9 10:36:02 2025 -0700 Allow attachment of multiple deadlines to a DAG (#55086) --- airflow-core/src/airflow/models/dag.py | 17 ++++++- airflow-core/src/airflow/models/dagrun.py | 10 ++-- airflow-core/src/airflow/serialization/schema.json | 4 ++ .../airflow/serialization/serialized_objects.py | 43 +++++++++++------ airflow-core/tests/unit/models/test_dag.py | 55 ++++++++++++++++++++++ task-sdk/src/airflow/sdk/definitions/dag.py | 21 +++++++-- 6 files changed, 127 insertions(+), 23 deletions(-) diff --git a/airflow-core/src/airflow/models/dag.py b/airflow-core/src/airflow/models/dag.py index 316eaf36173..4f6f22b2f80 100644 --- a/airflow-core/src/airflow/models/dag.py +++ b/airflow-core/src/airflow/models/dag.py @@ -452,12 +452,25 @@ class DagModel(Base): @property def deadline(self): """Get the deserialized deadline alert.""" - return DeadlineAlert.deserialize_deadline_alert(self._deadline) if self._deadline else None + if self._deadline is None: + return None + if isinstance(self._deadline, list): + return [DeadlineAlert.deserialize_deadline_alert(item) for item in self._deadline] + return DeadlineAlert.deserialize_deadline_alert(self._deadline) @deadline.setter def deadline(self, value): """Set and serialize the deadline alert.""" - self._deadline = value if isinstance(value, dict) else value.serialize_deadline_alert() + if value is None: + self._deadline = None + elif isinstance(value, list): + self._deadline = [ + item if isinstance(item, dict) else item.serialize_deadline_alert() for item in value + ] + elif isinstance(value, dict): + self._deadline = value + else: + self._deadline = value.serialize_deadline_alert() @property def timezone(self): diff --git a/airflow-core/src/airflow/models/dagrun.py b/airflow-core/src/airflow/models/dagrun.py index 26d1d42e524..516ef64b30c 100644 --- a/airflow-core/src/airflow/models/dagrun.py +++ b/airflow-core/src/airflow/models/dagrun.py @@ -1222,9 +1222,13 @@ class DagRun(Base, LoggingMixin): msg="success", ) - if (deadline := dag.deadline) and isinstance(deadline.reference, DeadlineReference.TYPES.DAGRUN): - # The dagrun has succeeded. If there wre any Deadlines for it which were not breached, they are no longer needed. - Deadline.prune_deadlines(session=session, conditions={DagRun.run_id: self.run_id}) + if dag.deadline: + # The dagrun has succeeded. If there were any Deadlines for it which were not breached, they are no longer needed. + if any( + isinstance(d.reference, DeadlineReference.TYPES.DAGRUN) + for d in cast("list", dag.deadline) + ): + Deadline.prune_deadlines(session=session, conditions={DagRun.run_id: self.run_id}) # if *all tasks* are deadlocked, the run failed elif unfinished.should_schedule and not are_runnable_tasks: diff --git a/airflow-core/src/airflow/serialization/schema.json b/airflow-core/src/airflow/serialization/schema.json index 0ce253e2262..c4740f346f3 100644 --- a/airflow-core/src/airflow/serialization/schema.json +++ b/airflow-core/src/airflow/serialization/schema.json @@ -190,6 +190,10 @@ "deadline": { "anyOf": [ { "$ref": "#/definitions/dict" }, + { + "type": "array", + "items": { "$ref": "#/definitions/dict" } + }, { "type": "null" } ] }, diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py b/airflow-core/src/airflow/serialization/serialized_objects.py index 3e75993b2e2..280b5907a89 100644 --- a/airflow-core/src/airflow/serialization/serialized_objects.py +++ b/airflow-core/src/airflow/serialization/serialized_objects.py @@ -2385,7 +2385,11 @@ class SerializedDAG(DAG, BaseSerialization): serialized_dag["dag_dependencies"] = [x.__dict__ for x in sorted(dag_deps)] serialized_dag["task_group"] = TaskGroupSerialization.serialize_task_group(dag.task_group) - serialized_dag["deadline"] = dag.deadline.serialize_deadline_alert() if dag.deadline else None + serialized_dag["deadline"] = ( + [deadline.serialize_deadline_alert() for deadline in dag.deadline] + if isinstance(dag.deadline, list) + else None + ) # Edge info in the JSON exactly matches our internal structure serialized_dag["edge_info"] = dag.edge_info @@ -2501,7 +2505,14 @@ class SerializedDAG(DAG, BaseSerialization): dag.has_on_failure_callback = True if "deadline" in encoded_dag and encoded_dag["deadline"] is not None: - dag.deadline = DeadlineAlert.deserialize_deadline_alert(encoded_dag["deadline"]) + dag.deadline = ( + [ + DeadlineAlert.deserialize_deadline_alert(deadline_data) + for deadline_data in encoded_dag["deadline"] + ] + if encoded_dag["deadline"] + else None + ) keys_to_set_none = dag.get_serialized_fields() - encoded_dag.keys() - cls._CONSTRUCTOR_PARAMS.keys() for k in keys_to_set_none: @@ -3096,19 +3107,21 @@ class SerializedDAG(DAG, BaseSerialization): session=session, ) - if self.deadline and isinstance(self.deadline.reference, DeadlineReference.TYPES.DAGRUN): - session.add( - Deadline( - deadline_time=self.deadline.reference.evaluate_with( - session=session, - interval=self.deadline.interval, - dag_id=self.dag_id, - run_id=run_id, - ), - callback=self.deadline.callback, - dagrun_id=orm_dagrun.id, - ) - ) + if self.deadline: + for deadline in cast("list", self.deadline): + if isinstance(deadline.reference, DeadlineReference.TYPES.DAGRUN): + session.add( + Deadline( + deadline_time=deadline.reference.evaluate_with( + session=session, + interval=deadline.interval, + dag_id=self.dag_id, + run_id=run_id, + ), + callback=deadline.callback, + dagrun_id=orm_dagrun.id, + ) + ) return orm_dagrun diff --git a/airflow-core/tests/unit/models/test_dag.py b/airflow-core/tests/unit/models/test_dag.py index 5ee4b9b2321..1c9b294bddb 100644 --- a/airflow-core/tests/unit/models/test_dag.py +++ b/airflow-core/tests/unit/models/test_dag.py @@ -1811,6 +1811,61 @@ my_postgres_conn: assert len(dr.deadlines) == 1 assert dr.deadlines[0].deadline_time == getattr(dr, reference_column, DEFAULT_DATE) + interval + def test_dag_with_multiple_deadlines(self, dag_maker, session): + """Test that a DAG with multiple deadlines stores all deadlines in the database.""" + deadlines = [ + DeadlineAlert( + reference=DeadlineReference.DAGRUN_QUEUED_AT, + interval=datetime.timedelta(minutes=5), + callback=AsyncCallback(empty_callback_for_deadline), + ), + DeadlineAlert( + reference=DeadlineReference.DAGRUN_QUEUED_AT, + interval=datetime.timedelta(minutes=10), + callback=AsyncCallback(empty_callback_for_deadline), + ), + DeadlineAlert( + reference=DeadlineReference.DAGRUN_LOGICAL_DATE, + interval=datetime.timedelta(hours=1), + callback=AsyncCallback(empty_callback_for_deadline), + ), + ] + + with dag_maker( + dag_id="test_multiple_deadlines", + schedule=datetime.timedelta(days=1), + deadline=deadlines, + ) as dag: + ... + + scheduler_dag = sync_dag_to_db(dag) + dr = scheduler_dag.create_dagrun( + run_id="test_multiple_deadlines", + run_type=DagRunType.SCHEDULED, + state=State.QUEUED, + logical_date=TEST_DATE, + run_after=TEST_DATE, + triggered_by=DagRunTriggeredByType.TEST, + ) + session.flush() + dr = session.merge(dr) + + # Check that all 3 deadlines were created + assert len(dr.deadlines) == 3 + + # Verify each deadline has correct properties + deadline_times = [d.deadline_time for d in dr.deadlines] + expected_times = [ + dr.queued_at + datetime.timedelta(minutes=5), + dr.queued_at + datetime.timedelta(minutes=10), + dr.logical_date + datetime.timedelta(hours=1), + ] + + # Sort both lists to compare regardless of order + deadline_times.sort() + expected_times.sort() + assert deadline_times == expected_times + class TestDagModel: def _clean(self): diff --git a/task-sdk/src/airflow/sdk/definitions/dag.py b/task-sdk/src/airflow/sdk/definitions/dag.py index bab5a6db01b..ed14b7dd587 100644 --- a/task-sdk/src/airflow/sdk/definitions/dag.py +++ b/task-sdk/src/airflow/sdk/definitions/dag.py @@ -184,6 +184,15 @@ def _convert_access_control(access_control): return updated_access_control +def _convert_deadline(deadline: list[DeadlineAlert] | DeadlineAlert | None) -> list[DeadlineAlert] | None: + """Convert deadline parameter to a list of DeadlineAlert objects.""" + if deadline is None: + return None + if isinstance(deadline, DeadlineAlert): + return [deadline] + return list(deadline) + + def _convert_doc_md(doc_md: str | None) -> str | None: if doc_md is None: return doc_md @@ -437,9 +446,15 @@ class DAG: default=None, validator=attrs.validators.optional(attrs.validators.instance_of(timedelta)), ) - deadline: DeadlineAlert | None = attrs.field( + deadline: list[DeadlineAlert] | DeadlineAlert | None = attrs.field( default=None, - validator=attrs.validators.optional(attrs.validators.instance_of(DeadlineAlert)), + converter=_convert_deadline, + validator=attrs.validators.optional( + attrs.validators.deep_iterable( + member_validator=attrs.validators.instance_of(DeadlineAlert), + iterable_validator=attrs.validators.instance_of(list), + ) + ), ) catchup: bool = attrs.field( @@ -1415,7 +1430,7 @@ if TYPE_CHECKING: catchup: bool = ..., on_success_callback: None | DagStateChangeCallback | list[DagStateChangeCallback] = None, on_failure_callback: None | DagStateChangeCallback | list[DagStateChangeCallback] = None, - deadline: DeadlineAlert | None = None, + deadline: list[DeadlineAlert] | DeadlineAlert | None = None, doc_md: str | None = None, params: ParamsDict | dict[str, Any] | None = None, access_control: dict[str, dict[str, Collection[str]]] | dict[str, Collection[str]] | None = None,