This is an automated email from the ASF dual-hosted git repository. pierrejeambrun pushed a commit to branch v2-5-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit d06ca4e3b51456c52f2cff25d46a35e34e233fe4 Author: Michael Petro <[email protected]> AuthorDate: Wed Mar 22 09:20:36 2023 -0400 dag processor manager, add retry_db_transcation to _fetch_callbacks (#30079) * dag processor manager, add retry_db_transcation to _fetch_callbacks * dag processor manager, create separate method to fetch callbacks with retries to split up provide_session and retry_db_transaction (cherry picked from commit 1f2b0c21d5ebefc404d12c123674e6ac45873646) --- airflow/dag_processing/manager.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index 1d8a082869..2cefa87767 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -61,6 +61,7 @@ from airflow.utils.process_utils import ( reap_process_group, set_new_process_group, ) +from airflow.utils.retries import retry_db_transaction from airflow.utils.session import NEW_SESSION, provide_session from airflow.utils.sqlalchemy import prohibit_commit, skip_locked, with_row_locks @@ -666,6 +667,10 @@ class DagFileProcessorManager(LoggingMixin): @provide_session def _fetch_callbacks(self, max_callbacks: int, session: Session = NEW_SESSION): + self._fetch_callbacks_with_retries(max_callbacks, session) + + @retry_db_transaction + def _fetch_callbacks_with_retries(self, max_callbacks: int, session: Session): """Fetches callbacks from database and add them to the internal queue for execution.""" self.log.debug("Fetching callbacks from the database.") with prohibit_commit(session) as guard:
