This is an automated email from the ASF dual-hosted git repository.
weilee pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-1-test by this push:
new ca9ee6f9781 Fix backfill max_active_runs race condition with
concurrent schedulers (#58807) (#58935)
ca9ee6f9781 is described below
commit ca9ee6f97819f655642d84f81d8f45ab827fe6eb
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Tue Dec 2 13:17:12 2025 +0100
Fix backfill max_active_runs race condition with concurrent schedulers
(#58807) (#58935)
---
.../src/airflow/jobs/scheduler_job_runner.py | 34 ++++++++-
airflow-core/tests/unit/jobs/test_scheduler_job.py | 83 ++++++++++++++++++++++
2 files changed, 116 insertions(+), 1 deletion(-)
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index 8468e7ab2d9..cc2968c8d82 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -1763,12 +1763,41 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
return False
return True
+ def _lock_backfills(self, dag_runs: Collection[DagRun], session: Session)
-> dict[int, Backfill]:
+ """
+ Lock Backfill rows to prevent race conditions when multiple schedulers
run concurrently.
+
+ :param dag_runs: Collection of Dag runs to process
+ :param session: DB session
+ :return: Dict mapping backfill_id to locked Backfill objects
+ """
+ if not (backfill_ids := {dr.backfill_id for dr in dag_runs if
dr.backfill_id is not None}):
+ return {}
+
+ locked_backfills = {
+ b.id: b
+ for b in session.scalars(
+
select(Backfill).where(Backfill.id.in_(backfill_ids)).with_for_update(skip_locked=True)
+ )
+ }
+
+ if skipped_backfills := backfill_ids - locked_backfills.keys():
+ self.log.debug(
+ "Skipping backfill runs for backfill_ids=%s - locked by
another scheduler",
+ skipped_backfills,
+ )
+
+ return locked_backfills
+
@add_debug_span
def _start_queued_dagruns(self, session: Session) -> None:
"""Find DagRuns in queued state and decide moving them to running
state."""
# added all() to save runtime, otherwise query is executed more than
once
dag_runs: Collection[DagRun] =
DagRun.get_queued_dag_runs_to_set_running(session).all()
+ # Lock backfills to prevent race conditions with concurrent schedulers
+ locked_backfills = self._lock_backfills(dag_runs, session)
+
query = (
select(
DagRun.dag_id,
@@ -1832,13 +1861,16 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
dag_id = dag_run.dag_id
run_id = dag_run.run_id
backfill_id = dag_run.backfill_id
- backfill = dag_run.backfill
dag = dag_run.dag = cached_get_dag(dag_run)
if not dag:
self.log.error("DAG '%s' not found in serialized_dag table",
dag_run.dag_id)
continue
active_runs = active_runs_of_dags[(dag_id, backfill_id)]
if backfill_id is not None:
+ if backfill_id not in locked_backfills:
+ # Another scheduler has this backfill locked, skip this run
+ continue
+ backfill = dag_run.backfill
if active_runs >= backfill.max_active_runs:
# todo: delete all "candidate dag runs" from list for this
dag right now
self.log.info(
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index 4234fb27641..1ad6ffb9984 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -5699,6 +5699,89 @@ class TestSchedulerJob:
assert session.scalar(select(func.count()).select_from(DagRun)) == 6
assert session.scalar(select(func.count()).where(DagRun.dag_id ==
dag1_dag_id)) == 6
+ def test_backfill_runs_skipped_when_lock_held_by_another_scheduler(self,
dag_maker, session):
+ """Test that a scheduler skips backfill runs when another scheduler
holds the lock."""
+ dag_id = "test_dag1"
+ backfill_max_active_runs = 3
+ dag_max_active_runs = 1
+
+ with dag_maker(
+ dag_id=dag_id,
+ start_date=DEFAULT_DATE,
+ schedule=timedelta(days=1),
+ max_active_runs=dag_max_active_runs,
+ catchup=True,
+ ):
+ EmptyOperator(task_id="mytask")
+
+ from_date = pendulum.parse("2021-01-01")
+ to_date = pendulum.parse("2021-01-05")
+ _create_backfill(
+ dag_id=dag_id,
+ from_date=from_date,
+ to_date=to_date,
+ max_active_runs=backfill_max_active_runs,
+ reverse=False,
+ triggering_user_name="test_user",
+ dag_run_conf={},
+ )
+
+ queued_count = (
+ session.query(func.count(DagRun.id))
+ .filter(
+ DagRun.dag_id == dag_id,
+ DagRun.state == State.QUEUED,
+ DagRun.run_type == DagRunType.BACKFILL_JOB,
+ )
+ .scalar()
+ )
+ assert queued_count == 5
+
+ scheduler_job = Job(executor=MockExecutor(do_update=False))
+ job_runner = SchedulerJobRunner(job=scheduler_job)
+
+ # Simulate another scheduler holding the lock by returning empty from
_lock_backfills
+ with patch.object(job_runner, "_lock_backfills", return_value={}):
+ job_runner._start_queued_dagruns(session)
+ session.flush()
+
+ # No runs should be started because we couldn't acquire the lock
+ running_count = (
+ session.query(func.count(DagRun.id))
+ .filter(
+ DagRun.dag_id == dag_id,
+ DagRun.state == State.RUNNING,
+ DagRun.run_type == DagRunType.BACKFILL_JOB,
+ )
+ .scalar()
+ )
+ assert running_count == 0, f"Expected 0 running when lock not
acquired, but got {running_count}. "
+ # no locks now:
+ job_runner._start_queued_dagruns(session)
+ session.flush()
+
+ running_count = (
+ session.query(func.count(DagRun.id))
+ .filter(
+ DagRun.dag_id == dag_id,
+ DagRun.state == State.RUNNING,
+ DagRun.run_type == DagRunType.BACKFILL_JOB,
+ )
+ .scalar()
+ )
+ assert running_count == backfill_max_active_runs
+ queued_count = (
+ session.query(func.count(DagRun.id))
+ .filter(
+ DagRun.dag_id == dag_id,
+ DagRun.state == State.QUEUED,
+ DagRun.run_type == DagRunType.BACKFILL_JOB,
+ )
+ .scalar()
+ )
+ # 2 runs are still queued
+ assert queued_count == 2
+
def test_start_queued_dagruns_do_follow_logical_date_order(self,
dag_maker):
session = settings.Session()
with dag_maker("test_dag1", max_active_runs=1):