ashb commented on code in PR #24969:
URL: https://github.com/apache/airflow/pull/24969#discussion_r946972332
##########
airflow/models/dag.py:
##########
@@ -3024,14 +3024,31 @@ def deactivate_deleted_dags(cls, alive_dag_filelocs:
List[str], session=NEW_SESS
continue
@classmethod
- def dags_needing_dagruns(cls, session: Session):
+ def dags_needing_dagruns(cls, session: Session) -> Tuple[Query, Dict]:
"""
Return (and lock) a list of Dag objects that are due to create a new
DagRun.
This will return a resultset of rows that is row-level-locked with a
"SELECT ... FOR UPDATE" query,
you should ensure that any scheduling decisions are made in a single
transaction -- as soon as the
transaction is committed it will be unlocked.
"""
+ from airflow.models.dataset import DatasetDagRef, DatasetDagRunQueue
as DDRQ
+
+ # these dag ids are triggered by datasets, and they are ready to go.
+ dataset_triggered_dag_info_list = {
+ x.dag_id: (x.first_event_time, x.last_event_time)
+ for x in session.query(
+ DatasetDagRef.dag_id,
+ func.max(DDRQ.created_at).label('last_event_time'),
+ func.max(DDRQ.created_at).label('first_event_time'),
+ )
+ .join(DatasetDagRef.queue_records, isouter=True)
+ .group_by(DatasetDagRef.dag_id)
+ .having(func.count() ==
func.sum(case((DDRQ.target_dag_id.is_not(None), 1), else_=0)))
+ .all()
+ }
+ dataset_triggered_dag_ids =
list(dataset_triggered_dag_info_list.keys())
Review Comment:
This only affects people using Datasets, which is a new feature, so this can
be fixed later if needed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]