TeddyHartanto commented on a change in pull request #7735: [AIRFLOW-4549] Allow 
skipped tasks to satisfy wait_for_downstream
URL: https://github.com/apache/airflow/pull/7735#discussion_r409367097
 
 

 ##########
 File path: tests/models/test_dagrun.py
 ##########
 @@ -552,3 +561,131 @@ def with_all_tasks_removed(dag):
         dagrun.verify_integrity()
         flaky_ti.refresh_from_db()
         self.assertEqual(State.NONE, flaky_ti.state)
+
+    def test_depends_on_past(self):
+        # dag_id = 'test_depends_on_past'
+        # dag = self.dagbag.get_dag(dag_id)
+        # task = dag.tasks[0]
+        # self.create_dag_run(dag, execution_date=timezone.datetime(2016, 1, 
1, 0, 0, 0))
+        # self.create_dag_run(dag, execution_date=timezone.datetime(2016, 1, 
2, 0, 0, 0))
+        # ti1 = TI(task, timezone.datetime(2016, 1, 1, 0, 0, 0))
+        # ti2 = TI(task, timezone.datetime(2016, 1, 2, 0, 0, 0))
+        #
+        # def run_2nd_dagrun():
+        #     dag.run(
+        #         start_date=timezone.datetime(2016, 1, 2, 0, 0, 0),
+        #         end_date=timezone.datetime(2016, 1, 2, 0, 0, 0),
+        #         executor=MockExecutor()
+        #     )
+        #
+        # # 2nd run of task fails by itself
+        # self.assertRaises(AirflowException, run_2nd_dagrun)
+        # ti2.refresh_from_db()
+        # self.assertEqual(ti2.state, State.SCHEDULED)
+        #
+        # # 2nd run af task fails if 1st run of task failed
+        # ti1.set_state(State.FAILED)
+        # self.assertRaises(AirflowException, run_2nd_dagrun)
+        # ti2.refresh_from_db()
+        # self.assertEqual(ti2.state, State.SCHEDULED)
+        #
+        # # but it works after 1st instance of task is marked as skipped
+        # ti1.set_state(State.SKIPPED)
+        # run_2nd_dagrun()
+        # ti2.refresh_from_db()
+        # self.assertEqual(ti2.state, State.SUCCESS)
+        #
+        # # and it also works if 1st instance is success
+        # ti2.set_state(State.NONE)
+        # ti1.set_state(State.SUCCESS)
+        # run_2nd_dagrun()
+        # ti2.refresh_from_db()
+        # self.assertEqual(ti2.state, State.SUCCESS)
+        dag_id = 'test_depends_on_past'
+
+        dag = self.dagbag.get_dag(dag_id)
+        task = dag.tasks[0]
+
+        self.create_dag_run(dag, execution_date=timezone.datetime(2016, 1, 1, 
0, 0, 0))
+        self.create_dag_run(dag, execution_date=timezone.datetime(2016, 1, 2, 
0, 0, 0))
+
+        uti1 = TI(task, timezone.datetime(2016, 1, 1, 0, 0, 0))
+        uti2 = TI(task, timezone.datetime(2016, 1, 2, 0, 0, 0))
+
+        for state in State.task_states:
+            uti1.set_state(state)
+            uti2.set_state(State.QUEUED)
+            uti2.run()
+            if state in (State.SUCCESS, State.SKIPPED):
+                self.assertEqual(uti2.state, State.SUCCESS)
+            else:
+                self.assertNotEqual(uti2.state, State.SUCCESS)
+
+    def test_wait_for_downstream(self):
+        # dag_id = 'test_wait_for_downstream'
+        # dag = models.DagBag().get_dag(dag_id)
+        #
+        # TODO: clarify with airflow maintainers -- is the snippet below still 
true and necessary?
+        # # ti.previous_ti requires a dagrun to exist
+        # self.create_dag_run(dag, execution_date=timezone.datetime(2016, 1, 
1, 0, 0, 0))
+        # self.create_dag_run(dag, execution_date=timezone.datetime(2016, 1, 
2, 0, 0, 0))
+        # upstream, downstream = dag.tasks
+        #
+        # def run_2nd_dagrun():
+        #     dag.run(
+        #         start_date=timezone.datetime(2016, 1, 2, 0, 0, 0),
+        #         end_date=timezone.datetime(2016, 1, 2, 0, 0, 0),
+        #         executor=MockExecutor()
+        #     )
+        #
+        # # doesn't run if downstream task for previous day has a null state
+        # uti_2 = TI(task=upstream, execution_date=timezone.datetime(2016, 1, 
2, 0, 0, 0))
+        # uti_2.previous_ti.set_state(State.SUCCESS)
+        # self.assertEqual(uti_2.previous_ti.state, State.SUCCESS)
+        # TODO: clarify with airflow maintainers -- I don't think this 
triggers an AirflowException, does it?
+        # self.assertRaises(AirflowException, run_2nd_dagrun)
 
 Review comment:
   I would like to clarify this line, because I took the original unit tests 
and modified them to be more comprehensive. The original author, @dima-asana , 
mentioned something about scheduler deadlock 
[here](https://github.com/apache/airflow/pull/5308/files#r285941524), which I 
didn't encounter. Could any kind soul enlighten me if that was the case for the 
older version of Airflow, but not applicable now? Or did I miss something?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to