ashb commented on code in PR #24743:
URL: https://github.com/apache/airflow/pull/24743#discussion_r910794084
##########
airflow/models/dagrun.py:
##########
@@ -631,6 +631,32 @@ def update_state(
session.merge(self)
# We do not flush here for performance reasons(It increases queries
count by +20)
+ from airflow.models import Dataset
+ from airflow.models.dataset_dag_run_event import DatasetDagRunEvent as
DDRE
+ from airflow.models.serialized_dag import SerializedDagModel
+
+ datasets = []
+ for task in self.dag.tasks:
+ for outlet in getattr(task, '_outlets', []):
+ if isinstance(outlet, Dataset):
+ datasets.append(outlet)
+ dataset_ids = [x.get_dataset_id(session=session) for x in datasets]
+ events_to_process =
session.query(DDRE).filter(DDRE.dataset_id.in_(dataset_ids)).all()
Review Comment:
Okay so: locking and HA support.
The reason in
https://lists.apache.org/thread/p83fwr87r2c60y0n1lmk9o8ycc4cmo6l that I had the
dataset events table listed as `(dag_id, task_id, run_id, dataset_uri)` (we can
ignore dataset_uri vs dataset_id) is so that here we can make this query be
```python
events_to_process =
session.query(DDRE).filter(DDRE.run_id=self.run_id).all()
```
And that way we know that only this one dag is going to be processing those
event rows.
Otherwise we somewhat trivially end up with a case where two DagRuns for the
same dag running at the same time try to create the same downstream runs. And
one of them is going to fail and cause the transaction to fail (possibly
killing that scheduler?). Or worse, as it's currently implemented using now for
the run id we'll end up with each pending event duplicated but with slightly
different run_id.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]