This is an automated email from the ASF dual-hosted git repository.
ferruzzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d7769a29951 When a dagrun is cleared, recalculate related deadlines if
applicable. (#61372)
d7769a29951 is described below
commit d7769a299514c4609f9331a7f68a0a65e3ed5b40
Author: D. Ferruzzi <[email protected]>
AuthorDate: Mon Feb 9 10:25:11 2026 -0800
When a dagrun is cleared, recalculate related deadlines if applicable.
(#61372)
* When a dagrun is cleared, recalculate related deadlines if applicable.
If the run has a deadline related to the queued_at time, then those should
be recalculated when that timestamp is changed.
---
airflow-core/src/airflow/models/taskinstance.py | 48 +++++++++++
.../tests/unit/models/test_taskinstance.py | 92 ++++++++++++++++++++++
2 files changed, 140 insertions(+)
diff --git a/airflow-core/src/airflow/models/taskinstance.py
b/airflow-core/src/airflow/models/taskinstance.py
index 838506a1bbe..a0676cfc0ac 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -74,6 +74,8 @@ from airflow.listeners.listener import get_listener_manager
from airflow.models.asset import AssetModel
from airflow.models.base import Base, StringID, TaskInstanceDependencies
from airflow.models.dag_version import DagVersion
+from airflow.models.deadline import Deadline, ReferenceModels
+from airflow.models.deadline_alert import DeadlineAlert as DeadlineAlertModel
# Import HITLDetail at runtime so SQLAlchemy can resolve the relationship
from airflow.models.hitl import HITLDetail # noqa: F401
@@ -181,6 +183,50 @@ def _stop_remaining_tasks(*, task_instance: TaskInstance,
task_teardown_map=None
log.info("Not skipping teardown task '%s'", ti.task_id)
+def _recalculate_dagrun_queued_at_deadlines(
+ dagrun: DagRun, new_queued_at: datetime, session: Session
+) -> None:
+ """
+ Recalculate deadline times for deadlines that reference dagrun.queued_at.
+
+ :param dagrun: The DagRun whose deadlines should be recalculated
+ :param new_queued_at: The new queued_at timestamp to use for calculation
+ :param session: Database session
+
+ :meta private:
+ """
+ results = session.execute(
+ select(Deadline, DeadlineAlertModel)
+ .join(DeadlineAlertModel, Deadline.deadline_alert_id ==
DeadlineAlertModel.id)
+ .where(
+ Deadline.dagrun_id == dagrun.id,
+ Deadline.missed == false(),
+
DeadlineAlertModel.reference[ReferenceModels.REFERENCE_TYPE_FIELD].as_string()
+ == ReferenceModels.DagRunQueuedAtDeadline.__name__,
+ )
+ ).all()
+
+ if not results:
+ return
+
+ for deadline, deadline_alert in results:
+ # We can't use evaluate_with() since the new queued_at is not written
to the DB yet.
+ deadline_interval = timedelta(seconds=deadline_alert.interval)
+ new_deadline_time = new_queued_at + deadline_interval
+
+ log.debug(
+ "Recalculating deadline %s for DagRun %s.%s: old=%s, new=%s",
+ deadline.id,
+ dagrun.dag_id,
+ dagrun.run_id,
+ deadline.deadline_time,
+ new_deadline_time,
+ )
+ deadline.deadline_time = new_deadline_time
+ # Do not flush/commit here in order to keep the scheduler loop atomic.
+ # These changes are committed by the calling function.
+
+
def clear_task_instances(
tis: list[TaskInstance],
session: Session,
@@ -274,6 +320,8 @@ def clear_task_instances(
dr.clear_number += 1
dr.queued_at = timezone.utcnow()
+ _recalculate_dagrun_queued_at_deadlines(dr, dr.queued_at, session)
+
if dr.state in State.finished_dr_states:
dr.state = dag_run_state
dr.start_date = timezone.utcnow()
diff --git a/airflow-core/tests/unit/models/test_taskinstance.py
b/airflow-core/tests/unit/models/test_taskinstance.py
index 1b6b3a01d1b..20faa789466 100644
--- a/airflow-core/tests/unit/models/test_taskinstance.py
+++ b/airflow-core/tests/unit/models/test_taskinstance.py
@@ -50,6 +50,8 @@ from airflow.models.asset import (
)
from airflow.models.dag_version import DagVersion
from airflow.models.dagrun import DagRun
+from airflow.models.deadline import Deadline
+from airflow.models.deadline_alert import DeadlineAlert as DeadlineAlertModel
from airflow.models.pool import Pool
from airflow.models.renderedtifields import RenderedTaskInstanceFields
from airflow.models.serialized_dag import SerializedDagModel
@@ -57,6 +59,7 @@ from airflow.models.taskinstance import (
TaskInstance,
TaskInstance as TI,
TaskInstanceNote,
+ clear_task_instances,
find_relevant_relatives,
)
from airflow.models.taskinstancehistory import TaskInstanceHistory
@@ -80,6 +83,8 @@ from airflow.sdk import (
task_group,
)
from airflow.sdk.api.datamodels._generated import AssetEventResponse,
AssetResponse
+from airflow.sdk.definitions.callback import AsyncCallback
+from airflow.sdk.definitions.deadline import DeadlineReference
from airflow.sdk.definitions.param import process_params
from airflow.sdk.definitions.taskgroup import TaskGroup
from airflow.sdk.execution_time.comms import AssetEventsResult
@@ -3090,3 +3095,90 @@ def
test_when_dag_run_has_partition_and_downstreams_listening_then_tables_popula
assert pakl.asset_partition_dag_run_id == apdr.id
assert pakl.source_partition_key == "abc123"
assert pakl.target_dag_id == "asset_event_listener"
+
+
+async def empty_callback_for_deadline():
+ """Used in deadline tests to confirm that Deadlines and DeadlineAlerts
function correctly."""
+ pass
+
+
+def test_clear_task_instances_recalculates_dagrun_queued_deadlines(dag_maker,
session):
+ """Test that clearing tasks recalculates all (and only) DAGRUN_QUEUED_AT
deadlines."""
+ with dag_maker(
+ dag_id="test_recalculate_deadlines",
+ schedule=datetime.timedelta(days=1),
+ ) as dag:
+ EmptyOperator(task_id="task_1")
+
+ dag_run = dag_maker.create_dagrun()
+ # Set the task to SUCCESS state
+ ti = dag_run.get_task_instance("task_1", session=session)
+ ti.set_state(TaskInstanceState.SUCCESS, session=session)
+
+ original_queued_at = timezone.utcnow() - datetime.timedelta(hours=2)
+ dag_run.queued_at = original_queued_at
+ session.flush()
+
+ serialized_dag_id = session.scalar(
+ select(SerializedDagModel.id).where(SerializedDagModel.dag_id ==
dag.dag_id)
+ )
+
+ deadline_configs = [
+ (DeadlineReference.DAGRUN_QUEUED_AT, datetime.timedelta(hours=1)),
+ (DeadlineReference.DAGRUN_QUEUED_AT, datetime.timedelta(hours=2)),
+ (DeadlineReference.FIXED_DATETIME, datetime.timedelta(hours=1)),
+ ]
+
+ for deadline_type, interval in deadline_configs:
+ if deadline_type == DeadlineReference.DAGRUN_QUEUED_AT:
+ reference =
DeadlineReference.DAGRUN_QUEUED_AT.serialize_reference()
+ deadline_time = dag_run.queued_at + interval
+ else: # FIXED_DATETIME
+ future_date = timezone.utcnow() + datetime.timedelta(days=7)
+ reference =
DeadlineReference.FIXED_DATETIME(future_date).serialize_reference()
+ deadline_time = future_date + interval
+
+ deadline_alert = DeadlineAlertModel(
+ serialized_dag_id=serialized_dag_id,
+ reference=reference,
+ interval=interval.total_seconds(),
+ callback_def={"path": f"{__name__}.empty_callback_for_deadline",
"kwargs": {}},
+ )
+ session.add(deadline_alert)
+ session.flush()
+
+ deadline = Deadline(
+ dagrun_id=dag_run.id,
+ deadline_alert_id=deadline_alert.id,
+ deadline_time=deadline_time,
+ callback=AsyncCallback(empty_callback_for_deadline),
+ dag_id=dag_run.dag_id,
+ )
+ session.add(deadline)
+
+ session.flush()
+
+ deadlines_before =
session.scalars(select(Deadline).where(Deadline.dagrun_id == dag_run.id)).all()
+ deadline_times_by_alert = {
+ deadline.deadline_alert_id: deadline.deadline_time for deadline in
deadlines_before
+ }
+
+ tis = session.scalars(select(TI).where(TI.dag_id == dag.dag_id, TI.run_id
== dag_run.run_id)).all()
+ clear_task_instances(tis, session)
+
+ dag_run = session.scalar(select(DagRun).where(DagRun.id == dag_run.id))
+ assert dag_run.queued_at > original_queued_at
+
+ deadlines_after =
session.scalars(select(Deadline).where(Deadline.dagrun_id == dag_run.id)).all()
+ assert len(deadlines_after) == 3
+
+ # Verify exactly 2 DAGRUN_QUEUED_AT deadlines were recalculated,
FIXED_DATETIME was not
+ recalculated_count = 0
+ for deadline in deadlines_after:
+ if deadline.deadline_time !=
deadline_times_by_alert[deadline.deadline_alert_id]:
+ recalculated_count += 1
+ deadline_alert = session.get(DeadlineAlertModel,
deadline.deadline_alert_id)
+ expected_time = dag_run.queued_at +
datetime.timedelta(seconds=deadline_alert.interval)
+ assert deadline.deadline_time == expected_time
+
+ assert recalculated_count == 2