[ 
https://issues.apache.org/jira/browse/AIRFLOW-4525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tony Brookes updated AIRFLOW-4525:
----------------------------------
    Description: 
When using the TriggerDagRunOperator there is a problem in the code which loops 
round subdags scheduling them.  You will not see this issue if you only have 
one level of sub dag, but if your sub dags have sub dags then you will see it.

The code pops an item off the list (non unique) and schedules it.  It then 
appends all sub dags of the dag it popped off the list to the current list.  It 
keeps doing this until the list is empty.

The problem is that <<level 1 dag>>.subdags returns _*all*_ subdags at all 
levels.  So when you process a level 2 sub dag and once against append all it's 
subdags you will get all it's children added _*again*_.

Up to and including 1.10.2 this is not a real problem.  You see it in the logs 
but it does not cause the operator to raise and cause cause the task to fail.  
However, in 1.10.3 this causes havoc.

Firstly, it is no longer silently swallowed.  It causes the operator to raise 
an error and so it fails.  Worse, since the statements it has executed to 
schedule is dag are NOT in the same transaction, those dags ARE triggered.  But 
the task will subsequently be retried if configured and will schedule them 
again (subsequent invocations, because they use now() as part of the key, will 
generate a different unique entry at the top level and hence cause it to be 
scheduled multiple times.

I'm not sure what the best fix is (or if it's my place to suggest one) but from 
what I've seen the cleanest approach is either to use a set, to avoid duplicate 
entries, rather than the current list based approach OR continue to use the 
list with it's "pop" semantics but keep track of items already processed and 
avoid re-appending them.

This would fix the current problem, but to be honest it feels semantically 
*_incorrect_* to trigger the sub dags in this way.  The top level dag invokes 
the sub dags as task instances like any other and you're sort of going behind 
its back this way.  Moreover, the semantic contract of the 
TriggerDagRunOperator is that it takes a single dag ID as input, implicitly 
creating the expectation that this is the only dag which will be triggered.  
Scheduling the sub dags as part of doing this feels wrong and actually creates 
an error whilst doing nothing to help the operation of the platform (unless 
there is a different configuration set up I am not thinking of which is 
entirely possible.)

But as far as I can discern, if you _*only*_ trigger the top level dag you've 
been _*asked*_ to trigger then actually, everything will work just fine.  The 
SubDagOperator which wraps the sub dags will trigger the sub dag anyway at the 
right time, based on whatever dependencies are in the top level dag (which 
might be none, in which case any sub dags will get scheduled automatically.

If there is some scenario which should still require the sub dags to be 
triggered, I think it's important this this sort of operator is atomic (or at 
the very least idempotent.)  Otherwise you can risk significant issues in a 
production environment with "over-triggering" Dags.  Even if concurrent dag 
configuration prevents them from running concurrently the list of scheduled 
dags can in theory grow forever (or to max_retries of the TriggerDagRunOperator 
task) and can cause some significant undesirable side effects.  From what I can 
see, using a transaction would perhaps be complex, but at the very least the 
dag entries should perhaps be added to the DB with _*no*_ state and then 
convert them all to RUNNING once you know they've all successfully inserted and 
any primary key issues are resolved.  The state is not part of the primary key 
so this would not cause a problem.  The worst case outcome under this approach 
occurs only if some form of DB failure between the inserts and the state update 
occurs.  This potentially means the dags never started, but I think that's a 
"better worst case" than the current situation where multiple unintended 
triggers can happen.

I have set this as a Blocker because I cannot find any way around it without 
modifying the core code myself and we, like many others I suspect, have dags 
which start with a sensor waiting for incoming data and then process it and 
trigger another instance of themselves to wait once again.

We are currently using 1.10.2 but this is a blocker for us upgrading to 1.10.3. 
 I can't find any way to stop the duplicate key errors from happening whatever 
I do unless I completely re-work my entire dag layout, which just makes them 
look highly complex and would obliterate the rather nice modular approach we've 
managed to build in our approach.

Please let me know if you need anything else.  I've never contributed to an 
Apache project and would need a little guidance and support if I were to try to 
address it myself.  I'm willing to try though as I use Airflow a lot and would 
love to give something back.  Would just need a little early pointing in the 
right direction. :)

 

  was:
When using the TriggerDagRunOperator there is a problem in the code which loops 
round subdags scheduling them.  You will not see this issue if you only have 
one level of sub dag, but if your sub dags have sub dags then you will see it.

The code pops an item off the list (non unique) and schedules it.  It then 
appends all sub dags of the dag it popped off the list to the current list.  It 
keeps doing this until the list is empty.

The problem is that <<level 1 dag>>.subdags returns ALL subdags at all levels.  
So when you process a level 2 sub dag and once against append all it's subdags 
you will get all it's children added AGAIN.

Up to and including 1.10.2 this is not a real problem.  You see it in the logs 
but it does not cause the operator to raise and cause cause the task to fail.  
However, in 1.10.3 this causes havoc.

Firstly, it is no longer silently swallowed.  It causes the operator to raise 
an error and so it fails.  Worse, since the statements it has executed to 
schedule is dag are NOT in the same transaction, those dags ARE triggered.  But 
the task will subsequently be retried if configured and will schedule them 
again (subsequent invocations, because they use now() as part of the key, will 
generate a different unique entry at the top level and hence cause it to be 
scheduled multiple times.

I'm not sure what the best fix is (or if it's my place to suggest one) but from 
what I've seen the cleanest approach is to use a set, to avoid duplicate 
entries, rather than the current list based approach.  This will fix most of 
the problem, but I think it's important this this sort of operator is truly 
atomic and that either all the rows are written to the DB or NONE are.  
Otherwise you can risk significant issues in a production environment with 
over-instantiating DAGS.  Even if concurrent DAG configuration prevents them 
from running concurrently the list of scheduled dags can in theory grow forever 
(or to max_retries of the TriggerDagRunOperator task) and can cause some 
significant undesirable side effects.

