This is an automated email from the ASF dual-hosted git repository.
vatsrahul1001 pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-2-test by this push:
new 31cfba3617b Fix DAG auto-pause ordering to use run_after (#65207)
(#66863)
31cfba3617b is described below
commit 31cfba3617bfea9e8bffa34704ef6b9d7d69779b
Author: Rahul Vats <[email protected]>
AuthorDate: Fri May 15 14:09:51 2026 +0530
Fix DAG auto-pause ordering to use run_after (#65207) (#66863)
* Fix DAG auto-pause ordering to use run_after
Auto-pausing for consecutive failed DAG runs should
evaluate the most recent runs by run_after, not by
logical_date. This fixes cases where manual runs or
other non-chronological logical dates could cause
the scheduler to count the wrong runs and pause or
not pause a DAG incorrectly.
closes: #65125
* fixup! Fix DAG auto-pause ordering to use run_after
* Update airflow-core/tests/unit/models/test_dag.py
* apply suggestions from code review
---------
(cherry picked from commit f1cd3f9504d43049d70c8a48fc239c6b49a795c6)
Co-authored-by: Ephraim Anierobi <[email protected]>
Co-authored-by: Copilot <[email protected]>
---
airflow-core/src/airflow/models/dagrun.py | 4 +-
airflow-core/tests/unit/models/test_dag.py | 161 +++++++++++++++++++++++++++++
2 files changed, 163 insertions(+), 2 deletions(-)
diff --git a/airflow-core/src/airflow/models/dagrun.py
b/airflow-core/src/airflow/models/dagrun.py
index afe73a43b96..c41017a7644 100644
--- a/airflow-core/src/airflow/models/dagrun.py
+++ b/airflow-core/src/airflow/models/dagrun.py
@@ -843,10 +843,10 @@ class DagRun(Base, LoggingMixin):
dag_runs = session.scalars(
select(DagRun)
.where(DagRun.dag_id == dag_id)
- .order_by(DagRun.logical_date.desc())
+ .order_by(DagRun.run_after.desc(), DagRun.id.desc())
.limit(max_consecutive_failed_dag_runs)
).all()
- """ Marking dag as paused, if needed"""
+ # Mark dag as paused, if needed
to_be_paused = len(dag_runs) >= max_consecutive_failed_dag_runs and
all(
dag_run.state == DagRunState.FAILED for dag_run in dag_runs
)
diff --git a/airflow-core/tests/unit/models/test_dag.py
b/airflow-core/tests/unit/models/test_dag.py
index f94457f3726..6703a40f415 100644
--- a/airflow-core/tests/unit/models/test_dag.py
+++ b/airflow-core/tests/unit/models/test_dag.py
@@ -829,6 +829,167 @@ class TestDag:
add_failed_dag_run(scheduler_dag, "2", TEST_DATE + timedelta(days=1))
assert session.get(DagModel, dag.dag_id).is_paused
+ @staticmethod
+ def _add_dag_run(scheduler_dag, op1, session, run_id, logical_date,
run_after, ti_state, run_state):
+ """Create a dagrun, set the task-instance state, and call update_state.
+
+ update_state triggers _check_last_n_dagruns_failed only when the run
+ transitions to FAILED, so it is a no-op for SUCCESS runs.
+ """
+ dr = scheduler_dag.create_dagrun(
+ run_type=DagRunType.MANUAL,
+ run_id=run_id,
+ logical_date=logical_date,
+ state=run_state,
+ data_interval=(logical_date, logical_date),
+ run_after=run_after,
+ triggered_by=DagRunTriggeredByType.TEST,
+ session=session,
+ )
+ ti = dr.get_task_instance(task_id=op1.task_id, session=session)
+ ti.set_state(state=ti_state, session=session)
+ dr.update_state(session=session)
+ return dr
+
+ def test_dag_paused_after_limit_orders_by_run_after(self,
testing_dag_bundle):
+ """Verify _check_last_n_dagruns_failed orders by run_after, not
logical_date.
+
+ Create three runs where run_after ordering differs from logical_date
ordering.
+ The two most-recent runs by run_after are FAILED, while the oldest
(which has the
+ latest logical_date) is SUCCESS. The DAG should be paused because the
check
+ must use run_after ordering.
+ """
+ dag_id = "dag_paused_orders_by_run_after"
+ dag = DAG(dag_id, schedule=None, is_paused_upon_creation=False,
max_consecutive_failed_dag_runs=2)
+ op1 = BashOperator(task_id="task", bash_command="exit 1;")
+ dag.add_task(op1)
+ session = settings.Session()
+ orm_dag = DagModel(
+ dag_id=dag.dag_id,
+ bundle_name="testing",
+ is_stale=False,
+ )
+ session.add(orm_dag)
+ session.flush()
+
+ scheduler_dag = sync_dag_to_db(dag, session=session)
+ assert not session.get(DagModel, dag.dag_id).is_paused
+
+ # Run 1: oldest by run_after but LATEST logical_date — SUCCESS.
+ # update_state is a no-op here because _check_last_n_dagruns_failed
+ # is only invoked on the FAILED branch.
+ self._add_dag_run(
+ scheduler_dag,
+ op1,
+ session,
+ run_id="run_success",
+ logical_date=TEST_DATE + timedelta(days=10),
+ run_after=TEST_DATE,
+ ti_state=TaskInstanceState.SUCCESS,
+ run_state=State.SUCCESS,
+ )
+
+ # Run 2: second by run_after, earlier logical_date — FAILED
+ self._add_dag_run(
+ scheduler_dag,
+ op1,
+ session,
+ run_id="run_fail_1",
+ logical_date=TEST_DATE + timedelta(days=1),
+ run_after=TEST_DATE + timedelta(days=1),
+ ti_state=TaskInstanceState.FAILED,
+ run_state=State.FAILED,
+ )
+
+ # After one failure, DAG should NOT be paused yet
+ session.expire_all()
+ assert not session.get(DagModel, dag.dag_id).is_paused
+
+ # Run 3: most recent by run_after, middle logical_date — FAILED
+ self._add_dag_run(
+ scheduler_dag,
+ op1,
+ session,
+ run_id="run_fail_2",
+ logical_date=TEST_DATE + timedelta(days=2),
+ run_after=TEST_DATE + timedelta(days=2),
+ ti_state=TaskInstanceState.FAILED,
+ run_state=State.FAILED,
+ )
+
+ # Last 2 runs by run_after are both FAILED, DAG should be paused.
+ # If the code incorrectly ordered by logical_date, it would pick
+ # the SUCCESS run (logical_date=TEST_DATE+10d) and one FAILED run,
+ # and the DAG would NOT be paused.
+ session.expire_all()
+ assert session.get(DagModel, dag.dag_id).is_paused
+
+ def test_dag_not_paused_when_latest_by_run_after_succeeds(self,
testing_dag_bundle):
+ """Verify _check_last_n_dagruns_failed does not pause when most recent
run_after runs pass.
+
+ run_after order: Run1(day0,FAILED) Run2(day1,SUCCESS)
Run3(day2,FAILED)
+ logical_date order: Run1(day10,FAILED) Run2(day5,SUCCESS)
Run3(day6,FAILED)
+
+ By run_after the last 2 are Run2(SUCCESS) and Run3(FAILED) → only 1
consecutive
+ failure → DAG must NOT be paused.
+ """
+ dag_id = "dag_not_paused_run_after_ordering"
+ dag = DAG(dag_id, schedule=None, is_paused_upon_creation=False,
max_consecutive_failed_dag_runs=2)
+ op1 = BashOperator(task_id="task", bash_command="exit 1;")
+ dag.add_task(op1)
+ session = settings.Session()
+ orm_dag = DagModel(
+ dag_id=dag.dag_id,
+ bundle_name="testing",
+ is_stale=False,
+ )
+ session.add(orm_dag)
+ session.flush()
+
+ scheduler_dag = sync_dag_to_db(dag, session=session)
+
+ # Run 1: oldest by run_after — FAILED.
+ self._add_dag_run(
+ scheduler_dag,
+ op1,
+ session,
+ run_id="run_fail_old",
+ logical_date=TEST_DATE + timedelta(days=10),
+ run_after=TEST_DATE,
+ ti_state=TaskInstanceState.FAILED,
+ run_state=State.FAILED,
+ )
+
+ # Run 2: middle by run_after, middle logical_date — SUCCESS.
+ self._add_dag_run(
+ scheduler_dag,
+ op1,
+ session,
+ run_id="run_success_mid",
+ logical_date=TEST_DATE + timedelta(days=5),
+ run_after=TEST_DATE + timedelta(days=1),
+ ti_state=TaskInstanceState.SUCCESS,
+ run_state=State.SUCCESS,
+ )
+
+ # Run 3: most recent by run_after — FAILED.
+ # This triggers _check_last_n_dagruns_failed.
+ self._add_dag_run(
+ scheduler_dag,
+ op1,
+ session,
+ run_id="run_fail_latest",
+ logical_date=TEST_DATE + timedelta(days=6),
+ run_after=TEST_DATE + timedelta(days=2),
+ ti_state=TaskInstanceState.FAILED,
+ run_state=State.FAILED,
+ )
+
+ # Correct (run_after) ordering: last 2 are Run2(SUCCESS) and
Run3(FAILED) → not paused.
+ # Wrong (logical_date) ordering: last 2 are Run1(+10d, FAILED) and
Run3(+6d, FAILED) → paused.
+ session.expire_all()
+ assert not session.get(DagModel, dag.dag_id).is_paused
+
def test_dag_is_deactivated_upon_dagfile_deletion(self, dag_maker):
dag_id = "old_existing_dag"
with dag_maker(dag_id, schedule=None, is_paused_upon_creation=True) as
dag: