Copilot commented on code in PR #64109:
URL: https://github.com/apache/airflow/pull/64109#discussion_r3066479270


##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -679,9 +679,27 @@ def get_queued_dag_runs_to_set_running(cls, session: 
Session) -> ScalarResult[Da
             .subquery()
         )
 
+        available_dagruns_rn = (
+            select(
+                DagRun.dag_id,
+                DagRun.id,
+                func.row_number()
+                .over(partition_by=[DagRun.dag_id, DagRun.backfill_id], 
order_by=DagRun.logical_date)

Review Comment:
   The window `order_by` uses only `DagRun.logical_date`, which may not be a 
total ordering (ties can occur), making the chosen dagruns nondeterministic 
across DB engines/plans and potentially causing flaky behavior. Add a stable 
tie-breaker (e.g., also order by `DagRun.id` or `DagRun.run_after`) so 
row-number assignment is deterministic within each `(dag_id, backfill_id)` 
partition.
   ```suggestion
                   .over(
                       partition_by=[DagRun.dag_id, DagRun.backfill_id],
                       order_by=[DagRun.logical_date, DagRun.id],
                   )
   ```



##########
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##########
@@ -5881,41 +5969,55 @@ def _running_counts():
             EmptyOperator(task_id="mytask")
 
         dr = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED, 
state=State.QUEUED)
-        for _ in range(9):
+        for _ in range(29):
             dr = dag_maker.create_dagrun_after(dr, 
run_type=DagRunType.SCHEDULED, state=State.QUEUED)
 
         # initial state -- nothing is running
         assert dag1_non_b_running == 0
         assert dag1_b_running == 0
         assert total_running == 0
-        assert session.scalar(select(func.count(DagRun.id))) == 46
+        assert session.scalar(select(func.count(DagRun.id))) == 66
         assert session.scalar(select(func.count()).where(DagRun.dag_id == 
dag1_dag_id)) == 36
 
         # now let's run it once
         self.job_runner._start_queued_dagruns(session)
         session.flush()
 
         # after running the scheduler one time, observe that only one dag run 
is started
-        # this is because there are 30 runs for dag 1 so neither the backfills 
nor
+        # and 3 backfill dagruns are started
+        # this is because there are 30 dags, most of which get filtered due to 
max_active_runs
+        # and so due to the default dagruns to examine, we look at the first 
20 dags which CAN be run
+        # according to the max_active_runs parameter, meaning 3 backfill runs 
will start, 1 non backfill and
+        # all dagruns of dag2
         # any runs for dag2 get started
         assert DagRun.DEFAULT_DAGRUNS_TO_EXAMINE == 20
         dag1_non_b_running, dag1_b_running, total_running = _running_counts()
         assert dag1_non_b_running == 1
-        assert dag1_b_running == 0
-        assert total_running == 1
-        assert session.scalar(select(func.count()).select_from(DagRun)) == 46
+        assert dag1_b_running == 3
+        assert total_running == 20
+        assert session.scalar(select(func.count()).select_from(DagRun)) == 66
         assert session.scalar(select(func.count()).where(DagRun.dag_id == 
dag1_dag_id)) == 36
+        # now we finish all lower priority backfill tasks, and observe new 
higher priority tasks are started
+        session.execute(
+            update(DagRun)
+            .where(DagRun.dag_id == "test_dag2", DagRun.state == 
DagRunState.RUNNING)
+            .values(state=DagRunState.SUCCESS)
+        )
+        session.commit()

Review Comment:
   Calling `session.commit()` inside a unit test can break transactional test 
isolation (fixtures that rely on nested transactions/rollbacks) and is usually 
unnecessary here since the updated rows are read again within the same session. 
Prefer removing the commit and relying on `flush()` (or keep it as a `flush()` 
only) so the state transition is visible without finalizing the transaction.
   ```suggestion
   
   ```



##########
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##########
@@ -5881,41 +5969,55 @@ def _running_counts():
             EmptyOperator(task_id="mytask")
 
         dr = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED, 
state=State.QUEUED)
-        for _ in range(9):
+        for _ in range(29):
             dr = dag_maker.create_dagrun_after(dr, 
run_type=DagRunType.SCHEDULED, state=State.QUEUED)
 
         # initial state -- nothing is running
         assert dag1_non_b_running == 0
         assert dag1_b_running == 0
         assert total_running == 0
-        assert session.scalar(select(func.count(DagRun.id))) == 46
+        assert session.scalar(select(func.count(DagRun.id))) == 66
         assert session.scalar(select(func.count()).where(DagRun.dag_id == 
dag1_dag_id)) == 36
 
         # now let's run it once
         self.job_runner._start_queued_dagruns(session)
         session.flush()
 
         # after running the scheduler one time, observe that only one dag run 
is started
-        # this is because there are 30 runs for dag 1 so neither the backfills 
nor
+        # and 3 backfill dagruns are started
+        # this is because there are 30 dags, most of which get filtered due to 
max_active_runs
+        # and so due to the default dagruns to examine, we look at the first 
20 dags which CAN be run
+        # according to the max_active_runs parameter, meaning 3 backfill runs 
will start, 1 non backfill and
+        # all dagruns of dag2
         # any runs for dag2 get started

