uranusjr commented on a change in pull request #17121:
URL: https://github.com/apache/airflow/pull/17121#discussion_r673904932
##########
File path: airflow/dag_processing/processor.py
##########
@@ -647,3 +650,17 @@ def process_file(
self.log.exception("Error logging import errors!")
return len(dagbag.dags), len(dagbag.import_errors)
+
+ def _deactivate_missing_dags(self, session: Session, dagbag: DagBag,
file_path: str) -> None:
+ for dag in (
+ session.query(DagModel)
+ .filter(DagModel.fileloc == file_path)
+ .filter(DagModel.is_active)
+ .filter(~DagModel.dag_id.in_(dagbag.dag_ids))
+ .all()
+ ):
+ self.log.warning(
+ "Deactivating DAG ID %s since it no longer exists in file %s",
dag.dag_id, file_path
+ )
+ dag.is_active = False
+ session.merge(dag)
Review comment:
Performance here can be improved with `UPDATE dag SET dag.is_active =
FALSE WHERE ...`. I’m not sure what the right SQLAlchemy syntax is for this
though.
One even significant improvement would be to somehow propagate the file
paths and DAG IDs out of `DagFileProcessorProcess` and batch-upgrade all the
file paths, but I don’t think that’s viable without _significant_ refactoring,
so let’s not do that.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]