dstandish commented on code in PR #23829:
URL: https://github.com/apache/airflow/pull/23829#discussion_r991513061


##########
tests/jobs/test_backfill_job.py:
##########
@@ -1812,3 +1812,41 @@ def 
test_task_instances_are_not_set_to_scheduled_when_dagrun_reset(self, dag_mak
             states = [ti.state for _, ti in tasks_to_run.items()]
             assert TaskInstanceState.SCHEDULED in states
             assert State.NONE in states
+
+    def test_backfill_disable_retry(self, dag_maker):
+        with dag_maker(
+            dag_id='test_disable_retry',
+            schedule_interval="@daily",
+            default_args={
+                'retries': 3,
+                'retry_delay': datetime.timedelta(seconds=3),
+            },
+        ) as dag:
+            task1 = EmptyOperator(task_id="task1")
+        dag_run = dag_maker.create_dagrun(state=None)
+
+        executor = MockExecutor(parallelism=16)
+        executor.mock_task_results[
+            TaskInstanceKey(dag.dag_id, task1.task_id, dag_run.run_id, 
try_number=1)
+        ] = State.UP_FOR_RETRY
+        executor.mock_task_results[
+            TaskInstanceKey(dag.dag_id, task1.task_id, dag_run.run_id, 
try_number=2)
+        ] = State.UP_FOR_RETRY
+        job = BackfillJob(
+            dag=dag,
+            executor=executor,
+            start_date=DEFAULT_DATE,
+            end_date=DEFAULT_DATE,
+            disable_retry=True,
+        )
+        with pytest.raises(BackfillUnfinished):
+            job.run()
+        ti = dag_run.get_task_instance(task_id=task1.task_id)
+
+        assert ti._try_number == 1

Review Comment:
   what's the purpose of mocking try number 2 above if it still ends up having 
try number 1 here?



##########
tests/jobs/test_backfill_job.py:
##########
@@ -1812,3 +1812,41 @@ def 
test_task_instances_are_not_set_to_scheduled_when_dagrun_reset(self, dag_mak
             states = [ti.state for _, ti in tasks_to_run.items()]
             assert TaskInstanceState.SCHEDULED in states
             assert State.NONE in states
+
+    def test_backfill_disable_retry(self, dag_maker):
+        with dag_maker(
+            dag_id='test_disable_retry',
+            schedule_interval="@daily",
+            default_args={
+                'retries': 3,
+                'retry_delay': datetime.timedelta(seconds=3),
+            },
+        ) as dag:
+            task1 = EmptyOperator(task_id="task1")
+        dag_run = dag_maker.create_dagrun(state=None)
+
+        executor = MockExecutor(parallelism=16)
+        executor.mock_task_results[
+            TaskInstanceKey(dag.dag_id, task1.task_id, dag_run.run_id, 
try_number=1)
+        ] = State.UP_FOR_RETRY
+        executor.mock_task_results[
+            TaskInstanceKey(dag.dag_id, task1.task_id, dag_run.run_id, 
try_number=2)
+        ] = State.UP_FOR_RETRY
+        job = BackfillJob(
+            dag=dag,
+            executor=executor,
+            start_date=DEFAULT_DATE,
+            end_date=DEFAULT_DATE,
+            disable_retry=True,
+        )
+        with pytest.raises(BackfillUnfinished):
+            job.run()
+        ti = dag_run.get_task_instance(task_id=task1.task_id)
+
+        assert ti._try_number == 1
+
+        dag_run.refresh_from_db()
+
+        assert State.FAILED == dag_run.state

Review Comment:
   ```suggestion
           dag_run.state == assert State.FAILED
   ```
   asserts are `actual == expected`



##########
tests/jobs/test_backfill_job.py:
##########
@@ -1812,3 +1812,41 @@ def 
test_task_instances_are_not_set_to_scheduled_when_dagrun_reset(self, dag_mak
             states = [ti.state for _, ti in tasks_to_run.items()]
             assert TaskInstanceState.SCHEDULED in states
             assert State.NONE in states
+
+    def test_backfill_disable_retry(self, dag_maker):
+        with dag_maker(
+            dag_id='test_disable_retry',
+            schedule_interval="@daily",
+            default_args={
+                'retries': 3,
+                'retry_delay': datetime.timedelta(seconds=3),
+            },
+        ) as dag:
+            task1 = EmptyOperator(task_id="task1")
+        dag_run = dag_maker.create_dagrun(state=None)
+
+        executor = MockExecutor(parallelism=16)
+        executor.mock_task_results[
+            TaskInstanceKey(dag.dag_id, task1.task_id, dag_run.run_id, 
try_number=1)
+        ] = State.UP_FOR_RETRY
+        executor.mock_task_results[
+            TaskInstanceKey(dag.dag_id, task1.task_id, dag_run.run_id, 
try_number=2)
+        ] = State.UP_FOR_RETRY
+        job = BackfillJob(
+            dag=dag,
+            executor=executor,
+            start_date=DEFAULT_DATE,
+            end_date=DEFAULT_DATE,
+            disable_retry=True,
+        )
+        with pytest.raises(BackfillUnfinished):
+            job.run()
+        ti = dag_run.get_task_instance(task_id=task1.task_id)
+
+        assert ti._try_number == 1
+
+        dag_run.refresh_from_db()
+
+        assert State.FAILED == dag_run.state

Review Comment:
   one thing i recommend you do is parametrize this test with args  
`disable_retry, expected` (values [True, FAILED] and [False, UP_FOR_RETRY] ) 
   
   this isn't a bad thing to verify, but it also makes it really clear for a 
reviewer that the test is working properly.  e.g. what if the task happened to 
be in failed for some reason other than the logic you added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to