Lee-W commented on code in PR #42358:
URL: https://github.com/apache/airflow/pull/42358#discussion_r1770829459
##########
airflow/dag_processing/collection.py:
##########
@@ -158,75 +140,101 @@ def calculate(cls, dags: dict[str, DAG], *, session:
Session) -> Self:
)
-def update_orm_dags(
- source_dags: dict[str, DAG],
- target_dags: dict[str, DagModel],
- *,
- processor_subdir: str | None = None,
- session: Session,
-) -> None:
- """
- Apply DAG attributes to DagModel objects.
-
- Objects in ``target_dags`` are modified in-place.
- """
- run_info = _RunInfo.calculate(source_dags, session=session)
-
- for dag_id, dm in sorted(target_dags.items()):
- dag = source_dags[dag_id]
- dm.fileloc = dag.fileloc
- dm.owners = dag.owner
- dm.is_active = True
- dm.has_import_errors = False
- dm.last_parsed_time = utcnow()
- dm.default_view = dag.default_view
- dm._dag_display_property_value = dag._dag_display_property_value
- dm.description = dag.description
- dm.max_active_tasks = dag.max_active_tasks
- dm.max_active_runs = dag.max_active_runs
- dm.max_consecutive_failed_dag_runs =
dag.max_consecutive_failed_dag_runs
- dm.has_task_concurrency_limits = any(
- t.max_active_tis_per_dag is not None or
t.max_active_tis_per_dagrun is not None for t in dag.tasks
- )
- dm.timetable_summary = dag.timetable.summary
- dm.timetable_description = dag.timetable.description
- dm.dataset_expression = dag.timetable.dataset_condition.as_expression()
- dm.processor_subdir = processor_subdir
-
- last_automated_run: DagRun | None =
run_info.latest_runs.get(dag.dag_id)
- if last_automated_run is None:
- last_automated_data_interval = None
- else:
- last_automated_data_interval =
dag.get_run_data_interval(last_automated_run)
- if run_info.num_active_runs.get(dag.dag_id, 0) >= dm.max_active_runs:
- dm.next_dagrun_create_after = None
+def _update_dag_tags(tag_names: set[str], dm: DagModel, *, session: Session)
-> None:
+ orm_tags = {t.name: t for t in dm.tags}
+ for name, orm_tag in orm_tags.items():
+ if name not in tag_names:
+ session.delete(orm_tag)
+ dm.tags.extend(DagTag(name=name, dag_id=dm.dag_id) for name in tag_names
if name not in orm_tags)
+
+
+def _update_dag_owner_links(dag_owner_links: dict[str, str], dm: DagModel, *,
session: Session) -> None:
+ orm_dag_owner_attributes = {obj.owner: obj for obj in dm.dag_owner_links}
+ for owner, obj in orm_dag_owner_attributes.items():
+ try:
+ link = dag_owner_links[owner]
+ except KeyError:
+ session.delete(obj)
else:
- dm.calculate_dagrun_date_fields(dag, last_automated_data_interval)
-
- if not dag.timetable.dataset_condition:
- dm.schedule_dataset_references = []
- dm.schedule_dataset_alias_references = []
- # FIXME: STORE NEW REFERENCES.
-
- dag_tags = set(dag.tags or ())
- for orm_tag in (dm_tags := list(dm.tags or [])):
- if orm_tag.name not in dag_tags:
- session.delete(orm_tag)
- dm.tags.remove(orm_tag)
- orm_tag_names = {t.name for t in dm_tags}
- for dag_tag in dag_tags:
- if dag_tag not in orm_tag_names:
- dag_tag_orm = DagTag(name=dag_tag, dag_id=dag.dag_id)
- dm.tags.append(dag_tag_orm)
- session.add(dag_tag_orm)
-
- dm_links = dm.dag_owner_links or []
- for dm_link in dm_links:
- if dm_link not in dag.owner_links:
- session.delete(dm_link)
- for owner_name, owner_link in dag.owner_links.items():
- dag_owner_orm = DagOwnerAttributes(dag_id=dag.dag_id,
owner=owner_name, link=owner_link)
- session.add(dag_owner_orm)
+ if obj.link != link:
+ obj.link = link
+ dm.dag_owner_links.extend(
+ DagOwnerAttributes(dag_id=dm.dag_id, owner=owner, link=link)
+ for owner, link in dag_owner_links.items()
+ if owner not in orm_dag_owner_attributes
+ )
+
+
+class DagModelOperation(NamedTuple):
+ """Collect DAG objects and perform database operations for them."""
+
+ dags: dict[str, DAG]
+
+ def add_dags(self, *, session: Session) -> dict[str, DagModel]:
+ orm_dags = _find_orm_dags(self.dags, session=session)
+ orm_dags.update(
+ (model.dag_id, model)
+ for model in _create_orm_dags(
+ (dag for dag_id, dag in self.dags.items() if dag_id not in
orm_dags),
+ session=session,
+ )
+ )
+ return orm_dags
+
+ def update_dags(
+ self,
+ orm_dags: dict[str, DagModel],
+ *,
+ processor_subdir: str | None = None,
+ session: Session,
+ ) -> None:
+ run_info = _RunInfo.calculate(self.dags, session=session)
+
+ for dag_id, dm in sorted(orm_dags.items()):
Review Comment:
sounds reasonable
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]