This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new ada0f13a2a Add disable retry flag on backfill (#23829)
ada0f13a2a is described below

commit ada0f13a2a738e67433070f2ecba4114065136d2
Author: domagojrazum <[email protected]>
AuthorDate: Mon Nov 7 23:30:14 2022 +0100

    Add disable retry flag on backfill (#23829)
---
 airflow/cli/cli_parser.py              |  6 +++++
 airflow/cli/commands/dag_command.py    |  1 +
 airflow/jobs/backfill_job.py           |  7 ++++++
 airflow/models/dag.py                  |  2 ++
 tests/cli/commands/test_dag_command.py |  4 +++
 tests/jobs/test_backfill_job.py        | 46 ++++++++++++++++++++++++++++++++++
 6 files changed, 66 insertions(+)

diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py
index f905e4a107..f7fd3ad0c4 100644
--- a/airflow/cli/cli_parser.py
+++ b/airflow/cli/cli_parser.py
@@ -351,6 +351,11 @@ ARG_CONTINUE_ON_FAILURES = Arg(
     help=("if set, the backfill will keep going even if some of the tasks 
failed"),
     action="store_true",
 )
+ARG_DISABLE_RETRY = Arg(
+    ("--disable-retry",),
+    help=("if set, the backfill will set tasks as failed without retrying."),
+    action="store_true",
+)
 ARG_RUN_BACKWARDS = Arg(
     (
         "-B",
@@ -1162,6 +1167,7 @@ DAGS_COMMANDS = (
             ARG_DONOT_PICKLE,
             ARG_YES,
             ARG_CONTINUE_ON_FAILURES,
+            ARG_DISABLE_RETRY,
             ARG_BF_IGNORE_DEPENDENCIES,
             ARG_BF_IGNORE_FIRST_DEPENDS_ON_PAST,
             ARG_SUBDIR,
diff --git a/airflow/cli/commands/dag_command.py 
b/airflow/cli/commands/dag_command.py
index 7607a24ddc..026c21816c 100644
--- a/airflow/cli/commands/dag_command.py
+++ b/airflow/cli/commands/dag_command.py
@@ -127,6 +127,7 @@ def dag_backfill(args, dag=None):
                     rerun_failed_tasks=args.rerun_failed_tasks,
                     run_backwards=args.run_backwards,
                     continue_on_failures=args.continue_on_failures,
+                    disable_retry=args.disable_retry,
                 )
             except ValueError as vr:
                 print(str(vr))
diff --git a/airflow/jobs/backfill_job.py b/airflow/jobs/backfill_job.py
index 5ced825ec1..b7808480c5 100644
--- a/airflow/jobs/backfill_job.py
+++ b/airflow/jobs/backfill_job.py
@@ -117,6 +117,7 @@ class BackfillJob(BaseJob):
         run_backwards=False,
         run_at_least_once=False,
         continue_on_failures=False,
+        disable_retry=False,
         *args,
         **kwargs,
     ):
@@ -156,6 +157,7 @@ class BackfillJob(BaseJob):
         self.run_backwards = run_backwards
         self.run_at_least_once = run_at_least_once
         self.continue_on_failures = continue_on_failures
+        self.disable_retry = disable_retry
         super().__init__(*args, **kwargs)
 
     def _update_counters(self, ti_status, session=None):
@@ -628,6 +630,11 @@ class BackfillJob(BaseJob):
                 for new_ti in new_mapped_tis:
                     new_ti.set_state(TaskInstanceState.SCHEDULED, 
session=session)
 
+            # Set state to failed for running TIs that are set up for retry if 
disable-retry flag is set
+            for ti in ti_status.running.values():
+                if self.disable_retry and ti.state == 
TaskInstanceState.UP_FOR_RETRY:
+                    ti.set_state(TaskInstanceState.FAILED, session=session)
+
             # update the task counters
             self._update_counters(ti_status=ti_status, session=session)
             session.commit()
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 94564d2aeb..f2511fbb31 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -2396,6 +2396,7 @@ class DAG(LoggingMixin):
         run_backwards=False,
         run_at_least_once=False,
         continue_on_failures=False,
+        disable_retry=False,
     ):
         """
         Runs the DAG.
@@ -2446,6 +2447,7 @@ class DAG(LoggingMixin):
             run_backwards=run_backwards,
             run_at_least_once=run_at_least_once,
             continue_on_failures=continue_on_failures,
+            disable_retry=disable_retry,
         )
         job.run()
 
diff --git a/tests/cli/commands/test_dag_command.py 
b/tests/cli/commands/test_dag_command.py
index b8f073d18d..025a7bbacd 100644
--- a/tests/cli/commands/test_dag_command.py
+++ b/tests/cli/commands/test_dag_command.py
@@ -130,6 +130,7 @@ class TestCliDags:
             run_backwards=False,
             verbose=False,
             continue_on_failures=False,
+            disable_retry=False,
         )
         mock_run.reset_mock()
         dag = self.dagbag.get_dag("example_bash_operator")
@@ -202,6 +203,7 @@ class TestCliDags:
             run_backwards=False,
             verbose=False,
             continue_on_failures=False,
+            disable_retry=False,
         )
         mock_run.reset_mock()
 
@@ -337,6 +339,7 @@ class TestCliDags:
             run_backwards=False,
             verbose=False,
             continue_on_failures=False,
+            disable_retry=False,
         )
 
     @mock.patch("airflow.cli.commands.dag_command.DAG.run")
@@ -377,6 +380,7 @@ class TestCliDags:
             run_backwards=True,
             verbose=False,
             continue_on_failures=False,
+            disable_retry=False,
         )
 
     def test_next_execution(self):
diff --git a/tests/jobs/test_backfill_job.py b/tests/jobs/test_backfill_job.py
index f519e9c540..2ca658276d 100644
--- a/tests/jobs/test_backfill_job.py
+++ b/tests/jobs/test_backfill_job.py
@@ -1836,3 +1836,49 @@ class TestBackfillJob:
             states = [ti.state for _, ti in tasks_to_run.items()]
             assert TaskInstanceState.SCHEDULED in states
             assert State.NONE in states
+
+    @pytest.mark.parametrize(
+        ["disable_retry", "try_number", "exception"],
+        (
+            (True, 1, BackfillUnfinished),
+            (False, 2, AirflowException),
+        ),
+    )
+    def test_backfill_disable_retry(self, dag_maker, disable_retry, 
try_number, exception):
+        with dag_maker(
+            dag_id="test_disable_retry",
+            schedule_interval="@daily",
+            default_args={
+                "retries": 2,
+                "retry_delay": datetime.timedelta(seconds=3),
+            },
+        ) as dag:
+            task1 = EmptyOperator(task_id="task1")
+        dag_run = dag_maker.create_dagrun(state=None)
+
+        executor = MockExecutor(parallelism=16)
+        executor.mock_task_results[
+            TaskInstanceKey(dag.dag_id, task1.task_id, dag_run.run_id, 
try_number=1)
+        ] = TaskInstanceState.UP_FOR_RETRY
+        executor.mock_task_results[
+            TaskInstanceKey(dag.dag_id, task1.task_id, dag_run.run_id, 
try_number=2)
+        ] = TaskInstanceState.FAILED
+
+        job = BackfillJob(
+            dag=dag,
+            executor=executor,
+            start_date=DEFAULT_DATE,
+            end_date=DEFAULT_DATE,
+            disable_retry=disable_retry,
+        )
+        with pytest.raises(exception):
+            job.run()
+        ti = dag_run.get_task_instance(task_id=task1.task_id)
+
+        assert ti._try_number == try_number
+
+        dag_run.refresh_from_db()
+
+        assert dag_run.state == DagRunState.FAILED
+
+        dag.clear()

Reply via email to