ashb commented on a change in pull request #10956:
URL: https://github.com/apache/airflow/pull/10956#discussion_r494939826
##########
File path: airflow/models/dag.py
##########
@@ -1941,6 +2087,36 @@ def deactivate_deleted_dags(cls, alive_dag_filelocs:
List[str], session=None):
session.rollback()
raise
+ @classmethod
+ def dags_needing_dagruns(cls, session: Session):
+ """
+ Return (and lock) a list of Dag objects that are due to create a new
DagRun This will return a
+ resultset of rows that is row-level-locked with a "SELECT ... FOR
UPDATE" query, you should ensure
+ that any scheduling decisions are made in a single transaction -- as
soon as the transaction is
+ committed it will be unlocked.
+ """
+
+ # TODO[HA]: Bake this query, it is run _A lot_
+ # TODO[HA]: Make this limit a tunable. We limit so that _one_ scheduler
+ # doesn't try to do all the creation of dag runs
+ return session.query(cls).filter(
+ cls.is_paused.is_(False),
+ cls.is_active.is_(True),
+ cls.next_dagrun_create_after <= func.now(),
Review comment:
Yes, None in `next_dagrun_create_after` is designed to be skipped - for
instance for `@once`
https://github.com/apache/airflow/pull/10956/files#diff-e5cbc8f771ec50ccb79ad8505f6f5697R482-R485:
```python
if (self.schedule_interval == "@once" and
date_last_automated_dagrun) or \
self.schedule_interval is None:
# Manual trigger, or already created the run for @once, can
short circuit
return (None, None)
```
or max_active_dags
https://github.com/apache/airflow/pull/10956/files#diff-e5cbc8f771ec50ccb79ad8505f6f5697R1758-R1765:
```python
active_runs_of_dag = num_active_runs.get(dag.dag_id, 0)
if dag.max_active_runs and active_runs_of_dag >=
dag.max_active_runs:
# Since this happens every time the dag is parsed it would
be quite spammy at info
log.debug(
"DAG %s is at (or above) max_active_runs (%d of %d), not
creating any more runs",
dag.dag_id, active_runs_of_dag, dag.max_active_runs
)
orm_dag.next_dagrun_create_after = None
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]