dstandish commented on code in PR #25959:
URL: https://github.com/apache/airflow/pull/25959#discussion_r959745974
##########
airflow/models/dag.py:
##########
@@ -2705,24 +2709,39 @@ class InletRef(NamedTuple):
del all_datasets
- # store dag-schedule-on-dataset references
- for dag_ref in dag_references:
- session.merge(
- DagScheduleDatasetReference(
- dataset_id=stored_datasets[dag_ref.uri].id,
- dag_id=dag_ref.dag_id,
- )
+ # reconcile dag-schedule-on-dataset references
+ for dag_id, uri_list in dag_references.items():
+ dag_refs_needed = {
+
DagScheduleDatasetReference(dataset_id=stored_datasets[uri].id, dag_id=dag_id)
+ for uri in uri_list
+ }
+ dag_refs_stored = set(
+ session.query(DagScheduleDatasetReference)
+ .filter(DagScheduleDatasetReference.dag_id == dag_id)
+ .all()
)
-
- # store task-outlet-dataset references
- for outlet_ref in outlet_references:
- session.merge(
- TaskOutletDatasetReference(
- dataset_id=stored_datasets[outlet_ref.uri].id,
- dag_id=outlet_ref.dag_id,
- task_id=outlet_ref.task_id,
+ dag_refs_to_add = {x for x in dag_refs_needed if x not in
dag_refs_stored}
+ session.bulk_save_objects(dag_refs_to_add)
+ for obj in dag_refs_stored - dag_refs_needed:
Review Comment:
operate on the relationship attr
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]