This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit e0607211a76e4d5fd1e04d8c362b49c6396bebd7
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Mon Sep 19 17:32:17 2022 +0100

    Don't update backfill run from the scheduler (#26342)
    
    Don't update backfill run from the scheduler
    
    When updating the state of paused dags with 'running' dagruns in the 
scheduler, we should not
    update the state of backfill run.
    
    Co-authored-by: Tzu-ping Chung <[email protected]>
    (cherry picked from commit b9c4e98d8f8bcc129cbb4079548bd521cd3981b9)
---
 airflow/jobs/scheduler_job.py    |  9 +++++++--
 tests/jobs/test_scheduler_job.py | 43 ++++++++++++++++++++++++++++++++++++++++
 2 files changed, 50 insertions(+), 2 deletions(-)

diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 1354374505..53a96cf9ac 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -774,7 +774,8 @@ class SchedulerJob(BaseJob):
                     self.log.exception("Exception when executing 
DagFileProcessorAgent.end")
             self.log.info("Exited execute loop")
 
-    def _update_dag_run_state_for_paused_dags(self) -> None:
+    @provide_session
+    def _update_dag_run_state_for_paused_dags(self, session: Session = 
NEW_SESSION) -> None:
         try:
             paused_dag_ids = DagModel.get_all_paused_dag_ids()
             for dag_id in paused_dag_ids:
@@ -784,7 +785,11 @@ class SchedulerJob(BaseJob):
                 dag = SerializedDagModel.get_dag(dag_id)
                 if dag is None:
                     continue
-                dag_runs = DagRun.find(dag_id=dag_id, 
state=DagRunState.RUNNING)
+                dag_runs = session.query(DagRun).filter(
+                    DagRun.dag_id == dag_id,
+                    DagRun.state == DagRunState.RUNNING,
+                    DagRun.run_type != DagRunType.BACKFILL_JOB,
+                )
                 for dag_run in dag_runs:
                     dag_run.dag = dag
                     _, callback_to_run = 
dag_run.update_state(execute_callbacks=False)
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 028522c74e..4284bf02d9 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -4439,6 +4439,49 @@ class TestSchedulerJob:
             .scalar()
         ) > (timezone.utcnow() - timedelta(days=2))
 
+    def test_update_dagrun_state_for_paused_dag_not_for_backfill(self, 
dag_maker, session):
+        """Test that the _update_dagrun_state_for_paused_dag does not affect 
backfilled dagruns"""
+
+        with dag_maker('testdag') as dag:
+            EmptyOperator(task_id='task1')
+
+        # Backfill run
+        backfill_run = 
dag_maker.create_dagrun(run_type=DagRunType.BACKFILL_JOB)
+        ti = backfill_run.get_task_instances()[0]
+        ti.set_state(TaskInstanceState.SUCCESS)
+        dm = DagModel.get_dagmodel(dag.dag_id)
+        dm.is_paused = True
+        session.merge(dm)
+        session.merge(ti)
+        session.flush()
+
+        # scheduled run
+        scheduled_run = dag_maker.create_dagrun(
+            execution_date=datetime.datetime(2022, 1, 1), 
run_type=DagRunType.SCHEDULED
+        )
+        ti = scheduled_run.get_task_instances()[0]
+        ti.set_state(TaskInstanceState.SUCCESS)
+        dm = DagModel.get_dagmodel(dag.dag_id)
+        dm.is_paused = True
+        session.merge(dm)
+        session.merge(ti)
+        session.flush()
+
+        assert dag.dag_id in DagModel.get_all_paused_dag_ids()
+        assert backfill_run.state == State.RUNNING
+        assert scheduled_run.state == State.RUNNING
+
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job.executor = MockExecutor()
+        self.scheduler_job._update_dag_run_state_for_paused_dags()
+        session.flush()
+
+        (backfill_run,) = DagRun.find(dag_id=dag.dag_id, 
run_type=DagRunType.BACKFILL_JOB, session=session)
+        assert backfill_run.state == State.RUNNING
+
+        (scheduled_run,) = DagRun.find(dag_id=dag.dag_id, 
run_type=DagRunType.SCHEDULED, session=session)
+        assert scheduled_run.state == State.SUCCESS
+
 
 @pytest.mark.need_serialized_dag
 def test_schedule_dag_run_with_upstream_skip(dag_maker, session):

Reply via email to