TeddyHartanto commented on a change in pull request #7735: [AIRFLOW-4549] Allow 
skipped tasks to satisfy wait_for_downstream
URL: https://github.com/apache/airflow/pull/7735#discussion_r409403720
 
 

 ##########
 File path: tests/models/test_dagrun.py
 ##########
 @@ -552,3 +561,131 @@ def with_all_tasks_removed(dag):
         dagrun.verify_integrity()
         flaky_ti.refresh_from_db()
         self.assertEqual(State.NONE, flaky_ti.state)
+
+    def test_depends_on_past(self):
+        # dag_id = 'test_depends_on_past'
+        # dag = self.dagbag.get_dag(dag_id)
+        # task = dag.tasks[0]
+        # self.create_dag_run(dag, execution_date=timezone.datetime(2016, 1, 
1, 0, 0, 0))
+        # self.create_dag_run(dag, execution_date=timezone.datetime(2016, 1, 
2, 0, 0, 0))
+        # ti1 = TI(task, timezone.datetime(2016, 1, 1, 0, 0, 0))
+        # ti2 = TI(task, timezone.datetime(2016, 1, 2, 0, 0, 0))
+        #
+        # def run_2nd_dagrun():
+        #     dag.run(
+        #         start_date=timezone.datetime(2016, 1, 2, 0, 0, 0),
+        #         end_date=timezone.datetime(2016, 1, 2, 0, 0, 0),
+        #         executor=MockExecutor()
+        #     )
+        #
+        # # 2nd run of task fails by itself
+        # self.assertRaises(AirflowException, run_2nd_dagrun)
+        # ti2.refresh_from_db()
+        # self.assertEqual(ti2.state, State.SCHEDULED)
+        #
+        # # 2nd run af task fails if 1st run of task failed
+        # ti1.set_state(State.FAILED)
+        # self.assertRaises(AirflowException, run_2nd_dagrun)
+        # ti2.refresh_from_db()
+        # self.assertEqual(ti2.state, State.SCHEDULED)
+        #
+        # # but it works after 1st instance of task is marked as skipped
+        # ti1.set_state(State.SKIPPED)
+        # run_2nd_dagrun()
+        # ti2.refresh_from_db()
+        # self.assertEqual(ti2.state, State.SUCCESS)
+        #
+        # # and it also works if 1st instance is success
+        # ti2.set_state(State.NONE)
+        # ti1.set_state(State.SUCCESS)
+        # run_2nd_dagrun()
+        # ti2.refresh_from_db()
+        # self.assertEqual(ti2.state, State.SUCCESS)
+        dag_id = 'test_depends_on_past'
+
+        dag = self.dagbag.get_dag(dag_id)
+        task = dag.tasks[0]
+
+        self.create_dag_run(dag, execution_date=timezone.datetime(2016, 1, 1, 
0, 0, 0))
+        self.create_dag_run(dag, execution_date=timezone.datetime(2016, 1, 2, 
0, 0, 0))
+
+        uti1 = TI(task, timezone.datetime(2016, 1, 1, 0, 0, 0))
+        uti2 = TI(task, timezone.datetime(2016, 1, 2, 0, 0, 0))
+
+        for state in State.task_states:
+            uti1.set_state(state)
+            uti2.set_state(State.QUEUED)
+            uti2.run()
+            if state in (State.SUCCESS, State.SKIPPED):
+                self.assertEqual(uti2.state, State.SUCCESS)
+            else:
+                self.assertNotEqual(uti2.state, State.SUCCESS)
+
+    def test_wait_for_downstream(self):
+        # dag_id = 'test_wait_for_downstream'
+        # dag = models.DagBag().get_dag(dag_id)
+        #
+        # TODO: clarify with airflow maintainers -- is the snippet below still 
true and necessary?
+        # # ti.previous_ti requires a dagrun to exist
+        # self.create_dag_run(dag, execution_date=timezone.datetime(2016, 1, 
1, 0, 0, 0))
+        # self.create_dag_run(dag, execution_date=timezone.datetime(2016, 1, 
2, 0, 0, 0))
+        # upstream, downstream = dag.tasks
+        #
+        # def run_2nd_dagrun():
+        #     dag.run(
+        #         start_date=timezone.datetime(2016, 1, 2, 0, 0, 0),
+        #         end_date=timezone.datetime(2016, 1, 2, 0, 0, 0),
+        #         executor=MockExecutor()
+        #     )
+        #
+        # # doesn't run if downstream task for previous day has a null state
+        # uti_2 = TI(task=upstream, execution_date=timezone.datetime(2016, 1, 
2, 0, 0, 0))
+        # uti_2.previous_ti.set_state(State.SUCCESS)
+        # self.assertEqual(uti_2.previous_ti.state, State.SUCCESS)
+        # TODO: clarify with airflow maintainers -- I don't think this 
triggers an AirflowException, does it?
+        # self.assertRaises(AirflowException, run_2nd_dagrun)
+        # uti_2.refresh_from_db()
+        # self.assertEqual(uti_2.state, State.SCHEDULED)
+        #
+        # # doesn't run if downstream task for previous day has a failed state
+        # dti_1 = TI(task=downstream, execution_date=timezone.datetime(2016, 
1, 1, 0, 0, 0))
+        # dti_1.set_state(State.FAILED)
+        # self.assertRaises(AirflowException, run_2nd_dagrun)
+        # uti_2.refresh_from_db()
+        # self.assertEqual(uti_2.state, State.SCHEDULED)
+        #
+        # # runs if downstream task for previous day has a skipped state
+        # dti_1 = TI(task=downstream, execution_date=timezone.datetime(2016, 
1, 1, 0, 0, 0))
+        # dti_1.set_state(State.SKIPPED)
+        # run_2nd_dagrun()
+        # uti_2.refresh_from_db()
+        # self.assertEqual(uti_2.state, State.SUCCESS)
+        #
+        # # runs if downstream task for previous day has a success state
+        # uti_2.set_state(State.NONE)
+        # dti_1.set_state(State.SUCCESS)
+        # run_2nd_dagrun()
+        # uti_2.refresh_from_db()
+        # self.assertEqual(uti_2.state, State.SUCCESS)
+        dag_id = 'test_wait_for_downstream'
+        dag = self.dagbag.get_dag(dag_id)
+        upstream, downstream = dag.tasks
+
+        # For ti.set_state() to work, the DagRun has to exist,
+        # Otherwise ti.previous_ti returns an unpersisted TI
+        self.create_dag_run(dag, execution_date=timezone.datetime(2016, 1, 1, 
0, 0, 0))
+        self.create_dag_run(dag, execution_date=timezone.datetime(2016, 1, 2, 
0, 0, 0))
+
+        dti1 = TI(task=downstream, execution_date=timezone.datetime(2016, 1, 
1, 0, 0, 0))
+        uti2 = TI(task=upstream, execution_date=timezone.datetime(2016, 1, 2, 
0, 0, 0))
+        uti2.previous_ti.set_state(State.SUCCESS)
+        self.assertEqual(uti2.previous_ti.state, State.SUCCESS)
+
+        for state in State.task_states:
+            dti1.set_state(state)
+            uti2.set_state(State.QUEUED)
+            uti2.run()
+            if state in (State.SUCCESS, State.SKIPPED):
+                self.assertEqual(uti2.state, State.SUCCESS)
+            else:
+                self.assertNotEqual(uti2.state, State.SUCCESS)
 
 Review comment:
   The one in `test_taskinstance.py` is really like a pure unit test. There is 
no `wait_for_downstream` flag being used anywhere there. It's really just a 
test on the behavior of `TaskInstance.are_dependents_done()`. 
   
   Meanwhile, the tests in `test_dagrun.py` has a wider scope in which the 
`wait_for_downstream=True` or `depends_on_past=True` are set when initialising 
the `Operator`'s. The state of the `previous_ti_downstream` and `previous_ti` 
are manipulated, `ti` is ran, and then we assert what we expect the state of 
`ti` should be. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to