This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c7b9c8dd32 Exclude backfill dag runs in active_runs_of_dags counts 
(#42684)
c7b9c8dd32 is described below

commit c7b9c8dd32590ac62ce6fa30683b9f4ba72a46a4
Author: Daniel Standish <[email protected]>
AuthorDate: Thu Oct 17 13:23:07 2024 -0700

    Exclude backfill dag runs in active_runs_of_dags counts (#42684)
    
    In the areas where this is used, we don't want to include backfill runs in 
the counts. Rather than rename the function to reflect the change, I add a 
parameter.
    
    https://github.com/orgs/apache/projects/408
---
 airflow/dag_processing/collection.py |  22 ++++--
 airflow/jobs/scheduler_job_runner.py |  28 +++++---
 airflow/models/dagrun.py             |   4 ++
 pyproject.toml                       |   1 +
 tests/jobs/test_scheduler_job.py     | 128 ++++++++++++++++++++++++++---------
 5 files changed, 140 insertions(+), 43 deletions(-)

diff --git a/airflow/dag_processing/collection.py 
b/airflow/dag_processing/collection.py
index f68ffbf331..068a3727d0 100644
--- a/airflow/dag_processing/collection.py
+++ b/airflow/dag_processing/collection.py
@@ -131,14 +131,24 @@ class _RunInfo(NamedTuple):
 
     @classmethod
     def calculate(cls, dags: dict[str, DAG], *, session: Session) -> Self:
+        """
+        Query the the run counts from the db.
+
+        :param dags: dict of dags to query
+        """
         # Skip these queries entirely if no DAGs can be scheduled to save time.
         if not any(dag.timetable.can_be_scheduled for dag in dags.values()):
             return cls({}, {})
-        return cls(
-            {run.dag_id: run for run in 
session.scalars(_get_latest_runs_stmt(dag_ids=dags))},
-            DagRun.active_runs_of_dags(dag_ids=dags, session=session),
+
+        latest_runs = {run.dag_id: run for run in 
session.scalars(_get_latest_runs_stmt(dag_ids=dags.keys()))}
+        active_run_counts = DagRun.active_runs_of_dags(
+            dag_ids=dags.keys(),
+            exclude_backfill=True,
+            session=session,
         )
 
+        return cls(latest_runs, active_run_counts)
+
 
 def _update_dag_tags(tag_names: set[str], dm: DagModel, *, session: Session) 
-> None:
     orm_tags = {t.name: t for t in dm.tags}
@@ -188,7 +198,11 @@ class DagModelOperation(NamedTuple):
         processor_subdir: str | None = None,
         session: Session,
     ) -> None:
-        run_info = _RunInfo.calculate(self.dags, session=session)
+        # we exclude backfill from active run counts since their concurrency 
is separate
+        run_info = _RunInfo.calculate(
+            dags=self.dags,
+            session=session,
+        )
 
         for dag_id, dm in sorted(orm_dags.items()):
             dag = self.dags[dag_id]
diff --git a/airflow/jobs/scheduler_job_runner.py 
b/airflow/jobs/scheduler_job_runner.py
index f583cb414e..ed67e7a4ac 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -1337,8 +1337,16 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
             .all()
         )
 
+        # backfill runs are not created by scheduler and their concurrency is 
separate
+        # so we exclude them here
         dag_ids = (dm.dag_id for dm in dag_models)
-        active_runs_of_dags = 
Counter(DagRun.active_runs_of_dags(dag_ids=dag_ids, session=session))
+        active_runs_of_dags = Counter(
+            DagRun.active_runs_of_dags(
+                dag_ids=dag_ids,
+                exclude_backfill=True,
+                session=session,
+            )
+        )
 
         for dag_model in dag_models:
             dag = self.dagbag.get_dag(dag_model.dag_id, session=session)
@@ -1382,7 +1390,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                 dag,
                 dag_model,
                 last_dag_run=None,
-                total_active_runs=active_runs_of_dags[dag.dag_id],
+                active_non_backfill_runs=active_runs_of_dags[dag.dag_id],
                 session=session,
             ):
                 dag_model.calculate_dagrun_date_fields(dag, data_interval)
@@ -1496,7 +1504,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
         dag_model: DagModel,
         *,
         last_dag_run: DagRun | None = None,
-        total_active_runs: int | None = None,
+        active_non_backfill_runs: int | None = None,
         session: Session,
     ) -> bool:
         """Check if the dag's next_dagruns_create_after should be updated."""
@@ -1511,15 +1519,19 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
         if not dag.timetable.can_be_scheduled:
             return False
 
