TeddyHartanto commented on a change in pull request #7735: [AIRFLOW-4549] Allow
skipped tasks to satisfy wait_for_downstream
URL: https://github.com/apache/airflow/pull/7735#discussion_r409367097
##########
File path: tests/models/test_dagrun.py
##########
@@ -552,3 +561,131 @@ def with_all_tasks_removed(dag):
dagrun.verify_integrity()
flaky_ti.refresh_from_db()
self.assertEqual(State.NONE, flaky_ti.state)
+
+ def test_depends_on_past(self):
+ # dag_id = 'test_depends_on_past'
+ # dag = self.dagbag.get_dag(dag_id)
+ # task = dag.tasks[0]
+ # self.create_dag_run(dag, execution_date=timezone.datetime(2016, 1,
1, 0, 0, 0))
+ # self.create_dag_run(dag, execution_date=timezone.datetime(2016, 1,
2, 0, 0, 0))
+ # ti1 = TI(task, timezone.datetime(2016, 1, 1, 0, 0, 0))
+ # ti2 = TI(task, timezone.datetime(2016, 1, 2, 0, 0, 0))
+ #
+ # def run_2nd_dagrun():
+ # dag.run(
+ # start_date=timezone.datetime(2016, 1, 2, 0, 0, 0),
+ # end_date=timezone.datetime(2016, 1, 2, 0, 0, 0),
+ # executor=MockExecutor()
+ # )
+ #
+ # # 2nd run of task fails by itself
+ # self.assertRaises(AirflowException, run_2nd_dagrun)
+ # ti2.refresh_from_db()
+ # self.assertEqual(ti2.state, State.SCHEDULED)
+ #
+ # # 2nd run af task fails if 1st run of task failed
+ # ti1.set_state(State.FAILED)
+ # self.assertRaises(AirflowException, run_2nd_dagrun)
+ # ti2.refresh_from_db()
+ # self.assertEqual(ti2.state, State.SCHEDULED)
+ #
+ # # but it works after 1st instance of task is marked as skipped
+ # ti1.set_state(State.SKIPPED)
+ # run_2nd_dagrun()
+ # ti2.refresh_from_db()
+ # self.assertEqual(ti2.state, State.SUCCESS)
+ #
+ # # and it also works if 1st instance is success
+ # ti2.set_state(State.NONE)
+ # ti1.set_state(State.SUCCESS)
+ # run_2nd_dagrun()
+ # ti2.refresh_from_db()
+ # self.assertEqual(ti2.state, State.SUCCESS)
+ dag_id = 'test_depends_on_past'
+
+ dag = self.dagbag.get_dag(dag_id)
+ task = dag.tasks[0]
+
+ self.create_dag_run(dag, execution_date=timezone.datetime(2016, 1, 1,
0, 0, 0))
+ self.create_dag_run(dag, execution_date=timezone.datetime(2016, 1, 2,
0, 0, 0))
+
+ uti1 = TI(task, timezone.datetime(2016, 1, 1, 0, 0, 0))
+ uti2 = TI(task, timezone.datetime(2016, 1, 2, 0, 0, 0))
+
+ for state in State.task_states:
+ uti1.set_state(state)
+ uti2.set_state(State.QUEUED)
+ uti2.run()
+ if state in (State.SUCCESS, State.SKIPPED):
+ self.assertEqual(uti2.state, State.SUCCESS)
+ else:
+ self.assertNotEqual(uti2.state, State.SUCCESS)
+
+ def test_wait_for_downstream(self):
+ # dag_id = 'test_wait_for_downstream'
+ # dag = models.DagBag().get_dag(dag_id)
+ #
+ # TODO: clarify with airflow maintainers -- is the snippet below still
true and necessary?
+ # # ti.previous_ti requires a dagrun to exist
+ # self.create_dag_run(dag, execution_date=timezone.datetime(2016, 1,
1, 0, 0, 0))
+ # self.create_dag_run(dag, execution_date=timezone.datetime(2016, 1,
2, 0, 0, 0))
+ # upstream, downstream = dag.tasks
+ #
+ # def run_2nd_dagrun():
+ # dag.run(
+ # start_date=timezone.datetime(2016, 1, 2, 0, 0, 0),
+ # end_date=timezone.datetime(2016, 1, 2, 0, 0, 0),
+ # executor=MockExecutor()
+ # )
+ #
+ # # doesn't run if downstream task for previous day has a null state
+ # uti_2 = TI(task=upstream, execution_date=timezone.datetime(2016, 1,
2, 0, 0, 0))
+ # uti_2.previous_ti.set_state(State.SUCCESS)
+ # self.assertEqual(uti_2.previous_ti.state, State.SUCCESS)
+ # TODO: clarify with airflow maintainers -- I don't think this
triggers an AirflowException, does it?
+ # self.assertRaises(AirflowException, run_2nd_dagrun)
Review comment:
I would like to clarify this line, because I took the original unit tests
and modified them to be more comprehensive. The original author, @dima-asana ,
mentioned something about scheduler deadlock
[here](https://github.com/apache/airflow/pull/5308/files#r285941524), which I
didn't encounter. Could any kind soul enlighten me if that was the case for the
older version of Airflow, but not the case anymore?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services