This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 1f2b0c21d5 dag processor manager, add retry_db_transcation to
_fetch_callbacks (#30079)
1f2b0c21d5 is described below
commit 1f2b0c21d5ebefc404d12c123674e6ac45873646
Author: Michael Petro <[email protected]>
AuthorDate: Wed Mar 22 09:20:36 2023 -0400
dag processor manager, add retry_db_transcation to _fetch_callbacks (#30079)
* dag processor manager, add retry_db_transcation to _fetch_callbacks
* dag processor manager, create separate method to fetch callbacks with
retries to split up provide_session and retry_db_transaction
---
airflow/dag_processing/manager.py | 5 +++++
1 file changed, 5 insertions(+)
diff --git a/airflow/dag_processing/manager.py
b/airflow/dag_processing/manager.py
index e8266ea4fa..b8bd7332f6 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -62,6 +62,7 @@ from airflow.utils.process_utils import (
reap_process_group,
set_new_process_group,
)
+from airflow.utils.retries import retry_db_transaction
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.sqlalchemy import prohibit_commit, skip_locked,
with_row_locks
@@ -687,6 +688,10 @@ class DagFileProcessorManager(LoggingMixin):
@provide_session
def _fetch_callbacks(self, max_callbacks: int, session: Session =
NEW_SESSION):
+ self._fetch_callbacks_with_retries(max_callbacks, session)
+
+ @retry_db_transaction
+ def _fetch_callbacks_with_retries(self, max_callbacks: int, session:
Session):
"""Fetches callbacks from database and add them to the internal queue
for execution."""
self.log.debug("Fetching callbacks from the database.")
with prohibit_commit(session) as guard: