[
https://issues.apache.org/jira/browse/AIRFLOW-4525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
RScodes reassigned AIRFLOW-4525:
--------------------------------
Assignee: RScodes
> Trigger Dag Operator causes duplicate key exceptions and can cause runaway
> dag spawning as it is not atomic at the DB level (on Postgres at least.)
> ---------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: AIRFLOW-4525
> URL: https://issues.apache.org/jira/browse/AIRFLOW-4525
> Project: Apache Airflow
> Issue Type: Bug
> Components: DAG, DagRun
> Affects Versions: 1.10.3
> Reporter: Tony Brookes
> Assignee: RScodes
> Priority: Blocker
>
> When using the TriggerDagRunOperator there is a problem in the code which
> loops round subdags scheduling them. You will not see this issue if you only
> have one level of sub dag, but if your sub dags have sub dags then you will
> see it.
> The code pops an item off the list (non unique) and schedules it. It then
> appends all sub dags of the dag it popped off the list to the current list.
> It keeps doing this until the list is empty.
> The problem is that <<top level dag>>.subdags returns _*all*_ subdags at
> _*all*_ levels. So when you process a <<top level dag.first level subdag>>
> it calls <<first level subdag>>.subdags and once agains this will append all
> its subdags, _*which are already in the list*_. Thus you are now certain you
> will get a duplicate key exception as the same dag ID and run ID are present
> twice.
> Up to and including 1.10.2 this is not a significant problem most of the
> time. You see the duplicate key errors in the logs but it does not cause the
> operator to raise and hence the task actually succeeds. That said, you do
> get a load of "running" sub dags in the console which never really do
> anything as they aren't invoked from the parent dag when it wants them to run
> and hence have no "task instance" connection to that dag.
> *+However, in 1.10.3 this causes havoc.+*
> Firstly, it no longer exits cleanly. It causes the operator to raise an
> error and so it fails. Worse, since the statements it has executed to
> schedule is dag are _*not*_ in the same transaction, all the dags before the
> first duplicate _*are triggered*_. But since the task will subsequently be
> retried (if configured) _*they will be triggered again.*_ Because the logic
> to generate the run ID use now() as part of the key they generate, subsequent
> invocations will have a different run ID and hence will cause all the dags
> before the first duplicate exception to be scheduled repeatedly, up to the
> maximum retry limit. You still get all the orphaned sub dag entries I
> mentioned from 10.2, but you get many many copies of them.
> I'm not sure what the best fix is (or if it's my place to suggest one) but
> from what I've seen the cleanest approach is either to use a set, to avoid
> duplicate entries, rather than the current list based approach OR continue to
> use the list with it's "pop" semantics but keep track of items already
> processed and avoid re-appending them.
> This would fix the current problem, but to be honest it feels semantically
> *_incorrect_* to trigger the sub dags in this way. The top level dag invokes
> the sub dags as task instances like any other and you're going behind its
> back invoking them this way. Moreover, the semantic contract of the
> TriggerDagRunOperator is that it takes a single dag ID as input, implicitly
> creating the expectation that this is the _*only dag which will be
> triggered.*_ Scheduling the sub dags as part of doing this feels wrong and
> actually creates an error whilst doing nothing to help the operation of the
> platform (unless there is a different configuration set up I am not thinking
> of which is entirely possible.)
> But as far as I can discern, if you _*only*_ trigger the top level dag you've
> been _*asked*_ to trigger then actually, everything will work just fine. The
> SubDagOperator which wraps the sub dags will trigger the sub dag anyway at
> the right time, based on whatever dependencies are in the top level dag
> (which might be none, in which case any sub dags will get scheduled
> automatically. The reason I know this of course is that the first time you
> trigger the top level DAG in the UI, only one row is written to the dag_run
> table, only the top level dag is triggered, and yet, it works just fine...
> If there is some scenario which should still require the sub dags to be
> triggered, I think it's important this this sort of operator is atomic (or at
> the very least idempotent.) Otherwise you can risk significant issues in a
> production environment with "over-triggering" Dags. Even if concurrent dag
> configuration prevents them from running concurrently the list of scheduled
> dags can in theory grow forever (or to max_retries of the
> TriggerDagRunOperator task) and can cause some significant undesirable side
> effects. From what I can see, using a transaction would perhaps be complex
> (and not cross platform friendly), but at the very least the dag entries
> should perhaps be added to the DB with _*no*_ state and then convert them all
> to RUNNING once you know they've all successfully inserted and any primary
> key issues are resolved. The state is not part of the primary key so this
> would not cause a problem. The worst case outcome under this approach occurs
> only if some form of DB failure between the inserts and the state update
> occurs. This potentially means the dags never started, but I think that's a
> "better worst case" than the current situation where multiple unintended
> triggers can happen.
> I have set this as a Blocker because I cannot find any way around it without
> modifying the core code myself and we, like many others I suspect, have dags
> which start with a sensor waiting for incoming data and then process it and
> trigger another instance of themselves to wait once again.
> We are currently using 1.10.2 but this is a blocker for us upgrading to
> 1.10.3. I can't find any way to stop the duplicate key errors from happening
> whatever I do unless I completely re-work my entire dag layout, which just
> makes them look highly complex and would obliterate the nice modular approach
> we've managed to build in our platform (and indeed which the Airflow platform
> encourages.)
> Please let me know if you need anything else. I've never contributed to an
> Apache project and would need a little guidance and support if I were to try
> to address it myself. I'm willing to try though as I use Airflow a lot and
> would love to give something back. Would just need a little early pointing
> in the right direction. :)
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)