ashb commented on a change in pull request #10956:
URL: https://github.com/apache/airflow/pull/10956#discussion_r494939826
##########
File path: airflow/models/dag.py
##########
@@ -1941,6 +2087,36 @@ def deactivate_deleted_dags(cls, alive_dag_filelocs:
List[str], session=None):
session.rollback()
raise
+ @classmethod
+ def dags_needing_dagruns(cls, session: Session):
+ """
+ Return (and lock) a list of Dag objects that are due to create a new
DagRun This will return a
+ resultset of rows that is row-level-locked with a "SELECT ... FOR
UPDATE" query, you should ensure
+ that any scheduling decisions are made in a single transaction -- as
soon as the transaction is
+ committed it will be unlocked.
+ """
+
+ # TODO[HA]: Bake this query, it is run _A lot_
+ # TODO[HA]: Make this limit a tunable. We limit so that _one_ scheduler
+ # doesn't try to do all the creation of dag runs
+ return session.query(cls).filter(
+ cls.is_paused.is_(False),
+ cls.is_active.is_(True),
+ cls.next_dagrun_create_after <= func.now(),
Review comment:
Yes, None in `next_dagrun_create_after` is designed to be skipped - for
instance for `@once`
https://github.com/apache/airflow/pull/10956/files#diff-e5cbc8f771ec50ccb79ad8505f6f5697R482-R485:
```python
if (self.schedule_interval == "@once" and
date_last_automated_dagrun) or \
self.schedule_interval is None:
# Manual trigger, or already created the run for @once, can
short circuit
return (None, None)
```
or max_active_dags:
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]