potiuk commented on code in PR #58969:
URL: https://github.com/apache/airflow/pull/58969#discussion_r2616930425


##########
airflow-core/docs/best-practices.rst:
##########
@@ -650,6 +650,104 @@ want to optimize your Dags there are the following 
actions you can take:
 Testing a Dag
 ^^^^^^^^^^^^^
 
+Testing Operators with pytest
+-----------------------------
+
+Below are two recommended, runnable patterns for unit-testing custom operators
+with pytest. Both examples work with Airflow 3.x.
+
+1. Using ``TaskInstance.run()``
+2. Using ``dag.create_dagrun()``
+
+Example: Using ``EmptyOperator``
+
+    from airflow.operators.empty import EmptyOperator
+
+    task = EmptyOperator(task_id="empty_task")
+
+
+Example 1: Testing using ``TaskInstance.run()``
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+This pattern constructs a DAG, creates a ``TaskInstance`` manually,
+and runs it directly.
+
+.. code-block:: python
+
+    import pendulum
+
+    from airflow.models.dag import DAG
+    from airflow.models.taskinstance import TaskInstance
+    from airflow.utils.state import TaskInstanceState
+    from airflow.operators.empty import EmptyOperator
+
+
+    def test_empty_operator_with_ti_run():
+        with DAG(
+            dag_id="test_empty_operator_ti_run",
+            start_date=pendulum.datetime(2024, 1, 1, tz="UTC"),
+            schedule=None,
+        ) as dag:
+            task = EmptyOperator(task_id="empty_task")
+
+        ti = TaskInstance(task=task, run_id="test_run")
+        ti.run(ignore_ti_state=True)
+
+        assert ti.state == TaskInstanceState.SUCCESS
+
+
+Example 2: Testing using ``dag.create_dagrun()``
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+This pattern creates a full DAG run and then runs the TaskInstance

Review Comment:
   ```suggestion
   This pattern creates a full Dag run and then runs the TaskInstance
   ```



##########
airflow-core/docs/best-practices.rst:
##########
@@ -650,6 +650,104 @@ want to optimize your Dags there are the following 
actions you can take:
 Testing a Dag
 ^^^^^^^^^^^^^
 
+Testing Operators with pytest
+-----------------------------
+
+Below are two recommended, runnable patterns for unit-testing custom operators
+with pytest. Both examples work with Airflow 3.x.
+
+1. Using ``TaskInstance.run()``
+2. Using ``dag.create_dagrun()``
+
+Example: Using ``EmptyOperator``
+
+    from airflow.operators.empty import EmptyOperator
+
+    task = EmptyOperator(task_id="empty_task")
+
+
+Example 1: Testing using ``TaskInstance.run()``
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+This pattern constructs a DAG, creates a ``TaskInstance`` manually,
+and runs it directly.
+
+.. code-block:: python
+
+    import pendulum
+
+    from airflow.models.dag import DAG
+    from airflow.models.taskinstance import TaskInstance
+    from airflow.utils.state import TaskInstanceState
+    from airflow.operators.empty import EmptyOperator
+
+
+    def test_empty_operator_with_ti_run():
+        with DAG(
+            dag_id="test_empty_operator_ti_run",
+            start_date=pendulum.datetime(2024, 1, 1, tz="UTC"),
+            schedule=None,
+        ) as dag:
+            task = EmptyOperator(task_id="empty_task")
+
+        ti = TaskInstance(task=task, run_id="test_run")
+        ti.run(ignore_ti_state=True)
+
+        assert ti.state == TaskInstanceState.SUCCESS
+
+
+Example 2: Testing using ``dag.create_dagrun()``
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+This pattern creates a full DAG run and then runs the TaskInstance
+associated with that DAG run.
+
+.. code-block:: python
+
+    import pendulum
+
+    from airflow.models.dag import DAG
+    from airflow.utils.state import TaskInstanceState
+    from airflow.operators.empty import EmptyOperator
+
+
+    def test_empty_operator_with_dagrun():
+        with DAG(
+            dag_id="test_empty_operator_dagrun",
+            start_date=pendulum.datetime(2024, 1, 1, tz="UTC"),
+            schedule=None,
+        ) as dag:
+            task = EmptyOperator(task_id="empty_task")
+
+        dagrun = dag.create_dagrun(
+            run_id="test_run",
+            state="success",
+            execution_date=pendulum.datetime(2024, 1, 1, tz="UTC"),
+            start_date=pendulum.datetime(2024, 1, 1, tz="UTC"),
+            data_interval=(
+                pendulum.datetime(2024, 1, 1, tz="UTC"),
+                pendulum.datetime(2024, 1, 1, tz="UTC"),
+            ),
+            logical_date=pendulum.datetime(2024, 1, 1, tz="UTC"),
+        )
+
+        ti = dagrun.get_task_instance("empty_task")
+        ti.run(ignore_ti_state=True)
+
+        assert ti.state == TaskInstanceState.SUCCESS
+
+
+Notes
+~~~~~
+
+* ``dag.test()`` should not be used inside pytest for Airflow 3.x because DAG
+  serialization is inactive.
+* Both examples above run completely inside pytest without requiring
+  any DAG serialization or scheduler.

Review Comment:
   ```suggestion
     any Dag serialization or scheduler.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to