SamWheating commented on a change in pull request #17121:
URL: https://github.com/apache/airflow/pull/17121#discussion_r701386688



##########
File path: airflow/dag_processing/processor.py
##########
@@ -645,3 +648,12 @@ def process_file(
             self.log.exception("Error logging import errors!")
 
         return len(dagbag.dags), len(dagbag.import_errors)
+
+    def _deactivate_missing_dags(self, session: Session, dagbag: DagBag, 
file_path: str) -> None:
+        deactivated = (
+            session.query(DagModel)
+            .filter(DagModel.fileloc == file_path, DagModel.is_active, 
~DagModel.dag_id.in_(dagbag.dag_ids))
+            .update({DagModel.is_active: False}, synchronize_session="fetch")
+        )
+        if deactivated:
+            self.log.info("Deactivated %i DAGs which are no longer present in 
%s", deactivated, file_path)

Review comment:
       Ah, thanks for confirming. If its not supported in sqlite and mySQL then 
I don't think we can rely on this feature.
   
   We could split this into a two-part query like:
   
   ```python
           deactivated = (
               session.query(DagModel)
               .filter(DagModel.fileloc == file_path, DagModel.is_active, 
~DagModel.dag_id.in_(dagbag.dag_ids))
           )
           if deactivated:
                deactivated_dags = [dag.dag_id for dag in deactivated]
                deactivated.update({DagModel.is_active: False}, 
synchronize_session="fetch")
                self.log.info("Deactivated missing DAGs: %s", 
",".join(deactivated_dags))
   ```
   
   But I think that will introduce potential race conditions and consistency 
issues, so I'd rather keep things as-is. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to