Feng Mao created AIRFLOW-4415:
---------------------------------

             Summary: skip status stops propagation randomly.
                 Key: AIRFLOW-4415
                 URL: https://issues.apache.org/jira/browse/AIRFLOW-4415
             Project: Apache Airflow
          Issue Type: Improvement
          Components: scheduler
    Affects Versions: 1.8.1
            Reporter: Feng Mao
             Fix For: 1.10.4


Issue: skip status stop propogation to down streams and get randomly stopped 
with the dag status marked as failed.

The issue is located in the version 1.8.1.

In version 1.8.0 there is a temp fix but removed after this version.

https://github.com/apache/airflow/commit/4077c6de297566a4c598065867a9a27324ae6eb1

https://github.com/apache/airflow/commit/92965e8275c6f2ec2282ad46c09950bab10c1cb2

 

root casue:

  In a loop, the scheduler evaluate each dag and all its task dependcies around 
by around.

  Each round evaluation happens twice in the context of flag_upstream_failed = 
false and true.

 

  The dag run update method mark the dag run deadlocked which stops the dag and 
all its tasks from be processed furture.

  https://github.com/apache/airflow/blob/1.8.1/airflow/models.py#L4184

  It is due to in no_dependencies_met.  All_sccucess trigger rule misses 
skipped status check and marks the task as failed when upstream only has 
skipped tasks.

  https://github.com/apache/airflow/blob/1.8.1/airflow/models.py#L4152

  
https://github.com/apache/airflow/blob/1.8.1/airflow/ti_deps/deps/trigger_rule_dep.py#L165

 

  Each dag update will checks all its task deps and sent ready tasks to run in 
the context of flag_upstream_failed=false (defalt)

  https://github.com/apache/airflow/blob/1.8.1/airflow/models.py#L4156   which 
wont handle skip status propogation.

 

  After dag update, dag will checks all its task deps and sent ready tasks to 
run in the context of flag_upstream_failed=true

  https://github.com/apache/airflow/blob/1.8.1/airflow/jobs.py#L904

  which handles skip status propogration.

  
https://github.com/apache/airflow/blob/1.8.1/airflow/ti_deps/deps/trigger_rule_dep.py#L138

 

  Two potential causes that will trigger dag update detect a deadlock.

  The skip status proprogatation rely on detected skipped upstreams (which 
happens asyncly by other nodes writing to db).

  If the tasks been evaluated  are not following topoloy order(random order) by 
priority_weigth. It requried multipe loop rounds to propogate skip statue to 
all downsteam tasks.

  Depending on how close the topoloy order the tasks fetched, the proprogation 
may go further or shorter.

 

  The deadlock detetion can be avoid only the following  conditions happen at 
the same time:

  1. the skip task (shortcurit operation async process) update db with skipped 
task status, right after dag update (flag_upstream_failed=false )before dag 
task checks(flag_upstream_failed=true) in scheduler process.

  2. dag checks(flag_upstream_failed=true) have all tasks fectch/evaluated in 
the topology order that skip status can propogate in one evaluations round.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to