This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-1-test by this push:
new 64642c224b7 [v3-1-test] Select deadlines by id so that deadlines
associated with other dagruns of same run_id are not pruned. (#58574) (#58910)
64642c224b7 is described below
commit 64642c224b7c277d07d22c0b603fad31024349d2
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Dec 1 22:19:28 2025 +0100
[v3-1-test] Select deadlines by id so that deadlines associated with other
dagruns of same run_id are not pruned. (#58574) (#58910)
* Select deadlines by id so that deadlines associated with other dagruns of
same run_id are not pruned.
* Track updates to dagrun attribute by session.add .
(cherry picked from commit c9d0fab18d5e46fedfec9d05dddfee8375ae2bf5)
Co-authored-by: Karthikeyan Singaravelan <[email protected]>
---
airflow-core/src/airflow/models/dagrun.py | 2 +-
airflow-core/tests/unit/models/test_dagrun.py | 57 ++++++++++++++++++++++++++-
2 files changed, 57 insertions(+), 2 deletions(-)
diff --git a/airflow-core/src/airflow/models/dagrun.py
b/airflow-core/src/airflow/models/dagrun.py
index 562fdc5e9fc..39fad84be6a 100644
--- a/airflow-core/src/airflow/models/dagrun.py
+++ b/airflow-core/src/airflow/models/dagrun.py
@@ -1228,7 +1228,7 @@ class DagRun(Base, LoggingMixin):
isinstance(d.reference, DeadlineReference.TYPES.DAGRUN)
for d in cast("list", dag.deadline)
):
- Deadline.prune_deadlines(session=session,
conditions={DagRun.run_id: self.run_id})
+ Deadline.prune_deadlines(session=session,
conditions={DagRun.id: self.id})
# if *all tasks* are deadlocked, the run failed
elif unfinished.should_schedule and not are_runnable_tasks:
diff --git a/airflow-core/tests/unit/models/test_dagrun.py
b/airflow-core/tests/unit/models/test_dagrun.py
index 74cf038dcec..a3192bc4098 100644
--- a/airflow-core/tests/unit/models/test_dagrun.py
+++ b/airflow-core/tests/unit/models/test_dagrun.py
@@ -27,7 +27,7 @@ from unittest.mock import call
import pendulum
import pytest
-from sqlalchemy import select
+from sqlalchemy import exists, select
from sqlalchemy.orm import joinedload
from airflow import settings
@@ -36,6 +36,7 @@ from airflow.callbacks.callback_requests import
DagCallbackRequest, DagRunContex
from airflow.models.dag import DagModel, infer_automated_data_interval
from airflow.models.dag_version import DagVersion
from airflow.models.dagrun import DagRun, DagRunNote
+from airflow.models.deadline import Deadline
from airflow.models.serialized_dag import SerializedDagModel
from airflow.models.taskinstance import TaskInstance, TaskInstanceNote,
clear_task_instances
from airflow.models.taskmap import TaskMap
@@ -1292,6 +1293,60 @@ class TestDagRun:
# Callbacks are not added until handle_callback = False is passed to
dag_run.update_state()
assert callback is None
+ def test_dagrun_success_deadline_prune(self, dag_maker, session):
+ """Ensure only the deadline associated with dagrun marked as success
is deleted."""
+ now = timezone.utcnow()
+ future_date = datetime.datetime.now() + datetime.timedelta(days=365)
+ initial_task_states = {
+ "test_state_succeeded1": TaskInstanceState.SUCCESS,
+ }
+
+ with dag_maker(
+ dag_id="dag_1",
+ schedule=datetime.timedelta(days=1),
+ deadline=DeadlineAlert(
+ reference=DeadlineReference.FIXED_DATETIME(future_date),
+ interval=datetime.timedelta(hours=1),
+ callback=AsyncCallback(empty_callback_for_deadline),
+ ),
+ session=session,
+ ) as dag1:
+ EmptyOperator(task_id="test_state_succeeded1")
+
+ dag_run1 = self.create_dag_run(
+ dag=dag1, session=session, logical_date=now,
task_states=initial_task_states
+ )
+
+ with dag_maker(
+ dag_id="dag_2",
+ schedule=datetime.timedelta(days=1),
+ deadline=DeadlineAlert(
+ reference=DeadlineReference.FIXED_DATETIME(future_date),
+ interval=datetime.timedelta(hours=1),
+ callback=AsyncCallback(empty_callback_for_deadline),
+ ),
+ session=session,
+ ) as dag2:
+ EmptyOperator(task_id="test_state_succeeded1")
+
+ dag_run2 = self.create_dag_run(
+ dag=dag2, session=session, logical_date=now,
task_states=initial_task_states
+ )
+
+ dag_run1_deadline = exists().where(Deadline.dagrun_id == dag_run1.id)
+ dag_run2_deadline = exists().where(Deadline.dagrun_id == dag_run2.id)
+
+ assert session.query(dag_run1_deadline).scalar()
+ assert session.query(dag_run2_deadline).scalar()
+
+ session.add(dag_run1)
+ dag_run1.update_state()
+
+ assert not session.query(dag_run1_deadline).scalar()
+ assert session.query(dag_run2_deadline).scalar()
+ assert dag_run1.state == DagRunState.SUCCESS
+ assert dag_run2.state == DagRunState.RUNNING
+
@pytest.mark.parametrize(
("run_type", "expected_tis"),