potiuk commented on a change in pull request #7477: [AIRFLOW-6857][depends on 
AIRFLOW-6856] Bulk sync DAGs
URL: https://github.com/apache/airflow/pull/7477#discussion_r382450159
 
 

 ##########
 File path: airflow/models/dag.py
 ##########
 @@ -1448,63 +1448,88 @@ def create_dagrun(self,
 
         return run
 
+    @classmethod
     @provide_session
-    def sync_to_db(self, owner=None, sync_time=None, session=None):
+    def bulk_sync_to_db(cls, dags: List["DAG"], sync_time=None, session=None):
         """
-        Save attributes about this DAG to the DB. Note that this method
+        Save attributes about list of DAG to the DB. Note that this method
         can be called for both DAGs and SubDAGs. A SubDag is actually a
         SubDagOperator.
 
-        :param dag: the DAG object to save to the DB
-        :type dag: airflow.models.DAG
+        :param dags: the DAG objects to save to the DB
+        :type dags: List[airflow.models.dag.DAG]
         :param sync_time: The time that the DAG should be marked as sync'ed
         :type sync_time: datetime
         :return: None
         """
+        if not dags:
+            return
         from airflow.models.serialized_dag import SerializedDagModel
 
-        if owner is None:
-            owner = self.owner
         if sync_time is None:
             sync_time = timezone.utcnow()
-
-        orm_dag = session.query(
-            DagModel).filter(DagModel.dag_id == self.dag_id).first()
-        if not orm_dag:
-            orm_dag = DagModel(dag_id=self.dag_id)
-            if self.is_paused_upon_creation is not None:
-                orm_dag.is_paused = self.is_paused_upon_creation
-            self.log.info("Creating ORM DAG for %s", self.dag_id)
+        log.info("Sync %s DAGs", len(dags))
+        dag_by_ids = {dag.dag_id: dag for dag in dags}
+        dag_ids = set(dag_by_ids.keys())
+        orm_dags = session.query(DagModel)\
+            .options(
+            joinedload(DagModel.tags, innerjoin=False)
+        )\
+            .filter(DagModel.dag_id.in_(dag_ids)).all()
+        existing_dag_ids = {orm_dag.dag_id for orm_dag in orm_dags}
+        missing_dag_ids = dag_ids.difference(existing_dag_ids)
+
+        for missing_dag_id in missing_dag_ids:
+            orm_dag = DagModel(dag_id=missing_dag_id)
+            dag = dag_by_ids[missing_dag_id]
+            if dag.is_paused_upon_creation is not None:
+                orm_dag.is_paused = dag.is_paused_upon_creation
+            log.info("Creating ORM DAG for %s", dag.dag_id)
             session.add(orm_dag)
-        if self.is_subdag:
-            orm_dag.is_subdag = True
-            orm_dag.fileloc = self.parent_dag.fileloc
-            orm_dag.root_dag_id = self.parent_dag.dag_id
-        else:
-            orm_dag.is_subdag = False
-            orm_dag.fileloc = self.fileloc
-        orm_dag.owners = owner
-        orm_dag.is_active = True
-        orm_dag.last_scheduler_run = sync_time
-        orm_dag.default_view = self._default_view
-        orm_dag.description = self.description
-        orm_dag.schedule_interval = self.schedule_interval
-        orm_dag.tags = self.get_dagtags(session=session)
+            orm_dags.append(orm_dag)
+
+        for orm_dag in orm_dags:
 
 Review comment:
   I love the change! 
   
   I have only one suggestion here. I am not sure if there will be parallel 
executions of  bulk_sync_db() but even if there are none now there might be 
some in the future.
   
   In such case it's an excellent practice to force the same sequence of 
updates on multiple entries by sorting the entries in the same order. This 
prevents the deadlock situation. When multiple entries are processed in random 
order, deadlock situation is very likely to happen. Sorting them in a 
consistent order makes deadlock impossible to occur. 
   
   in this case I think simple sorting of the orm_dag by the dag_id should do 
the trick. Also if we have any other bulk updates to dagbag table we should 
apply the same principle.
    

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to