This is an automated email from the ASF dual-hosted git repository.

vatsrahul1001 pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-2-test by this push:
     new 31cfba3617b Fix DAG auto-pause ordering to use run_after (#65207) 
(#66863)
31cfba3617b is described below

commit 31cfba3617bfea9e8bffa34704ef6b9d7d69779b
Author: Rahul Vats <[email protected]>
AuthorDate: Fri May 15 14:09:51 2026 +0530

    Fix DAG auto-pause ordering to use run_after (#65207) (#66863)
    
    * Fix DAG auto-pause ordering to use run_after
    
    Auto-pausing for consecutive failed DAG runs should
    evaluate the most recent runs by run_after, not by
    logical_date. This fixes cases where manual runs or
    other non-chronological logical dates could cause
    the scheduler to count the wrong runs and pause or
    not pause a DAG incorrectly.
    
    closes: #65125
    
    * fixup! Fix DAG auto-pause ordering to use run_after
    
    * Update airflow-core/tests/unit/models/test_dag.py
    
    
    
    * apply suggestions from code review
    
    ---------
    
    
    (cherry picked from commit f1cd3f9504d43049d70c8a48fc239c6b49a795c6)
    
    Co-authored-by: Ephraim Anierobi <[email protected]>
    Co-authored-by: Copilot <[email protected]>
---
 airflow-core/src/airflow/models/dagrun.py  |   4 +-
 airflow-core/tests/unit/models/test_dag.py | 161 +++++++++++++++++++++++++++++
 2 files changed, 163 insertions(+), 2 deletions(-)

diff --git a/airflow-core/src/airflow/models/dagrun.py 
b/airflow-core/src/airflow/models/dagrun.py
index afe73a43b96..c41017a7644 100644
--- a/airflow-core/src/airflow/models/dagrun.py
+++ b/airflow-core/src/airflow/models/dagrun.py
@@ -843,10 +843,10 @@ class DagRun(Base, LoggingMixin):
         dag_runs = session.scalars(
             select(DagRun)
             .where(DagRun.dag_id == dag_id)
-            .order_by(DagRun.logical_date.desc())
+            .order_by(DagRun.run_after.desc(), DagRun.id.desc())
             .limit(max_consecutive_failed_dag_runs)
         ).all()
-        """ Marking dag as paused, if needed"""
+        # Mark dag as paused, if needed
         to_be_paused = len(dag_runs) >= max_consecutive_failed_dag_runs and 
all(
             dag_run.state == DagRunState.FAILED for dag_run in dag_runs
         )
diff --git a/airflow-core/tests/unit/models/test_dag.py 
b/airflow-core/tests/unit/models/test_dag.py
index f94457f3726..6703a40f415 100644
--- a/airflow-core/tests/unit/models/test_dag.py
+++ b/airflow-core/tests/unit/models/test_dag.py
@@ -829,6 +829,167 @@ class TestDag:
         add_failed_dag_run(scheduler_dag, "2", TEST_DATE + timedelta(days=1))
         assert session.get(DagModel, dag.dag_id).is_paused
 
+    @staticmethod
+    def _add_dag_run(scheduler_dag, op1, session, run_id, logical_date, 
run_after, ti_state, run_state):
+        """Create a dagrun, set the task-instance state, and call update_state.
+
+        update_state triggers _check_last_n_dagruns_failed only when the run
+        transitions to FAILED, so it is a no-op for SUCCESS runs.
+        """
+        dr = scheduler_dag.create_dagrun(
+            run_type=DagRunType.MANUAL,
+            run_id=run_id,
+            logical_date=logical_date,
+            state=run_state,
+            data_interval=(logical_date, logical_date),
+            run_after=run_after,
+            triggered_by=DagRunTriggeredByType.TEST,
+            session=session,
+        )
+        ti = dr.get_task_instance(task_id=op1.task_id, session=session)
+        ti.set_state(state=ti_state, session=session)
+        dr.update_state(session=session)
+        return dr
+
+    def test_dag_paused_after_limit_orders_by_run_after(self, 
testing_dag_bundle):
+        """Verify _check_last_n_dagruns_failed orders by run_after, not 
logical_date.
+
+        Create three runs where run_after ordering differs from logical_date 
ordering.
+        The two most-recent runs by run_after are FAILED, while the oldest 
(which has the
+        latest logical_date) is SUCCESS.  The DAG should be paused because the 
check
+        must use run_after ordering.
+        """
+        dag_id = "dag_paused_orders_by_run_after"
+        dag = DAG(dag_id, schedule=None, is_paused_upon_creation=False, 
max_consecutive_failed_dag_runs=2)
+        op1 = BashOperator(task_id="task", bash_command="exit 1;")
+        dag.add_task(op1)
+        session = settings.Session()
+        orm_dag = DagModel(
+            dag_id=dag.dag_id,
+            bundle_name="testing",
+            is_stale=False,
+        )
+        session.add(orm_dag)
+        session.flush()
+
+        scheduler_dag = sync_dag_to_db(dag, session=session)
+        assert not session.get(DagModel, dag.dag_id).is_paused
+
+        # Run 1: oldest by run_after but LATEST logical_date — SUCCESS.
+        # update_state is a no-op here because _check_last_n_dagruns_failed
+        # is only invoked on the FAILED branch.
+        self._add_dag_run(
+            scheduler_dag,
+            op1,
+            session,
+            run_id="run_success",
+            logical_date=TEST_DATE + timedelta(days=10),
+            run_after=TEST_DATE,
+            ti_state=TaskInstanceState.SUCCESS,
+            run_state=State.SUCCESS,
+        )
+
+        # Run 2: second by run_after, earlier logical_date — FAILED
+        self._add_dag_run(
+            scheduler_dag,
+            op1,
+            session,
+            run_id="run_fail_1",
+            logical_date=TEST_DATE + timedelta(days=1),
+            run_after=TEST_DATE + timedelta(days=1),
+            ti_state=TaskInstanceState.FAILED,
+            run_state=State.FAILED,
+        )
+
+        # After one failure, DAG should NOT be paused yet
+        session.expire_all()
+        assert not session.get(DagModel, dag.dag_id).is_paused
+
+        # Run 3: most recent by run_after, middle logical_date — FAILED
+        self._add_dag_run(
+            scheduler_dag,
+            op1,
+            session,
+            run_id="run_fail_2",
+            logical_date=TEST_DATE + timedelta(days=2),
+            run_after=TEST_DATE + timedelta(days=2),
+            ti_state=TaskInstanceState.FAILED,
+            run_state=State.FAILED,
+        )
+
+        # Last 2 runs by run_after are both FAILED, DAG should be paused.
+        # If the code incorrectly ordered by logical_date, it would pick
+        # the SUCCESS run (logical_date=TEST_DATE+10d) and one FAILED run,
+        # and the DAG would NOT be paused.
+        session.expire_all()
+        assert session.get(DagModel, dag.dag_id).is_paused
+
+    def test_dag_not_paused_when_latest_by_run_after_succeeds(self, 
testing_dag_bundle):
+        """Verify _check_last_n_dagruns_failed does not pause when most recent 
run_after runs pass.
+
+          run_after order:  Run1(day0,FAILED) Run2(day1,SUCCESS) 
Run3(day2,FAILED)
+          logical_date order: Run1(day10,FAILED) Run2(day5,SUCCESS) 
Run3(day6,FAILED)
+
+        By run_after the last 2 are Run2(SUCCESS) and Run3(FAILED) → only 1 
consecutive
+        failure → DAG must NOT be paused.
+        """
+        dag_id = "dag_not_paused_run_after_ordering"
+        dag = DAG(dag_id, schedule=None, is_paused_upon_creation=False, 
max_consecutive_failed_dag_runs=2)
+        op1 = BashOperator(task_id="task", bash_command="exit 1;")
+        dag.add_task(op1)
+        session = settings.Session()
+        orm_dag = DagModel(
+            dag_id=dag.dag_id,
+            bundle_name="testing",
+            is_stale=False,
+        )
+        session.add(orm_dag)
+        session.flush()
+
+        scheduler_dag = sync_dag_to_db(dag, session=session)
+
+        # Run 1: oldest by run_after — FAILED.
+        self._add_dag_run(
+            scheduler_dag,
+            op1,
+            session,
+            run_id="run_fail_old",
+            logical_date=TEST_DATE + timedelta(days=10),
+            run_after=TEST_DATE,
+            ti_state=TaskInstanceState.FAILED,
+            run_state=State.FAILED,
+        )
+
+        # Run 2: middle by run_after, middle logical_date — SUCCESS.
+        self._add_dag_run(
+            scheduler_dag,
+            op1,
+            session,
+            run_id="run_success_mid",
+            logical_date=TEST_DATE + timedelta(days=5),
+            run_after=TEST_DATE + timedelta(days=1),
+            ti_state=TaskInstanceState.SUCCESS,
+            run_state=State.SUCCESS,
+        )
+
+        # Run 3: most recent by run_after — FAILED.
+        # This triggers _check_last_n_dagruns_failed.
+        self._add_dag_run(
+            scheduler_dag,
+            op1,
+            session,
+            run_id="run_fail_latest",
+            logical_date=TEST_DATE + timedelta(days=6),
+            run_after=TEST_DATE + timedelta(days=2),
+            ti_state=TaskInstanceState.FAILED,
+            run_state=State.FAILED,
+        )
+
+        # Correct (run_after) ordering: last 2 are Run2(SUCCESS) and 
Run3(FAILED) → not paused.
+        # Wrong (logical_date) ordering: last 2 are Run1(+10d, FAILED) and 
Run3(+6d, FAILED) → paused.
+        session.expire_all()
+        assert not session.get(DagModel, dag.dag_id).is_paused
+
     def test_dag_is_deactivated_upon_dagfile_deletion(self, dag_maker):
         dag_id = "old_existing_dag"
         with dag_maker(dag_id, schedule=None, is_paused_upon_creation=True) as 
dag:

Reply via email to