I have set this as a Blocker because I cannot find any way around it without 
modifying the core code myself and we, like many others I suspect, have DAGs 
which start with a sensor waiting for incoming data and then process it and 
trigger another instance of themselves.

We are currently using 1.10.2 but this is a blocker for us upgrading to 1.10.3.

Please let me know if you need anything else.  I've never contributed to an 
Apache project and would need a little guidance and support if I were to try to 
address it myself.  I'm willing to try though as I use Airflow a lot and would 
love to give something back.  Would just need a little early pointing in the 
right direction. :)

 


> Trigger Dag Operator causes duplicate key exceptions and can cause runaway 
> dag spawning as it is not atomic at the DB level (on Postgres at least.)
> ---------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-4525
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-4525
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: DAG, DagRun
>    Affects Versions: 1.10.2, 1.10.3
>         Environment: Mac 10.14.4.
>            Reporter: Tony Brookes
>            Priority: Blocker
>
> When using the TriggerDagRunOperator there is a problem in the code which 
> loops round subdags scheduling them.  You will not see this issue if you only 
> have one level of sub dag, but if your sub dags have sub dags then you will 
> see it.
> The code pops an item off the list (non unique) and schedules it.  It then 
> appends all sub dags of the dag it popped off the list to the current list.  
> It keeps doing this until the list is empty.
> The problem is that <<level 1 dag>>.subdags returns _*all*_ subdags at all 
> levels.  So when you process a level 2 sub dag and once against append all 
> it's subdags you will get all it's children added _*again*_.
> Up to and including 1.10.2 this is not a real problem.  You see it in the 
> logs but it does not cause the operator to raise and cause cause the task to 
> fail.  However, in 1.10.3 this causes havoc.
> Firstly, it is no longer silently swallowed.  It causes the operator to raise 
> an error and so it fails.  Worse, since the statements it has executed to 
> schedule is dag are NOT in the same transaction, those dags ARE triggered.  
> But the task will subsequently be retried if configured and will schedule 
> them again (subsequent invocations, because they use now() as part of the 
> key, will generate a different unique entry at the top level and hence cause 
> it to be scheduled multiple times.
> I'm not sure what the best fix is (or if it's my place to suggest one) but 
> from what I've seen the cleanest approach is either to use a set, to avoid 
> duplicate entries, rather than the current list based approach OR continue to 
> use the list with it's "pop" semantics but keep track of items already 
> processed and avoid re-appending them.
> This would fix the current problem, but to be honest it feels semantically 
> *_incorrect_* to trigger the sub dags in this way.  The top level dag invokes 
> the sub dags as task instances like any other and you're sort of going behind 
> its back this way.  Moreover, the semantic contract of the 
> TriggerDagRunOperator is that it takes a single dag ID as input, implicitly 
> creating the expectation that this is the only dag which will be triggered.  
> Scheduling the sub dags as part of doing this feels wrong and actually 
> creates an error whilst doing nothing to help the operation of the platform 
> (unless there is a different configuration set up I am not thinking of which 
> is entirely possible.)
> But as far as I can discern, if you _*only*_ trigger the top level dag you've 
> been _*asked*_ to trigger then actually, everything will work just fine.  The 
> SubDagOperator which wraps the sub dags will trigger the sub dag anyway at 
> the right time, based on whatever dependencies are in the top level dag 
> (which might be none, in which case any sub dags will get scheduled 
> automatically.
> If there is some scenario which should still require the sub dags to be 
> triggered, I think it's important this this sort of operator is atomic (or at 
> the very least idempotent.)  Otherwise you can risk significant issues in a 
> production environment with "over-triggering" Dags.  Even if concurrent dag 
> configuration prevents them from running concurrently the list of scheduled 
> dags can in theory grow forever (or to max_retries of the 
> TriggerDagRunOperator task) and can cause some significant undesirable side 
> effects.  From what I can see, using a transaction would perhaps be complex, 
> but at the very least the dag entries should perhaps be added to the DB with 
> _*no*_ state and then convert them all to RUNNING once you know they've all 
> successfully inserted and any primary key issues are resolved.  The state is 
> not part of the primary key so this would not cause a problem.  The worst 
> case outcome under this approach occurs only if some form of DB failure 
> between the inserts and the state update occurs.  This potentially means the 
> dags never started, but I think that's a "better worst case" than the current 
> situation where multiple unintended triggers can happen.
> I have set this as a Blocker because I cannot find any way around it without 
> modifying the core code myself and we, like many others I suspect, have dags 
> which start with a sensor waiting for incoming data and then process it and 
> trigger another instance of themselves to wait once again.
> We are currently using 1.10.2 but this is a blocker for us upgrading to 
> 1.10.3.  I can't find any way to stop the duplicate key errors from happening 
> whatever I do unless I completely re-work my entire dag layout, which just 
> makes them look highly complex and would obliterate the rather nice modular 
> approach we've managed to build in our approach.
> Please let me know if you need anything else.  I've never contributed to an 
> Apache project and would need a little guidance and support if I were to try 
> to address it myself.  I'm willing to try though as I use Airflow a lot and 
> would love to give something back.  Would just need a little early pointing 
> in the right direction. :)
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to