uranusjr commented on code in PR #23181:
URL: https://github.com/apache/airflow/pull/23181#discussion_r857290031
##########
tests/jobs/test_local_task_job.py:
##########
@@ -799,6 +800,121 @@ def
test_mini_scheduler_works_with_wait_for_upstream(self, caplog, dag_maker):
assert failed_deps[0].dep_name == "Previous Dagrun State"
assert not failed_deps[0].passed
+ @pytest.mark.parametrize(
+ "exception, trigger_rule",
+ [
+ (AirflowFailException(), TriggerRule.ALL_DONE),
+ (AirflowFailException(), TriggerRule.ALL_FAILED),
+ (AirflowSkipException(), TriggerRule.ALL_DONE),
+ (AirflowSkipException(), TriggerRule.ALL_SKIPPED),
+ (AirflowSkipException(), TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS),
+ ],
+ )
+ @conf_vars({('scheduler', 'schedule_after_task_execution'): 'True'})
+ def test_mini_scheduler_works_with_skipped_and_failed(self, exception,
trigger_rule, caplog, dag_maker):
+ """
+ In these cases D is running, at no decision can be made about C.
+ """
+
+ def raise_():
+ raise exception
+
+ session = settings.Session()
Review Comment:
You can use the `session` fixture here instead, which automatically calls
`rollback` on test teardown.
##########
tests/jobs/test_local_task_job.py:
##########
@@ -799,6 +800,121 @@ def
test_mini_scheduler_works_with_wait_for_upstream(self, caplog, dag_maker):
assert failed_deps[0].dep_name == "Previous Dagrun State"
assert not failed_deps[0].passed
+ @pytest.mark.parametrize(
+ "exception, trigger_rule",
+ [
+ (AirflowFailException(), TriggerRule.ALL_DONE),
+ (AirflowFailException(), TriggerRule.ALL_FAILED),
+ (AirflowSkipException(), TriggerRule.ALL_DONE),
+ (AirflowSkipException(), TriggerRule.ALL_SKIPPED),
+ (AirflowSkipException(), TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS),
+ ],
+ )
+ @conf_vars({('scheduler', 'schedule_after_task_execution'): 'True'})
+ def test_mini_scheduler_works_with_skipped_and_failed(self, exception,
trigger_rule, caplog, dag_maker):
+ """
+ In these cases D is running, at no decision can be made about C.
+ """
+
+ def raise_():
+ raise exception
+
+ session = settings.Session()
+
+ with dag_maker(catchup=False) as dag:
+ task_a = PythonOperator(task_id='A', python_callable=raise_)
+ task_b = PythonOperator(task_id='B', python_callable=lambda: True)
+ task_c = PythonOperator(task_id='C', python_callable=lambda: True,
trigger_rule=trigger_rule)
+ task_d = PythonOperator(task_id='D', python_callable=lambda: True)
+ task_a >> task_b >> task_c
+ task_d >> task_c
+
+ dr = dag.create_dagrun(run_id='test_1', state=State.RUNNING,
execution_date=DEFAULT_DATE)
+ ti_a = TaskInstance(task_a, run_id=dr.run_id, state=State.QUEUED)
+ ti_b = TaskInstance(task_b, run_id=dr.run_id, state=State.NONE)
+ ti_c = TaskInstance(task_c, run_id=dr.run_id, state=State.NONE)
+ ti_d = TaskInstance(task_d, run_id=dr.run_id, state=State.RUNNING)
+
+ session.merge(ti_a)
+ session.merge(ti_b)
+ session.merge(ti_c)
+ session.merge(ti_d)
+ session.flush()
+
+ job1 = LocalTaskJob(task_instance=ti_a, ignore_ti_state=True,
executor=SequentialExecutor())
+ job1.task_runner = StandardTaskRunner(job1)
+ job1.run()
+
+ ti_b.refresh_from_db(session)
+ ti_c.refresh_from_db(session)
+ assert ti_b.state in (State.SKIPPED, State.UPSTREAM_FAILED)
+ assert ti_c.state == State.NONE
+ assert "0 downstream tasks scheduled from follow-on schedule" in
caplog.text
+
+ failed_deps = list(ti_c.get_failed_dep_statuses(session=session))
+ assert len(failed_deps) == 1
+ assert failed_deps[0].dep_name == "Trigger Rule"
+ assert not failed_deps[0].passed
+
+ session.rollback()
Review Comment:
… because due to how tests run, this won’t be called if any of the asserts
fail.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]