This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v3-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 020c2d67ebe6dc4d976b77fbab1c3b044a0c8315 Author: Jarek Potiuk <[email protected]> AuthorDate: Mon Dec 8 00:02:51 2025 +0100 [v3-1-test] Fix DagRun.queued_at not updating when clearing (#59066) (#59177) Clearing a dag run sets the state to QUEUED so the scheduler will pick it up, but it did not reset the queued_at timestamp. (cherry picked from commit 2b38b194633e896dee8993a94c6dc7c4b11d3804) Co-authored-by: D. Ferruzzi <[email protected]> --- airflow-core/src/airflow/models/taskinstance.py | 1 + airflow-core/tests/unit/models/test_cleartasks.py | 5 +++++ 2 files changed, 6 insertions(+) diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py index 216bc9b1bca..29f2d98eefa 100644 --- a/airflow-core/src/airflow/models/taskinstance.py +++ b/airflow-core/src/airflow/models/taskinstance.py @@ -294,6 +294,7 @@ def clear_task_instances( dr.last_scheduling_decision = None dr.start_date = None dr.clear_number += 1 + dr.queued_at = timezone.utcnow() session.flush() diff --git a/airflow-core/tests/unit/models/test_cleartasks.py b/airflow-core/tests/unit/models/test_cleartasks.py index 9a1f37c89ca..421e962515f 100644 --- a/airflow-core/tests/unit/models/test_cleartasks.py +++ b/airflow-core/tests/unit/models/test_cleartasks.py @@ -276,6 +276,7 @@ class TestClearTasks: ti1.state = TaskInstanceState.SUCCESS session = dag_maker.session session.flush() + original_queued_at = dr.queued_at # we use order_by(task_id) here because for the test DAG structure of ours # this is equivalent to topological sort. It would not work in general case @@ -291,6 +292,10 @@ class TestClearTasks: assert dr.start_date is None assert dr.last_scheduling_decision is None + # The initial finished run has queued_at=None, clearing should populate it. + assert original_queued_at is None + assert dr.queued_at is not None + @pytest.mark.parametrize("delete_tasks", [True, False]) def test_clear_task_instances_maybe_task_removed(self, delete_tasks, dag_maker, session): """This verifies the behavior of clear_task_instances re task removal.
