This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-1-test by this push:
     new 9d3e032f93e Fix scheduler using stale max_active_runs from 
SerializedDAG (#57619) (#57959)
9d3e032f93e is described below

commit 9d3e032f93e4eaf3ef57ffa34dc1b6b99f6c1b82
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Thu Nov 6 15:01:17 2025 +0100

    Fix scheduler using stale max_active_runs from SerializedDAG (#57619) 
(#57959)
    
    * Fix scheduler using stale max_active_runs from SerializedDAG
    
    When max_active_runs is updated via versioned bundles, the scheduler
    was checking the limit against stale SerializedDAG data instead of
    the latest DagModel value. This caused queued DAG runs to be incorrectly
    blocked even when the limit had been increased.
    
    The scheduler now uses max_active_runs from DagModel (accessed via
    dag_run association proxy) to ensure versioned bundle updates to
    max_active_runs are respected immediately.
    
    closes: #57604
    
    * fixup! Fix scheduler using stale max_active_runs from SerializedDAG
    
    * fixup! fixup! Fix scheduler using stale max_active_runs from SerializedDAG
    
    (cherry picked from commit 1c1130a2999fcc0b8d1acafda97914dd8824d613)
---
 .../src/airflow/jobs/scheduler_job_runner.py       |  6 +-
 airflow-core/tests/unit/jobs/test_scheduler_job.py | 99 ++++++++++++++++++++++
 2 files changed, 103 insertions(+), 2 deletions(-)

diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py 
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index 0763f59e3b1..8468e7ab2d9 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -1850,8 +1850,10 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                         run_id,
                     )
                     continue
-            elif dag.max_active_runs:
-                if active_runs >= dag.max_active_runs:
+            elif dag_run.max_active_runs:
+                # Using dag_run.max_active_runs which links to DagModel to 
ensure we are checking
+                # against the most recent changes on the dag and not using 
stale serialized dag
+                if active_runs >= dag_run.max_active_runs:
                     # todo: delete all candidate dag runs for this dag from 
list right now
                     self.log.info(
                         "dag cannot be started due to dag max_active_runs 
constraint; "
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py 
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index 49e82fd6587..5f6e5fdfc61 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -7125,6 +7125,105 @@ def test_schedule_dag_run_with_upstream_skip(dag_maker, 
session):
     # dummy3 should be skipped because dummy1 is skipped.
     assert tis[dummy3.task_id].state == State.SKIPPED
 
+    def 
test_start_queued_dagruns_uses_latest_max_active_runs_from_dag_model(self, 
dag_maker, session):
+        """
+        Test that _start_queued_dagruns uses max_active_runs from DagModel 
(via dag_run)
+        instead of stale SerializedDAG max_active_runs.
+
+        This test verifies the fix where SerializedDAG may have stale 
max_active_runs,
+        but DagModel has the latest value updated by version changes(versioned 
bundles). The scheduler should
+        use the latest value from DagModel to respect user updates.
+        """
+        # Create a DAG with max_active_runs=1 initially
+        with dag_maker(
+            dag_id="test_max_active_runs_stale_serialized",
+            max_active_runs=1,
+            session=session,
+        ) as dag:
+            EmptyOperator(task_id="dummy_task")
+
+        dag_model = dag_maker.dag_model
+        assert dag_model.max_active_runs == 1
+
+        # Create a SerializedDAG (which will have max_active_runs=1)
+        # This simulates the SerializedDAG being created/updated from the DAG 
file
+        scheduler_job = Job(executor=self.null_exec)
+        self.job_runner = SchedulerJobRunner(job=scheduler_job)
+        self.job_runner._create_dag_runs([dag_model], session)
+
+        # Verify SerializedDAG has max_active_runs=1
+        dag_run_1 = (
+            session.query(DagRun).filter(DagRun.dag_id == 
dag.dag_id).order_by(DagRun.logical_date).first()
+        )
+        assert dag_run_1 is not None
+        serialized_dag = 
self.job_runner.scheduler_dag_bag.get_dag_for_run(dag_run_1, session=session)
+        assert serialized_dag is not None
+        assert serialized_dag.max_active_runs == 1
+
+        # Now update DagModel.max_active_runs to 2 (simulating a versioned 
bundle update)
+        # This is the latest value, but SerializedDAG still has the old value
+        dag_model.max_active_runs = 2
+        session.commit()
+        session.refresh(dag_model)
+
+        # Create 1 running dag run
+        dag_run_1.state = DagRunState.RUNNING
+        session.commit()
+
+        # Create 1 queued dag run
+        dag_run_2 = dag_maker.create_dagrun(
+            run_id="test_run_2",
+            state=DagRunState.QUEUED,
+            run_type=DagRunType.SCHEDULED,
+            session=session,
+        )
+
+        # Ensure dag_run_2 has the updated DagModel relationship loaded
+        # The association proxy dag_run.max_active_runs accesses 
dag_model.max_active_runs
+        # so we need to ensure the relationship is loaded
+        session.refresh(dag_run_2)
+
+        # Verify we have 1 running and 1 queued
+        running_count = (
+            session.query(DagRun)
+            .filter(DagRun.dag_id == dag.dag_id, DagRun.state == 
DagRunState.RUNNING)
+            .count()
+        )
+        queued_count = (
+            session.query(DagRun)
+            .filter(DagRun.dag_id == dag.dag_id, DagRun.state == 
DagRunState.QUEUED)
+            .count()
+        )
+        assert running_count == 1
+        assert queued_count == 1
+
+        # The SerializedDAG still has max_active_runs=1 (stale)
+        # But DagModel has max_active_runs=2 (latest)
+        assert serialized_dag.max_active_runs == 1
+        assert dag_model.max_active_runs == 2
+
+        # Call _start_queued_dagruns
+        # With the fix: Should start the queued run (using DagModel 
max_active_runs=2, active_runs=1 < 2)
+        # Without the fix: Would block the queued run (using SerializedDAG 
max_active_runs=1, active_runs=1 >= 1)
+        self.job_runner._start_queued_dagruns(session)
+        session.flush()
+
+        # Verify that the queued dag run started (proves it used 
DagModel.max_active_runs=2)
+        dag_run_2 = session.get(DagRun, dag_run_2.id)
+        assert dag_run_2.state == DagRunState.RUNNING, (
+            "The queued dag run should have started because 
DagModel.max_active_runs=2 "
+            "allows it (active_runs=1 < 2), even though 
SerializedDAG.max_active_runs=1 for that dagrun serdag version "
+            "would have blocked it."
+        )
+
+        # Verify we now have 2 running dag runs
+        running_count = (
+            session.query(DagRun)
+            .filter(DagRun.dag_id == dag.dag_id, DagRun.state == 
DagRunState.RUNNING)
+            .count()
+        )
+        assert running_count == 2
+
 
 class TestSchedulerJobQueriesCount:
     """

Reply via email to