This is an automated email from the ASF dual-hosted git repository.
utkarsharma pushed a commit to branch v2-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v2-10-test by this push:
new e001b88f58 fix DagPriorityParsingRequest unique constraint error when
dataset aliases are resolved into new datasets (#41398)
e001b88f58 is described below
commit e001b88f5875cfd7e295891a0bbdbc75a3dccbfb
Author: Wei Lee <[email protected]>
AuthorDate: Mon Aug 12 17:24:59 2024 +0800
fix DagPriorityParsingRequest unique constraint error when dataset aliases
are resolved into new datasets (#41398)
* fix(datasets/manager): fix DagPriorityParsingRequest unique constraint
error when dataset aliases are resolved into new datasets
this happens when dynamic task mapping is used
* refactor(dataset/manager): reword debug log
Co-authored-by: Ephraim Anierobi <[email protected]>
* refactor(dataset/manager): remove unnecessary logging
Co-authored-by: Ephraim Anierobi <[email protected]>
---------
Co-authored-by: Ephraim Anierobi <[email protected]>
(cherry picked from commit bf64cb686be28f5e60e0b06f5fae790481c2efcd)
---
airflow/datasets/manager.py | 35 +++++++++++++++++++++++++++++++----
1 file changed, 31 insertions(+), 4 deletions(-)
diff --git a/airflow/datasets/manager.py b/airflow/datasets/manager.py
index 29f95ef4c7..058eef6ab8 100644
--- a/airflow/datasets/manager.py
+++ b/airflow/datasets/manager.py
@@ -140,10 +140,8 @@ class DatasetManager(LoggingMixin):
dags_to_reparse = dags_to_queue_from_dataset_alias -
dags_to_queue_from_dataset
if dags_to_reparse:
- session.add_all(
- DagPriorityParsingRequest(fileloc=fileloc)
- for fileloc in {dag.fileloc for dag in dags_to_reparse}
- )
+ file_locs = {dag.fileloc for dag in dags_to_reparse}
+ cls._send_dag_priority_parsing_request(file_locs, session)
session.flush()
cls.notify_dataset_changed(dataset=dataset)
@@ -208,6 +206,35 @@ class DatasetManager(LoggingMixin):
stmt =
insert(DatasetDagRunQueue).values(dataset_id=dataset_id).on_conflict_do_nothing()
session.execute(stmt, values)
+ @classmethod
+ def _send_dag_priority_parsing_request(cls, file_locs: Iterable[str],
session: Session) -> None:
+ if session.bind.dialect.name == "postgresql":
+ return cls._postgres_send_dag_priority_parsing_request(file_locs,
session)
+ return cls._slow_path_send_dag_priority_parsing_request(file_locs,
session)
+
+ @classmethod
+ def _slow_path_send_dag_priority_parsing_request(cls, file_locs:
Iterable[str], session: Session) -> None:
+ def _send_dag_priority_parsing_request_if_needed(fileloc: str) -> str
| None:
+ # Don't error whole transaction when a single
DagPriorityParsingRequest item conflicts.
+ #
https://docs.sqlalchemy.org/en/14/orm/session_transaction.html#using-savepoint
+ req = DagPriorityParsingRequest(fileloc=fileloc)
+ try:
+ with session.begin_nested():
+ session.merge(req)
+ except exc.IntegrityError:
+ cls.logger().debug("Skipping request %s, already present",
req, exc_info=True)
+ return None
+ return req.fileloc
+
+ (_send_dag_priority_parsing_request_if_needed(fileloc) for fileloc in
file_locs)
+
+ @classmethod
+ def _postgres_send_dag_priority_parsing_request(cls, file_locs:
Iterable[str], session: Session) -> None:
+ from sqlalchemy.dialects.postgresql import insert
+
+ stmt = insert(DagPriorityParsingRequest).on_conflict_do_nothing()
+ session.execute(stmt, {"fileloc": fileloc for fileloc in file_locs})
+
def resolve_dataset_manager() -> DatasetManager:
"""Retrieve the dataset manager."""