ashb commented on a change in pull request #17552:
URL: https://github.com/apache/airflow/pull/17552#discussion_r709084189
##########
File path: docs/apache-airflow/best-practices.rst
##########
@@ -270,77 +275,73 @@ Unit tests ensure that there is no incorrect code in your
DAG. You can write uni
.. code-block:: python
- from airflow.models import DagBag
- import unittest
+ import pytest
+ from airflow.models import DagBag
- class TestHelloWorldDAG(unittest.TestCase):
- @classmethod
- def setUpClass(cls):
- cls.dagbag = DagBag()
- def test_dag_loaded(self):
- dag = self.dagbag.get_dag(dag_id="hello_world")
- assert self.dagbag.import_errors == {}
- assert dag is not None
- assert len(dag.tasks) == 1
+ @pytest.fixture(scope="scope")
+ def dagbag(self):
+ return DagBag()
+
+
+ def test_dag_loaded(self, dagbag):
+ dag = dagbag.get_dag(dag_id="hello_world")
+ assert dagbag.import_errors == {}
+ assert dag is not None
+ assert len(dag.tasks) == 1
+
**Unit test a DAG structure:**
This is an example test want to verify the structure of a code-generated DAG
against a dict object
.. code-block:: python
- import unittest
+ def assert_dag_dict_equal(source, dag):
+ assert dag.task_dict.keys() == source.keys()
+ for task_id, downstream_list in source.items():
+ assert dag.has_task(task_id)
+ task = dag.get_task(task_id)
+ assert task.downstream_task_ids == set(downstream_list)
- class testClass(unittest.TestCase):
- def assertDagDictEqual(self, source, dag):
- assert dag.task_dict.keys() == source.keys()
- for task_id, downstream_list in source.items():
- assert dag.has_task(task_id)
- task = dag.get_task(task_id)
- assert task.downstream_task_ids == set(downstream_list)
+ def test_dag():
+ assert_dag_dict_equal(
+ {
+ "DummyInstruction_0": ["DummyInstruction_1"],
+ "DummyInstruction_1": ["DummyInstruction_2"],
+ "DummyInstruction_2": ["DummyInstruction_3"],
+ "DummyInstruction_3": [],
+ },
+ dag,
+ )
- def test_dag(self):
- self.assertDagDictEqual(
- {
- "DummyInstruction_0": ["DummyInstruction_1"],
- "DummyInstruction_1": ["DummyInstruction_2"],
- "DummyInstruction_2": ["DummyInstruction_3"],
- "DummyInstruction_3": [],
- },
- dag,
- )
**Unit test for custom operator:**
.. code-block:: python
- import unittest
- from airflow.utils.state import State
-
- DEFAULT_DATE = "2019-10-03"
- TEST_DAG_ID = "test_my_custom_operator"
-
-
- class MyCustomOperatorTest(unittest.TestCase):
- def setUp(self):
- self.dag = DAG(
- TEST_DAG_ID,
- schedule_interval="@daily",
- default_args={"start_date": DEFAULT_DATE},
- )
- self.op = MyCustomOperator(
- dag=self.dag,
- task_id="test",
- prefix="s3://bucket/some/prefix",
- )
- self.ti = TaskInstance(task=self.op, execution_date=DEFAULT_DATE)
-
- def test_execute_no_trigger(self):
- self.ti.run(ignore_ti_state=True)
- assert self.ti.state == State.SUCCESS
- # Assert something related to tasks results
+ import pytest
+
+ from airflow.utils.state import State
+
+
+ def test_my_custom_oeprator_execute_no_trigger(dag_maker):
+ with dag_maker(
Review comment:
The idea behind this doc is for users to be able to test their DAGs, but
the `dag_maker` fixture is only available inside the airflow tests, so this
won't work.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]