dstandish commented on code in PR #25959:
URL: https://github.com/apache/airflow/pull/25959#discussion_r959745974
##########
airflow/models/dag.py:
##########
@@ -2705,24 +2709,39 @@ class InletRef(NamedTuple):
del all_datasets
- # store dag-schedule-on-dataset references
- for dag_ref in dag_references:
- session.merge(
- DagScheduleDatasetReference(
- dataset_id=stored_datasets[dag_ref.uri].id,
- dag_id=dag_ref.dag_id,
- )
+ # reconcile dag-schedule-on-dataset references
+ for dag_id, uri_list in dag_references.items():
+ dag_refs_needed = {
+
DagScheduleDatasetReference(dataset_id=stored_datasets[uri].id, dag_id=dag_id)
+ for uri in uri_list
+ }
+ dag_refs_stored = set(
+ session.query(DagScheduleDatasetReference)
+ .filter(DagScheduleDatasetReference.dag_id == dag_id)
+ .all()
)
-
- # store task-outlet-dataset references
- for outlet_ref in outlet_references:
- session.merge(
- TaskOutletDatasetReference(
- dataset_id=stored_datasets[outlet_ref.uri].id,
- dag_id=outlet_ref.dag_id,
- task_id=outlet_ref.task_id,
+ dag_refs_to_add = {x for x in dag_refs_needed if x not in
dag_refs_stored}
+ session.bulk_save_objects(dag_refs_to_add)
+ for obj in dag_refs_stored - dag_refs_needed:
Review Comment:
operate on the relationship attr
##########
airflow/models/dag.py:
##########
@@ -2705,24 +2709,39 @@ class InletRef(NamedTuple):
del all_datasets
- # store dag-schedule-on-dataset references
- for dag_ref in dag_references:
- session.merge(
- DagScheduleDatasetReference(
- dataset_id=stored_datasets[dag_ref.uri].id,
- dag_id=dag_ref.dag_id,
- )
+ # reconcile dag-schedule-on-dataset references
+ for dag_id, uri_list in dag_references.items():
+ dag_refs_needed = {
+
DagScheduleDatasetReference(dataset_id=stored_datasets[uri].id, dag_id=dag_id)
+ for uri in uri_list
+ }
+ dag_refs_stored = set(
+ session.query(DagScheduleDatasetReference)
+ .filter(DagScheduleDatasetReference.dag_id == dag_id)
+ .all()
)
-
- # store task-outlet-dataset references
- for outlet_ref in outlet_references:
- session.merge(
- TaskOutletDatasetReference(
- dataset_id=stored_datasets[outlet_ref.uri].id,
- dag_id=outlet_ref.dag_id,
- task_id=outlet_ref.task_id,
+ dag_refs_to_add = {x for x in dag_refs_needed if x not in
dag_refs_stored}
+ session.bulk_save_objects(dag_refs_to_add)
+ for obj in dag_refs_stored - dag_refs_needed:
+ session.delete(obj)
+
+ # reconcile task-outlet-dataset references
+ for (dag_id, task_id), uri_list in outlet_references.items():
+ task_refs_needed = {
+ TaskOutletDatasetReference(dataset_id=stored_datasets[uri].id,
dag_id=dag_id, task_id=task_id)
+ for uri in uri_list
+ }
+ task_refs_stored = set(
+ session.query(TaskOutletDatasetReference)
+ .filter(
+ TaskOutletDatasetReference.dag_id == dag_id,
TaskOutletDatasetReference.task_id == task_id
)
+ .all()
)
+ task_refs_to_add = {x for x in task_refs_needed if x not in
task_refs_stored}
+ session.bulk_save_objects(task_refs_to_add)
+ for obj in task_refs_stored - task_refs_needed:
Review Comment:
possible bulk delete
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]