This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 3b5adaf Refactor ti clear next method kwargs tests (#19194)
3b5adaf is described below
commit 3b5adaff9a30bee3ee12b32c44f38c3f5148df24
Author: Rocco Pascale <[email protected]>
AuthorDate: Mon Jan 10 02:00:04 2022 -0500
Refactor ti clear next method kwargs tests (#19194)
---
tests/models/test_taskinstance.py | 55 +++++++++++++--------------------------
1 file changed, 18 insertions(+), 37 deletions(-)
diff --git a/tests/models/test_taskinstance.py
b/tests/models/test_taskinstance.py
index 14a8c78..2d9bcd6 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -484,50 +484,31 @@ class TestTaskInstance:
ti.state == state
@pytest.mark.parametrize(
- "state",
- [State.FAILED, State.SKIPPED, State.SUCCESS, State.UP_FOR_RESCHEDULE,
State.UP_FOR_RETRY],
+ "state, exception, retries",
+ [
+ (State.FAILED, AirflowException, 0),
+ (State.SKIPPED, AirflowSkipException, 0),
+ (State.SUCCESS, None, 0),
+ (State.UP_FOR_RESCHEDULE,
AirflowRescheduleException(timezone.utcnow()), 0),
+ (State.UP_FOR_RETRY, AirflowException, 1),
+ ],
)
- def test_task_wipes_next_fields(self, session, state, dag_maker):
+ def test_task_wipes_next_fields(self, session, dag_maker, state,
exception, retries):
"""
Test that ensures that tasks wipe their next_method and next_kwargs
- when they go into a state of FAILED, SKIPPED, SUCCESS,
UP_FOR_RESCHEDULE, or UP_FOR_RETRY.
+ when the TI enters one of the configured states.
"""
- def failure():
- raise AirflowException
-
- def skip():
- raise AirflowSkipException
-
- def success():
- return None
-
- def reschedule():
- reschedule_date = timezone.utcnow()
- raise AirflowRescheduleException(reschedule_date)
-
- _retries = 0
- _retry_delay = datetime.timedelta(seconds=0)
-
- if state == State.FAILED:
- _python_callable = failure
- elif state == State.SKIPPED:
- _python_callable = skip
- elif state == State.SUCCESS:
- _python_callable = success
- elif state == State.UP_FOR_RESCHEDULE:
- _python_callable = reschedule
- elif state in [State.FAILED, State.UP_FOR_RETRY]:
- _python_callable = failure
- _retries = 1
- _retry_delay = datetime.timedelta(seconds=2)
+ def _raise_if_exception():
+ if exception:
+ raise exception
with dag_maker("test_deferred_method_clear"):
task = PythonOperator(
task_id="test_deferred_method_clear_task",
- python_callable=_python_callable,
- retries=_retries,
- retry_delay=_retry_delay,
+ python_callable=_raise_if_exception,
+ retries=retries,
+ retry_delay=datetime.timedelta(seconds=2),
)
dr = dag_maker.create_dagrun()
@@ -539,9 +520,9 @@ class TestTaskInstance:
ti.task = task
if state in [State.FAILED, State.UP_FOR_RETRY]:
- with pytest.raises(AirflowException):
+ with pytest.raises(exception):
ti.run()
- elif state in [State.SKIPPED, State.SUCCESS, State.UP_FOR_RESCHEDULE]:
+ else:
ti.run()
ti.refresh_from_db()