kaxil commented on code in PR #64109:
URL: https://github.com/apache/airflow/pull/64109#discussion_r2981174784


##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -690,8 +708,13 @@ def get_queued_dag_runs_to_set_running(cls, session: 
Session) -> ScalarResult[Da
                 # the one done in this query verifies that the dag is not 
maxed out
                 # it could return many more dag runs than runnable if there is 
even
                 # capacity for 1.  this could be improved.
-                coalesce(running_drs.c.num_running, text("0"))
-                < coalesce(Backfill.max_active_runs, DagModel.max_active_runs),
+                available_dagruns_rn.c.rn
+                <= coalesce(
+                    Backfill.max_active_runs,
+                    DagModel.max_active_runs,
+                    airflow_conf.getint("core", "max_active_runs_per_dag"),
+                )

Review Comment:
   Adding `airflow_conf.getint("core", "max_active_runs_per_dag")` as a third 
fallback in the coalesce chain changes behavior from the original query, which 
only used `coalesce(Backfill.max_active_runs, DagModel.max_active_runs)`.
   
   `DagModel.max_active_runs` should always have a value (it defaults from 
config at DAG parse time), so this fallback shouldn't be needed. If it IS 
needed, that indicates a data integrity issue that should be investigated 
separately rather than papered over here.



##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -3229,6 +3221,7 @@ def _try_to_load_executor(
 
         return executor
 
+    # TODO: remove as it moved to the get_queued_dag_runs_to_set_running 
method in dagrun.py

Review Comment:
   This TODO is incorrect. `_set_exceeds_max_active_runs` sets the 
`exceeds_max_non_backfill` flag on `DagModel`, which is used by 
`dags_needing_dagruns` (line 725 of dag.py) to prevent *creating* new QUEUED 
runs. That's a different concern from limiting which QUEUED runs get promoted 
to RUNNING.
   
   They serve different purposes and both are needed:
   - `_set_exceeds_max_active_runs` → controls DagRun *creation*
   - `get_queued_dag_runs_to_set_running` → controls QUEUED→RUNNING *promotion*



##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -656,9 +656,27 @@ def get_queued_dag_runs_to_set_running(cls, session: 
Session) -> ScalarResult[Da
             .subquery()
         )
 
+        available_dagruns_rn = (
+            select(
+                DagRun.dag_id,
+                DagRun.id,
+                func.row_number()
+                .over(partition_by=[DagRun.dag_id, DagRun.backfill_id], 
order_by=DagRun.logical_date)
+                .label("rn"),
+            )
+            .where(DagRun.state == DagRunState.QUEUED)
+            .subquery()
+        )
+
         query = (
             select(cls)

Review Comment:
   The `row_number()` approach is a good direction for solving the starvation 
problem. One thing to verify: when `max_active_runs - num_running` evaluates to 
0 or negative (because runs started between the subquery snapshot and the outer 
query), `rn <= 0` will correctly exclude all queued runs for that DAG. Worth a 
test case for that edge case.



##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -1913,14 +1913,6 @@ def _create_dag_runs(self, dag_models: 
Collection[DagModel], session: Session) -
         )
 
         for dag_model in dag_models:
-            if dag_model.exceeds_max_non_backfill:

Review Comment:
   This `exceeds_max_non_backfill` check guards DagRun *creation* (moving to 
QUEUED state), not the QUEUED→RUNNING promotion. The starvation fix belongs in 
`get_queued_dag_runs_to_set_running` (which this PR also modifies), not here.
   
   Removing this check means the scheduler will keep creating QUEUED DagRuns 
even when the DAG is at `max_active_runs`. The `dags_needing_dagruns` query 
also checks this flag (line 725 of dag.py), but it can go stale between the 
query and the loop execution, so this in-loop check is the safety net.
   
   I'd keep this guard — it's unrelated to the starvation fix.



##########
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##########
@@ -3230,33 +3230,6 @@ def 
test_cleanup_methods_all_called_multiple_executors(self, mock_executors):
         for executor in self.job_runner.executors:
             executor.end.assert_called_once()
 

Review Comment:
   These deleted tests 
(`test_queued_dagruns_stops_creating_when_max_active_is_reached` and 
`test_more_runs_are_not_created_when_max_active_runs_is_reached`) verify that 
`_create_dag_runs` respects `max_active_runs`. Since the 
`exceeds_max_non_backfill` guard in `_create_dag_runs` should be kept (see 
other comment), these tests should stay too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to