This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v3-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit cbc1f72bc0bbb74624b1d3d6f6a8079354473e71 Author: Tzu-ping Chung <[email protected]> AuthorDate: Fri Jan 2 15:40:13 2026 +0800 Revert "docs: Improve unit-testing docs with runnable pytest examples for Operators (#58969)" (#60035) This reverts commit 652d4ee983ff0cb0007c054f1ead43f329e12905. (cherry picked from commit 31b698fe381cb782fb65edea800c77baa9572fd7) --- airflow-core/docs/best-practices.rst | 98 ------------------------------------ 1 file changed, 98 deletions(-) diff --git a/airflow-core/docs/best-practices.rst b/airflow-core/docs/best-practices.rst index b7f2b69f3c1..b818e174f32 100644 --- a/airflow-core/docs/best-practices.rst +++ b/airflow-core/docs/best-practices.rst @@ -650,104 +650,6 @@ want to optimize your Dags there are the following actions you can take: Testing a Dag ^^^^^^^^^^^^^ -Testing Operators with pytest ------------------------------ - -Below are two recommended, runnable patterns for unit-testing custom operators -with pytest. Both examples work with Airflow 3.x. - -1. Using ``TaskInstance.run()`` -2. Using ``dag.create_dagrun()`` - -Example: Using ``EmptyOperator`` - - from airflow.operators.empty import EmptyOperator - - task = EmptyOperator(task_id="empty_task") - - -Example 1: Testing using ``TaskInstance.run()`` -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -This pattern constructs a Dag, creates a ``TaskInstance`` manually, -and runs it directly. - -.. code-block:: python - - import pendulum - - from airflow.models.dag import DAG - from airflow.models.taskinstance import TaskInstance - from airflow.utils.state import TaskInstanceState - from airflow.operators.empty import EmptyOperator - - - def test_empty_operator_with_ti_run(): - with DAG( - dag_id="test_empty_operator_ti_run", - start_date=pendulum.datetime(2024, 1, 1, tz="UTC"), - schedule=None, - ) as dag: - task = EmptyOperator(task_id="empty_task") - - ti = TaskInstance(task=task, run_id="test_run") - ti.run(ignore_ti_state=True) - - assert ti.state == TaskInstanceState.SUCCESS - - -Example 2: Testing using ``dag.create_dagrun()`` -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -This pattern creates a full Dag run and then runs the TaskInstance -associated with that Dag run. - -.. code-block:: python - - import pendulum - - from airflow.models.dag import DAG - from airflow.utils.state import TaskInstanceState - from airflow.operators.empty import EmptyOperator - - - def test_empty_operator_with_dagrun(): - with DAG( - dag_id="test_empty_operator_dagrun", - start_date=pendulum.datetime(2024, 1, 1, tz="UTC"), - schedule=None, - ) as dag: - task = EmptyOperator(task_id="empty_task") - - dagrun = dag.create_dagrun( - run_id="test_run", - state="success", - execution_date=pendulum.datetime(2024, 1, 1, tz="UTC"), - start_date=pendulum.datetime(2024, 1, 1, tz="UTC"), - data_interval=( - pendulum.datetime(2024, 1, 1, tz="UTC"), - pendulum.datetime(2024, 1, 1, tz="UTC"), - ), - logical_date=pendulum.datetime(2024, 1, 1, tz="UTC"), - ) - - ti = dagrun.get_task_instance("empty_task") - ti.run(ignore_ti_state=True) - - assert ti.state == TaskInstanceState.SUCCESS - - -Notes -~~~~~ - -* ``dag.test()`` should not be used inside pytest for Airflow 3.x because Dag - serialization is inactive. -* Both examples above run completely inside pytest without requiring - any Dag serialization or scheduler. -* Use mocking for external services when needed. - - - Airflow users should treat Dags as production level code, and Dags should have various associated tests to ensure that they produce expected results. You can write a wide variety of tests for a Dag. Let's take a look at some of them.
