This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v1-10-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 01dcdc940ab23dd83dccd64b2e6dc8f5c521abf5 Author: svivier-orange <[email protected]> AuthorDate: Mon Dec 9 10:38:12 2019 +0100 [AIRFLOW-XXX] Add a structural dag validation example (#6727) * Add a structural dag validation example (cherry picked from commit 572151cbd851abfe273ac82dd26355ae34708d5b) --- docs/best-practices.rst | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/docs/best-practices.rst b/docs/best-practices.rst index 4a75fb4..9b6b3ae 100644 --- a/docs/best-practices.rst +++ b/docs/best-practices.rst @@ -148,6 +148,28 @@ Unit tests ensure that there is no incorrect code in your DAG. You can write a u self.assertIsNotNone(dag) self.assertEqual(len(dag.tasks), 1) +**Unit test a DAG structure:** +This is an example test want to verify the structure of a code-generated DAG against a dict object + +.. code:: + + import unittest + class testClass(unittest.TestCase): + def assertDagDictEqual(self,source,dag): + self.assertEqual(dag.task_dict.keys(),source.keys()) + for task_id,downstream_list in source.items(): + self.assertTrue(dag.has_task(task_id), msg="Missing task_id: {} in dag".format(task_id)) + task = dag.get_task(task_id) + self.assertEqual(task.downstream_task_ids, set(downstream_list), + msg="unexpected downstream link in {}".format(task_id)) + def test_dag(self): + self.assertDagDictEqual({ + "DummyInstruction_0": ["DummyInstruction_1"], + "DummyInstruction_1": ["DummyInstruction_2"], + "DummyInstruction_2": ["DummyInstruction_3"], + "DummyInstruction_3": [] + },dag) + **Unit test for custom operator:** .. code::
