This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-1-test by this push:
     new ca9ee6f9781 Fix backfill max_active_runs race condition with 
concurrent schedulers (#58807) (#58935)
ca9ee6f9781 is described below

commit ca9ee6f97819f655642d84f81d8f45ab827fe6eb
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Tue Dec 2 13:17:12 2025 +0100

    Fix backfill max_active_runs race condition with concurrent schedulers 
(#58807) (#58935)
---
 .../src/airflow/jobs/scheduler_job_runner.py       | 34 ++++++++-
 airflow-core/tests/unit/jobs/test_scheduler_job.py | 83 ++++++++++++++++++++++
 2 files changed, 116 insertions(+), 1 deletion(-)

diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py 
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index 8468e7ab2d9..cc2968c8d82 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -1763,12 +1763,41 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
             return False
         return True
 
+    def _lock_backfills(self, dag_runs: Collection[DagRun], session: Session) 
-> dict[int, Backfill]:
+        """
+        Lock Backfill rows to prevent race conditions when multiple schedulers 
run concurrently.
+
+        :param dag_runs: Collection of Dag runs to process
+        :param session: DB session
+        :return: Dict mapping backfill_id to locked Backfill objects
+        """
+        if not (backfill_ids := {dr.backfill_id for dr in dag_runs if 
dr.backfill_id is not None}):
+            return {}
+
+        locked_backfills = {
+            b.id: b
+            for b in session.scalars(
+                
select(Backfill).where(Backfill.id.in_(backfill_ids)).with_for_update(skip_locked=True)
+            )
+        }
+
+        if skipped_backfills := backfill_ids - locked_backfills.keys():
+            self.log.debug(
+                "Skipping backfill runs for backfill_ids=%s - locked by 
another scheduler",
+                skipped_backfills,
+            )
+
+        return locked_backfills
+
     @add_debug_span
     def _start_queued_dagruns(self, session: Session) -> None:
         """Find DagRuns in queued state and decide moving them to running 
state."""
         # added all() to save runtime, otherwise query is executed more than 
once
         dag_runs: Collection[DagRun] = 
DagRun.get_queued_dag_runs_to_set_running(session).all()
 
+        # Lock backfills to prevent race conditions with concurrent schedulers
+        locked_backfills = self._lock_backfills(dag_runs, session)
+
         query = (
             select(
                 DagRun.dag_id,
@@ -1832,13 +1861,16 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
             dag_id = dag_run.dag_id
             run_id = dag_run.run_id
             backfill_id = dag_run.backfill_id
-            backfill = dag_run.backfill
             dag = dag_run.dag = cached_get_dag(dag_run)
             if not dag:
                 self.log.error("DAG '%s' not found in serialized_dag table", 
dag_run.dag_id)
                 continue
             active_runs = active_runs_of_dags[(dag_id, backfill_id)]
             if backfill_id is not None:
+                if backfill_id not in locked_backfills:
+                    # Another scheduler has this backfill locked, skip this run
+                    continue
+                backfill = dag_run.backfill
                 if active_runs >= backfill.max_active_runs:
                     # todo: delete all "candidate dag runs" from list for this 
dag right now
                     self.log.info(
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py 
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index 4234fb27641..1ad6ffb9984 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -5699,6 +5699,89 @@ class TestSchedulerJob:
         assert session.scalar(select(func.count()).select_from(DagRun)) == 6
         assert session.scalar(select(func.count()).where(DagRun.dag_id == 
dag1_dag_id)) == 6
 
+    def test_backfill_runs_skipped_when_lock_held_by_another_scheduler(self, 
dag_maker, session):
+        """Test that a scheduler skips backfill runs when another scheduler 
holds the lock."""
+        dag_id = "test_dag1"
+        backfill_max_active_runs = 3
+        dag_max_active_runs = 1
+
+        with dag_maker(
+            dag_id=dag_id,
+            start_date=DEFAULT_DATE,
+            schedule=timedelta(days=1),
+            max_active_runs=dag_max_active_runs,
+            catchup=True,
+        ):
+            EmptyOperator(task_id="mytask")
+
+        from_date = pendulum.parse("2021-01-01")
+        to_date = pendulum.parse("2021-01-05")
+        _create_backfill(
+            dag_id=dag_id,
+            from_date=from_date,
+            to_date=to_date,
+            max_active_runs=backfill_max_active_runs,
+            reverse=False,
+            triggering_user_name="test_user",
+            dag_run_conf={},
+        )
+
+        queued_count = (
+            session.query(func.count(DagRun.id))
+            .filter(
+                DagRun.dag_id == dag_id,
+                DagRun.state == State.QUEUED,
+                DagRun.run_type == DagRunType.BACKFILL_JOB,
+            )
+            .scalar()
+        )
+        assert queued_count == 5
+
+        scheduler_job = Job(executor=MockExecutor(do_update=False))
+        job_runner = SchedulerJobRunner(job=scheduler_job)
+
+        # Simulate another scheduler holding the lock by returning empty from 
_lock_backfills
+        with patch.object(job_runner, "_lock_backfills", return_value={}):
+            job_runner._start_queued_dagruns(session)
+            session.flush()
+
+        # No runs should be started because we couldn't acquire the lock
+        running_count = (
+            session.query(func.count(DagRun.id))
+            .filter(
+                DagRun.dag_id == dag_id,
+                DagRun.state == State.RUNNING,
+                DagRun.run_type == DagRunType.BACKFILL_JOB,
+            )
+            .scalar()
+        )
+        assert running_count == 0, f"Expected 0 running when lock not 
acquired, but got {running_count}. "
+        # no locks now:
+        job_runner._start_queued_dagruns(session)
+        session.flush()
+
+        running_count = (
+            session.query(func.count(DagRun.id))
+            .filter(
+                DagRun.dag_id == dag_id,
+                DagRun.state == State.RUNNING,
+                DagRun.run_type == DagRunType.BACKFILL_JOB,
+            )
+            .scalar()
+        )
+        assert running_count == backfill_max_active_runs
+        queued_count = (
+            session.query(func.count(DagRun.id))
+            .filter(
+                DagRun.dag_id == dag_id,
+                DagRun.state == State.QUEUED,
+                DagRun.run_type == DagRunType.BACKFILL_JOB,
+            )
+            .scalar()
+        )
+        # 2 runs are still queued
+        assert queued_count == 2
+
     def test_start_queued_dagruns_do_follow_logical_date_order(self, 
dag_maker):
         session = settings.Session()
         with dag_maker("test_dag1", max_active_runs=1):

Reply via email to