Hi folks,
I have a dag that does propagate failures correctly in sequential executor:

https://www.dropbox.com/s/zh0quoj99e44qxh/Screenshot%202017-05-06%2012.54.10.png?dl=0

but does not propagate failures when using celery executor:

https://www.dropbox.com/s/mfxqhawwf0760gm/Screenshot%202017-05-06%2019.14.06.png?dl=0
Below is sample dag which I used to recreate the problem. I force the
failure in the dataops_weekly_update_reviews task by using a non-existent
keyword argument.

```
import airflow
import datetime
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG

args = {
    'owner': 'airflow',
    'start_date': datetime.datetime(2017, 5, 5),
    'queue': 'development'
}

dag = DAG(
    dag_id='example_dataops_weekly_reviews', default_args=args,
    schedule_interval=None)


def instantiate_emr_cluster(*args, **kwargs):
    return "instantiating emr cluster"

task_instantiate_emr_cluster = PythonOperator(
    task_id="instantiate_emr_cluster",
    python_callable=instantiate_emr_cluster,
    provide_context=True,
    dag=dag)


def initialize_tables(*args, **kwargs):
    return "initializing tables {}".format(kwargs["ds"])


task_initialize_tables = PythonOperator(
    task_id="initialize_tables",
    python_callable=initialize_tables,
    provide_context=True,
    dag=dag)


def dataops_weekly_update_reviews(*args, **kwargs):
    return "UPDATING weekly reviews {}".format(kwargs["dsasdfdsfa"])


task_dataops_weekly_update_reviews = PythonOperator(
    task_id="dataops_weekly_update_reviews",
    python_callable=dataops_weekly_update_reviews,
    provide_context=True,
    dag=dag)


def load_dataops_reviews(*args, **kwargs):
    return "loading dataops reviews"


task_load_dataops_reviews = PythonOperator(
    task_id="load_dataops_reviews",
    python_callable=load_dataops_reviews,
    provide_context=True,
    dag=dag)


def load_dataops_surveys(**kwargs):
    return "Print out the running EMR cluster"


task_load_dataops_surveys = PythonOperator(
    task_id="load_dataops_surveys",
    provide_context=True,
    python_callable=load_dataops_surveys,
    dag=dag)


def load_cs_survey_answers(**kwargs):
    return "load cs survey answers"


task_load_cs_survey_answers = PythonOperator(
    task_id="load_cs_survey_answers",
    provide_context=True,
    python_callable=load_cs_survey_answers,
    dag=dag)


def terminate_emr_cluster(*args, **kwargs):
    return "terminate emr cluster"


task_terminate_emr_cluster = PythonOperator(
    task_id="terminate_emr_cluster",
    python_callable=terminate_emr_cluster,
    provide_context=True,
    trigger_rule="all_done",
    dag=dag)


task_initialize_tables.set_upstream(task_instantiate_emr_cluster)
task_dataops_weekly_update_reviews.set_upstream(task_initialize_tables)
task_load_dataops_reviews.set_upstream(task_dataops_weekly_update_reviews)
task_terminate_emr_cluster.set_upstream(task_load_dataops_reviews)
task_load_dataops_surveys.set_upstream(task_dataops_weekly_update_reviews)
task_terminate_emr_cluster.set_upstream(task_load_dataops_surveys)
task_load_cs_survey_answers.set_upstream(task_dataops_weekly_update_reviews)
task_terminate_emr_cluster.set_upstream(task_load_cs_survey_answers)

```

Reply via email to