-        if total_active_runs is None:
-            runs_dict = DagRun.active_runs_of_dags(dag_ids=[dag.dag_id], 
session=session)
-            total_active_runs = runs_dict.get(dag.dag_id, 0)
+        if active_non_backfill_runs is None:
+            runs_dict = DagRun.active_runs_of_dags(
+                dag_ids=[dag.dag_id],
+                exclude_backfill=True,
+                session=session,
+            )
+            active_non_backfill_runs = runs_dict.get(dag.dag_id, 0)
 
-        if total_active_runs >= dag.max_active_runs:
+        if active_non_backfill_runs >= dag.max_active_runs:
             self.log.info(
                 "DAG %s is at (or above) max_active_runs (%d of %d), not 
creating any more runs",
                 dag_model.dag_id,
-                total_active_runs,
+                active_non_backfill_runs,
                 dag.max_active_runs,
             )
             dag_model.next_dagrun_create_after = None
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 4fd689d616..20ec12b991 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -386,7 +386,9 @@ class DagRun(Base, LoggingMixin):
     @provide_session
     def active_runs_of_dags(
         cls,
+        *,
         dag_ids: Iterable[str],
+        exclude_backfill,
         session: Session = NEW_SESSION,
     ) -> dict[str, int]:
         """
@@ -400,6 +402,8 @@ class DagRun(Base, LoggingMixin):
             .where(cls.state.in_((DagRunState.RUNNING, DagRunState.QUEUED)))
             .group_by(cls.dag_id)
         )
+        if exclude_backfill:
+            query = query.where(cls.run_type != DagRunType.BACKFILL_JOB)
         return dict(iter(session.execute(query)))
 
     @classmethod
diff --git a/pyproject.toml b/pyproject.toml
index 746e77f97e..1b1f595450 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -298,6 +298,7 @@ ignore = [
     "PT005", # Fixture returns a value, remove leading underscore
     "PT006", # Wrong type of names in @pytest.mark.parametrize
     "PT007", # Wrong type of values in @pytest.mark.parametrize
+    "PT013", # silly rule prohibiting e.g. `from pytest import param`
     "PT011", # pytest.raises() is too broad, set the match parameter
     "PT019", # fixture without value is injected as parameter, use 
@pytest.mark.usefixtures instead
     # Rules below explicitly set off which could overlap with Ruff's formatter
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index b2b7f5a5ed..52ad2f6a8c 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -33,6 +33,7 @@ import pendulum
 import psutil
 import pytest
 import time_machine
+from pytest import param
 from sqlalchemy import func, select, update
 
 import airflow.example_dags
@@ -3818,62 +3819,119 @@ class TestSchedulerJob:
         assert "Marked 1 SchedulerJob instances as failed" in caplog.messages
 
     @pytest.mark.parametrize(
-        "schedule, number_running, excepted",
+        "kwargs",
         [
-            (None, None, False),
-            ("*/1 * * * *", None, False),
-            ("*/1 * * * *", 1, True),
+            param(
+                dict(
+                    schedule=None,
+                    backfill_runs=0,
+                    other_runs=2,
+                    max_active_runs=2,
+                    should_update=False,
+                ),
+                id="no_dag_schedule",
+            ),
+            param(
+                dict(
+                    schedule="0 0 * * *",
+                    backfill_runs=0,
+                    other_runs=2,
+                    max_active_runs=2,
+                    should_update=False,
+                ),
+                id="dag_schedule_at_capacity",
+            ),
+            param(
+                dict(
+                    schedule="0 0 * * *",
+                    backfill_runs=0,
+                    other_runs=1,
+                    max_active_runs=2,
+                    should_update=True,
+                ),
+                id="dag_schedule_under_capacity",
+            ),
+            param(
+                dict(
+                    schedule="0 0 * * *",
+                    backfill_runs=0,
+                    other_runs=5,
+                    max_active_runs=2,
+                    should_update=False,
+                ),
+                id="dag_schedule_over_capacity",
+            ),
+            param(
+                dict(
+                    schedule="0 0 * * *",
+                    number_running=None,
+                    backfill_runs=5,
+                    other_runs=1,
+                    max_active_runs=2,
+                    should_update=True,
+                ),
+                id="dag_schedule_under_capacity_many_backfill",
+            ),
         ],
-        ids=["no_dag_schedule", "dag_schedule_too_many_runs", 
"dag_schedule_less_runs"],
     )
-    def test_should_update_dag_next_dagruns(self, schedule, number_running, 
excepted, session, dag_maker):
+    @pytest.mark.parametrize("provide_run_count", [True, False])
+    def test_should_update_dag_next_dagruns(self, provide_run_count: bool, 
kwargs: dict, session, dag_maker):
         """Test if really required to update next dagrun or possible to save 
