This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-1-test by this push:
     new 5279f2626c9 [v3-1-test] docs: Improve unit-testing docs with runnable 
pytest examples for Operators (#58969) (#59410)
5279f2626c9 is described below

commit 5279f2626c99357146cdc343eb0a368ad57c5500
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Sun Dec 14 13:24:54 2025 +0100

    [v3-1-test] docs: Improve unit-testing docs with runnable pytest examples 
for Operators (#58969) (#59410)
    
    * docs: Add runnable pytest examples for operator unit tests
    
    Includes:
    * DummySuccessOperator for testing
    * TaskInstance.run() example
    * dag.create_dagrun() example
    * Updated best-practices docs
    
    * chore(tests): fix ruff lint issues in example tests (unused vars, S101)
    
    * chore: add options file
    
    * chore: ignore local options file
    
    * docs: replace placeholders with full runnable pytest examples
    
    * docs: replace placeholders with full runnable pytest examples
    
    * docs: move pytest examples into docs; remove root-level tests directory
    
    * docs: add runnable pytest examples directly in docs
    
    * chore: remove stray options file
    
    * chore: remove stray options file and revert .gitignore changes
    
    * docs: move pytest operator testing examples into best-practices
    
    * docs: fix Sphinx underline length for Example 1 title
    
    * docs: update pytest examples and fix heading formatting
    
    * docs: use EmptyOperator in pytest examples and remove XCom assertions 
(maintainer feedback)
    
    * ci: trigger static checks re-run
    
    * ci: trigger static checks re-run
    
    * ci: retry docs build after Sphinx fix
    
    * chore(docs): fix trailing whitespace
    
    * Update airflow-core/docs/best-practices.rst
    
    * Update airflow-core/docs/best-practices.rst
    
    * Update airflow-core/docs/best-practices.rst
    
    * Update airflow-core/docs/best-practices.rst
    
    * Update airflow-core/docs/best-practices.rst
    
    ---------
    (cherry picked from commit 652d4ee983ff0cb0007c054f1ead43f329e12905)
    
    Co-authored-by: Shruti Singh <[email protected]>
    Co-authored-by: Jarek Potiuk <[email protected]>
---
 airflow-core/docs/best-practices.rst | 98 ++++++++++++++++++++++++++++++++++++
 1 file changed, 98 insertions(+)

diff --git a/airflow-core/docs/best-practices.rst 
b/airflow-core/docs/best-practices.rst
index 169f538b4cb..f46c231ee52 100644
--- a/airflow-core/docs/best-practices.rst
+++ b/airflow-core/docs/best-practices.rst
@@ -650,6 +650,104 @@ want to optimize your Dags there are the following 
actions you can take:
 Testing a Dag
 ^^^^^^^^^^^^^
 
+Testing Operators with pytest
+-----------------------------
+
+Below are two recommended, runnable patterns for unit-testing custom operators
+with pytest. Both examples work with Airflow 3.x.
+
+1. Using ``TaskInstance.run()``
+2. Using ``dag.create_dagrun()``
+
+Example: Using ``EmptyOperator``
+
+    from airflow.operators.empty import EmptyOperator
+
+    task = EmptyOperator(task_id="empty_task")
+
+
+Example 1: Testing using ``TaskInstance.run()``
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+This pattern constructs a Dag, creates a ``TaskInstance`` manually,
+and runs it directly.
+
+.. code-block:: python
+
+    import pendulum
+
+    from airflow.models.dag import DAG
+    from airflow.models.taskinstance import TaskInstance
+    from airflow.utils.state import TaskInstanceState
+    from airflow.operators.empty import EmptyOperator
+
+
+    def test_empty_operator_with_ti_run():
+        with DAG(
+            dag_id="test_empty_operator_ti_run",
+            start_date=pendulum.datetime(2024, 1, 1, tz="UTC"),
+            schedule=None,
+        ) as dag:
+            task = EmptyOperator(task_id="empty_task")
+
+        ti = TaskInstance(task=task, run_id="test_run")
+        ti.run(ignore_ti_state=True)
+
+        assert ti.state == TaskInstanceState.SUCCESS
+
+
+Example 2: Testing using ``dag.create_dagrun()``
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+This pattern creates a full Dag run and then runs the TaskInstance
+associated with that Dag run.
+
+.. code-block:: python
+
+    import pendulum
+
+    from airflow.models.dag import DAG
+    from airflow.utils.state import TaskInstanceState
+    from airflow.operators.empty import EmptyOperator
+
+
+    def test_empty_operator_with_dagrun():
+        with DAG(
+            dag_id="test_empty_operator_dagrun",
+            start_date=pendulum.datetime(2024, 1, 1, tz="UTC"),
+            schedule=None,
+        ) as dag:
+            task = EmptyOperator(task_id="empty_task")
+
+        dagrun = dag.create_dagrun(
+            run_id="test_run",
+            state="success",
+            execution_date=pendulum.datetime(2024, 1, 1, tz="UTC"),
+            start_date=pendulum.datetime(2024, 1, 1, tz="UTC"),
+            data_interval=(
+                pendulum.datetime(2024, 1, 1, tz="UTC"),
+                pendulum.datetime(2024, 1, 1, tz="UTC"),
+            ),
+            logical_date=pendulum.datetime(2024, 1, 1, tz="UTC"),
+        )
+
+        ti = dagrun.get_task_instance("empty_task")
+        ti.run(ignore_ti_state=True)
+
+        assert ti.state == TaskInstanceState.SUCCESS
+
+
+Notes
+~~~~~
+
+* ``dag.test()`` should not be used inside pytest for Airflow 3.x because Dag
+  serialization is inactive.
+* Both examples above run completely inside pytest without requiring
+  any Dag serialization or scheduler.
+* Use mocking for external services when needed.
+
+
+
 Airflow users should treat Dags as production level code, and Dags should have 
various associated tests to
 ensure that they produce expected results. You can write a wide variety of 
tests for a Dag.
 Let's take a look at some of them.

Reply via email to