kaxil commented on code in PR #64294:
URL: https://github.com/apache/airflow/pull/64294#discussion_r3336428270
##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -615,7 +629,7 @@ def active_runs_of_dags(
@classmethod
@retry_db_transaction
- def get_running_dag_runs_to_examine(cls, session: Session) ->
ScalarResult[DagRun]:
+ def get_running_dag_runs_to_examine(cls, session: Session) ->
Sequence[DagRun]:
Review Comment:
Point 1 here is still open. The docstring (line 641) still describes a
single `SELECT ... FOR UPDATE` query. With
`max_new_dagruns_per_loop_to_schedule > 0` the method now runs a second `FOR
UPDATE SKIP LOCKED` query for never-examined runs (`last_scheduling_decision IS
NULL`) and adds `IS NOT NULL` to the first one. A sentence covering that
two-query split would close this out. Worth noting that "never examined" also
covers cleared-and-requeued runs, since clearing a queued DagRun resets
`last_scheduling_decision` to NULL.
##########
airflow-core/tests/unit/models/test_dagrun.py:
##########
@@ -993,6 +993,121 @@ def test_wait_for_downstream(self, dag_maker, session,
prev_ti_state, is_ti_sche
schedulable_tis = [ti.task_id for ti in decision.schedulable_tis]
assert (upstream.task_id in schedulable_tis) == is_ti_schedulable
+ def
test_get_running_dag_runs_ignores_new_dagruns_to_examine_when_smaller_than_0(
+ self, session, dag_maker, monkeypatch
+ ):
+
+ monkeypatch.setattr(DagRun, "DEFAULT_NEW_DAGRUNS_TO_EXAMINE", 0)
+
+ def create_dagruns(
+ last_scheduling_decision: datetime.datetime | None = None,
+ count: int = 20,
+ ):
+ dagrun = dag_maker.create_dagrun(
+ run_type=DagRunType.SCHEDULED,
+ state=State.RUNNING,
+ run_after=datetime.datetime(2024, 1, 1),
+ )
+ dagrun.last_scheduling_decision = last_scheduling_decision
+ session.merge(dagrun)
+ for _ in range(count - 1):
+ dagrun = dag_maker.create_dagrun_after(
+ dagrun,
+ run_type=DagRunType.SCHEDULED,
+ state=State.RUNNING,
+ run_after=datetime.datetime(2024, 1, 1),
+ )
+
+ dagrun.last_scheduling_decision = last_scheduling_decision
+ session.merge(dagrun)
+
+ with dag_maker(
+ dag_id="dummy_dag",
+ schedule=datetime.timedelta(days=1),
+ start_date=datetime.datetime(2024, 1, 1),
+ session=session,
+ ):
+ EmptyOperator(task_id="dummy_task")
+
+ create_dagruns(None, 10)
+
+ with dag_maker(
+ dag_id="dummy_dag2",
+ schedule=datetime.timedelta(days=1),
+ start_date=datetime.datetime(2024, 1, 1),
+ session=session,
+ ):
+ EmptyOperator(task_id="dummy_task2")
+
+ create_dagruns(func.now(), 20)
Review Comment:
`func.now()` is still passed as `last_scheduling_decision` here, and again
at line 1110, so this one does not look removed yet. It is a SQL expression
rather than a Python datetime, which reads oddly against the `datetime | None`
annotation and depends on the DB clock -- a literal like `timezone.utcnow()` is
clearer and deterministic. Separately, the `create_dagruns` helper is
copy-pasted verbatim across both new tests; worth lifting it to a fixture.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]