As part of my performance testing work I created a simple driver script, one of
the steps it did was to "reset" the DB (at least for a given dag) so that the
code was operating on the same rows in the same state each time -- I ran all my
tests about 10 times to get mean and variance.
My "reset" function looks like this:
--
def reset_dag(dag, num_runs=NUM_RUNS):
DR = airflow.models.DagRun
TI = airflow.models.TaskInstance
TF = airflow.models.TaskFail
dag_id = dag.dag_id
with create_session() as session:
dag.sync_to_db(session=session)
session.query(DR).filter(DR.dag_id == dag_id).delete()
session.query(TI).filter(TI.dag_id == dag_id).delete()
session.query(TF).filter(TF.dag_id == dag_id).delete()
airflow.models.DagModel.get_dagmodel(dag.dag_id).set_is_paused(is_paused=False)
next_run_date = dag.normalize_schedule(dag.start_date or
min(t.start_date for t in dag.tasks))
for _ in range(num_runs):
next_run = dag.create_dagrun(
run_id=DR.ID_PREFIX + next_run_date.isoformat(),
execution_date=next_run_date,
start_date=timezone.utcnow(),
state=state.State.RUNNING,
external_trigger=False
)
next_run_date = dag.following_schedule(next_run_date)
return next_run
--
This works well enough (at least for the case where I only want N dag runs
created), but it has a couple of limitations:
- It's *slow*. 54% of the test runtime is from resetting between tests (not
included in measurement, just affects me waiting)
- It's not that flexible; for instance if I wanted to put some task instances
in certain states it's not easy.
Its main advantage is that it uses our model code, so is less likely to get out
of date as the model evolves over time.
Does anyone have an opinions about test fixtures (good or bad), espeically with
the somewhat complex interaction we have between DAG<->DagRun<->TI<->TaskFail
etc (without any explicit FKs) and recommendations for modules to use or avoid?
-ash