This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ada0f13a2a Add disable retry flag on backfill (#23829)
ada0f13a2a is described below
commit ada0f13a2a738e67433070f2ecba4114065136d2
Author: domagojrazum <[email protected]>
AuthorDate: Mon Nov 7 23:30:14 2022 +0100
Add disable retry flag on backfill (#23829)
---
airflow/cli/cli_parser.py | 6 +++++
airflow/cli/commands/dag_command.py | 1 +
airflow/jobs/backfill_job.py | 7 ++++++
airflow/models/dag.py | 2 ++
tests/cli/commands/test_dag_command.py | 4 +++
tests/jobs/test_backfill_job.py | 46 ++++++++++++++++++++++++++++++++++
6 files changed, 66 insertions(+)
diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py
index f905e4a107..f7fd3ad0c4 100644
--- a/airflow/cli/cli_parser.py
+++ b/airflow/cli/cli_parser.py
@@ -351,6 +351,11 @@ ARG_CONTINUE_ON_FAILURES = Arg(
help=("if set, the backfill will keep going even if some of the tasks
failed"),
action="store_true",
)
+ARG_DISABLE_RETRY = Arg(
+ ("--disable-retry",),
+ help=("if set, the backfill will set tasks as failed without retrying."),
+ action="store_true",
+)
ARG_RUN_BACKWARDS = Arg(
(
"-B",
@@ -1162,6 +1167,7 @@ DAGS_COMMANDS = (
ARG_DONOT_PICKLE,
ARG_YES,
ARG_CONTINUE_ON_FAILURES,
+ ARG_DISABLE_RETRY,
ARG_BF_IGNORE_DEPENDENCIES,
ARG_BF_IGNORE_FIRST_DEPENDS_ON_PAST,
ARG_SUBDIR,
diff --git a/airflow/cli/commands/dag_command.py
b/airflow/cli/commands/dag_command.py
index 7607a24ddc..026c21816c 100644
--- a/airflow/cli/commands/dag_command.py
+++ b/airflow/cli/commands/dag_command.py
@@ -127,6 +127,7 @@ def dag_backfill(args, dag=None):
rerun_failed_tasks=args.rerun_failed_tasks,
run_backwards=args.run_backwards,
continue_on_failures=args.continue_on_failures,
+ disable_retry=args.disable_retry,
)
except ValueError as vr:
print(str(vr))
diff --git a/airflow/jobs/backfill_job.py b/airflow/jobs/backfill_job.py
index 5ced825ec1..b7808480c5 100644
--- a/airflow/jobs/backfill_job.py
+++ b/airflow/jobs/backfill_job.py
@@ -117,6 +117,7 @@ class BackfillJob(BaseJob):
run_backwards=False,
run_at_least_once=False,
continue_on_failures=False,
+ disable_retry=False,
*args,
**kwargs,
):
@@ -156,6 +157,7 @@ class BackfillJob(BaseJob):
self.run_backwards = run_backwards
self.run_at_least_once = run_at_least_once
self.continue_on_failures = continue_on_failures
+ self.disable_retry = disable_retry
super().__init__(*args, **kwargs)
def _update_counters(self, ti_status, session=None):
@@ -628,6 +630,11 @@ class BackfillJob(BaseJob):
for new_ti in new_mapped_tis:
new_ti.set_state(TaskInstanceState.SCHEDULED,
session=session)
+ # Set state to failed for running TIs that are set up for retry if
disable-retry flag is set
+ for ti in ti_status.running.values():
+ if self.disable_retry and ti.state ==
TaskInstanceState.UP_FOR_RETRY:
+ ti.set_state(TaskInstanceState.FAILED, session=session)
+
# update the task counters
self._update_counters(ti_status=ti_status, session=session)
session.commit()
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 94564d2aeb..f2511fbb31 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -2396,6 +2396,7 @@ class DAG(LoggingMixin):
run_backwards=False,
run_at_least_once=False,
continue_on_failures=False,
+ disable_retry=False,
):
"""
Runs the DAG.
@@ -2446,6 +2447,7 @@ class DAG(LoggingMixin):
run_backwards=run_backwards,
run_at_least_once=run_at_least_once,
continue_on_failures=continue_on_failures,
+ disable_retry=disable_retry,
)
job.run()
diff --git a/tests/cli/commands/test_dag_command.py
b/tests/cli/commands/test_dag_command.py
index b8f073d18d..025a7bbacd 100644
--- a/tests/cli/commands/test_dag_command.py
+++ b/tests/cli/commands/test_dag_command.py
@@ -130,6 +130,7 @@ class TestCliDags:
run_backwards=False,
verbose=False,
continue_on_failures=False,
+ disable_retry=False,
)
mock_run.reset_mock()
dag = self.dagbag.get_dag("example_bash_operator")
@@ -202,6 +203,7 @@ class TestCliDags:
run_backwards=False,
verbose=False,
continue_on_failures=False,
+ disable_retry=False,
)
mock_run.reset_mock()
@@ -337,6 +339,7 @@ class TestCliDags:
run_backwards=False,
verbose=False,
continue_on_failures=False,
+ disable_retry=False,
)
@mock.patch("airflow.cli.commands.dag_command.DAG.run")
@@ -377,6 +380,7 @@ class TestCliDags:
run_backwards=True,
verbose=False,
continue_on_failures=False,
+ disable_retry=False,
)
def test_next_execution(self):
diff --git a/tests/jobs/test_backfill_job.py b/tests/jobs/test_backfill_job.py
index f519e9c540..2ca658276d 100644
--- a/tests/jobs/test_backfill_job.py
+++ b/tests/jobs/test_backfill_job.py
@@ -1836,3 +1836,49 @@ class TestBackfillJob:
states = [ti.state for _, ti in tasks_to_run.items()]
assert TaskInstanceState.SCHEDULED in states
assert State.NONE in states
+
+ @pytest.mark.parametrize(
+ ["disable_retry", "try_number", "exception"],
+ (
+ (True, 1, BackfillUnfinished),
+ (False, 2, AirflowException),
+ ),
+ )
+ def test_backfill_disable_retry(self, dag_maker, disable_retry,
try_number, exception):
+ with dag_maker(
+ dag_id="test_disable_retry",
+ schedule_interval="@daily",
+ default_args={
+ "retries": 2,
+ "retry_delay": datetime.timedelta(seconds=3),
+ },
+ ) as dag:
+ task1 = EmptyOperator(task_id="task1")
+ dag_run = dag_maker.create_dagrun(state=None)
+
+ executor = MockExecutor(parallelism=16)
+ executor.mock_task_results[
+ TaskInstanceKey(dag.dag_id, task1.task_id, dag_run.run_id,
try_number=1)
+ ] = TaskInstanceState.UP_FOR_RETRY
+ executor.mock_task_results[
+ TaskInstanceKey(dag.dag_id, task1.task_id, dag_run.run_id,
try_number=2)
+ ] = TaskInstanceState.FAILED
+
+ job = BackfillJob(
+ dag=dag,
+ executor=executor,
+ start_date=DEFAULT_DATE,
+ end_date=DEFAULT_DATE,
+ disable_retry=disable_retry,
+ )
+ with pytest.raises(exception):
+ job.run()
+ ti = dag_run.get_task_instance(task_id=task1.task_id)
+
+ assert ti._try_number == try_number
+
+ dag_run.refresh_from_db()
+
+ assert dag_run.state == DagRunState.FAILED
+
+ dag.clear()