Lee-W commented on code in PR #42358:
URL: https://github.com/apache/airflow/pull/42358#discussion_r1770735535


##########
airflow/dag_processing/collection.py:
##########
@@ -158,75 +140,101 @@ def calculate(cls, dags: dict[str, DAG], *, session: 
Session) -> Self:
         )
 
 
-def update_orm_dags(
-    source_dags: dict[str, DAG],
-    target_dags: dict[str, DagModel],
-    *,
-    processor_subdir: str | None = None,
-    session: Session,
-) -> None:
-    """
-    Apply DAG attributes to DagModel objects.
-
-    Objects in ``target_dags`` are modified in-place.
-    """
-    run_info = _RunInfo.calculate(source_dags, session=session)
-
-    for dag_id, dm in sorted(target_dags.items()):
-        dag = source_dags[dag_id]
-        dm.fileloc = dag.fileloc
-        dm.owners = dag.owner
-        dm.is_active = True
-        dm.has_import_errors = False
-        dm.last_parsed_time = utcnow()
-        dm.default_view = dag.default_view
-        dm._dag_display_property_value = dag._dag_display_property_value
-        dm.description = dag.description
-        dm.max_active_tasks = dag.max_active_tasks
-        dm.max_active_runs = dag.max_active_runs
-        dm.max_consecutive_failed_dag_runs = 
dag.max_consecutive_failed_dag_runs
-        dm.has_task_concurrency_limits = any(
-            t.max_active_tis_per_dag is not None or 
t.max_active_tis_per_dagrun is not None for t in dag.tasks
-        )
-        dm.timetable_summary = dag.timetable.summary
-        dm.timetable_description = dag.timetable.description
-        dm.dataset_expression = dag.timetable.dataset_condition.as_expression()
-        dm.processor_subdir = processor_subdir
-
-        last_automated_run: DagRun | None = 
run_info.latest_runs.get(dag.dag_id)
-        if last_automated_run is None:
-            last_automated_data_interval = None
-        else:
-            last_automated_data_interval = 
dag.get_run_data_interval(last_automated_run)
-        if run_info.num_active_runs.get(dag.dag_id, 0) >= dm.max_active_runs:
-            dm.next_dagrun_create_after = None
+def _update_dag_tags(tag_names: set[str], dm: DagModel, *, session: Session) 
-> None:
+    orm_tags = {t.name: t for t in dm.tags}
+    for name, orm_tag in orm_tags.items():
+        if name not in tag_names:
+            session.delete(orm_tag)
+    dm.tags.extend(DagTag(name=name, dag_id=dm.dag_id) for name in tag_names 
if name not in orm_tags)

Review Comment:
   We probably could use set here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to