This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 9769a65c20 Fixed backfill interference with scheduler (#22701)
9769a65c20 is described below
commit 9769a65c20f6028d640061efacbc5bfeb5ebaf3d
Author: QP Hou <[email protected]>
AuthorDate: Fri Apr 8 13:13:45 2022 -0700
Fixed backfill interference with scheduler (#22701)
Co-authored-by: Dmirty Suvorov <[email protected]>
---
airflow/jobs/backfill_job.py | 30 +++++++++++++++-
tests/jobs/test_backfill_job.py | 76 ++++++++++++++++++++++++++++------------
tests/jobs/test_scheduler_job.py | 6 ++--
3 files changed, 86 insertions(+), 26 deletions(-)
diff --git a/airflow/jobs/backfill_job.py b/airflow/jobs/backfill_job.py
index c9f0a48b58..0334a4f20c 100644
--- a/airflow/jobs/backfill_job.py
+++ b/airflow/jobs/backfill_job.py
@@ -702,6 +702,9 @@ class BackfillJob(BaseJob):
return err
+ def _get_dag_with_subdags(self):
+ return [self.dag] + self.dag.subdags
+
@provide_session
def _execute_dagruns(self, dagrun_infos, ti_status, executor, pickle_id,
start_date, session=None):
"""
@@ -717,7 +720,7 @@ class BackfillJob(BaseJob):
:param session: the current session object
"""
for dagrun_info in dagrun_infos:
- for dag in [self.dag] + self.dag.subdags:
+ for dag in self._get_dag_with_subdags():
dag_run = self._get_dag_run(dagrun_info, dag, session=session)
tis_map = self._task_instances_for_dag_run(dag_run,
session=session)
if dag_run is None:
@@ -784,6 +787,31 @@ class BackfillJob(BaseJob):
return
dagrun_infos = [DagRunInfo.interval(dagrun_start_date,
dagrun_end_date)]
+ dag_with_subdags_ids = [d.dag_id for d in self._get_dag_with_subdags()]
+ running_dagruns = DagRun.find(
+ dag_id=dag_with_subdags_ids,
+ execution_start_date=self.bf_start_date,
+ execution_end_date=self.bf_end_date,
+ no_backfills=True,
+ state=DagRunState.RUNNING,
+ )
+
+ if running_dagruns:
+ for run in running_dagruns:
+ self.log.error(
+ "Backfill cannot be created for DagRun %s in %s, as
there's already %s in a RUNNING "
+ "state.",
+ run.run_id,
+ run.execution_date.strftime("%Y-%m-%dT%H:%M:%S"),
+ run.run_type,
+ )
+ self.log.error(
+ "Changing DagRun into BACKFILL would cause scheduler to lose
track of executing "
+ "tasks. Not changing DagRun type into BACKFILL, and trying
insert another DagRun into "
+ "database would cause database constraint violation for dag_id
+ execution_date "
+ "combination. Please adjust backfill dates or wait for this
DagRun to finish.",
+ )
+ return
# picklin'
pickle_id = None
diff --git a/tests/jobs/test_backfill_job.py b/tests/jobs/test_backfill_job.py
index f64009ebb8..65caff63a4 100644
--- a/tests/jobs/test_backfill_job.py
+++ b/tests/jobs/test_backfill_job.py
@@ -104,7 +104,7 @@ class TestBackfillJob:
def test_unfinished_dag_runs_set_to_failed(self, dag_maker):
dag = self._get_dummy_dag(dag_maker)
- dag_run = dag_maker.create_dagrun()
+ dag_run = dag_maker.create_dagrun(state=None)
job = BackfillJob(
dag=dag,
@@ -121,7 +121,7 @@ class TestBackfillJob:
def test_dag_run_with_finished_tasks_set_to_success(self, dag_maker):
dag = self._get_dummy_dag(dag_maker)
- dag_run = dag_maker.create_dagrun()
+ dag_run = dag_maker.create_dagrun(state=None)
for ti in dag_run.get_task_instances():
ti.set_state(State.SUCCESS)
@@ -282,7 +282,7 @@ class TestBackfillJob:
def test_backfill_conf(self, dag_maker):
dag = self._get_dummy_dag(dag_maker, dag_id='test_backfill_conf')
- dag_maker.create_dagrun()
+ dag_maker.create_dagrun(state=None)
executor = MockExecutor()
@@ -311,7 +311,7 @@ class TestBackfillJob:
dag_id='test_backfill_respect_max_active_tis_per_dag_limit',
max_active_tis_per_dag=max_active_tis_per_dag,
)
- dag_maker.create_dagrun()
+ dag_maker.create_dagrun(state=None)
executor = MockExecutor()
@@ -360,7 +360,7 @@ class TestBackfillJob:
@patch('airflow.jobs.backfill_job.BackfillJob.log')
def test_backfill_respect_dag_concurrency_limit(self, mock_log, dag_maker):
dag = self._get_dummy_dag(dag_maker,
dag_id='test_backfill_respect_concurrency_limit')
- dag_maker.create_dagrun()
+ dag_maker.create_dagrun(state=None)
dag.max_active_tasks = 2
executor = MockExecutor()
@@ -414,7 +414,7 @@ class TestBackfillJob:
set_default_pool_slots(default_pool_slots)
dag = self._get_dummy_dag(dag_maker,
dag_id='test_backfill_with_no_pool_limit')
- dag_maker.create_dagrun()
+ dag_maker.create_dagrun(state=None)
executor = MockExecutor()
@@ -470,7 +470,7 @@ class TestBackfillJob:
dag_id='test_backfill_pool_not_found',
pool='king_pool',
)
- dag_maker.create_dagrun()
+ dag_maker.create_dagrun(state=None)
executor = MockExecutor()
@@ -503,7 +503,7 @@ class TestBackfillJob:
dag_id='test_backfill_respect_pool_limit',
pool=pool.pool,
)
- dag_maker.create_dagrun()
+ dag_maker.create_dagrun(state=None)
executor = MockExecutor()
@@ -553,7 +553,7 @@ class TestBackfillJob:
dag = self._get_dummy_dag(
dag_maker, dag_id="test_backfill_run_rescheduled",
task_id="test_backfill_run_rescheduled_task-1"
)
- dag_maker.create_dagrun()
+ dag_maker.create_dagrun(state=None)
executor = MockExecutor()
@@ -586,6 +586,7 @@ class TestBackfillJob:
dag_maker, dag_id="test_backfill_override_conf",
task_id="test_backfill_override_conf-1"
)
dr = dag_maker.create_dagrun(
+ state=None,
start_date=DEFAULT_DATE,
)
@@ -608,11 +609,41 @@ class TestBackfillJob:
dr = wrapped_task_instances_for_dag_run.call_args_list[0][0][0]
assert dr.conf == {"a": 1}
+ def test_backfill_skip_active_scheduled_dagrun(self, dag_maker, caplog):
+ dag = self._get_dummy_dag(
+ dag_maker,
+ dag_id="test_backfill_skip_active_scheduled_dagrun",
+ task_id="test_backfill_skip_active_scheduled_dagrun-1",
+ )
+ dag_maker.create_dagrun(
+ run_type=DagRunType.SCHEDULED,
+ state=State.RUNNING,
+ )
+
+ executor = MockExecutor()
+
+ job = BackfillJob(
+ dag=dag,
+ executor=executor,
+ start_date=DEFAULT_DATE,
+ end_date=DEFAULT_DATE + datetime.timedelta(days=2),
+ )
+ job.run()
+ error_log_records = [record for record in caplog.records if
record.levelname == "ERROR"]
+ assert "Backfill cannot be created for DagRun" in
error_log_records[0].msg
+
+ ti = TI(
+ task=dag.get_task('test_backfill_skip_active_scheduled_dagrun-1'),
execution_date=DEFAULT_DATE
+ )
+ ti.refresh_from_db()
+ # since DAG backfill is skipped, task state should be none
+ assert ti.state == State.NONE
+
def test_backfill_rerun_failed_tasks(self, dag_maker):
dag = self._get_dummy_dag(
dag_maker, dag_id="test_backfill_rerun_failed",
task_id="test_backfill_rerun_failed_task-1"
)
- dag_maker.create_dagrun()
+ dag_maker.create_dagrun(state=None)
executor = MockExecutor()
@@ -646,7 +677,7 @@ class TestBackfillJob:
op1 =
DummyOperator(task_id='test_backfill_rerun_upstream_failed_task-1')
op2 =
DummyOperator(task_id='test_backfill_rerun_upstream_failed_task-2')
op1.set_upstream(op2)
- dag_maker.create_dagrun()
+ dag_maker.create_dagrun(state=None)
executor = MockExecutor()
@@ -678,7 +709,7 @@ class TestBackfillJob:
dag = self._get_dummy_dag(
dag_maker, dag_id='test_backfill_rerun_failed',
task_id='test_backfill_rerun_failed_task-1'
)
- dag_maker.create_dagrun()
+ dag_maker.create_dagrun(state=None)
executor = MockExecutor()
@@ -715,7 +746,7 @@ class TestBackfillJob:
},
) as dag:
task1 = DummyOperator(task_id="task1")
- dag_maker.create_dagrun()
+ dag_maker.create_dagrun(state=None)
executor = MockExecutor(parallelism=16)
executor.mock_task_results[
@@ -742,7 +773,7 @@ class TestBackfillJob:
},
) as dag:
task1 = DummyOperator(task_id="task1")
- dr = dag_maker.create_dagrun()
+ dr = dag_maker.create_dagrun(state=None)
executor = MockExecutor(parallelism=16)
executor.mock_task_results[
@@ -924,7 +955,7 @@ class TestBackfillJob:
dag = self._get_dag_test_max_active_limits(
dag_maker, dag_id='test_backfill_max_limit_check_within_limit',
max_active_runs=16
)
- dag_maker.create_dagrun()
+ dag_maker.create_dagrun(state=None)
start_date = DEFAULT_DATE - datetime.timedelta(hours=1)
end_date = DEFAULT_DATE
@@ -956,6 +987,7 @@ class TestBackfillJob:
dag_id=dag_id,
)
dag_maker.create_dagrun(
+ state=None,
# Existing dagrun that is not within the backfill range
run_id=run_id,
execution_date=DEFAULT_DATE +
datetime.timedelta(hours=1),
@@ -1010,7 +1042,7 @@ class TestBackfillJob:
dag = self._get_dag_test_max_active_limits(
dag_maker, dag_id='test_backfill_max_limit_check_no_count_existing'
)
- dag_maker.create_dagrun()
+ dag_maker.create_dagrun(state=None)
executor = MockExecutor()
job = BackfillJob(
@@ -1030,7 +1062,7 @@ class TestBackfillJob:
dag = self._get_dag_test_max_active_limits(
dag_maker, dag_id='test_backfill_max_limit_check_complete_loop'
)
- dag_maker.create_dagrun()
+ dag_maker.create_dagrun(state=None)
start_date = DEFAULT_DATE - datetime.timedelta(hours=1)
end_date = DEFAULT_DATE
@@ -1064,7 +1096,7 @@ class TestBackfillJob:
op4.set_downstream(op5)
op3.set_downstream(op4)
- dr = dag_maker.create_dagrun()
+ dr = dag_maker.create_dagrun(state=None)
executor = MockExecutor()
sub_dag = dag.partial_subset(
@@ -1090,7 +1122,7 @@ class TestBackfillJob:
op5 = DummyOperator(task_id='op5')
op6 = DummyOperator(task_id='op6')
- dr = dag_maker.create_dagrun()
+ dr = dag_maker.create_dagrun(state=None)
executor = MockExecutor()
@@ -1269,7 +1301,7 @@ class TestBackfillJob:
def test_update_counters(self, dag_maker, session):
with dag_maker(dag_id='test_manage_executor_state',
start_date=DEFAULT_DATE, session=session) as dag:
task1 = DummyOperator(task_id='dummy', owner='airflow')
- dr = dag_maker.create_dagrun()
+ dr = dag_maker.create_dagrun(state=None)
job = BackfillJob(dag=dag)
ti = TI(task1, dr.execution_date)
@@ -1438,7 +1470,7 @@ class TestBackfillJob:
job = BackfillJob(dag=dag)
# create dagruns
- dr1 = dag_maker.create_dagrun()
+ dr1 = dag_maker.create_dagrun(state=State.RUNNING)
dr2 = dag.create_dagrun(run_id='test2', state=State.SUCCESS)
# create taskinstances and set states
@@ -1523,7 +1555,7 @@ class TestBackfillJob:
DummyOperator(task_id="dummy_task", dag=dag)
job = BackfillJob(
- dag=dag, executor=MockExecutor(),
start_date=datetime.datetime.now() - datetime.timedelta(days=1)
+ dag=dag, executor=MockExecutor(), start_date=timezone.utcnow() -
datetime.timedelta(days=1)
)
job.run()
dr: DagRun = dag.get_last_dagrun()
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index f0f8c52b07..80e3cb1f34 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -1760,7 +1760,7 @@ class TestSchedulerJob:
dr = dag.create_dagrun(
run_type=DagRunType.SCHEDULED,
execution_date=dagrun_info.logical_date,
- state=State.RUNNING,
+ state=None,
session=session,
)
@@ -1769,7 +1769,7 @@ class TestSchedulerJob:
dr = dag.create_dagrun(
run_type=DagRunType.SCHEDULED,
execution_date=dr.data_interval_end,
- state=State.RUNNING,
+ state=None,
session=session,
)
ex_date = dr.execution_date
@@ -1851,7 +1851,7 @@ class TestSchedulerJob:
dr = dag.create_dagrun(
run_type=DagRunType.SCHEDULED,
execution_date=DEFAULT_DATE,
- state=State.RUNNING,
+ state=None,
)
self.null_exec.mock_task_fail(dag_id, 'test_dagrun_fail', dr.run_id)