GitHub user GlenboLake created a discussion: How to efficiently test my DAGs on 
a per-task basis in Airflow 3?

I am in the process of upgrading DAGs from Airflow 2.10 to 3.x. For each of my 
DAGs, I tend to have one test for each task that checks the output. A test for 
a PythonOperator may check that the correct XComs were set, and a 
SQLExecuteQueryOperator will usually check the final contents of the output 
table. An example DAG and test might look something like this:

```python
# my_dag.py
with DAG(dag_id='example') as dag:
    start = S3ListOperator(
        task_id="check_s3",
        bucket='data',
        prefix="incoming/{{ logical_date.year }}/{{ logical_date.month }}/",
        delimiter='/'
    )
    load = SQLExecuteQueryOperator(
        task_id='load',
        conn_id='redshift',
        sql=[
            "templates/create_target_table.sql",
            "COPY TO {{ params.target_table }} FROM '{{ 
ti.xcom_pull(task_ids='check_s3') }}"
        ],
        params={"target_table": "temp.loaded_data"}
    )
    process = SQLExecuteQueryOperator(
        task_id="process",
        conn_id="redshift",
        sql="INSERT INTO {{ params.prod_table }} SELECT * FROM {{ 
params.new_table }} WHERE some_condition = TRUE",
        params={
            "new_table": "temp.loaded_data",
            "prod_table": "final.example",
        }
    )
    
    start >> load >> process
```

```python
# test_my_dag.py
from dags.my_dag import dag
from pendulum import UTC, DateTime

logical_date = DateTime(2026, 3, 19, tzinfo=UTC)

@pytest.fixture
def setup_process(db_hook: PostgresHook):
    db_hook.run('CREATE TABLE temp.loaded_data ...')
    data = [
        # fake data as result of load task
    ]
    db_hook.insert_rows("temp.loaded_data", data)
    existing_data = [
        # data that may already exist in final.example
    ]
    db_hook.insert_rows("final.example", existing_data)
    yield
    db_hook.run("DROP TABLE IF EXISTS temp.loaded_data, final.example")
    
@pytest.mark.usefixtures("setup_process")
@provide_session
def test_process(db_hook, session):
    run = dag.create_dagrun(state=DagRunState.RUNNING, 
execution_date=logical_date)
    ti = TaskInstance(
        task=dag.get_task("process"),
        run_id=run.run_id,
    )
    session.merge(ti)
    ti.run(ignore_all_deps=True)
    
    result = db_hook.get_records("SELECT * FROM final.example")
    expected = [...]
    assert result == expected
```

I have several DAGs, amounting to over 1000 tests (including 
pytest.mark.parametrize) with about 700 instances of `ti.run` as above. This 
takes about 7 minutes to run in Airflow 2.10.5. However, my attempts to migrate 
to Airflow 3 have resulted in the test suite taking a full 50 minutes now. I 
can only conclude that I have done something wrong.

I can share the helper functions that I've written for Airflow 3 if requested, 
but what I'd really like to know is: Is there a preferred or correct way to 
test individual tasks? If possible, I want to try to avoid fully mocking out 
functions like `ti.xcom_pull` (I want the test to fail if I pass an incorrect 
argument).

I am aware that the easiest thing would be for me to test my functions and SQL 
templates directly. However, a large part of that is rendering templates, which 
requires a template context; is there a test-compatible version of Context 
available, or would I have to spin up my own?

GitHub link: https://github.com/apache/airflow/discussions/63941

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to