This is an automated email from the ASF dual-hosted git repository.

ferruzzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 4e5bba54123 Update dagrun_queued_at any time a run is restarted. 
(#60848)
4e5bba54123 is described below

commit 4e5bba54123037052da93f9bef32225f9a1d2391
Author: D. Ferruzzi <[email protected]>
AuthorDate: Thu Jan 29 14:00:25 2026 -0800

    Update dagrun_queued_at any time a run is restarted. (#60848)
    
    Previous behavior only updated the queued_at field if the run was cleared 
while in a terminal state.  A run which was cleared while it was running or 
queued kept the original queued_at timestamp, making it impossible to track 
when the run was most recently attempted and introducing different behaviour 
depending on when the button was clicked.  We now update the queued_at 
timestamp and clear_number counter regardless of the state the run was in at 
the time it was cleared.  start_date i [...]
---
 airflow-core/src/airflow/models/taskinstance.py   |  6 ++++--
 airflow-core/tests/unit/models/test_cleartasks.py | 10 ++++++++++
 airflow-core/tests/unit/models/test_dagrun.py     |  5 ++++-
 3 files changed, 18 insertions(+), 3 deletions(-)

diff --git a/airflow-core/src/airflow/models/taskinstance.py 
b/airflow-core/src/airflow/models/taskinstance.py
index 2595492cd3b..ea196847c20 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -270,6 +270,10 @@ def clear_task_instances(
         ).all()
         dag_run_state = DagRunState(dag_run_state)  # Validate the state value.
         for dr in drs:
+            # Always update clear_number and queued_at when clearing tasks, 
regardless of state
+            dr.clear_number += 1
+            dr.queued_at = timezone.utcnow()
+
             if dr.state in State.finished_dr_states:
                 dr.state = dag_run_state
                 dr.start_date = timezone.utcnow()
@@ -295,8 +299,6 @@ def clear_task_instances(
                 if dag_run_state == DagRunState.QUEUED:
                     dr.last_scheduling_decision = None
                     dr.start_date = None
-                    dr.clear_number += 1
-                    dr.queued_at = timezone.utcnow()
     session.flush()
 
 
diff --git a/airflow-core/tests/unit/models/test_cleartasks.py 
b/airflow-core/tests/unit/models/test_cleartasks.py
index 52800e6400f..202611610a4 100644
--- a/airflow-core/tests/unit/models/test_cleartasks.py
+++ b/airflow-core/tests/unit/models/test_cleartasks.py
@@ -201,6 +201,7 @@ class TestClearTasks:
         """
         Test that DagRun state, start_date and last_scheduling_decision
         are not changed after clearing TI in an unfinished DagRun.
+        However, queued_at and clear_number should still be updated.
         """
         # Explicitly needs catchup as True as test is creating history runs
         with dag_maker(
@@ -222,6 +223,10 @@ class TestClearTasks:
         session = dag_maker.session
         session.flush()
 
+        # Store original values to verify they're updated
+        original_queued_at = dr.queued_at
+        original_clear_number = dr.clear_number
+
         # we use order_by(task_id) here because for the test DAG structure of 
ours
         # this is equivalent to topological sort. It would not work in general 
case
         # but it works for our case because we specifically constructed test 
DAGS
@@ -239,6 +244,11 @@ class TestClearTasks:
             assert dr.start_date
         assert dr.last_scheduling_decision == DEFAULT_DATE
 
+        # Verify queued_at and clear_number are updated even for 
running/queued dag runs
+        assert dr.queued_at is not None
+        assert dr.queued_at != original_queued_at
+        assert dr.clear_number == original_clear_number + 1
+
     @pytest.mark.parametrize(
         ("state", "last_scheduling"),
         [
diff --git a/airflow-core/tests/unit/models/test_dagrun.py 
b/airflow-core/tests/unit/models/test_dagrun.py
index 1af654c6d82..06c5648f04a 100644
--- a/airflow-core/tests/unit/models/test_dagrun.py
+++ b/airflow-core/tests/unit/models/test_dagrun.py
@@ -186,7 +186,10 @@ class TestDagRun:
         session.flush()
         dr0 = session.scalar(select(DagRun).where(DagRun.dag_id == dag_id, 
DagRun.logical_date == now))
         assert dr0.state == state
-        assert dr0.clear_number < 1
+        # clear_number should be incremented even for running dag runs
+        assert dr0.clear_number == 1
+        # queued_at should also be updated
+        assert dr0.queued_at is not None
 
     @pytest.mark.parametrize("state", [DagRunState.SUCCESS, 
DagRunState.FAILED])
     def test_clear_task_instances_for_backfill_finished_dagrun(self, 
dag_maker, state, session):

Reply via email to