ashb commented on code in PR #24743:
URL: https://github.com/apache/airflow/pull/24743#discussion_r914169419
##########
airflow/models/dagrun.py:
##########
@@ -631,8 +631,75 @@ def update_state(
session.merge(self)
# We do not flush here for performance reasons(It increases queries
count by +20)
+ self._process_dataset_dagrun_events(session=session)
+
return schedulable_tis, callback
+ def _process_dataset_dagrun_events(self, *, session=NEW_SESSION):
+ """
+ Looks at all outlet datasets that have been updated by this dag,
+ and creates DAG runs that have all dataset deps fulfilled.
+ """
+ from airflow.models.dataset import Dataset, DatasetDagRef,
DatasetTaskRef
+
+ has_dataset_outlets = False
+ if self.dag:
+ for _, task in self.dag.task_dict.items():
+ if has_dataset_outlets is True:
+ break
+ for obj in getattr(task, '_outlets', []):
+ if isinstance(obj, Dataset):
+ has_dataset_outlets = True
+ break
+ dependent_dag_ids = []
+ if self.dag and has_dataset_outlets:
+ dependent_dag_ids = [
+ x.dag_id
+ for x in session.query(DatasetDagRef.dag_id)
+ .filter(DatasetTaskRef.dag_id == self.dag_id)
+ .all()
+ ]
+
+ from airflow.models.dataset import DatasetDagRunQueue as DDRQ
+ from airflow.models.serialized_dag import SerializedDagModel
+
+ dag_ids_to_trigger = None
+ if dependent_dag_ids:
+ dag_ids_to_trigger = [
+ x.dag_id
+ for x in session.query(
+ DatasetDagRef.dag_id,
+ )
+ .join(
+ DDRQ,
+ and_(
+ DDRQ.dataset_id == DatasetDagRef.dataset_id,
+ DDRQ.target_dag_id == DatasetDagRef.dag_id,
+ ),
+ isouter=True,
+ )
+ .filter(DatasetDagRef.dag_id.in_(dependent_dag_ids))
+ .group_by(DatasetDagRef.dag_id)
+ .having(func.count() ==
func.sum(case((DDRQ.target_dag_id.is_not(None), 1), else_=0)))
+ .all()
+ ]
+
+ if dag_ids_to_trigger:
+ for dag_id in dag_ids_to_trigger:
+ row = SerializedDagModel.get(dag_id, session)
+ if not row:
+ continue
+ dag = row.dag
+ if dag.schedule_on:
+ dag.create_dagrun(
+ run_type=DagRunType.MANUAL,
+ run_id=self.generate_run_id(DagRunType.MANUAL,
execution_date=timezone.utcnow()),
+ state=DagRunState.QUEUED,
+ external_trigger=True,
Review Comment:
```suggestion
```
Nothing really uses this that I know of.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]