This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e0b7077745 Add test for behavior for paused backfill (#42837)
e0b7077745 is described below

commit e0b7077745523cacd34176c4b6a1e0e887a2c41c
Author: Daniel Standish <[email protected]>
AuthorDate: Thu Oct 10 14:40:10 2024 -0700

    Add test for behavior for paused backfill (#42837)
    
    
    -----
    Co-authored-by: Kaxil Naik <[email protected]>
---
 tests/jobs/test_scheduler_job.py | 90 +++++++++++++++++++++++++++++++++++++++-
 1 file changed, 89 insertions(+), 1 deletion(-)

diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 639a3528fa..78c3acdce0 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -53,7 +53,7 @@ from airflow.jobs.job import Job, run_job
 from airflow.jobs.local_task_job_runner import LocalTaskJobRunner
 from airflow.jobs.scheduler_job_runner import SchedulerJobRunner
 from airflow.models.asset import AssetDagRunQueue, AssetEvent, AssetModel
-from airflow.models.backfill import _create_backfill
+from airflow.models.backfill import Backfill, _create_backfill
 from airflow.models.dag import DAG, DagModel
 from airflow.models.dagbag import DagBag
 from airflow.models.dagrun import DagRun
@@ -4960,6 +4960,94 @@ class TestSchedulerJob:
         assert session.scalar(select(func.count()).select_from(DagRun)) == 46
         assert session.scalar(select(func.count()).where(DagRun.dag_id == 
dag1_dag_id)) == 36
 
+    @pytest.mark.parametrize(
+        "pause_it, expected_running",
+        [
+            (True, 0),
+            (False, 3),
+        ],
+    )
+    def test_backfill_runs_not_started_when_backfill_paused(
+        self, pause_it, expected_running, dag_maker, session
+    ):
+        """
+        When backfill is paused, will not start.
+        """
+        dag1_dag_id = "test_dag1"
+        with dag_maker(
+            dag_id=dag1_dag_id,
+            start_date=DEFAULT_DATE,
+            schedule=timedelta(days=1),
+            max_active_runs=1,
+        ):
+            EmptyOperator(task_id="mytask")
+
+        def _running_counts():
+            dag1_non_b_running = (
+                session.query(func.count(DagRun.id))
+                .filter(
+                    DagRun.dag_id == dag1_dag_id,
+                    DagRun.state == State.RUNNING,
+                    DagRun.run_type != DagRunType.BACKFILL_JOB,
+                )
+                .scalar()
+            )
+            dag1_b_running = (
+                session.query(func.count(DagRun.id))
+                .filter(
+                    DagRun.dag_id == dag1_dag_id,
+                    DagRun.state == State.RUNNING,
+                    DagRun.run_type == DagRunType.BACKFILL_JOB,
+                )
+                .scalar()
+            )
+            total_running_count = (
+                session.query(func.count(DagRun.id)).filter(DagRun.state == 
State.RUNNING).scalar()
+            )
+            return dag1_non_b_running, dag1_b_running, total_running_count
+
+        scheduler_job = Job()
+        self.job_runner = SchedulerJobRunner(job=scheduler_job, 
subdir=os.devnull)
+        scheduler_job.executor = MockExecutor(do_update=False)
+        self.job_runner.processor_agent = 
mock.MagicMock(spec=DagFileProcessorAgent)
+
+        from_date = pendulum.parse("2021-01-01")
+        to_date = pendulum.parse("2021-01-06")
+        b = _create_backfill(
+            dag_id=dag1_dag_id,
+            from_date=from_date,
+            to_date=to_date,
+            max_active_runs=3,
+            reverse=False,
+            dag_run_conf={},
+        )
+        dag1_non_b_running, dag1_b_running, total_running = _running_counts()
+
+        # initial state -- nothing is running
+        assert dag1_non_b_running == 0
+        assert dag1_b_running == 0
+        assert total_running == 0
+        assert session.query(func.count(DagRun.id)).scalar() == 6
+        assert session.scalar(select(func.count()).where(DagRun.dag_id == 
dag1_dag_id)) == 6
+
+        if pause_it:
+            b = session.get(Backfill, b.id)
+            b.is_paused = True
+
+        session.commit()
+
+        # now let's run scheduler once
+        self.job_runner._start_queued_dagruns(session)
+        session.flush()
+
+        assert DagRun.DEFAULT_DAGRUNS_TO_EXAMINE == 20
+        dag1_non_b_running, dag1_b_running, total_running = _running_counts()
+        assert dag1_non_b_running == 0
+        assert dag1_b_running == expected_running
+        assert total_running == expected_running
+        assert session.scalar(select(func.count()).select_from(DagRun)) == 6
+        assert session.scalar(select(func.count()).where(DagRun.dag_id == 
dag1_dag_id)) == 6
+
     def test_start_queued_dagruns_do_follow_execution_date_order(self, 
dag_maker):
         session = settings.Session()
         with dag_maker("test_dag1", max_active_runs=1):

Reply via email to