Hi folks, I have a dag that does propagate failures correctly in sequential executor:
https://www.dropbox.com/s/zh0quoj99e44qxh/Screenshot%202017-05-06%2012.54.10.png?dl=0 but does not propagate failures when using celery executor: https://www.dropbox.com/s/mfxqhawwf0760gm/Screenshot%202017-05-06%2019.14.06.png?dl=0 Below is sample dag which I used to recreate the problem. I force the failure in the dataops_weekly_update_reviews task by using a non-existent keyword argument. ``` import airflow import datetime from airflow.operators.python_operator import PythonOperator from airflow.models import DAG args = { 'owner': 'airflow', 'start_date': datetime.datetime(2017, 5, 5), 'queue': 'development' } dag = DAG( dag_id='example_dataops_weekly_reviews', default_args=args, schedule_interval=None) def instantiate_emr_cluster(*args, **kwargs): return "instantiating emr cluster" task_instantiate_emr_cluster = PythonOperator( task_id="instantiate_emr_cluster", python_callable=instantiate_emr_cluster, provide_context=True, dag=dag) def initialize_tables(*args, **kwargs): return "initializing tables {}".format(kwargs["ds"]) task_initialize_tables = PythonOperator( task_id="initialize_tables", python_callable=initialize_tables, provide_context=True, dag=dag) def dataops_weekly_update_reviews(*args, **kwargs): return "UPDATING weekly reviews {}".format(kwargs["dsasdfdsfa"]) task_dataops_weekly_update_reviews = PythonOperator( task_id="dataops_weekly_update_reviews", python_callable=dataops_weekly_update_reviews, provide_context=True, dag=dag) def load_dataops_reviews(*args, **kwargs): return "loading dataops reviews" task_load_dataops_reviews = PythonOperator( task_id="load_dataops_reviews", python_callable=load_dataops_reviews, provide_context=True, dag=dag) def load_dataops_surveys(**kwargs): return "Print out the running EMR cluster" task_load_dataops_surveys = PythonOperator( task_id="load_dataops_surveys", provide_context=True, python_callable=load_dataops_surveys, dag=dag) def load_cs_survey_answers(**kwargs): return "load cs survey answers" task_load_cs_survey_answers = PythonOperator( task_id="load_cs_survey_answers", provide_context=True, python_callable=load_cs_survey_answers, dag=dag) def terminate_emr_cluster(*args, **kwargs): return "terminate emr cluster" task_terminate_emr_cluster = PythonOperator( task_id="terminate_emr_cluster", python_callable=terminate_emr_cluster, provide_context=True, trigger_rule="all_done", dag=dag) task_initialize_tables.set_upstream(task_instantiate_emr_cluster) task_dataops_weekly_update_reviews.set_upstream(task_initialize_tables) task_load_dataops_reviews.set_upstream(task_dataops_weekly_update_reviews) task_terminate_emr_cluster.set_upstream(task_load_dataops_reviews) task_load_dataops_surveys.set_upstream(task_dataops_weekly_update_reviews) task_terminate_emr_cluster.set_upstream(task_load_dataops_surveys) task_load_cs_survey_answers.set_upstream(task_dataops_weekly_update_reviews) task_terminate_emr_cluster.set_upstream(task_load_cs_survey_answers) ```
