This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch backport-22af27e-v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 589df29a348a194b6ff28a5553ac202cf00a0a1c
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Tue Dec 2 12:11:00 2025 +0100

    Fix backfill max_active_runs race condition with concurrent schedulers 
(#58807)
    
    * Fix backfill max_active_runs race condition with concurrent schedulers
    
    When two schedulers run concurrently, both could start more backfill
    dag runs than max_active_runs allows. This happened because each
    scheduler read the count of running dag runs before either committed,
    causing both to see stale counts and start runs simultaneously.
    
    The fix adds row-level locking on the Backfill table. When a scheduler
    processes backfill dag runs, it first locks the relevant Backfill rows.
    If another scheduler already holds the lock, the current scheduler skips
    those backfills rather than potentially violating the max_active_runs
    constraint.
    
    This ensures that only one scheduler can process a given backfill's
    dag runs at a time, preventing the race condition while remaining
    non-blocking (schedulers don't wait on each other).
    
    (cherry picked from commit 22af27ea5a750ed92b203bd47846e5197cfda7fa)
---
 .../src/airflow/jobs/scheduler_job_runner.py       | 34 ++++++++-
 airflow-core/tests/unit/jobs/test_scheduler_job.py | 83 ++++++++++++++++++++++
 2 files changed, 116 insertions(+), 1 deletion(-)

diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py 
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index 8468e7ab2d9..cc2968c8d82 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -1763,12 +1763,41 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
             return False
         return True
 
+    def _lock_backfills(self, dag_runs: Collection[DagRun], session: Session) 
-> dict[int, Backfill]:
+        """
+        Lock Backfill rows to prevent race conditions when multiple schedulers 
run concurrently.
+
+        :param dag_runs: Collection of Dag runs to process
+        :param session: DB session
+        :return: Dict mapping backfill_id to locked Backfill objects
+        """
+        if not (backfill_ids := {dr.backfill_id for dr in dag_runs if 
dr.backfill_id is not None}):
+            return {}
+
+        locked_backfills = {
+            b.id: b
+            for b in session.scalars(
+                
select(Backfill).where(Backfill.id.in_(backfill_ids)).with_for_update(skip_locked=True)
+            )
+        }
+
+        if skipped_backfills := backfill_ids - locked_backfills.keys():
+            self.log.debug(
+                "Skipping backfill runs for backfill_ids=%s - locked by 
another scheduler",
+                skipped_backfills,
+            )
+
+        return locked_backfills
+
     @add_debug_span
     def _start_queued_dagruns(self, session: Session) -> None:
         """Find DagRuns in queued state and decide moving them to running 
state."""
         # added all() to save runtime, otherwise query is executed more than 
once
         dag_runs: Collection[DagRun] = 
DagRun.get_queued_dag_runs_to_set_running(session).all()
 
+        # Lock backfills to prevent race conditions with concurrent schedulers
+        locked_backfills = self._lock_backfills(dag_runs, session)
+
         query = (
             select(
                 DagRun.dag_id,
@@ -1832,13 +1861,16 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
             dag_id = dag_run.dag_id
             run_id = dag_run.run_id
             backfill_id = dag_run.backfill_id
-            backfill = dag_run.backfill
             dag = dag_run.dag = cached_get_dag(dag_run)
             if not dag:
                 self.log.error("DAG '%s' not found in serialized_dag table", 
dag_run.dag_id)
                 continue
             active_runs = active_runs_of_dags[(dag_id, backfill_id)]
             if backfill_id is not None:
+                if backfill_id not in locked_backfills:
+                    # Another scheduler has this backfill locked, skip this run
+                    continue
+                backfill = dag_run.backfill
                 if active_runs >= backfill.max_active_runs:
                     # todo: delete all "candidate dag runs" from list for this 
dag right now
                     self.log.info(
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py 
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index 4234fb27641..1ad6ffb9984 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -5699,6 +5699,89 @@ class TestSchedulerJob:
         assert session.scalar(select(func.count()).select_from(DagRun)) == 6
         assert session.scalar(select(func.count()).where(DagRun.dag_id == 
dag1_dag_id)) == 6
 
+    def test_backfill_runs_skipped_when_lock_held_by_another_scheduler(self, 
dag_maker, session):
+        """Test that a scheduler skips backfill runs when another scheduler 
holds the lock."""
+        dag_id = "test_dag1"
+        backfill_max_active_runs = 3
+        dag_max_active_runs = 1
+
+        with dag_maker(
+            dag_id=dag_id,
+            start_date=DEFAULT_DATE,
+            schedule=timedelta(days=1),
+            max_active_runs=dag_max_active_runs,
+            catchup=True,
+        ):
+            EmptyOperator(task_id="mytask")
+
+        from_date = pendulum.parse("2021-01-01")
+        to_date = pendulum.parse("2021-01-05")
+        _create_backfill(
+            dag_id=dag_id,
+            from_date=from_date,
+            to_date=to_date,
+            max_active_runs=backfill_max_active_runs,
+            reverse=False,
+            triggering_user_name="test_user",
+            dag_run_conf={},
+        )
+
+        queued_count = (
+            session.query(func.count(DagRun.id))
+            .filter(
+                DagRun.dag_id == dag_id,
+                DagRun.state == State.QUEUED,
+                DagRun.run_type == DagRunType.BACKFILL_JOB,
+            )
+            .scalar()
+        )
+        assert queued_count == 5
+
+        scheduler_job = Job(executor=MockExecutor(do_update=False))
+        job_runner = SchedulerJobRunner(job=scheduler_job)
+
+        # Simulate another scheduler holding the lock by returning empty from 
_lock_backfills
+        with patch.object(job_runner, "_lock_backfills", return_value={}):
+            job_runner._start_queued_dagruns(session)
+            session.flush()
+
+        # No runs should be started because we couldn't acquire the lock
+        running_count = (
+            session.query(func.count(DagRun.id))
+            .filter(
+                DagRun.dag_id == dag_id,
+                DagRun.state == State.RUNNING,
+                DagRun.run_type == DagRunType.BACKFILL_JOB,
+            )
+            .scalar()
+        )
+        assert running_count == 0, f"Expected 0 running when lock not 
acquired, but got {running_count}. "
+        # no locks now:
+        job_runner._start_queued_dagruns(session)
+        session.flush()
+
+        running_count = (
+            session.query(func.count(DagRun.id))
+            .filter(
+                DagRun.dag_id == dag_id,
+                DagRun.state == State.RUNNING,
+                DagRun.run_type == DagRunType.BACKFILL_JOB,
+            )
+            .scalar()
+        )
+        assert running_count == backfill_max_active_runs
+        queued_count = (
+            session.query(func.count(DagRun.id))
+            .filter(
+                DagRun.dag_id == dag_id,
+                DagRun.state == State.QUEUED,
+                DagRun.run_type == DagRunType.BACKFILL_JOB,
+            )
+            .scalar()
+        )
+        # 2 runs are still queued
+        assert queued_count == 2
+
     def test_start_queued_dagruns_do_follow_logical_date_order(self, 
dag_maker):
         session = settings.Session()
         with dag_maker("test_dag1", max_active_runs=1):

Reply via email to