This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch v2-8-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit ad43b5be222a151283694d7f70bae0dfae620c74 Author: Aleksey Kirilishin <[email protected]> AuthorDate: Fri Jan 26 22:41:42 2024 +0300 fix: DagRuns with UPSTREAM_FAILED tasks get stuck in the backfill. (#36954) (cherry picked from commit edce53582c7548cdfd6bbe9fd7970c8f4def4155) --- airflow/jobs/backfill_job_runner.py | 8 +++-- .../test_backfill_with_upstream_failed_task.py | 36 ++++++++++++++++++++++ tests/jobs/test_backfill_job.py | 30 +++++++++++++++++- 3 files changed, 70 insertions(+), 4 deletions(-) diff --git a/airflow/jobs/backfill_job_runner.py b/airflow/jobs/backfill_job_runner.py index af8ebd1c4e..101e6cc313 100644 --- a/airflow/jobs/backfill_job_runner.py +++ b/airflow/jobs/backfill_job_runner.py @@ -106,7 +106,7 @@ class BackfillJobRunner(BaseJobRunner, LoggingMixin): failed: set[TaskInstanceKey] = attr.ib(factory=set) not_ready: set[TaskInstanceKey] = attr.ib(factory=set) deadlocked: set[TaskInstance] = attr.ib(factory=set) - active_runs: list[DagRun] = attr.ib(factory=list) + active_runs: set[DagRun] = attr.ib(factory=set) executed_dag_run_dates: set[pendulum.DateTime] = attr.ib(factory=set) finished_runs: int = 0 total_runs: int = 0 @@ -526,6 +526,8 @@ class BackfillJobRunner(BaseJobRunner, LoggingMixin): ti_status.running.pop(key) # Reset the failed task in backfill to scheduled state ti.set_state(TaskInstanceState.SCHEDULED, session=session) + if ti.dag_run not in ti_status.active_runs: + ti_status.active_runs.add(ti.dag_run) else: # Default behaviour which works for subdag. if ti.state in (TaskInstanceState.FAILED, TaskInstanceState.UPSTREAM_FAILED): @@ -746,7 +748,7 @@ class BackfillJobRunner(BaseJobRunner, LoggingMixin): session.commit() # update dag run state - _dag_runs = ti_status.active_runs[:] + _dag_runs = ti_status.active_runs.copy() for run in _dag_runs: run.update_state(session=session) if run.state in State.finished_dr_states: @@ -848,7 +850,7 @@ class BackfillJobRunner(BaseJobRunner, LoggingMixin): dag_run = self._get_dag_run(dagrun_info, dag, session=session) if dag_run is not None: tis_map = self._task_instances_for_dag_run(dag, dag_run, session=session) - ti_status.active_runs.append(dag_run) + ti_status.active_runs.add(dag_run) ti_status.to_run.update(tis_map or {}) processed_dag_run_dates = self._process_backfill_task_instances( diff --git a/tests/dags/test_backfill_with_upstream_failed_task.py b/tests/dags/test_backfill_with_upstream_failed_task.py new file mode 100644 index 0000000000..d2cb6353bf --- /dev/null +++ b/tests/dags/test_backfill_with_upstream_failed_task.py @@ -0,0 +1,36 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import datetime + +from airflow.models.dag import DAG +from airflow.operators.bash import BashOperator + +dag = DAG( + dag_id="test_backfill_with_upstream_failed_task", + default_args={"retries": 0, "start_date": datetime.datetime(2010, 1, 1)}, + schedule="0 0 * * *", +) + +failing_task = BashOperator(task_id="failing_task", bash_command="exit 1", dag=dag) +downstream_task = BashOperator(task_id="downstream_task", bash_command="echo 1", dag=dag) +downstream_task.set_upstream(failing_task) + +if __name__ == "__main__": + dag.cli() diff --git a/tests/jobs/test_backfill_job.py b/tests/jobs/test_backfill_job.py index 88ce758b57..0802f11aa9 100644 --- a/tests/jobs/test_backfill_job.py +++ b/tests/jobs/test_backfill_job.py @@ -1916,7 +1916,7 @@ class TestBackfillJob: executor = MockExecutor() ti_status = BackfillJobRunner._DagRunTaskStatus() - ti_status.active_runs.append(dr) + ti_status.active_runs.add(dr) ti_status.to_run = {ti.key: ti for ti in dr.task_instances} job = Job(executor=executor) @@ -2103,3 +2103,31 @@ class TestBackfillJob: assert dag_run.state == DagRunState.FAILED dag.clear() + + def test_backfill_failed_dag_with_upstream_failed_task(self, dag_maker): + self.dagbag.process_file(str(TEST_DAGS_FOLDER / "test_backfill_with_upstream_failed_task.py")) + dag = self.dagbag.get_dag("test_backfill_with_upstream_failed_task") + + # We have to use the "fake" version of perform_heartbeat due to the 'is_unit_test' check in + # the original one. However, instead of using the original version of perform_heartbeat, + # we can simply wait for a LocalExecutor's worker cycle. The approach with sleep works well now, + # but it can be replaced with checking the state of the LocalTaskJob. + def fake_perform_heartbeat(*args, **kwargs): + import time + + time.sleep(1) + + with mock.patch("airflow.jobs.backfill_job_runner.perform_heartbeat", fake_perform_heartbeat): + job = Job(executor=ExecutorLoader.load_executor("LocalExecutor")) + job_runner = BackfillJobRunner( + job=job, + dag=dag, + start_date=DEFAULT_DATE, + end_date=DEFAULT_DATE, + rerun_failed_tasks=True, + ) + with pytest.raises(BackfillUnfinished): + run_job(job=job, execute_callable=job_runner._execute) + + dr: DagRun = dag.get_last_dagrun() + assert dr.state == State.FAILED
