GitHub user GlenboLake created a discussion: How to efficiently test my DAGs on
a per-task basis in Airflow 3?
I am in the process of upgrading DAGs from Airflow 2.10 to 3.x. For each of my
DAGs, I tend to have one test for each task that checks the output. A test for
a PythonOperator may check that the correct XComs were set, and a
SQLExecuteQueryOperator will usually check the final contents of the output
table. An example DAG and test might look something like this:
```python
# my_dag.py
with DAG(dag_id='example') as dag:
start = S3ListOperator(
task_id="check_s3",
bucket='data',
prefix="incoming/{{ logical_date.year }}/{{ logical_date.month }}/",
delimiter='/'
)
load = SQLExecuteQueryOperator(
task_id='load',
conn_id='redshift',
sql=[
"templates/create_target_table.sql",
"COPY TO {{ params.target_table }} FROM '{{
ti.xcom_pull(task_ids='check_s3') }}"
],
params={"target_table": "temp.loaded_data"}
)
process = SQLExecuteQueryOperator(
task_id="process",
conn_id="redshift",
sql="INSERT INTO {{ params.prod_table }} SELECT * FROM {{
params.new_table }} WHERE some_condition = TRUE",
params={
"new_table": "temp.loaded_data",
"prod_table": "final.example",
}
)
start >> load >> process
```
```python
# test_my_dag.py
from dags.my_dag import dag
from pendulum import UTC, DateTime
logical_date = DateTime(2026, 3, 19, tzinfo=UTC)
@pytest.fixture
def setup_process(db_hook: PostgresHook):
db_hook.run('CREATE TABLE temp.loaded_data ...')
data = [
# fake data as result of load task
]
db_hook.insert_rows("temp.loaded_data", data)
existing_data = [
# data that may already exist in final.example
]
db_hook.insert_rows("final.example", existing_data)
yield
db_hook.run("DROP TABLE IF EXISTS temp.loaded_data, final.example")
@pytest.mark.usefixtures("setup_process")
@provide_session
def test_process(db_hook, session):
run = dag.create_dagrun(state=DagRunState.RUNNING,
execution_date=logical_date)
ti = TaskInstance(
task=dag.get_task("process"),
run_id=run.run_id,
)
session.merge(ti)
ti.run(ignore_all_deps=True)
result = db_hook.get_records("SELECT * FROM final.example")
expected = [...]
assert result == expected
```
I have several DAGs, amounting to over 1000 tests (including
pytest.mark.parametrize) with about 700 instances of `ti.run` as above. This
takes about 7 minutes to run in Airflow 2.10.5. However, my attempts to migrate
to Airflow 3 have resulted in the test suite taking a full 50 minutes now. I
can only conclude that I have done something wrong.
I can share the helper functions that I've written for Airflow 3 if requested,
but what I'd really like to know is: Is there a preferred or correct way to
test individual tasks? If possible, I want to try to avoid fully mocking out
functions like `ti.xcom_pull` (I want the test to fail if I pass an incorrect
argument).
I am aware that the easiest thing would be for me to test my functions and SQL
templates directly. However, a large part of that is rendering templates, which
requires a template context; is there a test-compatible version of Context
available, or would I have to spin up my own?
GitHub link: https://github.com/apache/airflow/discussions/63941
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]