Copilot commented on code in PR #64294:
URL: https://github.com/apache/airflow/pull/64294#discussion_r3066489393
##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -39,6 +39,7 @@
Index,
Integer,
PrimaryKeyConstraint,
+ SQLColumnExpression,
String,
Text,
UniqueConstraint,
Review Comment:
`SQLColumnExpression` is only used for a type annotation (and this module
already uses `from __future__ import annotations`). Importing it at runtime is
unnecessary and slightly increases import surface; consider moving it under the
`TYPE_CHECKING` block (or switching the annotation to
`ColumnElement[Any]`/similar that’s already available under `TYPE_CHECKING`).
##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -628,27 +642,65 @@ def get_running_dag_runs_to_examine(cls, session:
Session) -> ScalarResult[DagRu
from airflow.models.backfill import BackfillDagRun
from airflow.models.dag import DagModel
- query = (
- select(cls)
- .with_hint(cls, "USE INDEX (idx_dag_run_running_dags)",
dialect_name="mysql")
- .where(cls.state == DagRunState.RUNNING)
- .join(DagModel, DagModel.dag_id == cls.dag_id)
- .join(BackfillDagRun, BackfillDagRun.dag_run_id == DagRun.id,
isouter=True)
- .where(
- DagModel.is_paused == false(),
- DagModel.is_stale == false(),
- )
- .order_by(
- nulls_first(cast("ColumnElement[Any]",
BackfillDagRun.sort_ordinal), session=session),
- nulls_first(cast("ColumnElement[Any]",
cls.last_scheduling_decision), session=session),
- cls.run_after,
+ def _get_dagrun_query(
+ filters: list[ColumnElement[bool]], order_by:
list[SQLColumnExpression[Any]], limit: int
+ ):
+ return (
+ select(DagRun)
+ .with_hint(DagRun, "USE INDEX (idx_dag_run_running_dags)",
dialect_name="mysql")
+ .where(DagRun.state == DagRunState.RUNNING)
+ .join(DagModel, DagModel.dag_id == cls.dag_id)
+ .join(BackfillDagRun, BackfillDagRun.dag_run_id == DagRun.id,
isouter=True)
+ .where(*filters)
+ .order_by(*order_by)
+ .limit(limit)
)
- .limit(cls.DEFAULT_DAGRUNS_TO_EXAMINE)
+
+ filters = [
+ DagRun.run_after <= func.now(),
+ DagModel.is_paused == false(),
+ DagModel.is_stale == false(),
+ ]
+
+ order = [
+ nulls_first(cast("ColumnElement[Any]",
BackfillDagRun.sort_ordinal), session=session),
+ nulls_first(cast("ColumnElement[Any]",
DagRun.last_scheduling_decision), session=session),
+ DagRun.run_after,
+ ]
+
+ new_dagruns_to_examine = cls.DEFAULT_NEW_DAGRUNS_TO_EXAMINE
+ dagruns_to_examine = cls.DEFAULT_DAGRUNS_TO_EXAMINE
+
+ if new_dagruns_to_examine < 0:
+ log.warning("'max_new_dagruns_per_loop_to_schedule' is smaller
than 0, ignoring configuration")
Review Comment:
The warning message for negative `max_new_dagruns_per_loop_to_schedule`
could be more actionable if it included the configured value and the fallback
behavior (treated as 0). Also, “less than 0” reads more naturally than “smaller
than 0”.
```suggestion
log.warning(
"'max_new_dagruns_per_loop_to_schedule' is configured as %s,
which is less than 0; "
"treating it as 0",
new_dagruns_to_examine,
)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]