ephraimbuddy commented on code in PR #41398:
URL: https://github.com/apache/airflow/pull/41398#discussion_r1713335699
##########
airflow/datasets/manager.py:
##########
@@ -208,6 +206,37 @@ def _postgres_queue_dagruns(cls, dataset_id: int,
dags_to_queue: set[DagModel],
stmt =
insert(DatasetDagRunQueue).values(dataset_id=dataset_id).on_conflict_do_nothing()
session.execute(stmt, values)
+ @classmethod
+ def _send_dag_priority_parsing_request(cls, file_locs: list[str], session:
Session) -> None:
+ if session.bind.dialect.name == "postgresql":
+ return cls._postgres_send_dag_priority_parsing_request(file_locs,
session)
+ return cls._slow_path_send_dag_priority_parsing_request(file_locs,
session)
+
+ @classmethod
+ def _slow_path_send_dag_priority_parsing_request(cls, file_locs:
list[str], session: Session) -> None:
+ def _send_dag_priority_parsing_request_if_needed(fileloc: str) -> str
| None:
+ # Don't error whole transaction when a single
DagPriorityParsingRequest item conflicts.
+ #
https://docs.sqlalchemy.org/en/14/orm/session_transaction.html#using-savepoint
+ req = DagPriorityParsingRequest(fileloc=fileloc)
+ try:
+ with session.begin_nested():
+ session.merge(req)
+ except exc.IntegrityError:
+ cls.logger().debug("Skipping request %s", req, exc_info=True)
+ return None
+ return req.fileloc
+
+ results = (_send_dag_priority_parsing_request_if_needed(fileloc) for
fileloc in file_locs)
+ if filelocs_to_parse := [result for result in results if result is not
None]:
+ cls.logger().debug("parse DAGs in %s", filelocs_to_parse)
Review Comment:
```suggestion
```
I don't think we need this
##########
airflow/datasets/manager.py:
##########
@@ -208,6 +206,37 @@ def _postgres_queue_dagruns(cls, dataset_id: int,
dags_to_queue: set[DagModel],
stmt =
insert(DatasetDagRunQueue).values(dataset_id=dataset_id).on_conflict_do_nothing()
session.execute(stmt, values)
+ @classmethod
+ def _send_dag_priority_parsing_request(cls, file_locs: list[str], session:
Session) -> None:
+ if session.bind.dialect.name == "postgresql":
+ return cls._postgres_send_dag_priority_parsing_request(file_locs,
session)
+ return cls._slow_path_send_dag_priority_parsing_request(file_locs,
session)
+
+ @classmethod
+ def _slow_path_send_dag_priority_parsing_request(cls, file_locs:
list[str], session: Session) -> None:
+ def _send_dag_priority_parsing_request_if_needed(fileloc: str) -> str
| None:
+ # Don't error whole transaction when a single
DagPriorityParsingRequest item conflicts.
+ #
https://docs.sqlalchemy.org/en/14/orm/session_transaction.html#using-savepoint
+ req = DagPriorityParsingRequest(fileloc=fileloc)
+ try:
+ with session.begin_nested():
+ session.merge(req)
+ except exc.IntegrityError:
+ cls.logger().debug("Skipping request %s", req, exc_info=True)
Review Comment:
```suggestion
cls.logger().debug("Skipping request %s, already present",
req, exc_info=True)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]