kaxil commented on code in PR #64294:
URL: https://github.com/apache/airflow/pull/64294#discussion_r3336428270


##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -615,7 +629,7 @@ def active_runs_of_dags(
 
     @classmethod
     @retry_db_transaction
-    def get_running_dag_runs_to_examine(cls, session: Session) -> 
ScalarResult[DagRun]:
+    def get_running_dag_runs_to_examine(cls, session: Session) -> 
Sequence[DagRun]:

Review Comment:
   Point 1 here is still open. The docstring (line 641) still describes a 
single `SELECT ... FOR UPDATE` query. With 
`max_new_dagruns_per_loop_to_schedule > 0` the method now runs a second `FOR 
UPDATE SKIP LOCKED` query for never-examined runs (`last_scheduling_decision IS 
NULL`) and adds `IS NOT NULL` to the first one. A sentence covering that 
two-query split would close this out. Worth noting that "never examined" also 
covers cleared-and-requeued runs, since clearing a queued DagRun resets 
`last_scheduling_decision` to NULL.



##########
airflow-core/tests/unit/models/test_dagrun.py:
##########
@@ -993,6 +993,121 @@ def test_wait_for_downstream(self, dag_maker, session, 
prev_ti_state, is_ti_sche
         schedulable_tis = [ti.task_id for ti in decision.schedulable_tis]
         assert (upstream.task_id in schedulable_tis) == is_ti_schedulable
 
+    def 
test_get_running_dag_runs_ignores_new_dagruns_to_examine_when_smaller_than_0(
+        self, session, dag_maker, monkeypatch
+    ):
+
+        monkeypatch.setattr(DagRun, "DEFAULT_NEW_DAGRUNS_TO_EXAMINE", 0)
+
+        def create_dagruns(
+            last_scheduling_decision: datetime.datetime | None = None,
+            count: int = 20,
+        ):
+            dagrun = dag_maker.create_dagrun(
+                run_type=DagRunType.SCHEDULED,
+                state=State.RUNNING,
+                run_after=datetime.datetime(2024, 1, 1),
+            )
+            dagrun.last_scheduling_decision = last_scheduling_decision
+            session.merge(dagrun)
+            for _ in range(count - 1):
+                dagrun = dag_maker.create_dagrun_after(
+                    dagrun,
+                    run_type=DagRunType.SCHEDULED,
+                    state=State.RUNNING,
+                    run_after=datetime.datetime(2024, 1, 1),
+                )
+
+                dagrun.last_scheduling_decision = last_scheduling_decision
+                session.merge(dagrun)
+
+        with dag_maker(
+            dag_id="dummy_dag",
+            schedule=datetime.timedelta(days=1),
+            start_date=datetime.datetime(2024, 1, 1),
+            session=session,
+        ):
+            EmptyOperator(task_id="dummy_task")
+
+        create_dagruns(None, 10)
+
+        with dag_maker(
+            dag_id="dummy_dag2",
+            schedule=datetime.timedelta(days=1),
+            start_date=datetime.datetime(2024, 1, 1),
+            session=session,
+        ):
+            EmptyOperator(task_id="dummy_task2")
+
+        create_dagruns(func.now(), 20)

Review Comment:
   `func.now()` is still passed as `last_scheduling_decision` here, and again 
at line 1110, so this one does not look removed yet. It is a SQL expression 
rather than a Python datetime, which reads oddly against the `datetime | None` 
annotation and depends on the DB clock -- a literal like `timezone.utcnow()` is 
clearer and deterministic. Separately, the `create_dagruns` helper is 
copy-pasted verbatim across both new tests; worth lifting it to a fixture.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to