Tony Brookes created AIRFLOW-4525:
-------------------------------------
Summary: Trigger Dag Operator causes duplicate key exceptions and
can cause runaway dag spawning as it is not atomic at the DB level (on Postgres
at least.)
Key: AIRFLOW-4525
URL: https://issues.apache.org/jira/browse/AIRFLOW-4525
Project: Apache Airflow
Issue Type: Bug
Components: DAG, DagRun
Affects Versions: 1.10.3, 1.10.2
Environment: Mac 10.14.4.
Reporter: Tony Brookes
When using the TriggerDagRunOperator there is a problem in the code which loops
round subdags scheduling them. You will not see this issue if you only have
one level of sub dag, but if your sub dags have sub dags then you will see it.
The code pops an item off the list (non unique) and schedules it. It then
appends all sub dags of the dag it popped off the list to the current list. It
keeps doing this until the list is empty.
The problem is that <<level 1 dag>>.subdags returns ALL subdags at all levels.
So when you process a level 2 sub dag and once against append all it's subdags
you will get all it's children added AGAIN.
Up to and including 1.10.2 this is not a real problem. You see it in the logs
but it does not cause the operator to raise and cause cause the task to fail.
However, in 1.10.3 this causes havoc.
Firstly, it is no longer silently swallowed. It causes the operator to raise
an error and so it fails. Worse, since the statements it has executed to
schedule is dag are NOT in the same transaction, those dags ARE triggered. But
the task will subsequently be retried if configured and will schedule them
again (subsequent invocations, because they use now() as part of the key, will
generate a different unique entry at the top level and hence cause it to be
scheduled multiple times.
I'm not sure what the best fix is (or if it's my place to suggest one) but from
what I've seen the cleanest approach is to use a set, to avoid duplicate
entries, rather than the current list based approach. This will fix most of
the problem, but I think it's important this this sort of operator is truly
atomic and that either all the rows are written to the DB or NONE are.
Otherwise you can risk significant issues in a production environment with
over-instantiating DAGS. Even if concurrent DAG configuration prevents them
from running concurrently the list of scheduled dags can in theory grow forever
(or to max_retries of the TriggerDagRunOperator task) and can cause some
significant undesirable side effects.
I have set this as a Blocker because I cannot find any way around it without
modifying the core code myself and we, like many others I suspect, have DAGs
which start with a sensor waiting for incoming data and then process it and
trigger another instance of themselves.
We are currently using 1.10.2 but this is a blocker for us upgrading to 1.10.3.
Please let me know if you need anything else. I've never contributed to an
Apache project and would need a little guidance and support if I were to try to
address it myself. I'm willing to try though as I use Airflow a lot and would
love to give something back. Would just need a little early pointing in the
right direction. :)
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)