This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v2-8-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit ad43b5be222a151283694d7f70bae0dfae620c74
Author: Aleksey Kirilishin <[email protected]>
AuthorDate: Fri Jan 26 22:41:42 2024 +0300

    fix: DagRuns with UPSTREAM_FAILED tasks get stuck in the backfill. (#36954)
    
    (cherry picked from commit edce53582c7548cdfd6bbe9fd7970c8f4def4155)
---
 airflow/jobs/backfill_job_runner.py                |  8 +++--
 .../test_backfill_with_upstream_failed_task.py     | 36 ++++++++++++++++++++++
 tests/jobs/test_backfill_job.py                    | 30 +++++++++++++++++-
 3 files changed, 70 insertions(+), 4 deletions(-)

diff --git a/airflow/jobs/backfill_job_runner.py 
b/airflow/jobs/backfill_job_runner.py
index af8ebd1c4e..101e6cc313 100644
--- a/airflow/jobs/backfill_job_runner.py
+++ b/airflow/jobs/backfill_job_runner.py
@@ -106,7 +106,7 @@ class BackfillJobRunner(BaseJobRunner, LoggingMixin):
         failed: set[TaskInstanceKey] = attr.ib(factory=set)
         not_ready: set[TaskInstanceKey] = attr.ib(factory=set)
         deadlocked: set[TaskInstance] = attr.ib(factory=set)
-        active_runs: list[DagRun] = attr.ib(factory=list)
+        active_runs: set[DagRun] = attr.ib(factory=set)
         executed_dag_run_dates: set[pendulum.DateTime] = attr.ib(factory=set)
         finished_runs: int = 0
         total_runs: int = 0
@@ -526,6 +526,8 @@ class BackfillJobRunner(BaseJobRunner, LoggingMixin):
                             ti_status.running.pop(key)
                         # Reset the failed task in backfill to scheduled state
                         ti.set_state(TaskInstanceState.SCHEDULED, 
session=session)
+                        if ti.dag_run not in ti_status.active_runs:
+                            ti_status.active_runs.add(ti.dag_run)
                 else:
                     # Default behaviour which works for subdag.
                     if ti.state in (TaskInstanceState.FAILED, 
TaskInstanceState.UPSTREAM_FAILED):
@@ -746,7 +748,7 @@ class BackfillJobRunner(BaseJobRunner, LoggingMixin):
             session.commit()
 
             # update dag run state
-            _dag_runs = ti_status.active_runs[:]
+            _dag_runs = ti_status.active_runs.copy()
             for run in _dag_runs:
                 run.update_state(session=session)
                 if run.state in State.finished_dr_states:
@@ -848,7 +850,7 @@ class BackfillJobRunner(BaseJobRunner, LoggingMixin):
                 dag_run = self._get_dag_run(dagrun_info, dag, session=session)
                 if dag_run is not None:
                     tis_map = self._task_instances_for_dag_run(dag, dag_run, 
session=session)
-                    ti_status.active_runs.append(dag_run)
+                    ti_status.active_runs.add(dag_run)
                     ti_status.to_run.update(tis_map or {})
 
         processed_dag_run_dates = self._process_backfill_task_instances(
diff --git a/tests/dags/test_backfill_with_upstream_failed_task.py 
b/tests/dags/test_backfill_with_upstream_failed_task.py
new file mode 100644
index 0000000000..d2cb6353bf
--- /dev/null
+++ b/tests/dags/test_backfill_with_upstream_failed_task.py
@@ -0,0 +1,36 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import datetime
+
+from airflow.models.dag import DAG
+from airflow.operators.bash import BashOperator
+
+dag = DAG(
+    dag_id="test_backfill_with_upstream_failed_task",
+    default_args={"retries": 0, "start_date": datetime.datetime(2010, 1, 1)},
+    schedule="0 0 * * *",
+)
+
+failing_task = BashOperator(task_id="failing_task", bash_command="exit 1", 
dag=dag)
+downstream_task = BashOperator(task_id="downstream_task", bash_command="echo 
1", dag=dag)
+downstream_task.set_upstream(failing_task)
+
+if __name__ == "__main__":
+    dag.cli()
diff --git a/tests/jobs/test_backfill_job.py b/tests/jobs/test_backfill_job.py
index 88ce758b57..0802f11aa9 100644
--- a/tests/jobs/test_backfill_job.py
+++ b/tests/jobs/test_backfill_job.py
@@ -1916,7 +1916,7 @@ class TestBackfillJob:
         executor = MockExecutor()
 
         ti_status = BackfillJobRunner._DagRunTaskStatus()
-        ti_status.active_runs.append(dr)
+        ti_status.active_runs.add(dr)
         ti_status.to_run = {ti.key: ti for ti in dr.task_instances}
 
         job = Job(executor=executor)
@@ -2103,3 +2103,31 @@ class TestBackfillJob:
         assert dag_run.state == DagRunState.FAILED
 
         dag.clear()
+
+    def test_backfill_failed_dag_with_upstream_failed_task(self, dag_maker):
+        self.dagbag.process_file(str(TEST_DAGS_FOLDER / 
"test_backfill_with_upstream_failed_task.py"))
+        dag = self.dagbag.get_dag("test_backfill_with_upstream_failed_task")
+
+        # We have to use the "fake" version of perform_heartbeat due to the 
'is_unit_test' check in
+        # the original one. However, instead of using the original version of 
perform_heartbeat,
+        # we can simply wait for a LocalExecutor's worker cycle. The approach 
with sleep works well now,
+        # but it can be replaced with checking the state of the LocalTaskJob.
+        def fake_perform_heartbeat(*args, **kwargs):
+            import time
+
+            time.sleep(1)
+
+        with mock.patch("airflow.jobs.backfill_job_runner.perform_heartbeat", 
fake_perform_heartbeat):
+            job = Job(executor=ExecutorLoader.load_executor("LocalExecutor"))
+            job_runner = BackfillJobRunner(
+                job=job,
+                dag=dag,
+                start_date=DEFAULT_DATE,
+                end_date=DEFAULT_DATE,
+                rerun_failed_tasks=True,
+            )
+            with pytest.raises(BackfillUnfinished):
+                run_job(job=job, execute_callable=job_runner._execute)
+
+        dr: DagRun = dag.get_last_dagrun()
+        assert dr.state == State.FAILED

Reply via email to