This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch v2-5-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit d06ca4e3b51456c52f2cff25d46a35e34e233fe4
Author: Michael Petro <[email protected]>
AuthorDate: Wed Mar 22 09:20:36 2023 -0400

    dag processor manager, add retry_db_transcation to _fetch_callbacks (#30079)
    
    * dag processor manager, add retry_db_transcation to _fetch_callbacks
    
    * dag processor manager, create separate method to fetch callbacks with 
retries to split up provide_session and retry_db_transaction
    
    (cherry picked from commit 1f2b0c21d5ebefc404d12c123674e6ac45873646)
---
 airflow/dag_processing/manager.py | 5 +++++
 1 file changed, 5 insertions(+)

diff --git a/airflow/dag_processing/manager.py 
b/airflow/dag_processing/manager.py
index 1d8a082869..2cefa87767 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -61,6 +61,7 @@ from airflow.utils.process_utils import (
     reap_process_group,
     set_new_process_group,
 )
+from airflow.utils.retries import retry_db_transaction
 from airflow.utils.session import NEW_SESSION, provide_session
 from airflow.utils.sqlalchemy import prohibit_commit, skip_locked, 
with_row_locks
 
@@ -666,6 +667,10 @@ class DagFileProcessorManager(LoggingMixin):
 
     @provide_session
     def _fetch_callbacks(self, max_callbacks: int, session: Session = 
NEW_SESSION):
+        self._fetch_callbacks_with_retries(max_callbacks, session)
+
+    @retry_db_transaction
+    def _fetch_callbacks_with_retries(self, max_callbacks: int, session: 
Session):
         """Fetches callbacks from database and add them to the internal queue 
for execution."""
         self.log.debug("Fetching callbacks from the database.")
         with prohibit_commit(session) as guard:

Reply via email to