Hi folks,

So it turns out in the celeryexecutor case the dag deadlocks.

The last log from the dag run is:

```
Deadlock; marking run <DagRun example_dataops_weekly_reviews @ 2017-06-01
23:24:07.054844: manual__2017-06-01T23:24:07.054844, externally triggered:
True> failed
```

which is over her:

https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L4250-L4253

and ultimately its the no_dependencies_met variable which is true in this
case for celery executor.

I am not clear why this would be an executor specific issue.

Best Regards,
Ali

On Sat, May 6, 2017 at 7:22 PM, Ali Naqvi <[email protected]> wrote:

> Hi folks,
> I have a dag that does propagate failures correctly in sequential executor:
>
> https://www.dropbox.com/s/zh0quoj99e44qxh/Screenshot%
> 202017-05-06%2012.54.10.png?dl=0
>
> but does not propagate failures when using celery executor:
>
> https://www.dropbox.com/s/mfxqhawwf0760gm/Screenshot%
> 202017-05-06%2019.14.06.png?dl=0
> Below is sample dag which I used to recreate the problem. I force the
> failure in the dataops_weekly_update_reviews task by using a non-existent
> keyword argument.
>
> ```
> import airflow
> import datetime
> from airflow.operators.python_operator import PythonOperator
> from airflow.models import DAG
>
> args = {
>     'owner': 'airflow',
>     'start_date': datetime.datetime(2017, 5, 5),
>     'queue': 'development'
> }
>
> dag = DAG(
>     dag_id='example_dataops_weekly_reviews', default_args=args,
>     schedule_interval=None)
>
>
> def instantiate_emr_cluster(*args, **kwargs):
>     return "instantiating emr cluster"
>
> task_instantiate_emr_cluster = PythonOperator(
>     task_id="instantiate_emr_cluster",
>     python_callable=instantiate_emr_cluster,
>     provide_context=True,
>     dag=dag)
>
>
> def initialize_tables(*args, **kwargs):
>     return "initializing tables {}".format(kwargs["ds"])
>
>
> task_initialize_tables = PythonOperator(
>     task_id="initialize_tables",
>     python_callable=initialize_tables,
>     provide_context=True,
>     dag=dag)
>
>
> def dataops_weekly_update_reviews(*args, **kwargs):
>     return "UPDATING weekly reviews {}".format(kwargs["dsasdfdsfa"])
>
>
> task_dataops_weekly_update_reviews = PythonOperator(
>     task_id="dataops_weekly_update_reviews",
>     python_callable=dataops_weekly_update_reviews,
>     provide_context=True,
>     dag=dag)
>
>
> def load_dataops_reviews(*args, **kwargs):
>     return "loading dataops reviews"
>
>
> task_load_dataops_reviews = PythonOperator(
>     task_id="load_dataops_reviews",
>     python_callable=load_dataops_reviews,
>     provide_context=True,
>     dag=dag)
>
>
> def load_dataops_surveys(**kwargs):
>     return "Print out the running EMR cluster"
>
>
> task_load_dataops_surveys = PythonOperator(
>     task_id="load_dataops_surveys",
>     provide_context=True,
>     python_callable=load_dataops_surveys,
>     dag=dag)
>
>
> def load_cs_survey_answers(**kwargs):
>     return "load cs survey answers"
>
>
> task_load_cs_survey_answers = PythonOperator(
>     task_id="load_cs_survey_answers",
>     provide_context=True,
>     python_callable=load_cs_survey_answers,
>     dag=dag)
>
>
> def terminate_emr_cluster(*args, **kwargs):
>     return "terminate emr cluster"
>
>
> task_terminate_emr_cluster = PythonOperator(
>     task_id="terminate_emr_cluster",
>     python_callable=terminate_emr_cluster,
>     provide_context=True,
>     trigger_rule="all_done",
>     dag=dag)
>
>
> task_initialize_tables.set_upstream(task_instantiate_emr_cluster)
> task_dataops_weekly_update_reviews.set_upstream(task_initialize_tables)
> task_load_dataops_reviews.set_upstream(task_dataops_weekly_update_reviews)
> task_terminate_emr_cluster.set_upstream(task_load_dataops_reviews)
> task_load_dataops_surveys.set_upstream(task_dataops_weekly_update_reviews)
> task_terminate_emr_cluster.set_upstream(task_load_dataops_surveys)
> task_load_cs_survey_answers.set_upstream(task_dataops_
> weekly_update_reviews)
> task_terminate_emr_cluster.set_upstream(task_load_cs_survey_answers)
>
> ```
>

Reply via email to