This is an automated email from the ASF dual-hosted git repository. jedcunningham pushed a commit to branch v2-4-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit e0607211a76e4d5fd1e04d8c362b49c6396bebd7 Author: Ephraim Anierobi <[email protected]> AuthorDate: Mon Sep 19 17:32:17 2022 +0100 Don't update backfill run from the scheduler (#26342) Don't update backfill run from the scheduler When updating the state of paused dags with 'running' dagruns in the scheduler, we should not update the state of backfill run. Co-authored-by: Tzu-ping Chung <[email protected]> (cherry picked from commit b9c4e98d8f8bcc129cbb4079548bd521cd3981b9) --- airflow/jobs/scheduler_job.py | 9 +++++++-- tests/jobs/test_scheduler_job.py | 43 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 50 insertions(+), 2 deletions(-) diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index 1354374505..53a96cf9ac 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -774,7 +774,8 @@ class SchedulerJob(BaseJob): self.log.exception("Exception when executing DagFileProcessorAgent.end") self.log.info("Exited execute loop") - def _update_dag_run_state_for_paused_dags(self) -> None: + @provide_session + def _update_dag_run_state_for_paused_dags(self, session: Session = NEW_SESSION) -> None: try: paused_dag_ids = DagModel.get_all_paused_dag_ids() for dag_id in paused_dag_ids: @@ -784,7 +785,11 @@ class SchedulerJob(BaseJob): dag = SerializedDagModel.get_dag(dag_id) if dag is None: continue - dag_runs = DagRun.find(dag_id=dag_id, state=DagRunState.RUNNING) + dag_runs = session.query(DagRun).filter( + DagRun.dag_id == dag_id, + DagRun.state == DagRunState.RUNNING, + DagRun.run_type != DagRunType.BACKFILL_JOB, + ) for dag_run in dag_runs: dag_run.dag = dag _, callback_to_run = dag_run.update_state(execute_callbacks=False) diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 028522c74e..4284bf02d9 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -4439,6 +4439,49 @@ class TestSchedulerJob: .scalar() ) > (timezone.utcnow() - timedelta(days=2)) + def test_update_dagrun_state_for_paused_dag_not_for_backfill(self, dag_maker, session): + """Test that the _update_dagrun_state_for_paused_dag does not affect backfilled dagruns""" + + with dag_maker('testdag') as dag: + EmptyOperator(task_id='task1') + + # Backfill run + backfill_run = dag_maker.create_dagrun(run_type=DagRunType.BACKFILL_JOB) + ti = backfill_run.get_task_instances()[0] + ti.set_state(TaskInstanceState.SUCCESS) + dm = DagModel.get_dagmodel(dag.dag_id) + dm.is_paused = True + session.merge(dm) + session.merge(ti) + session.flush() + + # scheduled run + scheduled_run = dag_maker.create_dagrun( + execution_date=datetime.datetime(2022, 1, 1), run_type=DagRunType.SCHEDULED + ) + ti = scheduled_run.get_task_instances()[0] + ti.set_state(TaskInstanceState.SUCCESS) + dm = DagModel.get_dagmodel(dag.dag_id) + dm.is_paused = True + session.merge(dm) + session.merge(ti) + session.flush() + + assert dag.dag_id in DagModel.get_all_paused_dag_ids() + assert backfill_run.state == State.RUNNING + assert scheduled_run.state == State.RUNNING + + self.scheduler_job = SchedulerJob(subdir=os.devnull) + self.scheduler_job.executor = MockExecutor() + self.scheduler_job._update_dag_run_state_for_paused_dags() + session.flush() + + (backfill_run,) = DagRun.find(dag_id=dag.dag_id, run_type=DagRunType.BACKFILL_JOB, session=session) + assert backfill_run.state == State.RUNNING + + (scheduled_run,) = DagRun.find(dag_id=dag.dag_id, run_type=DagRunType.SCHEDULED, session=session) + assert scheduled_run.state == State.SUCCESS + @pytest.mark.need_serialized_dag def test_schedule_dag_run_with_upstream_skip(dag_maker, session):
