Copilot commented on code in PR #64109:
URL: https://github.com/apache/airflow/pull/64109#discussion_r2981154831


##########
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##########
@@ -5965,41 +5885,55 @@ def _running_counts():
             EmptyOperator(task_id="mytask")
 
         dr = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED, 
state=State.QUEUED)
-        for _ in range(9):
+        for _ in range(29):
             dr = dag_maker.create_dagrun_after(dr, 
run_type=DagRunType.SCHEDULED, state=State.QUEUED)
 
         # initial state -- nothing is running
         assert dag1_non_b_running == 0
         assert dag1_b_running == 0
         assert total_running == 0
-        assert session.scalar(select(func.count(DagRun.id))) == 46
+        assert session.scalar(select(func.count(DagRun.id))) == 66
         assert session.scalar(select(func.count()).where(DagRun.dag_id == 
dag1_dag_id)) == 36
 
         # now let's run it once
         self.job_runner._start_queued_dagruns(session)
         session.flush()
 
         # after running the scheduler one time, observe that only one dag run 
is started
-        # this is because there are 30 runs for dag 1 so neither the backfills 
nor
+        # and 3 backfill dagruns are started
+        # this is because there are 30 dags, most of which get filtered due to 
max_active_runs
+        # and so due to the default dagruns to examine, we look at the first 
20 dags which CAN be run
+        # according to the max_active_runs parameter, meaning 3 backfill runs 
will start, 1 non backfill and
+        # all dagruns of dag2
         # any runs for dag2 get started
         assert DagRun.DEFAULT_DAGRUNS_TO_EXAMINE == 20
         dag1_non_b_running, dag1_b_running, total_running = _running_counts()
         assert dag1_non_b_running == 1
-        assert dag1_b_running == 0
-        assert total_running == 1
-        assert session.scalar(select(func.count()).select_from(DagRun)) == 46
+        assert dag1_b_running == 3
+        assert total_running == 20
+        assert session.scalar(select(func.count()).select_from(DagRun)) == 66
         assert session.scalar(select(func.count()).where(DagRun.dag_id == 
dag1_dag_id)) == 36
+        # now we finish all lower priority backfill tasks, and observe new 
higher priority tasks are started
+        session.execute(
+            update(DagRun)
+            .where(DagRun.dag_id == "test_dag2", DagRun.state == 
DagRunState.RUNNING)
+            .values(state=DagRunState.SUCCESS)
+        )

Review Comment:
   This comment says you’re finishing “lower priority backfill tasks”, but the 
code updates DAG runs for `test_dag2` (non-backfill) from RUNNING→SUCCESS. 
Please adjust the comment to match what the test is actually doing (finishing 
`test_dag2` runs) to avoid confusion when maintaining the test expectations.



##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -3229,6 +3221,7 @@ def _try_to_load_executor(
 
         return executor
 
+    # TODO: remove as it moved to the get_queued_dag_runs_to_set_running 
method in dagrun.py

Review Comment:
   The TODO above `_set_exceeds_max_active_runs` says it “moved to ... 
get_queued_dag_runs_to_set_running”, but this method is still called from 
`_create_dag_runs` and `_schedule_dag_run` to update 
`DagModel.exceeds_max_non_backfill`. This TODO is misleading—either remove it 
or reword it to describe the actual deprecation plan (and conditions) if the 
intent is to delete the helper later.
   ```suggestion
       # NOTE: This helper is still used by SchedulerJobRunner (e.g. in 
_create_dag_runs/_schedule_dag_run)
       # to maintain DagModel.exceeds_max_non_backfill. The logic for enforcing 
max active runs when
       # transitioning queued runs to running lives in 
DagRun.get_queued_dag_runs_to_set_running.
       # If/when all remaining call sites in SchedulerJobRunner are removed, 
this helper can be deprecated
       # and then deleted.
   ```



##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -656,9 +656,27 @@ def get_queued_dag_runs_to_set_running(cls, session: 
Session) -> ScalarResult[Da
             .subquery()
         )
 
+        available_dagruns_rn = (
+            select(
+                DagRun.dag_id,
+                DagRun.id,
+                func.row_number()
+                .over(partition_by=[DagRun.dag_id, DagRun.backfill_id], 
order_by=DagRun.logical_date)
+                .label("rn"),
+            )
+            .where(DagRun.state == DagRunState.QUEUED)

Review Comment:
   `available_dagruns_rn` uses `row_number(... order_by=DagRun.logical_date)`, 
but `logical_date` is nullable and does not reflect the scheduler’s actual 
priority ordering (`run_after`, and for backfills 
`BackfillDagRun.sort_ordinal`, which can differ e.g. reverse backfills). This 
can cause the rn-capacity filter to pick a non-runnable/future `run_after` row 
(or wrong backfill ordinal) as `rn=1` and then filter out runnable candidates, 
reintroducing starvation or breaking backfill ordering. Consider basing the 
window `order_by` on `run_after` with a deterministic tiebreaker (e.g. `id`), 
and for backfills incorporate `BackfillDagRun.sort_ordinal` (likely via 
computing the row_number after joining that table), and/or apply `run_after <= 
now()` in the window subquery so rn is computed only among runnable queued runs.
   ```suggestion
                   .over(
                       partition_by=[DagRun.dag_id, DagRun.backfill_id],
                       order_by=[BackfillDagRun.sort_ordinal, DagRun.run_after, 
DagRun.id],
                   )
                   .label("rn"),
               )
               .join(
                   BackfillDagRun,
                   DagRun.backfill_id == BackfillDagRun.id,
                   isouter=True,
               )
               .where(
                   and_(
                       DagRun.state == DagRunState.QUEUED,
                       DagRun.run_after <= func.now(),
                   )
               )
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to