This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-1-test by this push:
     new 64642c224b7 [v3-1-test] Select deadlines by id so that deadlines 
associated with other dagruns of same run_id are not pruned. (#58574) (#58910)
64642c224b7 is described below

commit 64642c224b7c277d07d22c0b603fad31024349d2
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Dec 1 22:19:28 2025 +0100

    [v3-1-test] Select deadlines by id so that deadlines associated with other 
dagruns of same run_id are not pruned. (#58574) (#58910)
    
    * Select deadlines by id so that deadlines associated with other dagruns of 
same run_id are not pruned.
    
    * Track updates to dagrun attribute by session.add .
    (cherry picked from commit c9d0fab18d5e46fedfec9d05dddfee8375ae2bf5)
    
    Co-authored-by: Karthikeyan Singaravelan <[email protected]>
---
 airflow-core/src/airflow/models/dagrun.py     |  2 +-
 airflow-core/tests/unit/models/test_dagrun.py | 57 ++++++++++++++++++++++++++-
 2 files changed, 57 insertions(+), 2 deletions(-)

diff --git a/airflow-core/src/airflow/models/dagrun.py 
b/airflow-core/src/airflow/models/dagrun.py
index 562fdc5e9fc..39fad84be6a 100644
--- a/airflow-core/src/airflow/models/dagrun.py
+++ b/airflow-core/src/airflow/models/dagrun.py
@@ -1228,7 +1228,7 @@ class DagRun(Base, LoggingMixin):
                     isinstance(d.reference, DeadlineReference.TYPES.DAGRUN)
                     for d in cast("list", dag.deadline)
                 ):
-                    Deadline.prune_deadlines(session=session, 
conditions={DagRun.run_id: self.run_id})
+                    Deadline.prune_deadlines(session=session, 
conditions={DagRun.id: self.id})
 
         # if *all tasks* are deadlocked, the run failed
         elif unfinished.should_schedule and not are_runnable_tasks:
diff --git a/airflow-core/tests/unit/models/test_dagrun.py 
b/airflow-core/tests/unit/models/test_dagrun.py
index 74cf038dcec..a3192bc4098 100644
--- a/airflow-core/tests/unit/models/test_dagrun.py
+++ b/airflow-core/tests/unit/models/test_dagrun.py
@@ -27,7 +27,7 @@ from unittest.mock import call
 
 import pendulum
 import pytest
-from sqlalchemy import select
+from sqlalchemy import exists, select
 from sqlalchemy.orm import joinedload
 
 from airflow import settings
@@ -36,6 +36,7 @@ from airflow.callbacks.callback_requests import 
DagCallbackRequest, DagRunContex
 from airflow.models.dag import DagModel, infer_automated_data_interval
 from airflow.models.dag_version import DagVersion
 from airflow.models.dagrun import DagRun, DagRunNote
+from airflow.models.deadline import Deadline
 from airflow.models.serialized_dag import SerializedDagModel
 from airflow.models.taskinstance import TaskInstance, TaskInstanceNote, 
clear_task_instances
 from airflow.models.taskmap import TaskMap
@@ -1292,6 +1293,60 @@ class TestDagRun:
         # Callbacks are not added until handle_callback = False is passed to 
dag_run.update_state()
         assert callback is None
 
+    def test_dagrun_success_deadline_prune(self, dag_maker, session):
+        """Ensure only the deadline associated with dagrun marked as success 
is deleted."""
+        now = timezone.utcnow()
+        future_date = datetime.datetime.now() + datetime.timedelta(days=365)
+        initial_task_states = {
+            "test_state_succeeded1": TaskInstanceState.SUCCESS,
+        }
+
+        with dag_maker(
+            dag_id="dag_1",
+            schedule=datetime.timedelta(days=1),
+            deadline=DeadlineAlert(
+                reference=DeadlineReference.FIXED_DATETIME(future_date),
+                interval=datetime.timedelta(hours=1),
+                callback=AsyncCallback(empty_callback_for_deadline),
+            ),
+            session=session,
+        ) as dag1:
+            EmptyOperator(task_id="test_state_succeeded1")
+
+        dag_run1 = self.create_dag_run(
+            dag=dag1, session=session, logical_date=now, 
task_states=initial_task_states
+        )
+
+        with dag_maker(
+            dag_id="dag_2",
+            schedule=datetime.timedelta(days=1),
+            deadline=DeadlineAlert(
+                reference=DeadlineReference.FIXED_DATETIME(future_date),
+                interval=datetime.timedelta(hours=1),
+                callback=AsyncCallback(empty_callback_for_deadline),
+            ),
+            session=session,
+        ) as dag2:
+            EmptyOperator(task_id="test_state_succeeded1")
+
+        dag_run2 = self.create_dag_run(
+            dag=dag2, session=session, logical_date=now, 
task_states=initial_task_states
+        )
+
+        dag_run1_deadline = exists().where(Deadline.dagrun_id == dag_run1.id)
+        dag_run2_deadline = exists().where(Deadline.dagrun_id == dag_run2.id)
+
+        assert session.query(dag_run1_deadline).scalar()
+        assert session.query(dag_run2_deadline).scalar()
+
+        session.add(dag_run1)
+        dag_run1.update_state()
+
+        assert not session.query(dag_run1_deadline).scalar()
+        assert session.query(dag_run2_deadline).scalar()
+        assert dag_run1.state == DagRunState.SUCCESS
+        assert dag_run2.state == DagRunState.RUNNING
+
 
 @pytest.mark.parametrize(
     ("run_type", "expected_tis"),

Reply via email to