[ 
https://issues.apache.org/jira/browse/AIRFLOW-1428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Conrad Lee updated AIRFLOW-1428:
--------------------------------
    Description: 
In controlling which tasks are executed in a DagRun, it's common for tasks to 
skip themselves, e.g., by raising an AirflowSkipException.  One controls how 
skips propagate using trigger rules.    It is currently unclear to me how to 
propagate skipped states without causing the DagRun to deadlock.

Consider the following simple example

{code}
def raise_skip():
    raise AirflowSkipException

skip_op = PythonOperator(
    task_id='skip_op',
    python_callable=raise_skip,
    dag=dag)

skipped_child_op = DummyOperator(task_id='skipped_child_op', dag=dag)

skip_op.set_downstream(skipped_child_op)
{code}

When I run the code above, the DagRun deadlocks.  I have dug into why:
* The deadlock is detected by DagRun.update_state 
[here](https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L4290-L4293).
  
* That's raised because {code} no_dependencies_met {code} is True, when it 
should be False
* no_dependencies_met is True because when you call 
`skipped_child_op.get_failed_dep_statuses()`, it returns ` 
[TIDepStatus(dep_name='Trigger Rule', passed=False, reason="Task's trigger rule 
'all_success' requires all upstream tasks to have succeeded, but found 1 
non-success(es). upstream_tasks_state={'successes': 0, 'skipped': 1, 'failed': 
0, 'upstream_failed': 0, 'done': 1}, 
upstream_task_ids=['metas_update_skipper']")]`

So basically, because `skipped_child_op`'s parent is skipped, it is considered 
failed.  I have looked for a trigger rule that would cause skipped parents to 
be not failed, but that doesn't seem to exist.  I don't want to use `ALL_DONE` 
because I want `skipped_child_op` to be skipped.  Thus, there seems to be no 
way for me to properly implement this very simply DAG.

It seems that the airflow community has gone back and forth on how to handle 
skipped tasks--see 
[AIRFLOW-983](https://issues.apache.org/jira/browse/AIRFLOW-983), 
[AIRFLOW-992](https://issues.apache.org/jira/browse/AIRFLOW-992), and 
[AIRFLOW-719](https://issues.apache.org/jira/browse/AIRFLOW-719).  There seems 
to be support for adding a trigger rule called `ALL_SUCCESS_OR_SKIPPED`, but 
nobody has implemented it.

  was:
In controlling which tasks are executed in a DagRun, it's common for tasks to 
skip themselves, e.g., by raising an AirflowSkipException.  One controls how 
skips propagate using trigger rules.    It is currently unclear to me how to 
propagate skipped states without causing the DagRun to deadlock.

Consider the following simple example

{code}
def raise_skip():
    raise AirflowSkipException

skip_op = PythonOperator(
    task_id='skip_op',
    python_callable=raise_skip,
    dag=dag)

skipped_child_op = DummyOperator(task_id='skipped_child_op', dag=dag)

skip_op.set_downstream(skipped_child_op)
{code}

When I run the code above, the DagRun deadlocks.  I have dug into why:
* The deadlock is detected by DagRun.update_state 
[here](https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L4290-L4293).
  
* That's raised because `no_dependencies_met` is `True`, when it should be 
`False`
* no_dependencies_met is True because when you call 
`skipped_child_op.get_failed_dep_statuses()`, it returns ` 
[TIDepStatus(dep_name='Trigger Rule', passed=False, reason="Task's trigger rule 
'all_success' requires all upstream tasks to have succeeded, but found 1 
non-success(es). upstream_tasks_state={'successes': 0, 'skipped': 1, 'failed': 
0, 'upstream_failed': 0, 'done': 1}, 
upstream_task_ids=['metas_update_skipper']")]`

So basically, because `skipped_child_op`'s parent is skipped, it is considered 
failed.  I have looked for a trigger rule that would cause skipped parents to 
be not failed, but that doesn't seem to exist.  I don't want to use `ALL_DONE` 
because I want `skipped_child_op` to be skipped.  Thus, there seems to be no 
way for me to properly implement this very simply DAG.

It seems that the airflow community has gone back and forth on how to handle 
skipped tasks--see 
[AIRFLOW-983](https://issues.apache.org/jira/browse/AIRFLOW-983), 
[AIRFLOW-992](https://issues.apache.org/jira/browse/AIRFLOW-992), and 
[AIRFLOW-719](https://issues.apache.org/jira/browse/AIRFLOW-719).  There seems 
to be support for adding a trigger rule called `ALL_SUCCESS_OR_SKIPPED`, but 
nobody has implemented it.


> DagRun deadlocks when all tasks' dependencies have skipped state
> ----------------------------------------------------------------
>
>                 Key: AIRFLOW-1428
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-1428
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: DagRun, dependencies
>    Affects Versions: 1.8.2
>         Environment: LocalExecutor with postgres
>            Reporter: Conrad Lee
>            Assignee: Bolke de Bruin
>            Priority: Critical
>
> In controlling which tasks are executed in a DagRun, it's common for tasks to 
> skip themselves, e.g., by raising an AirflowSkipException.  One controls how 
> skips propagate using trigger rules.    It is currently unclear to me how to 
> propagate skipped states without causing the DagRun to deadlock.
> Consider the following simple example
> {code}
> def raise_skip():
>     raise AirflowSkipException
> skip_op = PythonOperator(
>     task_id='skip_op',
>     python_callable=raise_skip,
>     dag=dag)
> skipped_child_op = DummyOperator(task_id='skipped_child_op', dag=dag)
> skip_op.set_downstream(skipped_child_op)
> {code}
> When I run the code above, the DagRun deadlocks.  I have dug into why:
> * The deadlock is detected by DagRun.update_state 
> [here](https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L4290-L4293).
>   
> * That's raised because {code} no_dependencies_met {code} is True, when it 
> should be False
> * no_dependencies_met is True because when you call 
> `skipped_child_op.get_failed_dep_statuses()`, it returns ` 
> [TIDepStatus(dep_name='Trigger Rule', passed=False, reason="Task's trigger 
> rule 'all_success' requires all upstream tasks to have succeeded, but found 1 
> non-success(es). upstream_tasks_state={'successes': 0, 'skipped': 1, 
> 'failed': 0, 'upstream_failed': 0, 'done': 1}, 
> upstream_task_ids=['metas_update_skipper']")]`
> So basically, because `skipped_child_op`'s parent is skipped, it is 
> considered failed.  I have looked for a trigger rule that would cause skipped 
> parents to be not failed, but that doesn't seem to exist.  I don't want to 
> use `ALL_DONE` because I want `skipped_child_op` to be skipped.  Thus, there 
> seems to be no way for me to properly implement this very simply DAG.
> It seems that the airflow community has gone back and forth on how to handle 
> skipped tasks--see 
> [AIRFLOW-983](https://issues.apache.org/jira/browse/AIRFLOW-983), 
> [AIRFLOW-992](https://issues.apache.org/jira/browse/AIRFLOW-992), and 
> [AIRFLOW-719](https://issues.apache.org/jira/browse/AIRFLOW-719).  There 
> seems to be support for adding a trigger rule called 
> `ALL_SUCCESS_OR_SKIPPED`, but nobody has implemented it.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to