This is an automated email from the ASF dual-hosted git repository.
ferruzzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 4e5bba54123 Update dagrun_queued_at any time a run is restarted.
(#60848)
4e5bba54123 is described below
commit 4e5bba54123037052da93f9bef32225f9a1d2391
Author: D. Ferruzzi <[email protected]>
AuthorDate: Thu Jan 29 14:00:25 2026 -0800
Update dagrun_queued_at any time a run is restarted. (#60848)
Previous behavior only updated the queued_at field if the run was cleared
while in a terminal state. A run which was cleared while it was running or
queued kept the original queued_at timestamp, making it impossible to track
when the run was most recently attempted and introducing different behaviour
depending on when the button was clicked. We now update the queued_at
timestamp and clear_number counter regardless of the state the run was in at
the time it was cleared. start_date i [...]
---
airflow-core/src/airflow/models/taskinstance.py | 6 ++++--
airflow-core/tests/unit/models/test_cleartasks.py | 10 ++++++++++
airflow-core/tests/unit/models/test_dagrun.py | 5 ++++-
3 files changed, 18 insertions(+), 3 deletions(-)
diff --git a/airflow-core/src/airflow/models/taskinstance.py
b/airflow-core/src/airflow/models/taskinstance.py
index 2595492cd3b..ea196847c20 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -270,6 +270,10 @@ def clear_task_instances(
).all()
dag_run_state = DagRunState(dag_run_state) # Validate the state value.
for dr in drs:
+ # Always update clear_number and queued_at when clearing tasks,
regardless of state
+ dr.clear_number += 1
+ dr.queued_at = timezone.utcnow()
+
if dr.state in State.finished_dr_states:
dr.state = dag_run_state
dr.start_date = timezone.utcnow()
@@ -295,8 +299,6 @@ def clear_task_instances(
if dag_run_state == DagRunState.QUEUED:
dr.last_scheduling_decision = None
dr.start_date = None
- dr.clear_number += 1
- dr.queued_at = timezone.utcnow()
session.flush()
diff --git a/airflow-core/tests/unit/models/test_cleartasks.py
b/airflow-core/tests/unit/models/test_cleartasks.py
index 52800e6400f..202611610a4 100644
--- a/airflow-core/tests/unit/models/test_cleartasks.py
+++ b/airflow-core/tests/unit/models/test_cleartasks.py
@@ -201,6 +201,7 @@ class TestClearTasks:
"""
Test that DagRun state, start_date and last_scheduling_decision
are not changed after clearing TI in an unfinished DagRun.
+ However, queued_at and clear_number should still be updated.
"""
# Explicitly needs catchup as True as test is creating history runs
with dag_maker(
@@ -222,6 +223,10 @@ class TestClearTasks:
session = dag_maker.session
session.flush()
+ # Store original values to verify they're updated
+ original_queued_at = dr.queued_at
+ original_clear_number = dr.clear_number
+
# we use order_by(task_id) here because for the test DAG structure of
ours
# this is equivalent to topological sort. It would not work in general
case
# but it works for our case because we specifically constructed test
DAGS
@@ -239,6 +244,11 @@ class TestClearTasks:
assert dr.start_date
assert dr.last_scheduling_decision == DEFAULT_DATE
+ # Verify queued_at and clear_number are updated even for
running/queued dag runs
+ assert dr.queued_at is not None
+ assert dr.queued_at != original_queued_at
+ assert dr.clear_number == original_clear_number + 1
+
@pytest.mark.parametrize(
("state", "last_scheduling"),
[
diff --git a/airflow-core/tests/unit/models/test_dagrun.py
b/airflow-core/tests/unit/models/test_dagrun.py
index 1af654c6d82..06c5648f04a 100644
--- a/airflow-core/tests/unit/models/test_dagrun.py
+++ b/airflow-core/tests/unit/models/test_dagrun.py
@@ -186,7 +186,10 @@ class TestDagRun:
session.flush()
dr0 = session.scalar(select(DagRun).where(DagRun.dag_id == dag_id,
DagRun.logical_date == now))
assert dr0.state == state
- assert dr0.clear_number < 1
+ # clear_number should be incremented even for running dag runs
+ assert dr0.clear_number == 1
+ # queued_at should also be updated
+ assert dr0.queued_at is not None
@pytest.mark.parametrize("state", [DagRunState.SUCCESS,
DagRunState.FAILED])
def test_clear_task_instances_for_backfill_finished_dagrun(self,
dag_maker, state, session):