dstandish commented on code in PR #23829:
URL: https://github.com/apache/airflow/pull/23829#discussion_r991513061
##########
tests/jobs/test_backfill_job.py:
##########
@@ -1812,3 +1812,41 @@ def
test_task_instances_are_not_set_to_scheduled_when_dagrun_reset(self, dag_mak
states = [ti.state for _, ti in tasks_to_run.items()]
assert TaskInstanceState.SCHEDULED in states
assert State.NONE in states
+
+ def test_backfill_disable_retry(self, dag_maker):
+ with dag_maker(
+ dag_id='test_disable_retry',
+ schedule_interval="@daily",
+ default_args={
+ 'retries': 3,
+ 'retry_delay': datetime.timedelta(seconds=3),
+ },
+ ) as dag:
+ task1 = EmptyOperator(task_id="task1")
+ dag_run = dag_maker.create_dagrun(state=None)
+
+ executor = MockExecutor(parallelism=16)
+ executor.mock_task_results[
+ TaskInstanceKey(dag.dag_id, task1.task_id, dag_run.run_id,
try_number=1)
+ ] = State.UP_FOR_RETRY
+ executor.mock_task_results[
+ TaskInstanceKey(dag.dag_id, task1.task_id, dag_run.run_id,
try_number=2)
+ ] = State.UP_FOR_RETRY
+ job = BackfillJob(
+ dag=dag,
+ executor=executor,
+ start_date=DEFAULT_DATE,
+ end_date=DEFAULT_DATE,
+ disable_retry=True,
+ )
+ with pytest.raises(BackfillUnfinished):
+ job.run()
+ ti = dag_run.get_task_instance(task_id=task1.task_id)
+
+ assert ti._try_number == 1
Review Comment:
what's the purpose of mocking try number 2 above if it still ends up having
try number 1 here?
##########
tests/jobs/test_backfill_job.py:
##########
@@ -1812,3 +1812,41 @@ def
test_task_instances_are_not_set_to_scheduled_when_dagrun_reset(self, dag_mak
states = [ti.state for _, ti in tasks_to_run.items()]
assert TaskInstanceState.SCHEDULED in states
assert State.NONE in states
+
+ def test_backfill_disable_retry(self, dag_maker):
+ with dag_maker(
+ dag_id='test_disable_retry',
+ schedule_interval="@daily",
+ default_args={
+ 'retries': 3,
+ 'retry_delay': datetime.timedelta(seconds=3),
+ },
+ ) as dag:
+ task1 = EmptyOperator(task_id="task1")
+ dag_run = dag_maker.create_dagrun(state=None)
+
+ executor = MockExecutor(parallelism=16)
+ executor.mock_task_results[
+ TaskInstanceKey(dag.dag_id, task1.task_id, dag_run.run_id,
try_number=1)
+ ] = State.UP_FOR_RETRY
+ executor.mock_task_results[
+ TaskInstanceKey(dag.dag_id, task1.task_id, dag_run.run_id,
try_number=2)
+ ] = State.UP_FOR_RETRY
+ job = BackfillJob(
+ dag=dag,
+ executor=executor,
+ start_date=DEFAULT_DATE,
+ end_date=DEFAULT_DATE,
+ disable_retry=True,
+ )
+ with pytest.raises(BackfillUnfinished):
+ job.run()
+ ti = dag_run.get_task_instance(task_id=task1.task_id)
+
+ assert ti._try_number == 1
+
+ dag_run.refresh_from_db()
+
+ assert State.FAILED == dag_run.state
Review Comment:
```suggestion
dag_run.state == assert State.FAILED
```
asserts are `actual == expected`
##########
tests/jobs/test_backfill_job.py:
##########
@@ -1812,3 +1812,41 @@ def
test_task_instances_are_not_set_to_scheduled_when_dagrun_reset(self, dag_mak
states = [ti.state for _, ti in tasks_to_run.items()]
assert TaskInstanceState.SCHEDULED in states
assert State.NONE in states
+
+ def test_backfill_disable_retry(self, dag_maker):
+ with dag_maker(
+ dag_id='test_disable_retry',
+ schedule_interval="@daily",
+ default_args={
+ 'retries': 3,
+ 'retry_delay': datetime.timedelta(seconds=3),
+ },
+ ) as dag:
+ task1 = EmptyOperator(task_id="task1")
+ dag_run = dag_maker.create_dagrun(state=None)
+
+ executor = MockExecutor(parallelism=16)
+ executor.mock_task_results[
+ TaskInstanceKey(dag.dag_id, task1.task_id, dag_run.run_id,
try_number=1)
+ ] = State.UP_FOR_RETRY
+ executor.mock_task_results[
+ TaskInstanceKey(dag.dag_id, task1.task_id, dag_run.run_id,
try_number=2)
+ ] = State.UP_FOR_RETRY
+ job = BackfillJob(
+ dag=dag,
+ executor=executor,
+ start_date=DEFAULT_DATE,
+ end_date=DEFAULT_DATE,
+ disable_retry=True,
+ )
+ with pytest.raises(BackfillUnfinished):
+ job.run()
+ ti = dag_run.get_task_instance(task_id=task1.task_id)
+
+ assert ti._try_number == 1
+
+ dag_run.refresh_from_db()
+
+ assert State.FAILED == dag_run.state
Review Comment:
one thing i recommend you do is parametrize this test with args
`disable_retry, expected` (values [True, FAILED] and [False, UP_FOR_RETRY] )
this isn't a bad thing to verify, but it also makes it really clear for a
reviewer that the test is working properly. e.g. what if the task happened to
be in failed for some reason other than the logic you added.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]