Copilot commented on code in PR #65207:
URL: https://github.com/apache/airflow/pull/65207#discussion_r3092252095
##########
airflow-core/tests/unit/models/test_dag.py:
##########
@@ -829,6 +829,161 @@ def add_failed_dag_run(dag, id, logical_date):
add_failed_dag_run(scheduler_dag, "2", TEST_DATE + timedelta(days=1))
assert session.get(DagModel, dag.dag_id).is_paused
+ def test_dag_paused_after_limit_orders_by_run_after(self,
testing_dag_bundle):
+ """Verify _check_last_n_dagruns_failed orders by run_after, not
logical_date.
+
+ Create three runs where run_after ordering differs from logical_date
ordering.
+ The two most-recent runs by run_after are FAILED, while the oldest
(which has the
+ latest logical_date) is SUCCESS. The DAG should be paused because the
check
+ must use run_after ordering.
+ """
+ dag_id = "dag_paused_orders_by_run_after"
+ dag = DAG(dag_id, schedule=None, is_paused_upon_creation=False,
max_consecutive_failed_dag_runs=2)
+ op1 = BashOperator(task_id="task", bash_command="exit 1;")
+ dag.add_task(op1)
+ session = settings.Session()
+ orm_dag = DagModel(
+ dag_id=dag.dag_id,
+ bundle_name="testing",
+ is_stale=False,
+ )
+ session.add(orm_dag)
+ session.flush()
+
+ scheduler_dag = sync_dag_to_db(dag, session=session)
+ assert not session.get(DagModel, dag.dag_id).is_paused
+
+ # Run 1: oldest by run_after but LATEST logical_date — SUCCESS
+ dr1 = scheduler_dag.create_dagrun(
+ run_type=DagRunType.MANUAL,
+ run_id="run_success",
+ logical_date=TEST_DATE + timedelta(days=10),
+ state=State.SUCCESS,
+ data_interval=(TEST_DATE, TEST_DATE),
+ run_after=TEST_DATE,
+ triggered_by=DagRunTriggeredByType.TEST,
+ session=session,
+ )
+ ti1 = dr1.get_task_instance(task_id=op1.task_id, session=session)
+ ti1.set_state(state=TaskInstanceState.SUCCESS, session=session)
+ dr1.update_state(session=session)
+
+ # Run 2: second by run_after, earlier logical_date — FAILED
+ dr2 = scheduler_dag.create_dagrun(
+ run_type=DagRunType.MANUAL,
+ run_id="run_fail_1",
+ logical_date=TEST_DATE + timedelta(days=1),
+ state=State.FAILED,
+ data_interval=(TEST_DATE + timedelta(days=1), TEST_DATE +
timedelta(days=1)),
+ run_after=TEST_DATE + timedelta(days=1),
+ triggered_by=DagRunTriggeredByType.TEST,
+ session=session,
+ )
+ ti2 = dr2.get_task_instance(task_id=op1.task_id, session=session)
+ ti2.set_state(state=TaskInstanceState.FAILED, session=session)
+ dr2.update_state(session=session)
+
+ # After one failure, DAG should NOT be paused yet
+ session.expire_all()
+ assert not session.get(DagModel, dag.dag_id).is_paused
+
+ # Run 3: most recent by run_after, middle logical_date — FAILED
+ dr3 = scheduler_dag.create_dagrun(
+ run_type=DagRunType.MANUAL,
+ run_id="run_fail_2",
+ logical_date=TEST_DATE + timedelta(days=2),
+ state=State.FAILED,
+ data_interval=(TEST_DATE + timedelta(days=2), TEST_DATE +
timedelta(days=2)),
+ run_after=TEST_DATE + timedelta(days=2),
+ triggered_by=DagRunTriggeredByType.TEST,
+ session=session,
+ )
+ ti3 = dr3.get_task_instance(task_id=op1.task_id, session=session)
+ ti3.set_state(state=TaskInstanceState.FAILED, session=session)
+ dr3.update_state(session=session)
+
+ # Last 2 runs by run_after are both FAILED, DAG should be paused.
+ # If the code incorrectly ordered by logical_date, it would pick
+ # the SUCCESS run (logical_date=TEST_DATE+10d) and one FAILED run,
+ # and the DAG would NOT be paused.
+ session.expire_all()
+ assert session.get(DagModel, dag.dag_id).is_paused
+
+ def test_dag_not_paused_when_latest_by_run_after_succeeds(self,
testing_dag_bundle):
+ """Verify _check_last_n_dagruns_failed does not pause when most recent
run_after runs pass.
+
+ run_after order: Run1(day0,FAILED) Run2(day1,SUCCESS)
Run3(day2,FAILED)
+ logical_date order: Run1(day10,FAILED) Run2(day11,SUCCESS)
Run3(day12,FAILED)
Review Comment:
The docstring’s described logical_date ordering (Run2(day11), Run3(day12))
doesn’t match the actual logical_date values set below (Run2 uses TEST_DATE+5d,
Run3 uses TEST_DATE+6d). Update the docstring to reflect the real dates (or
adjust the test data) so the scenario is unambiguous.
```suggestion
run_after order: Run1(day0,FAILED) Run2(day1,SUCCESS)
Run3(day2,FAILED)
logical_date order: Run1(day10,FAILED) Run2(day5,SUCCESS)
Run3(day6,FAILED)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]