ashb commented on code in PR #24969:
URL: https://github.com/apache/airflow/pull/24969#discussion_r946972332


##########
airflow/models/dag.py:
##########
@@ -3024,14 +3024,31 @@ def deactivate_deleted_dags(cls, alive_dag_filelocs: 
List[str], session=NEW_SESS
                 continue
 
     @classmethod
-    def dags_needing_dagruns(cls, session: Session):
+    def dags_needing_dagruns(cls, session: Session) -> Tuple[Query, Dict]:
         """
         Return (and lock) a list of Dag objects that are due to create a new 
DagRun.
 
         This will return a resultset of rows  that is row-level-locked with a 
"SELECT ... FOR UPDATE" query,
         you should ensure that any scheduling decisions are made in a single 
transaction -- as soon as the
         transaction is committed it will be unlocked.
         """
+        from airflow.models.dataset import DatasetDagRef, DatasetDagRunQueue 
as DDRQ
+
+        # these dag ids are triggered by datasets, and they are ready to go.
+        dataset_triggered_dag_info_list = {
+            x.dag_id: (x.first_event_time, x.last_event_time)
+            for x in session.query(
+                DatasetDagRef.dag_id,
+                func.max(DDRQ.created_at).label('last_event_time'),
+                func.max(DDRQ.created_at).label('first_event_time'),
+            )
+            .join(DatasetDagRef.queue_records, isouter=True)
+            .group_by(DatasetDagRef.dag_id)
+            .having(func.count() == 
func.sum(case((DDRQ.target_dag_id.is_not(None), 1), else_=0)))
+            .all()
+        }
+        dataset_triggered_dag_ids = 
list(dataset_triggered_dag_info_list.keys())

Review Comment:
   This only affects people using Datasets, which is a new feature, so this can 
be fixed later if needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to