This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 9769a65c20 Fixed backfill interference with scheduler (#22701)
9769a65c20 is described below

commit 9769a65c20f6028d640061efacbc5bfeb5ebaf3d
Author: QP Hou <[email protected]>
AuthorDate: Fri Apr 8 13:13:45 2022 -0700

    Fixed backfill interference with scheduler (#22701)
    
    Co-authored-by: Dmirty Suvorov <[email protected]>
---
 airflow/jobs/backfill_job.py     | 30 +++++++++++++++-
 tests/jobs/test_backfill_job.py  | 76 ++++++++++++++++++++++++++++------------
 tests/jobs/test_scheduler_job.py |  6 ++--
 3 files changed, 86 insertions(+), 26 deletions(-)

diff --git a/airflow/jobs/backfill_job.py b/airflow/jobs/backfill_job.py
index c9f0a48b58..0334a4f20c 100644
--- a/airflow/jobs/backfill_job.py
+++ b/airflow/jobs/backfill_job.py
@@ -702,6 +702,9 @@ class BackfillJob(BaseJob):
 
         return err
 
+    def _get_dag_with_subdags(self):
+        return [self.dag] + self.dag.subdags
+
     @provide_session
     def _execute_dagruns(self, dagrun_infos, ti_status, executor, pickle_id, 
start_date, session=None):
         """
@@ -717,7 +720,7 @@ class BackfillJob(BaseJob):
         :param session: the current session object
         """
         for dagrun_info in dagrun_infos:
-            for dag in [self.dag] + self.dag.subdags:
+            for dag in self._get_dag_with_subdags():
                 dag_run = self._get_dag_run(dagrun_info, dag, session=session)
                 tis_map = self._task_instances_for_dag_run(dag_run, 
session=session)
                 if dag_run is None:
@@ -784,6 +787,31 @@ class BackfillJob(BaseJob):
                 return
             dagrun_infos = [DagRunInfo.interval(dagrun_start_date, 
dagrun_end_date)]
 
+        dag_with_subdags_ids = [d.dag_id for d in self._get_dag_with_subdags()]
+        running_dagruns = DagRun.find(
+            dag_id=dag_with_subdags_ids,
+            execution_start_date=self.bf_start_date,
+            execution_end_date=self.bf_end_date,
+            no_backfills=True,
+            state=DagRunState.RUNNING,
+        )
+
+        if running_dagruns:
+            for run in running_dagruns:
+                self.log.error(
+                    "Backfill cannot be created for DagRun %s in %s, as 
there's already %s in a RUNNING "
+                    "state.",
+                    run.run_id,
+                    run.execution_date.strftime("%Y-%m-%dT%H:%M:%S"),
+                    run.run_type,
+                )
+            self.log.error(
+                "Changing DagRun into BACKFILL would cause scheduler to lose 
track of executing "
+                "tasks. Not changing DagRun type into BACKFILL, and trying 
insert another DagRun into "
+                "database would cause database constraint violation for dag_id 
+ execution_date "
+                "combination. Please adjust backfill dates or wait for this 
DagRun to finish.",
+            )
+            return
         # picklin'
         pickle_id = None
 
diff --git a/tests/jobs/test_backfill_job.py b/tests/jobs/test_backfill_job.py
index f64009ebb8..65caff63a4 100644
--- a/tests/jobs/test_backfill_job.py
+++ b/tests/jobs/test_backfill_job.py
@@ -104,7 +104,7 @@ class TestBackfillJob:
 
     def test_unfinished_dag_runs_set_to_failed(self, dag_maker):
         dag = self._get_dummy_dag(dag_maker)
-        dag_run = dag_maker.create_dagrun()
+        dag_run = dag_maker.create_dagrun(state=None)
 
         job = BackfillJob(
             dag=dag,
@@ -121,7 +121,7 @@ class TestBackfillJob:
 
     def test_dag_run_with_finished_tasks_set_to_success(self, dag_maker):
         dag = self._get_dummy_dag(dag_maker)
-        dag_run = dag_maker.create_dagrun()
+        dag_run = dag_maker.create_dagrun(state=None)
 
         for ti in dag_run.get_task_instances():
             ti.set_state(State.SUCCESS)
@@ -282,7 +282,7 @@ class TestBackfillJob:
 
     def test_backfill_conf(self, dag_maker):
         dag = self._get_dummy_dag(dag_maker, dag_id='test_backfill_conf')
-        dag_maker.create_dagrun()
+        dag_maker.create_dagrun(state=None)
 
         executor = MockExecutor()
 
@@ -311,7 +311,7 @@ class TestBackfillJob:
             dag_id='test_backfill_respect_max_active_tis_per_dag_limit',
             max_active_tis_per_dag=max_active_tis_per_dag,
         )
-        dag_maker.create_dagrun()
+        dag_maker.create_dagrun(state=None)
 
         executor = MockExecutor()
 
@@ -360,7 +360,7 @@ class TestBackfillJob:
     @patch('airflow.jobs.backfill_job.BackfillJob.log')
     def test_backfill_respect_dag_concurrency_limit(self, mock_log, dag_maker):
         dag = self._get_dummy_dag(dag_maker, 
dag_id='test_backfill_respect_concurrency_limit')
-        dag_maker.create_dagrun()
+        dag_maker.create_dagrun(state=None)
         dag.max_active_tasks = 2
 
         executor = MockExecutor()
@@ -414,7 +414,7 @@ class TestBackfillJob:
         set_default_pool_slots(default_pool_slots)
 
         dag = self._get_dummy_dag(dag_maker, 
dag_id='test_backfill_with_no_pool_limit')
-        dag_maker.create_dagrun()
+        dag_maker.create_dagrun(state=None)
 
         executor = MockExecutor()
 
@@ -470,7 +470,7 @@ class TestBackfillJob:
             dag_id='test_backfill_pool_not_found',
             pool='king_pool',
         )
-        dag_maker.create_dagrun()
+        dag_maker.create_dagrun(state=None)
 
         executor = MockExecutor()
 
@@ -503,7 +503,7 @@ class TestBackfillJob:
             dag_id='test_backfill_respect_pool_limit',
             pool=pool.pool,
         )
