This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 0da4993500 Retry on Airflow Schedule DAG Run DB Deadlock (#26347)
0da4993500 is described below

commit 0da49935000476b1d1941b63d0d66d3c58d64fea
Author: Anthony Panat <[email protected]>
AuthorDate: Sat Oct 1 23:33:59 2022 -0400

    Retry on Airflow Schedule DAG Run DB Deadlock (#26347)
    
    
    
    Co-authored-by: Anthony Panat <[email protected]>
    Co-authored-by: Anthony Panat 
<[email protected]>
---
 airflow/jobs/scheduler_job.py | 19 +++++++++++++------
 1 file changed, 13 insertions(+), 6 deletions(-)

diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 53a96cf9ac..74541a1cf8 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -943,12 +943,7 @@ class SchedulerJob(BaseJob):
             # Bulk fetch the currently active dag runs for the dags we are
             # examining, rather than making one query per DagRun
 
-            callback_tuples = []
-            for dag_run in dag_runs:
-                callback_to_run = self._schedule_dag_run(dag_run, session)
-                callback_tuples.append((dag_run, callback_to_run))
-
-            guard.commit()
+            callback_tuples = self._schedule_all_dag_runs(guard, dag_runs, 
session)
 
         # Send the callbacks after we commit to ensure the context is up to 
date when it gets run
         for dag_run, callback_to_run in callback_tuples:
@@ -1232,6 +1227,18 @@ class SchedulerJob(BaseJob):
                 active_runs_of_dags[dag_run.dag_id] += 1
                 _update_state(dag, dag_run)
 
+    @retry_db_transaction
+    def _schedule_all_dag_runs(self, guard, dag_runs, session):
+        """Makes scheduling decisions for all `dag_runs`"""
+        callback_tuples = []
+        for dag_run in dag_runs:
+            callback_to_run = self._schedule_dag_run(dag_run, session)
+            callback_tuples.append((dag_run, callback_to_run))
+
+        guard.commit()
+
+        return callback_tuples
+
     def _schedule_dag_run(
         self,
         dag_run: DagRun,

Reply via email to