This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 1f2b0c21d5 dag processor manager, add retry_db_transcation to 
_fetch_callbacks (#30079)
1f2b0c21d5 is described below

commit 1f2b0c21d5ebefc404d12c123674e6ac45873646
Author: Michael Petro <[email protected]>
AuthorDate: Wed Mar 22 09:20:36 2023 -0400

    dag processor manager, add retry_db_transcation to _fetch_callbacks (#30079)
    
    * dag processor manager, add retry_db_transcation to _fetch_callbacks
    
    * dag processor manager, create separate method to fetch callbacks with 
retries to split up provide_session and retry_db_transaction
---
 airflow/dag_processing/manager.py | 5 +++++
 1 file changed, 5 insertions(+)

diff --git a/airflow/dag_processing/manager.py 
b/airflow/dag_processing/manager.py
index e8266ea4fa..b8bd7332f6 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -62,6 +62,7 @@ from airflow.utils.process_utils import (
     reap_process_group,
     set_new_process_group,
 )
+from airflow.utils.retries import retry_db_transaction
 from airflow.utils.session import NEW_SESSION, provide_session
 from airflow.utils.sqlalchemy import prohibit_commit, skip_locked, 
with_row_locks
 
@@ -687,6 +688,10 @@ class DagFileProcessorManager(LoggingMixin):
 
     @provide_session
     def _fetch_callbacks(self, max_callbacks: int, session: Session = 
NEW_SESSION):
+        self._fetch_callbacks_with_retries(max_callbacks, session)
+
+    @retry_db_transaction
+    def _fetch_callbacks_with_retries(self, max_callbacks: int, session: 
Session):
         """Fetches callbacks from database and add them to the internal queue 
for execution."""
         self.log.debug("Fetching callbacks from the database.")
         with prohibit_commit(session) as guard:

Reply via email to