Review Comment:
   These explanatory comments repeatedly say 'dags' where they appear to mean 
'dagruns' (e.g., '30 dags', 'first 20 dags'), which makes the rationale hard to 
follow. Clarifying the terminology here (dag vs dagrun) would prevent 
misunderstanding when debugging scheduler selection behavior.
   ```suggestion
           # this is because there are 30 queued dagruns, many of which get 
filtered because their DAGs
           # have already reached max_active_runs
           # and so due to the default dagruns-to-examine limit, we look at the 
first 20 dagruns that CAN be run
           # according to the max_active_runs parameter, meaning 3 backfill 
runs will start, 1 non-backfill,
           # and all runnable dagruns for dag2
   ```



##########
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##########
@@ -3290,6 +3290,94 @@ def 
test_runs_are_created_after_max_active_runs_was_reached(self, dag_maker, ses
         dag_runs = DagRun.find(dag_id=dag.dag_id, session=session)
         assert len(dag_runs) == 2
 
+    def test_runs_are_not_starved_by_max_active_runs_limit(self, dag_maker, 
session):
+        """
+        Test that dagruns are not starved by max_active_runs
+        """
+        scheduler_job = Job()
+        self.job_runner = SchedulerJobRunner(job=scheduler_job, 
executors=[self.null_exec])
+
+        dag_ids = ["dag1", "dag2", "dag3"]
+
+        max_active_runs = 3
+
+        for dag_id in dag_ids:
+            with dag_maker(
+                dag_id=dag_id,
+                max_active_runs=max_active_runs,
+                session=session,
+                catchup=True,
+                schedule=timedelta(seconds=60),
+                start_date=DEFAULT_DATE,
+            ):
+                # Need to use something that doesn't immediately get marked as 
success by the scheduler
+                BashOperator(task_id="task", bash_command="true")
+
+            dag_run = dag_maker.create_dagrun(
+                state=State.QUEUED, session=session, 
run_type=DagRunType.SCHEDULED
+            )
+
+            for _ in range(50):
+                # create a bunch of dagruns in queued state, to make sure they 
are filtered by max_active_runs
+                dag_run = dag_maker.create_dagrun_after(
+                    dag_run, run_type=DagRunType.SCHEDULED, state=State.QUEUED
+                )
+
+        self.job_runner._start_queued_dagruns(session)
+        session.flush()
+
+        running_dagrun_count = session.scalar(
+            select(func.count()).select_from(DagRun).where(DagRun.state == 
DagRunState.RUNNING)
+        )
+
+        assert running_dagrun_count == max_active_runs * len(dag_ids)
+
+    def 
test_no_more_dagruns_are_set_to_running_when_max_active_runs_exceeded(self, 
dag_maker, session):
+        """
+        Test that dagruns are not moved to running if there are more than the 
max_active_runs running dagruns
+        """
+        scheduler_job = Job()
+        self.job_runner = SchedulerJobRunner(job=scheduler_job, 
executors=[self.null_exec])
+
+        max_active_runs = 1
+        with dag_maker(
+            dag_id="test_dag",
+            max_active_runs=max_active_runs,
+            session=session,
+            catchup=True,
+            schedule=timedelta(seconds=60),
+            start_date=DEFAULT_DATE,
+        ):
+            # Need to use something that doesn't immediately get marked as 
success by the scheduler
+            BashOperator(task_id="task", bash_command="true")
+
+        dag_run = dag_maker.create_dagrun(state=State.RUNNING, 
session=session, run_type=DagRunType.SCHEDULED)
+
+        for _ in range(5):
+            # create a bunch of dagruns in queued state, to make sure they are 
filtered by max_active_runs

Review Comment:
   The comment says these are created in `queued` state, but the code sets 
`state=State.RUNNING`. If the intent is to pre-fill/exceed `max_active_runs` 
with RUNNING dagruns (which makes sense for this test), update the comment to 
match the actual setup to avoid confusion for future maintainers.
   ```suggestion
               # create a bunch of dagruns in running state, to exceed 
max_active_runs
   ```



##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -679,9 +679,27 @@ def get_queued_dag_runs_to_set_running(cls, session: 
Session) -> ScalarResult[Da
             .subquery()
         )
 
+        available_dagruns_rn = (
+            select(
+                DagRun.dag_id,
+                DagRun.id,
+                func.row_number()
+                .over(partition_by=[DagRun.dag_id, DagRun.backfill_id], 
order_by=DagRun.logical_date)
+                .label("rn"),
+            )

Review Comment:
   This computes `row_number()` across *all* queued dagruns before applying 
later eligibility filters (DagModel/Backfill joins, paused checks, etc.). On 
large installations with many queued dagruns, that full-table window can become 
a costly bottleneck. Consider pushing more predicates/joins into the same 
subquery/CTE used for the window (so the window runs only on eligible 
candidates), or otherwise narrowing the queued set prior to the window 
calculation.
   ```suggestion
               )
               .join(
                   DagModel,
                   and_(
                       DagModel.dag_id == DagRun.dag_id,
                       DagModel.is_paused == false(),
                       DagModel.is_stale == false(),
                   ),
               )
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to