jedcunningham commented on code in PR #25959:
URL: https://github.com/apache/airflow/pull/25959#discussion_r964302942


##########
airflow/models/dag.py:
##########
@@ -2665,29 +2665,39 @@ def bulk_write_to_db(cls, dags: Collection["DAG"], 
session=NEW_SESSION):
             TaskOutletDatasetReference,
         )
 
-        class OutletRef(NamedTuple):
-            dag_id: str
-            task_id: str
-            uri: str
-
-        class InletRef(NamedTuple):
-            dag_id: str
-            uri: str
-
-        dag_references = set()
-        outlet_references = set()
+        dag_references = collections.defaultdict(set)
+        outlet_references = collections.defaultdict(set)
         # We can't use a set here as we want to preserve order
         outlet_datasets: Dict[Dataset, None] = {}
         input_datasets: Dict[Dataset, None] = {}
+
+        # here we go through dags and tasks to check for dataset references
+        # if there are now None and previously there were some, we delete them
+        # if there are now *any*, we add them to the above data structures and
+        # later we'll persist them to the database.
         for dag in dags:
+            curr_orm_dag = existing_dags.get(dag.dag_id)
+            if not dag.dataset_triggers:
+                if curr_orm_dag and curr_orm_dag.schedule_dataset_references:
+                    curr_orm_dag.schedule_dataset_references = []
             for dataset in dag.dataset_triggers:
-                dag_references.add(InletRef(dag.dag_id, dataset.uri))
+                dag_references[dag.dag_id].add(dataset.uri)
                 input_datasets[DatasetModel.from_public(dataset)] = None
+            curr_outlet_references = curr_orm_dag and 
curr_orm_dag.task_outlet_dataset_references
             for task in dag.tasks:
-                for obj in task.outlets or []:
-                    if isinstance(obj, Dataset):
-                        outlet_references.add(OutletRef(task.dag_id, 
task.task_id, obj.uri))
-                        outlet_datasets[DatasetModel.from_public(obj)] = None
+                dataset_outlets = [x for x in task.outlets or [] if 
isinstance(x, Dataset)]
+                if not dataset_outlets:

Review Comment:
   Do we need this section in addition to the task outlet reconciliation 
further down? What we have below might be enough on its own?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to