SamWheating commented on a change in pull request #17891:
URL: https://github.com/apache/airflow/pull/17891#discussion_r699564483
##########
File path: airflow/models/dagbag.py
##########
@@ -466,6 +467,34 @@ def _bag_dag(self, *, dag, root_dag, recursive):
del self.dags[subdag.dag_id]
raise
+ @provide_session
+ def _check_if_duplicate(self, dag, session=None):
+ """
+ Checks if a DAG with the same ID already exists.
+ If present, returns the fileloc of the existing DAG.
+ """
+ from airflow.models.serialized_dag import SerializedDagModel # Avoid
circular import
+
+ other_dag = session.query(SerializedDagModel).filter(
+ SerializedDagModel.dag_id == dag.dag_id,
+ SerializedDagModel.fileloc_hash !=
DagCode.dag_fileloc_hash(dag.fileloc)
+ ).first()
+ if other_dag:
+ # If a DAG was just moved from one file to another, this condition
may sometimes check out.
+ # Heuristics below try to avoid a false-positive alert
+ if not os.path.exists(other_dag.fileloc):
+ # Other file is no more, nothing to worry about
+ return None
+ # Not feasible here to parse that other file to check if it still
has the DAG,
+ # so resort to just checking the task ids
+ task_ids = [task.task_id for task in dag.tasks]
+ other_dag_task_ids = [task['label'] for task in
other_dag.data['dag']['tasks']]
+ if task_ids != other_dag_task_ids:
Review comment:
If these two lists are built from the same DAG, will they always be in
the same order? If not then the comparison isn't going to be accurate.
If they aren't in a deterministic order, maybe we could sort or cast to a
set before comparison?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]