This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c7b9c8dd32 Exclude backfill dag runs in active_runs_of_dags counts
(#42684)
c7b9c8dd32 is described below
commit c7b9c8dd32590ac62ce6fa30683b9f4ba72a46a4
Author: Daniel Standish <[email protected]>
AuthorDate: Thu Oct 17 13:23:07 2024 -0700
Exclude backfill dag runs in active_runs_of_dags counts (#42684)
In the areas where this is used, we don't want to include backfill runs in
the counts. Rather than rename the function to reflect the change, I add a
parameter.
https://github.com/orgs/apache/projects/408
---
airflow/dag_processing/collection.py | 22 ++++--
airflow/jobs/scheduler_job_runner.py | 28 +++++---
airflow/models/dagrun.py | 4 ++
pyproject.toml | 1 +
tests/jobs/test_scheduler_job.py | 128 ++++++++++++++++++++++++++---------
5 files changed, 140 insertions(+), 43 deletions(-)
diff --git a/airflow/dag_processing/collection.py
b/airflow/dag_processing/collection.py
index f68ffbf331..068a3727d0 100644
--- a/airflow/dag_processing/collection.py
+++ b/airflow/dag_processing/collection.py
@@ -131,14 +131,24 @@ class _RunInfo(NamedTuple):
@classmethod
def calculate(cls, dags: dict[str, DAG], *, session: Session) -> Self:
+ """
+ Query the the run counts from the db.
+
+ :param dags: dict of dags to query
+ """
# Skip these queries entirely if no DAGs can be scheduled to save time.
if not any(dag.timetable.can_be_scheduled for dag in dags.values()):
return cls({}, {})
- return cls(
- {run.dag_id: run for run in
session.scalars(_get_latest_runs_stmt(dag_ids=dags))},
- DagRun.active_runs_of_dags(dag_ids=dags, session=session),
+
+ latest_runs = {run.dag_id: run for run in
session.scalars(_get_latest_runs_stmt(dag_ids=dags.keys()))}
+ active_run_counts = DagRun.active_runs_of_dags(
+ dag_ids=dags.keys(),
+ exclude_backfill=True,
+ session=session,
)
+ return cls(latest_runs, active_run_counts)
+
def _update_dag_tags(tag_names: set[str], dm: DagModel, *, session: Session)
-> None:
orm_tags = {t.name: t for t in dm.tags}
@@ -188,7 +198,11 @@ class DagModelOperation(NamedTuple):
processor_subdir: str | None = None,
session: Session,
) -> None:
- run_info = _RunInfo.calculate(self.dags, session=session)
+ # we exclude backfill from active run counts since their concurrency
is separate
+ run_info = _RunInfo.calculate(
+ dags=self.dags,
+ session=session,
+ )
for dag_id, dm in sorted(orm_dags.items()):
dag = self.dags[dag_id]
diff --git a/airflow/jobs/scheduler_job_runner.py
b/airflow/jobs/scheduler_job_runner.py
index f583cb414e..ed67e7a4ac 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -1337,8 +1337,16 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
.all()
)
+ # backfill runs are not created by scheduler and their concurrency is
separate
+ # so we exclude them here
dag_ids = (dm.dag_id for dm in dag_models)
- active_runs_of_dags =
Counter(DagRun.active_runs_of_dags(dag_ids=dag_ids, session=session))
+ active_runs_of_dags = Counter(
+ DagRun.active_runs_of_dags(
+ dag_ids=dag_ids,
+ exclude_backfill=True,
+ session=session,
+ )
+ )
for dag_model in dag_models:
dag = self.dagbag.get_dag(dag_model.dag_id, session=session)
@@ -1382,7 +1390,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
dag,
dag_model,
last_dag_run=None,
- total_active_runs=active_runs_of_dags[dag.dag_id],
+ active_non_backfill_runs=active_runs_of_dags[dag.dag_id],
session=session,
):
dag_model.calculate_dagrun_date_fields(dag, data_interval)
@@ -1496,7 +1504,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
dag_model: DagModel,
*,
last_dag_run: DagRun | None = None,
- total_active_runs: int | None = None,
+ active_non_backfill_runs: int | None = None,
session: Session,
) -> bool:
"""Check if the dag's next_dagruns_create_after should be updated."""
@@ -1511,15 +1519,19 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
if not dag.timetable.can_be_scheduled:
return False
- if total_active_runs is None:
- runs_dict = DagRun.active_runs_of_dags(dag_ids=[dag.dag_id],
session=session)
- total_active_runs = runs_dict.get(dag.dag_id, 0)
+ if active_non_backfill_runs is None:
+ runs_dict = DagRun.active_runs_of_dags(
+ dag_ids=[dag.dag_id],
+ exclude_backfill=True,
+ session=session,
+ )
+ active_non_backfill_runs = runs_dict.get(dag.dag_id, 0)
- if total_active_runs >= dag.max_active_runs:
+ if active_non_backfill_runs >= dag.max_active_runs:
self.log.info(
"DAG %s is at (or above) max_active_runs (%d of %d), not
creating any more runs",
dag_model.dag_id,
- total_active_runs,
+ active_non_backfill_runs,
dag.max_active_runs,
)
dag_model.next_dagrun_create_after = None
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 4fd689d616..20ec12b991 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -386,7 +386,9 @@ class DagRun(Base, LoggingMixin):
@provide_session
def active_runs_of_dags(
cls,
+ *,
dag_ids: Iterable[str],
+ exclude_backfill,
session: Session = NEW_SESSION,
) -> dict[str, int]:
"""
@@ -400,6 +402,8 @@ class DagRun(Base, LoggingMixin):
.where(cls.state.in_((DagRunState.RUNNING, DagRunState.QUEUED)))
.group_by(cls.dag_id)
)
+ if exclude_backfill:
+ query = query.where(cls.run_type != DagRunType.BACKFILL_JOB)
return dict(iter(session.execute(query)))
@classmethod
diff --git a/pyproject.toml b/pyproject.toml
index 746e77f97e..1b1f595450 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -298,6 +298,7 @@ ignore = [
"PT005", # Fixture returns a value, remove leading underscore
"PT006", # Wrong type of names in @pytest.mark.parametrize
"PT007", # Wrong type of values in @pytest.mark.parametrize
+ "PT013", # silly rule prohibiting e.g. `from pytest import param`
"PT011", # pytest.raises() is too broad, set the match parameter
"PT019", # fixture without value is injected as parameter, use
@pytest.mark.usefixtures instead
# Rules below explicitly set off which could overlap with Ruff's formatter
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index b2b7f5a5ed..52ad2f6a8c 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -33,6 +33,7 @@ import pendulum
import psutil
import pytest
import time_machine
+from pytest import param
from sqlalchemy import func, select, update
import airflow.example_dags
@@ -3818,62 +3819,119 @@ class TestSchedulerJob:
assert "Marked 1 SchedulerJob instances as failed" in caplog.messages
@pytest.mark.parametrize(
- "schedule, number_running, excepted",
+ "kwargs",
[
- (None, None, False),
- ("*/1 * * * *", None, False),
- ("*/1 * * * *", 1, True),
+ param(
+ dict(
+ schedule=None,
+ backfill_runs=0,
+ other_runs=2,
+ max_active_runs=2,
+ should_update=False,
+ ),
+ id="no_dag_schedule",
+ ),
+ param(
+ dict(
+ schedule="0 0 * * *",
+ backfill_runs=0,
+ other_runs=2,
+ max_active_runs=2,
+ should_update=False,
+ ),
+ id="dag_schedule_at_capacity",
+ ),
+ param(
+ dict(
+ schedule="0 0 * * *",
+ backfill_runs=0,
+ other_runs=1,
+ max_active_runs=2,
+ should_update=True,
+ ),
+ id="dag_schedule_under_capacity",
+ ),
+ param(
+ dict(
+ schedule="0 0 * * *",
+ backfill_runs=0,
+ other_runs=5,
+ max_active_runs=2,
+ should_update=False,
+ ),
+ id="dag_schedule_over_capacity",
+ ),
+ param(
+ dict(
+ schedule="0 0 * * *",
+ number_running=None,
+ backfill_runs=5,
+ other_runs=1,
+ max_active_runs=2,
+ should_update=True,
+ ),
+ id="dag_schedule_under_capacity_many_backfill",
+ ),
],
- ids=["no_dag_schedule", "dag_schedule_too_many_runs",
"dag_schedule_less_runs"],
)
- def test_should_update_dag_next_dagruns(self, schedule, number_running,
excepted, session, dag_maker):
+ @pytest.mark.parametrize("provide_run_count", [True, False])
+ def test_should_update_dag_next_dagruns(self, provide_run_count: bool,
kwargs: dict, session, dag_maker):
"""Test if really required to update next dagrun or possible to save
run time"""
+ schedule: str | None = kwargs["schedule"]
+ backfill_runs: int = kwargs["backfill_runs"]
+ other_runs: int = kwargs["other_runs"]
+ max_active_runs: int = kwargs["max_active_runs"]
+ should_update: bool = kwargs["should_update"]
- with dag_maker(
- dag_id="test_should_update_dag_next_dagruns", schedule=schedule,
max_active_runs=2
- ) as dag:
+ with dag_maker(schedule=schedule, max_active_runs=max_active_runs) as
dag:
EmptyOperator(task_id="dummy")
- dag_model = dag_maker.dag_model
-
- for index in range(2):
+ index = 0
+ for index in range(other_runs):
dag_maker.create_dagrun(
run_id=f"run_{index}",
execution_date=(DEFAULT_DATE + timedelta(days=index)),
start_date=timezone.utcnow(),
state=State.RUNNING,
+ run_type=DagRunType.SCHEDULED,
session=session,
)
-
- session.flush()
+ for index in range(index + 1, index + 1 + backfill_runs):
+ dag_maker.create_dagrun(
+ run_id=f"run_{index}",
+ execution_date=(DEFAULT_DATE + timedelta(days=index)),
+ start_date=timezone.utcnow(),
+ state=State.RUNNING,
+ run_type=DagRunType.BACKFILL_JOB,
+ session=session,
+ )
+ assert index == other_runs + backfill_runs - 1 # sanity check
+ session.commit()
scheduler_job = Job(executor=self.null_exec)
self.job_runner = SchedulerJobRunner(job=scheduler_job)
- assert excepted is self.job_runner._should_update_dag_next_dagruns(
- dag, dag_model, total_active_runs=number_running, session=session
+ actual = self.job_runner._should_update_dag_next_dagruns(
+ dag=dag,
+ dag_model=dag_maker.dag_model,
+ active_non_backfill_runs=other_runs if provide_run_count else
None, # exclude backfill here
+ session=session,
)
+ assert actual == should_update
@pytest.mark.parametrize(
- "run_type, should_update",
+ "run_type, expected",
[
(DagRunType.MANUAL, False),
(DagRunType.SCHEDULED, True),
(DagRunType.BACKFILL_JOB, True),
(DagRunType.DATASET_TRIGGERED, False),
],
- ids=[
- DagRunType.MANUAL.name,
- DagRunType.SCHEDULED.name,
- DagRunType.BACKFILL_JOB.name,
- DagRunType.DATASET_TRIGGERED.name,
- ],
)
- def test_should_update_dag_next_dagruns_after_run_type(self, run_type,
should_update, session, dag_maker):
- """Test that whether next dagrun is updated depends on run type"""
+ def test_should_update_dag_next_dagruns_after_run_type(self, run_type,
expected, session, dag_maker):
+ """Test that whether next dag run is updated depends on run type"""
with dag_maker(
- dag_id="test_should_update_dag_next_dagruns_after_run_type",
schedule="*/1 * * * *",
- max_active_runs=10,
+ max_active_runs=3,
) as dag:
EmptyOperator(task_id="dummy")
@@ -3892,9 +3950,13 @@ class TestSchedulerJob:
scheduler_job = Job(executor=self.null_exec)
self.job_runner = SchedulerJobRunner(job=scheduler_job)
- assert should_update is
self.job_runner._should_update_dag_next_dagruns(
- dag, dag_model, last_dag_run=run, total_active_runs=0,
session=session
+ actual = self.job_runner._should_update_dag_next_dagruns(
+ dag=dag,
+ dag_model=dag_model,
+ last_dag_run=run,
+ session=session,
)
+ assert actual == expected
def test_create_dag_runs(self, dag_maker):
"""
@@ -4477,14 +4539,18 @@ class TestSchedulerJob:
model: DagModel = session.get(DagModel, dag.dag_id)
# Pre-condition
- assert DagRun.active_runs_of_dags(dag_ids=["test_dag"],
session=session) == {"test_dag": 3}
+ assert DagRun.active_runs_of_dags(dag_ids=["test_dag"],
exclude_backfill=True, session=session) == {
+ "test_dag": 3
+ }
assert model.next_dagrun == timezone.DateTime(2016, 1, 3, tzinfo=UTC)
assert model.next_dagrun_create_after is None
complete_one_dagrun()
- assert DagRun.active_runs_of_dags(dag_ids=["test_dag"],
session=session) == {"test_dag": 3}
+ assert DagRun.active_runs_of_dags(dag_ids=["test_dag"],
exclude_backfill=True, session=session) == {
+ "test_dag": 3
+ }
for _ in range(5):
self.job_runner._do_scheduling(session)