Copilot commented on code in PR #64294:
URL: https://github.com/apache/airflow/pull/64294#discussion_r3066489393


##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -39,6 +39,7 @@
     Index,
     Integer,
     PrimaryKeyConstraint,
+    SQLColumnExpression,
     String,
     Text,
     UniqueConstraint,

Review Comment:
   `SQLColumnExpression` is only used for a type annotation (and this module 
already uses `from __future__ import annotations`). Importing it at runtime is 
unnecessary and slightly increases import surface; consider moving it under the 
`TYPE_CHECKING` block (or switching the annotation to 
`ColumnElement[Any]`/similar that’s already available under `TYPE_CHECKING`).



##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -628,27 +642,65 @@ def get_running_dag_runs_to_examine(cls, session: 
Session) -> ScalarResult[DagRu
         from airflow.models.backfill import BackfillDagRun
         from airflow.models.dag import DagModel
 
-        query = (
-            select(cls)
-            .with_hint(cls, "USE INDEX (idx_dag_run_running_dags)", 
dialect_name="mysql")
-            .where(cls.state == DagRunState.RUNNING)
-            .join(DagModel, DagModel.dag_id == cls.dag_id)
-            .join(BackfillDagRun, BackfillDagRun.dag_run_id == DagRun.id, 
isouter=True)
-            .where(
-                DagModel.is_paused == false(),
-                DagModel.is_stale == false(),
-            )
-            .order_by(
-                nulls_first(cast("ColumnElement[Any]", 
BackfillDagRun.sort_ordinal), session=session),
-                nulls_first(cast("ColumnElement[Any]", 
cls.last_scheduling_decision), session=session),
-                cls.run_after,
+        def _get_dagrun_query(
+            filters: list[ColumnElement[bool]], order_by: 
list[SQLColumnExpression[Any]], limit: int
+        ):
+            return (
+                select(DagRun)
+                .with_hint(DagRun, "USE INDEX (idx_dag_run_running_dags)", 
dialect_name="mysql")
+                .where(DagRun.state == DagRunState.RUNNING)
+                .join(DagModel, DagModel.dag_id == cls.dag_id)
+                .join(BackfillDagRun, BackfillDagRun.dag_run_id == DagRun.id, 
isouter=True)
+                .where(*filters)
+                .order_by(*order_by)
+                .limit(limit)
             )
-            .limit(cls.DEFAULT_DAGRUNS_TO_EXAMINE)
+
+        filters = [
+            DagRun.run_after <= func.now(),
+            DagModel.is_paused == false(),
+            DagModel.is_stale == false(),
+        ]
+
+        order = [
+            nulls_first(cast("ColumnElement[Any]", 
BackfillDagRun.sort_ordinal), session=session),
+            nulls_first(cast("ColumnElement[Any]", 
DagRun.last_scheduling_decision), session=session),
+            DagRun.run_after,
+        ]
+
+        new_dagruns_to_examine = cls.DEFAULT_NEW_DAGRUNS_TO_EXAMINE
+        dagruns_to_examine = cls.DEFAULT_DAGRUNS_TO_EXAMINE
+
+        if new_dagruns_to_examine < 0:
+            log.warning("'max_new_dagruns_per_loop_to_schedule' is smaller 
than 0, ignoring configuration")

Review Comment:
   The warning message for negative `max_new_dagruns_per_loop_to_schedule` 
could be more actionable if it included the configured value and the fallback 
behavior (treated as 0). Also, “less than 0” reads more naturally than “smaller 
than 0”.
   ```suggestion
               log.warning(
                   "'max_new_dagruns_per_loop_to_schedule' is configured as %s, 
which is less than 0; "
                   "treating it as 0",
                   new_dagruns_to_examine,
               )
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to