dstandish commented on a change in pull request #19194:
URL: https://github.com/apache/airflow/pull/19194#discussion_r774794859



##########
File path: tests/models/test_taskinstance.py
##########
@@ -483,50 +483,32 @@ def task_function(ti):
         ti.state == state
 
     @pytest.mark.parametrize(
-        "state",
-        [State.FAILED, State.SKIPPED, State.SUCCESS, State.UP_FOR_RESCHEDULE, 
State.UP_FOR_RETRY],
+        "state, exception_type, retries",
+        [
+            (State.FAILED, AirflowException, 0),
+            (State.SKIPPED, AirflowSkipException, 0),
+            (State.SUCCESS, None, 0),
+            (State.UP_FOR_RESCHEDULE, 
AirflowRescheduleException(timezone.utcnow()), 0),
+            (State.UP_FOR_RETRY, AirflowException, 1),
+        ],
     )
-    def test_task_wipes_next_fields(self, session, state, dag_maker):
+    def test_task_wipes_next_fields(self, session, dag_maker, state, 
exception_type, retries):
         """
-        Test that ensures that tasks wipe their next_method and next_kwargs
-        when they go into a state of FAILED, SKIPPED, SUCCESS, 
UP_FOR_RESCHEDULE, or UP_FOR_RETRY.
+        Test that ensures that tasks wipe their next_method and next_kwargs 
for the configured states:
+        FAILED, SKIPPED, SUCCESS, UP_FOR_RESCHEDULE, UP_FOR_RETRY.
         """
 
-        def failure():
-            raise AirflowException
-
-        def skip():
-            raise AirflowSkipException
-
-        def success():
-            return None
-
-        def reschedule():
-            reschedule_date = timezone.utcnow()
-            raise AirflowRescheduleException(reschedule_date)
-
-        _retries = 0
-        _retry_delay = datetime.timedelta(seconds=0)
-
-        if state == State.FAILED:
-            _python_callable = failure
-        elif state == State.SKIPPED:
-            _python_callable = skip
-        elif state == State.SUCCESS:
-            _python_callable = success
-        elif state == State.UP_FOR_RESCHEDULE:
-            _python_callable = reschedule
-        elif state in [State.FAILED, State.UP_FOR_RETRY]:
-            _python_callable = failure
-            _retries = 1
-            _retry_delay = datetime.timedelta(seconds=2)
+        def _raise_af_exception(exception_type):
+            if exception_type:
+                raise exception_type
 
         with dag_maker("test_deferred_method_clear"):
             task = PythonOperator(
                 task_id="test_deferred_method_clear_task",
-                python_callable=_python_callable,
-                retries=_retries,
-                retry_delay=_retry_delay,
+                python_callable=_raise_af_exception,
+                op_args=[exception_type],

Review comment:
       if you remove `exception_type` as a param in `_raise_af_exception` you 
can also remove `op_kwargs`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to