This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 020c2d67ebe6dc4d976b77fbab1c3b044a0c8315
Author: Jarek Potiuk <[email protected]>
AuthorDate: Mon Dec 8 00:02:51 2025 +0100

    [v3-1-test] Fix DagRun.queued_at not updating when clearing (#59066) 
(#59177)
    
    Clearing a dag run sets the state to QUEUED so the scheduler will pick it 
up, but it did not reset the queued_at timestamp.
    (cherry picked from commit 2b38b194633e896dee8993a94c6dc7c4b11d3804)
    
    Co-authored-by: D. Ferruzzi <[email protected]>
---
 airflow-core/src/airflow/models/taskinstance.py   | 1 +
 airflow-core/tests/unit/models/test_cleartasks.py | 5 +++++
 2 files changed, 6 insertions(+)

diff --git a/airflow-core/src/airflow/models/taskinstance.py 
b/airflow-core/src/airflow/models/taskinstance.py
index 216bc9b1bca..29f2d98eefa 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -294,6 +294,7 @@ def clear_task_instances(
                     dr.last_scheduling_decision = None
                     dr.start_date = None
                     dr.clear_number += 1
+                    dr.queued_at = timezone.utcnow()
     session.flush()
 
 
diff --git a/airflow-core/tests/unit/models/test_cleartasks.py 
b/airflow-core/tests/unit/models/test_cleartasks.py
index 9a1f37c89ca..421e962515f 100644
--- a/airflow-core/tests/unit/models/test_cleartasks.py
+++ b/airflow-core/tests/unit/models/test_cleartasks.py
@@ -276,6 +276,7 @@ class TestClearTasks:
         ti1.state = TaskInstanceState.SUCCESS
         session = dag_maker.session
         session.flush()
+        original_queued_at = dr.queued_at
 
         # we use order_by(task_id) here because for the test DAG structure of 
ours
         # this is equivalent to topological sort. It would not work in general 
case
@@ -291,6 +292,10 @@ class TestClearTasks:
         assert dr.start_date is None
         assert dr.last_scheduling_decision is None
 
+        # The initial finished run has queued_at=None, clearing should 
populate it.
+        assert original_queued_at is None
+        assert dr.queued_at is not None
+
     @pytest.mark.parametrize("delete_tasks", [True, False])
     def test_clear_task_instances_maybe_task_removed(self, delete_tasks, 
dag_maker, session):
         """This verifies the behavior of clear_task_instances re task removal.

Reply via email to