This is an automated email from the ASF dual-hosted git repository.
dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e0b7077745 Add test for behavior for paused backfill (#42837)
e0b7077745 is described below
commit e0b7077745523cacd34176c4b6a1e0e887a2c41c
Author: Daniel Standish <[email protected]>
AuthorDate: Thu Oct 10 14:40:10 2024 -0700
Add test for behavior for paused backfill (#42837)
-----
Co-authored-by: Kaxil Naik <[email protected]>
---
tests/jobs/test_scheduler_job.py | 90 +++++++++++++++++++++++++++++++++++++++-
1 file changed, 89 insertions(+), 1 deletion(-)
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 639a3528fa..78c3acdce0 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -53,7 +53,7 @@ from airflow.jobs.job import Job, run_job
from airflow.jobs.local_task_job_runner import LocalTaskJobRunner
from airflow.jobs.scheduler_job_runner import SchedulerJobRunner
from airflow.models.asset import AssetDagRunQueue, AssetEvent, AssetModel
-from airflow.models.backfill import _create_backfill
+from airflow.models.backfill import Backfill, _create_backfill
from airflow.models.dag import DAG, DagModel
from airflow.models.dagbag import DagBag
from airflow.models.dagrun import DagRun
@@ -4960,6 +4960,94 @@ class TestSchedulerJob:
assert session.scalar(select(func.count()).select_from(DagRun)) == 46
assert session.scalar(select(func.count()).where(DagRun.dag_id ==
dag1_dag_id)) == 36
+ @pytest.mark.parametrize(
+ "pause_it, expected_running",
+ [
+ (True, 0),
+ (False, 3),
+ ],
+ )
+ def test_backfill_runs_not_started_when_backfill_paused(
+ self, pause_it, expected_running, dag_maker, session
+ ):
+ """
+ When backfill is paused, will not start.
+ """
+ dag1_dag_id = "test_dag1"
+ with dag_maker(
+ dag_id=dag1_dag_id,
+ start_date=DEFAULT_DATE,
+ schedule=timedelta(days=1),
+ max_active_runs=1,
+ ):
+ EmptyOperator(task_id="mytask")
+
+ def _running_counts():
+ dag1_non_b_running = (
+ session.query(func.count(DagRun.id))
+ .filter(
+ DagRun.dag_id == dag1_dag_id,
+ DagRun.state == State.RUNNING,
+ DagRun.run_type != DagRunType.BACKFILL_JOB,
+ )
+ .scalar()
+ )
+ dag1_b_running = (
+ session.query(func.count(DagRun.id))
+ .filter(
+ DagRun.dag_id == dag1_dag_id,
+ DagRun.state == State.RUNNING,
+ DagRun.run_type == DagRunType.BACKFILL_JOB,
+ )
+ .scalar()
+ )
+ total_running_count = (
+ session.query(func.count(DagRun.id)).filter(DagRun.state ==
State.RUNNING).scalar()
+ )
+ return dag1_non_b_running, dag1_b_running, total_running_count
+
+ scheduler_job = Job()
+ self.job_runner = SchedulerJobRunner(job=scheduler_job,
subdir=os.devnull)
+ scheduler_job.executor = MockExecutor(do_update=False)
+ self.job_runner.processor_agent =
mock.MagicMock(spec=DagFileProcessorAgent)
+
+ from_date = pendulum.parse("2021-01-01")
+ to_date = pendulum.parse("2021-01-06")
+ b = _create_backfill(
+ dag_id=dag1_dag_id,
+ from_date=from_date,
+ to_date=to_date,
+ max_active_runs=3,
+ reverse=False,
+ dag_run_conf={},
+ )
+ dag1_non_b_running, dag1_b_running, total_running = _running_counts()
+
+ # initial state -- nothing is running
+ assert dag1_non_b_running == 0
+ assert dag1_b_running == 0
+ assert total_running == 0
+ assert session.query(func.count(DagRun.id)).scalar() == 6
+ assert session.scalar(select(func.count()).where(DagRun.dag_id ==
dag1_dag_id)) == 6
+
+ if pause_it:
+ b = session.get(Backfill, b.id)
+ b.is_paused = True
+
+ session.commit()
+
+ # now let's run scheduler once
+ self.job_runner._start_queued_dagruns(session)
+ session.flush()
+
+ assert DagRun.DEFAULT_DAGRUNS_TO_EXAMINE == 20
+ dag1_non_b_running, dag1_b_running, total_running = _running_counts()
+ assert dag1_non_b_running == 0
+ assert dag1_b_running == expected_running
+ assert total_running == expected_running
+ assert session.scalar(select(func.count()).select_from(DagRun)) == 6
+ assert session.scalar(select(func.count()).where(DagRun.dag_id ==
dag1_dag_id)) == 6
+
def test_start_queued_dagruns_do_follow_execution_date_order(self,
dag_maker):
session = settings.Session()
with dag_maker("test_dag1", max_active_runs=1):