[ 
https://issues.apache.org/jira/browse/AIRFLOW-4525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

RScodes reassigned AIRFLOW-4525:
--------------------------------

    Assignee: RScodes

> Trigger Dag Operator causes duplicate key exceptions and can cause runaway 
> dag spawning as it is not atomic at the DB level (on Postgres at least.)
> ---------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-4525
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-4525
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: DAG, DagRun
>    Affects Versions: 1.10.3
>            Reporter: Tony Brookes
>            Assignee: RScodes
>            Priority: Blocker
>
> When using the TriggerDagRunOperator there is a problem in the code which 
> loops round subdags scheduling them.  You will not see this issue if you only 
> have one level of sub dag, but if your sub dags have sub dags then you will 
> see it.
> The code pops an item off the list (non unique) and schedules it.  It then 
> appends all sub dags of the dag it popped off the list to the current list.  
> It keeps doing this until the list is empty.
> The problem is that <<top level dag>>.subdags returns _*all*_ subdags at 
> _*all*_ levels.  So when you process a <<top level dag.first level subdag>> 
> it calls <<first level subdag>>.subdags and once agains this will append all 
> its subdags, _*which are already in the list*_.  Thus you are now certain you 
> will get a duplicate key exception as the same dag ID and run ID are present 
> twice.
> Up to and including 1.10.2 this is not a significant problem most of the 
> time.  You see the duplicate key errors in the logs but it does not cause the 
> operator to raise and hence the task actually succeeds.  That said, you do 
> get a load of "running" sub dags in the console which never really do 
> anything as they aren't invoked from the parent dag when it wants them to run 
> and hence have no "task instance" connection to that dag.
> *+However, in 1.10.3 this causes havoc.+*
> Firstly, it no longer exits cleanly.  It causes the operator to raise an 
> error and so it fails.  Worse, since the statements it has executed to 
> schedule is dag are _*not*_ in the same transaction, all the dags before the 
> first duplicate _*are triggered*_.  But since the task will subsequently be 
> retried (if configured) _*they will be triggered again.*_  Because the logic 
> to generate the run ID use now() as part of the key they generate, subsequent 
> invocations will have a different run ID and hence will cause all the dags 
> before the first duplicate exception to be scheduled repeatedly, up to the 
> maximum retry limit.  You still get all the orphaned sub dag entries I 
> mentioned from 10.2, but you get many many copies of them.
> I'm not sure what the best fix is (or if it's my place to suggest one) but 
> from what I've seen the cleanest approach is either to use a set, to avoid 
> duplicate entries, rather than the current list based approach OR continue to 
> use the list with it's "pop" semantics but keep track of items already 
> processed and avoid re-appending them.
> This would fix the current problem, but to be honest it feels semantically 
> *_incorrect_* to trigger the sub dags in this way.  The top level dag invokes 
> the sub dags as task instances like any other and you're going behind its 
> back invoking them this way.  Moreover, the semantic contract of the 
> TriggerDagRunOperator is that it takes a single dag ID as input, implicitly 
> creating the expectation that this is the _*only dag which will be 
> triggered.*_  Scheduling the sub dags as part of doing this feels wrong and 
> actually creates an error whilst doing nothing to help the operation of the 
> platform (unless there is a different configuration set up I am not thinking 
> of which is entirely possible.)
> But as far as I can discern, if you _*only*_ trigger the top level dag you've 
> been _*asked*_ to trigger then actually, everything will work just fine.  The 
> SubDagOperator which wraps the sub dags will trigger the sub dag anyway at 
> the right time, based on whatever dependencies are in the top level dag 
> (which might be none, in which case any sub dags will get scheduled 
> automatically.  The reason I know this of course is that the first time you 
> trigger the top level DAG in the UI, only one row is written to the dag_run 
> table, only the top level dag is triggered, and yet, it works just fine...
> If there is some scenario which should still require the sub dags to be 
> triggered, I think it's important this this sort of operator is atomic (or at 
> the very least idempotent.)  Otherwise you can risk significant issues in a 
> production environment with "over-triggering" Dags.  Even if concurrent dag 
> configuration prevents them from running concurrently the list of scheduled 
> dags can in theory grow forever (or to max_retries of the 
> TriggerDagRunOperator task) and can cause some significant undesirable side 
> effects.  From what I can see, using a transaction would perhaps be complex 
> (and not cross platform friendly), but at the very least the dag entries 
> should perhaps be added to the DB with _*no*_ state and then convert them all 
> to RUNNING once you know they've all successfully inserted and any primary 
> key issues are resolved.  The state is not part of the primary key so this 
> would not cause a problem.  The worst case outcome under this approach occurs 
> only if some form of DB failure between the inserts and the state update 
> occurs.  This potentially means the dags never started, but I think that's a 
> "better worst case" than the current situation where multiple unintended 
> triggers can happen.
> I have set this as a Blocker because I cannot find any way around it without 
> modifying the core code myself and we, like many others I suspect, have dags 
> which start with a sensor waiting for incoming data and then process it and 
> trigger another instance of themselves to wait once again.
> We are currently using 1.10.2 but this is a blocker for us upgrading to 
> 1.10.3.  I can't find any way to stop the duplicate key errors from happening 
> whatever I do unless I completely re-work my entire dag layout, which just 
> makes them look highly complex and would obliterate the nice modular approach 
> we've managed to build in our platform (and indeed which the Airflow platform 
> encourages.)
> Please let me know if you need anything else.  I've never contributed to an 
> Apache project and would need a little guidance and support if I were to try 
> to address it myself.  I'm willing to try though as I use Airflow a lot and 
> would love to give something back.  Would just need a little early pointing 
> in the right direction. :)
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to