This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-1-test by this push:
new 5279f2626c9 [v3-1-test] docs: Improve unit-testing docs with runnable
pytest examples for Operators (#58969) (#59410)
5279f2626c9 is described below
commit 5279f2626c99357146cdc343eb0a368ad57c5500
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Sun Dec 14 13:24:54 2025 +0100
[v3-1-test] docs: Improve unit-testing docs with runnable pytest examples
for Operators (#58969) (#59410)
* docs: Add runnable pytest examples for operator unit tests
Includes:
* DummySuccessOperator for testing
* TaskInstance.run() example
* dag.create_dagrun() example
* Updated best-practices docs
* chore(tests): fix ruff lint issues in example tests (unused vars, S101)
* chore: add options file
* chore: ignore local options file
* docs: replace placeholders with full runnable pytest examples
* docs: replace placeholders with full runnable pytest examples
* docs: move pytest examples into docs; remove root-level tests directory
* docs: add runnable pytest examples directly in docs
* chore: remove stray options file
* chore: remove stray options file and revert .gitignore changes
* docs: move pytest operator testing examples into best-practices
* docs: fix Sphinx underline length for Example 1 title
* docs: update pytest examples and fix heading formatting
* docs: use EmptyOperator in pytest examples and remove XCom assertions
(maintainer feedback)
* ci: trigger static checks re-run
* ci: trigger static checks re-run
* ci: retry docs build after Sphinx fix
* chore(docs): fix trailing whitespace
* Update airflow-core/docs/best-practices.rst
* Update airflow-core/docs/best-practices.rst
* Update airflow-core/docs/best-practices.rst
* Update airflow-core/docs/best-practices.rst
* Update airflow-core/docs/best-practices.rst
---------
(cherry picked from commit 652d4ee983ff0cb0007c054f1ead43f329e12905)
Co-authored-by: Shruti Singh <[email protected]>
Co-authored-by: Jarek Potiuk <[email protected]>
---
airflow-core/docs/best-practices.rst | 98 ++++++++++++++++++++++++++++++++++++
1 file changed, 98 insertions(+)
diff --git a/airflow-core/docs/best-practices.rst
b/airflow-core/docs/best-practices.rst
index 169f538b4cb..f46c231ee52 100644
--- a/airflow-core/docs/best-practices.rst
+++ b/airflow-core/docs/best-practices.rst
@@ -650,6 +650,104 @@ want to optimize your Dags there are the following
actions you can take:
Testing a Dag
^^^^^^^^^^^^^
+Testing Operators with pytest
+-----------------------------
+
+Below are two recommended, runnable patterns for unit-testing custom operators
+with pytest. Both examples work with Airflow 3.x.
+
+1. Using ``TaskInstance.run()``
+2. Using ``dag.create_dagrun()``
+
+Example: Using ``EmptyOperator``
+
+ from airflow.operators.empty import EmptyOperator
+
+ task = EmptyOperator(task_id="empty_task")
+
+
+Example 1: Testing using ``TaskInstance.run()``
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+This pattern constructs a Dag, creates a ``TaskInstance`` manually,
+and runs it directly.
+
+.. code-block:: python
+
+ import pendulum
+
+ from airflow.models.dag import DAG
+ from airflow.models.taskinstance import TaskInstance
+ from airflow.utils.state import TaskInstanceState
+ from airflow.operators.empty import EmptyOperator
+
+
+ def test_empty_operator_with_ti_run():
+ with DAG(
+ dag_id="test_empty_operator_ti_run",
+ start_date=pendulum.datetime(2024, 1, 1, tz="UTC"),
+ schedule=None,
+ ) as dag:
+ task = EmptyOperator(task_id="empty_task")
+
+ ti = TaskInstance(task=task, run_id="test_run")
+ ti.run(ignore_ti_state=True)
+
+ assert ti.state == TaskInstanceState.SUCCESS
+
+
+Example 2: Testing using ``dag.create_dagrun()``
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+This pattern creates a full Dag run and then runs the TaskInstance
+associated with that Dag run.
+
+.. code-block:: python
+
+ import pendulum
+
+ from airflow.models.dag import DAG
+ from airflow.utils.state import TaskInstanceState
+ from airflow.operators.empty import EmptyOperator
+
+
+ def test_empty_operator_with_dagrun():
+ with DAG(
+ dag_id="test_empty_operator_dagrun",
+ start_date=pendulum.datetime(2024, 1, 1, tz="UTC"),
+ schedule=None,
+ ) as dag:
+ task = EmptyOperator(task_id="empty_task")
+
+ dagrun = dag.create_dagrun(
+ run_id="test_run",
+ state="success",
+ execution_date=pendulum.datetime(2024, 1, 1, tz="UTC"),
+ start_date=pendulum.datetime(2024, 1, 1, tz="UTC"),
+ data_interval=(
+ pendulum.datetime(2024, 1, 1, tz="UTC"),
+ pendulum.datetime(2024, 1, 1, tz="UTC"),
+ ),
+ logical_date=pendulum.datetime(2024, 1, 1, tz="UTC"),
+ )
+
+ ti = dagrun.get_task_instance("empty_task")
+ ti.run(ignore_ti_state=True)
+
+ assert ti.state == TaskInstanceState.SUCCESS
+
+
+Notes
+~~~~~
+
+* ``dag.test()`` should not be used inside pytest for Airflow 3.x because Dag
+ serialization is inactive.
+* Both examples above run completely inside pytest without requiring
+ any Dag serialization or scheduler.
+* Use mocking for external services when needed.
+
+
+
Airflow users should treat Dags as production level code, and Dags should have
various associated tests to
ensure that they produce expected results. You can write a wide variety of
tests for a Dag.
Let's take a look at some of them.