Copilot commented on code in PR #65207:
URL: https://github.com/apache/airflow/pull/65207#discussion_r3089779726
##########
airflow-core/tests/unit/models/test_dag.py:
##########
@@ -829,6 +829,167 @@ def add_failed_dag_run(dag, id, logical_date):
add_failed_dag_run(scheduler_dag, "2", TEST_DATE + timedelta(days=1))
assert session.get(DagModel, dag.dag_id).is_paused
+ def test_dag_paused_after_limit_orders_by_run_after(self,
testing_dag_bundle):
+ """Verify _check_last_n_dagruns_failed orders by run_after, not
logical_date.
+
+ Create three runs where run_after ordering differs from logical_date
ordering.
+ The two most-recent runs by run_after are FAILED, while the oldest
(which has the
+ latest logical_date) is SUCCESS. The DAG should be paused because the
check
+ must use run_after ordering.
+ """
+ dag_id = "dag_paused_orders_by_run_after"
+ dag = DAG(dag_id, schedule=None, is_paused_upon_creation=False,
max_consecutive_failed_dag_runs=2)
+ op1 = BashOperator(task_id="task", bash_command="exit 1;")
+ dag.add_task(op1)
+ session = settings.Session()
+ orm_dag = DagModel(
+ dag_id=dag.dag_id,
+ bundle_name="testing",
+ is_stale=False,
+ )
+ session.add(orm_dag)
+ session.flush()
+
+ scheduler_dag = sync_dag_to_db(dag, session=session)
+ assert not session.get(DagModel, dag.dag_id).is_paused
+
+ # Run 1: oldest by run_after but LATEST logical_date — SUCCESS
+ dr1 = scheduler_dag.create_dagrun(
+ run_type=DagRunType.MANUAL,
+ run_id="run_success",
+ logical_date=TEST_DATE + timedelta(days=10),
+ state=State.SUCCESS,
+ data_interval=(TEST_DATE, TEST_DATE),
+ run_after=TEST_DATE,
+ triggered_by=DagRunTriggeredByType.TEST,
+ session=session,
+ )
+ ti1 = dr1.get_task_instance(task_id=op1.task_id, session=session)
+ ti1.set_state(state=TaskInstanceState.SUCCESS, session=session)
+ dr1.update_state(session=session)
+
+ # Run 2: second by run_after, earlier logical_date — FAILED
+ dr2 = scheduler_dag.create_dagrun(
+ run_type=DagRunType.MANUAL,
+ run_id="run_fail_1",
+ logical_date=TEST_DATE + timedelta(days=1),
+ state=State.FAILED,
+ data_interval=(TEST_DATE + timedelta(days=1), TEST_DATE +
timedelta(days=1)),
+ run_after=TEST_DATE + timedelta(days=1),
+ triggered_by=DagRunTriggeredByType.TEST,
+ session=session,
+ )
+ ti2 = dr2.get_task_instance(task_id=op1.task_id, session=session)
+ ti2.set_state(state=TaskInstanceState.FAILED, session=session)
+ dr2.update_state(session=session)
+
+ # After one failure, DAG should NOT be paused yet
+ session.expire_all()
+ assert not session.get(DagModel, dag.dag_id).is_paused
+
+ # Run 3: most recent by run_after, middle logical_date — FAILED
+ dr3 = scheduler_dag.create_dagrun(
+ run_type=DagRunType.MANUAL,
+ run_id="run_fail_2",
+ logical_date=TEST_DATE + timedelta(days=2),
+ state=State.FAILED,
+ data_interval=(TEST_DATE + timedelta(days=2), TEST_DATE +
timedelta(days=2)),
+ run_after=TEST_DATE + timedelta(days=2),
+ triggered_by=DagRunTriggeredByType.TEST,
+ session=session,
+ )
+ ti3 = dr3.get_task_instance(task_id=op1.task_id, session=session)
+ ti3.set_state(state=TaskInstanceState.FAILED, session=session)
+ dr3.update_state(session=session)
+
+ # Last 2 runs by run_after are both FAILED, DAG should be paused.
+ # If the code incorrectly ordered by logical_date, it would pick
+ # the SUCCESS run (logical_date=TEST_DATE+10d) and one FAILED run,
+ # and the DAG would NOT be paused.
+ session.expire_all()
+ assert session.get(DagModel, dag.dag_id).is_paused
+
+ def test_dag_not_paused_when_latest_by_run_after_succeeds(self,
testing_dag_bundle):
+ """Verify _check_last_n_dagruns_failed does not pause when most recent
run_after runs pass.
+
+ Create three runs where the two most-recent by run_after are: one
FAILED, one SUCCESS.
+ Even though the two most-recent by logical_date are both FAILED, the
DAG should NOT
+ be paused because run_after ordering is used.
+ """
+ dag_id = "dag_not_paused_run_after_ordering"
+ dag = DAG(dag_id, schedule=None, is_paused_upon_creation=False,
max_consecutive_failed_dag_runs=2)
+ op1 = BashOperator(task_id="task", bash_command="exit 1;")
+ dag.add_task(op1)
+ session = settings.Session()
+ orm_dag = DagModel(
+ dag_id=dag.dag_id,
+ bundle_name="testing",
+ is_stale=False,
+ )
+ session.add(orm_dag)
+ session.flush()
+
+ scheduler_dag = sync_dag_to_db(dag, session=session)
+
+ # Run 1: oldest by run_after — FAILED
+ dr1 = scheduler_dag.create_dagrun(
+ run_type=DagRunType.MANUAL,
+ run_id="run_fail_old",
+ logical_date=TEST_DATE + timedelta(days=5),
+ state=State.FAILED,
+ data_interval=(TEST_DATE, TEST_DATE),
+ run_after=TEST_DATE,
+ triggered_by=DagRunTriggeredByType.TEST,
+ session=session,
+ )
+ ti1 = dr1.get_task_instance(task_id=op1.task_id, session=session)
+ ti1.set_state(state=TaskInstanceState.FAILED, session=session)
+ dr1.update_state(session=session)
+
+ # Run 2: second by run_after — FAILED (also latest logical_date)
+ dr2 = scheduler_dag.create_dagrun(
+ run_type=DagRunType.MANUAL,
+ run_id="run_fail_mid",
+ logical_date=TEST_DATE + timedelta(days=6),
+ state=State.FAILED,
+ data_interval=(TEST_DATE + timedelta(days=1), TEST_DATE +
timedelta(days=1)),
+ run_after=TEST_DATE + timedelta(days=1),
+ triggered_by=DagRunTriggeredByType.TEST,
+ session=session,
+ )
+ ti2 = dr2.get_task_instance(task_id=op1.task_id, session=session)
+ ti2.set_state(state=TaskInstanceState.FAILED, session=session)
+ dr2.update_state(session=session)
+
+ session.expire_all()
+ assert session.get(DagModel, dag.dag_id).is_paused
+
+ # Unpause the DAG so we can test the next scenario
+ session.execute(update(DagModel).where(DagModel.dag_id ==
dag_id).values(is_paused=False))
+ session.flush()
+
+ # Run 3: most recent by run_after — SUCCESS (but earliest logical_date)
+ dr3 = scheduler_dag.create_dagrun(
+ run_type=DagRunType.MANUAL,
+ run_id="run_success_latest",
+ logical_date=TEST_DATE,
+ state=State.SUCCESS,
+ data_interval=(TEST_DATE + timedelta(days=2), TEST_DATE +
timedelta(days=2)),
+ run_after=TEST_DATE + timedelta(days=2),
+ triggered_by=DagRunTriggeredByType.TEST,
+ session=session,
+ )
+ ti3 = dr3.get_task_instance(task_id=op1.task_id, session=session)
+ ti3.set_state(state=TaskInstanceState.SUCCESS, session=session)
+ dr3.update_state(session=session)
+
Review Comment:
`test_dag_not_paused_when_latest_by_run_after_succeeds` doesn’t currently
exercise the ordering logic: after manually unpausing, the next run you create
is SUCCESS, and `_check_last_n_dagruns_failed` is only invoked when a DagRun
transitions to FAILED. As a result, this assertion will stay green even if
ordering is wrong. To make this a real regression test, either call
`_check_last_n_dagruns_failed(...)` directly after creating the 3 runs, or add
a subsequent FAILED run that triggers the check while the most-recent *by
run_after* set includes a SUCCESS.
##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -865,7 +865,7 @@ def _check_last_n_dagruns_failed(self, dag_id,
max_consecutive_failed_dag_runs,
dag_runs = session.scalars(
select(DagRun)
.where(DagRun.dag_id == dag_id)
- .order_by(DagRun.logical_date.desc())
+ .order_by(DagRun.run_after.desc(), DagRun.id.desc())
Review Comment:
In `_check_last_n_dagruns_failed`, the query filters by the `dag_id`
argument, but the pause/update logic later in the method uses `self.dag_id`.
This makes the method internally inconsistent and could pause a different DAG
than the one queried if it’s ever called with a non-matching `dag_id`. Consider
removing the `dag_id` parameter and using `self.dag_id` consistently, or use
the `dag_id` argument consistently throughout the method (including the
update/logging).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]