run time"""
+        schedule: str | None = kwargs["schedule"]
+        backfill_runs: int = kwargs["backfill_runs"]
+        other_runs: int = kwargs["other_runs"]
+        max_active_runs: int = kwargs["max_active_runs"]
+        should_update: bool = kwargs["should_update"]
 
-        with dag_maker(
-            dag_id="test_should_update_dag_next_dagruns", schedule=schedule, 
max_active_runs=2
-        ) as dag:
+        with dag_maker(schedule=schedule, max_active_runs=max_active_runs) as 
dag:
             EmptyOperator(task_id="dummy")
 
-        dag_model = dag_maker.dag_model
-
-        for index in range(2):
+        index = 0
+        for index in range(other_runs):
             dag_maker.create_dagrun(
                 run_id=f"run_{index}",
                 execution_date=(DEFAULT_DATE + timedelta(days=index)),
                 start_date=timezone.utcnow(),
                 state=State.RUNNING,
+                run_type=DagRunType.SCHEDULED,
                 session=session,
             )
-
-        session.flush()
+        for index in range(index + 1, index + 1 + backfill_runs):
+            dag_maker.create_dagrun(
+                run_id=f"run_{index}",
+                execution_date=(DEFAULT_DATE + timedelta(days=index)),
+                start_date=timezone.utcnow(),
+                state=State.RUNNING,
+                run_type=DagRunType.BACKFILL_JOB,
+                session=session,
+            )
+        assert index == other_runs + backfill_runs - 1  # sanity check
+        session.commit()
         scheduler_job = Job(executor=self.null_exec)
         self.job_runner = SchedulerJobRunner(job=scheduler_job)
 
-        assert excepted is self.job_runner._should_update_dag_next_dagruns(
-            dag, dag_model, total_active_runs=number_running, session=session
+        actual = self.job_runner._should_update_dag_next_dagruns(
+            dag=dag,
+            dag_model=dag_maker.dag_model,
+            active_non_backfill_runs=other_runs if provide_run_count else 
None,  # exclude backfill here
+            session=session,
         )
+        assert actual == should_update
 
     @pytest.mark.parametrize(
-        "run_type, should_update",
+        "run_type, expected",
         [
             (DagRunType.MANUAL, False),
             (DagRunType.SCHEDULED, True),
             (DagRunType.BACKFILL_JOB, True),
             (DagRunType.DATASET_TRIGGERED, False),
         ],
-        ids=[
-            DagRunType.MANUAL.name,
-            DagRunType.SCHEDULED.name,
-            DagRunType.BACKFILL_JOB.name,
-            DagRunType.DATASET_TRIGGERED.name,
-        ],
     )
-    def test_should_update_dag_next_dagruns_after_run_type(self, run_type, 
should_update, session, dag_maker):
-        """Test that whether next dagrun is updated depends on run type"""
+    def test_should_update_dag_next_dagruns_after_run_type(self, run_type, 
expected, session, dag_maker):
+        """Test that whether next dag run is updated depends on run type"""
         with dag_maker(
-            dag_id="test_should_update_dag_next_dagruns_after_run_type",
             schedule="*/1 * * * *",
-            max_active_runs=10,
+            max_active_runs=3,
         ) as dag:
             EmptyOperator(task_id="dummy")
 
@@ -3892,9 +3950,13 @@ class TestSchedulerJob:
         scheduler_job = Job(executor=self.null_exec)
         self.job_runner = SchedulerJobRunner(job=scheduler_job)
 
-        assert should_update is 
self.job_runner._should_update_dag_next_dagruns(
-            dag, dag_model, last_dag_run=run, total_active_runs=0, 
session=session
+        actual = self.job_runner._should_update_dag_next_dagruns(
+            dag=dag,
+            dag_model=dag_model,
+            last_dag_run=run,
+            session=session,
         )
+        assert actual == expected
 
     def test_create_dag_runs(self, dag_maker):
         """
@@ -4477,14 +4539,18 @@ class TestSchedulerJob:
         model: DagModel = session.get(DagModel, dag.dag_id)
 
         # Pre-condition
-        assert DagRun.active_runs_of_dags(dag_ids=["test_dag"], 
session=session) == {"test_dag": 3}
+        assert DagRun.active_runs_of_dags(dag_ids=["test_dag"], 
exclude_backfill=True, session=session) == {
+            "test_dag": 3
+        }
 
         assert model.next_dagrun == timezone.DateTime(2016, 1, 3, tzinfo=UTC)
         assert model.next_dagrun_create_after is None
 
         complete_one_dagrun()
 
-        assert DagRun.active_runs_of_dags(dag_ids=["test_dag"], 
session=session) == {"test_dag": 3}
+        assert DagRun.active_runs_of_dags(dag_ids=["test_dag"], 
exclude_backfill=True, session=session) == {
+            "test_dag": 3
+        }
 
         for _ in range(5):
             self.job_runner._do_scheduling(session)

Reply via email to