-        dag_maker.create_dagrun()
+        dag_maker.create_dagrun(state=None)
 
         executor = MockExecutor()
 
@@ -553,7 +553,7 @@ class TestBackfillJob:
         dag = self._get_dummy_dag(
             dag_maker, dag_id="test_backfill_run_rescheduled", 
task_id="test_backfill_run_rescheduled_task-1"
         )
-        dag_maker.create_dagrun()
+        dag_maker.create_dagrun(state=None)
 
         executor = MockExecutor()
 
@@ -586,6 +586,7 @@ class TestBackfillJob:
             dag_maker, dag_id="test_backfill_override_conf", 
task_id="test_backfill_override_conf-1"
         )
         dr = dag_maker.create_dagrun(
+            state=None,
             start_date=DEFAULT_DATE,
         )
 
@@ -608,11 +609,41 @@ class TestBackfillJob:
             dr = wrapped_task_instances_for_dag_run.call_args_list[0][0][0]
             assert dr.conf == {"a": 1}
 
+    def test_backfill_skip_active_scheduled_dagrun(self, dag_maker, caplog):
+        dag = self._get_dummy_dag(
+            dag_maker,
+            dag_id="test_backfill_skip_active_scheduled_dagrun",
+            task_id="test_backfill_skip_active_scheduled_dagrun-1",
+        )
+        dag_maker.create_dagrun(
+            run_type=DagRunType.SCHEDULED,
+            state=State.RUNNING,
+        )
+
+        executor = MockExecutor()
+
+        job = BackfillJob(
+            dag=dag,
+            executor=executor,
+            start_date=DEFAULT_DATE,
+            end_date=DEFAULT_DATE + datetime.timedelta(days=2),
+        )
+        job.run()
+        error_log_records = [record for record in caplog.records if 
record.levelname == "ERROR"]
+        assert "Backfill cannot be created for DagRun" in 
error_log_records[0].msg
+
+        ti = TI(
+            task=dag.get_task('test_backfill_skip_active_scheduled_dagrun-1'), 
execution_date=DEFAULT_DATE
+        )
+        ti.refresh_from_db()
+        # since DAG backfill is skipped, task state should be none
+        assert ti.state == State.NONE
+
     def test_backfill_rerun_failed_tasks(self, dag_maker):
         dag = self._get_dummy_dag(
             dag_maker, dag_id="test_backfill_rerun_failed", 
task_id="test_backfill_rerun_failed_task-1"
         )
-        dag_maker.create_dagrun()
+        dag_maker.create_dagrun(state=None)
 
         executor = MockExecutor()
 
@@ -646,7 +677,7 @@ class TestBackfillJob:
             op1 = 
DummyOperator(task_id='test_backfill_rerun_upstream_failed_task-1')
             op2 = 
DummyOperator(task_id='test_backfill_rerun_upstream_failed_task-2')
             op1.set_upstream(op2)
-        dag_maker.create_dagrun()
+        dag_maker.create_dagrun(state=None)
 
         executor = MockExecutor()
 
@@ -678,7 +709,7 @@ class TestBackfillJob:
         dag = self._get_dummy_dag(
             dag_maker, dag_id='test_backfill_rerun_failed', 
task_id='test_backfill_rerun_failed_task-1'
         )
-        dag_maker.create_dagrun()
+        dag_maker.create_dagrun(state=None)
 
         executor = MockExecutor()
 
@@ -715,7 +746,7 @@ class TestBackfillJob:
             },
         ) as dag:
             task1 = DummyOperator(task_id="task1")
-        dag_maker.create_dagrun()
+        dag_maker.create_dagrun(state=None)
 
         executor = MockExecutor(parallelism=16)
         executor.mock_task_results[
@@ -742,7 +773,7 @@ class TestBackfillJob:
             },
         ) as dag:
             task1 = DummyOperator(task_id="task1")
