ashb commented on a change in pull request #10956:
URL: https://github.com/apache/airflow/pull/10956#discussion_r499519623
##########
File path: airflow/models/dag.py
##########
@@ -1939,6 +2099,37 @@ def deactivate_deleted_dags(cls, alive_dag_filelocs:
List[str], session=None):
session.rollback()
raise
+ @classmethod
+ def dags_needing_dagruns(cls, session: Session):
+ """
+ Return (and lock) a list of Dag objects that are due to create a new
DagRun This will return a
+ resultset of rows that is row-level-locked with a "SELECT ... FOR
UPDATE" query, you should ensure
+ that any scheduling decisions are made in a single transaction -- as
soon as the transaction is
+ committed it will be unlocked.
+ """
+ # TODO[HA]: Bake this query, it is run _A lot_
+ # TODO[HA]: Make this limit a tunable. We limit so that _one_ scheduler
+ # doesn't try to do all the creation of dag runs
+ query = session.query(cls).filter(
+ cls.is_paused.is_(False),
+ cls.is_active.is_(True),
+ cls.next_dagrun_create_after <= func.now(),
+ ).order_by(
+ cls.next_dagrun_create_after
+ ).limit(cls.NUM_DAGS_PER_DAGRUN_QUERY)
+
+ return with_for_update(query, of=cls, **skip_locked(session=session))
+
+
+STATICA_HACK = True
Review comment:
This is a pattern we already use in `airflow/__init__.py`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]