-        dr = dag_maker.create_dagrun()
+        dr = dag_maker.create_dagrun(state=None)
 
         executor = MockExecutor(parallelism=16)
         executor.mock_task_results[
@@ -924,7 +955,7 @@ class TestBackfillJob:
         dag = self._get_dag_test_max_active_limits(
             dag_maker, dag_id='test_backfill_max_limit_check_within_limit', 
max_active_runs=16
         )
-        dag_maker.create_dagrun()
+        dag_maker.create_dagrun(state=None)
         start_date = DEFAULT_DATE - datetime.timedelta(hours=1)
         end_date = DEFAULT_DATE
 
@@ -956,6 +987,7 @@ class TestBackfillJob:
                         dag_id=dag_id,
                     )
                     dag_maker.create_dagrun(
+                        state=None,
                         # Existing dagrun that is not within the backfill range
                         run_id=run_id,
                         execution_date=DEFAULT_DATE + 
datetime.timedelta(hours=1),
@@ -1010,7 +1042,7 @@ class TestBackfillJob:
         dag = self._get_dag_test_max_active_limits(
             dag_maker, dag_id='test_backfill_max_limit_check_no_count_existing'
         )
-        dag_maker.create_dagrun()
+        dag_maker.create_dagrun(state=None)
 
         executor = MockExecutor()
         job = BackfillJob(
@@ -1030,7 +1062,7 @@ class TestBackfillJob:
         dag = self._get_dag_test_max_active_limits(
             dag_maker, dag_id='test_backfill_max_limit_check_complete_loop'
         )
-        dag_maker.create_dagrun()
+        dag_maker.create_dagrun(state=None)
         start_date = DEFAULT_DATE - datetime.timedelta(hours=1)
         end_date = DEFAULT_DATE
 
@@ -1064,7 +1096,7 @@ class TestBackfillJob:
             op4.set_downstream(op5)
             op3.set_downstream(op4)
 
-        dr = dag_maker.create_dagrun()
+        dr = dag_maker.create_dagrun(state=None)
 
         executor = MockExecutor()
         sub_dag = dag.partial_subset(
@@ -1090,7 +1122,7 @@ class TestBackfillJob:
             op5 = DummyOperator(task_id='op5')
             op6 = DummyOperator(task_id='op6')
 
-        dr = dag_maker.create_dagrun()
+        dr = dag_maker.create_dagrun(state=None)
 
         executor = MockExecutor()
 
@@ -1269,7 +1301,7 @@ class TestBackfillJob:
     def test_update_counters(self, dag_maker, session):
         with dag_maker(dag_id='test_manage_executor_state', 
start_date=DEFAULT_DATE, session=session) as dag:
             task1 = DummyOperator(task_id='dummy', owner='airflow')
-        dr = dag_maker.create_dagrun()
+        dr = dag_maker.create_dagrun(state=None)
         job = BackfillJob(dag=dag)
 
         ti = TI(task1, dr.execution_date)
@@ -1438,7 +1470,7 @@ class TestBackfillJob:
         job = BackfillJob(dag=dag)
 
         # create dagruns
-        dr1 = dag_maker.create_dagrun()
+        dr1 = dag_maker.create_dagrun(state=State.RUNNING)
         dr2 = dag.create_dagrun(run_id='test2', state=State.SUCCESS)
 
         # create taskinstances and set states
@@ -1523,7 +1555,7 @@ class TestBackfillJob:
             DummyOperator(task_id="dummy_task", dag=dag)
 
         job = BackfillJob(
-            dag=dag, executor=MockExecutor(), 
start_date=datetime.datetime.now() - datetime.timedelta(days=1)
+            dag=dag, executor=MockExecutor(), start_date=timezone.utcnow() - 
datetime.timedelta(days=1)
         )
         job.run()
         dr: DagRun = dag.get_last_dagrun()
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index f0f8c52b07..80e3cb1f34 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -1760,7 +1760,7 @@ class TestSchedulerJob:
         dr = dag.create_dagrun(
             run_type=DagRunType.SCHEDULED,
             execution_date=dagrun_info.logical_date,
-            state=State.RUNNING,
+            state=None,
             session=session,
         )
 
@@ -1769,7 +1769,7 @@ class TestSchedulerJob:
             dr = dag.create_dagrun(
                 run_type=DagRunType.SCHEDULED,
                 execution_date=dr.data_interval_end,
-                state=State.RUNNING,
+                state=None,
                 session=session,
             )
         ex_date = dr.execution_date
@@ -1851,7 +1851,7 @@ class TestSchedulerJob:
         dr = dag.create_dagrun(
             run_type=DagRunType.SCHEDULED,
             execution_date=DEFAULT_DATE,
-            state=State.RUNNING,
+            state=None,
         )
         self.null_exec.mock_task_fail(dag_id, 'test_dagrun_fail', dr.run_id)
 

Reply via email to