Hi folks, So it turns out in the celeryexecutor case the dag deadlocks.
The last log from the dag run is: ``` Deadlock; marking run <DagRun example_dataops_weekly_reviews @ 2017-06-01 23:24:07.054844: manual__2017-06-01T23:24:07.054844, externally triggered: True> failed ``` which is over her: https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L4250-L4253 and ultimately its the no_dependencies_met variable which is true in this case for celery executor. I am not clear why this would be an executor specific issue. Best Regards, Ali On Sat, May 6, 2017 at 7:22 PM, Ali Naqvi <[email protected]> wrote: > Hi folks, > I have a dag that does propagate failures correctly in sequential executor: > > https://www.dropbox.com/s/zh0quoj99e44qxh/Screenshot% > 202017-05-06%2012.54.10.png?dl=0 > > but does not propagate failures when using celery executor: > > https://www.dropbox.com/s/mfxqhawwf0760gm/Screenshot% > 202017-05-06%2019.14.06.png?dl=0 > Below is sample dag which I used to recreate the problem. I force the > failure in the dataops_weekly_update_reviews task by using a non-existent > keyword argument. > > ``` > import airflow > import datetime > from airflow.operators.python_operator import PythonOperator > from airflow.models import DAG > > args = { > 'owner': 'airflow', > 'start_date': datetime.datetime(2017, 5, 5), > 'queue': 'development' > } > > dag = DAG( > dag_id='example_dataops_weekly_reviews', default_args=args, > schedule_interval=None) > > > def instantiate_emr_cluster(*args, **kwargs): > return "instantiating emr cluster" > > task_instantiate_emr_cluster = PythonOperator( > task_id="instantiate_emr_cluster", > python_callable=instantiate_emr_cluster, > provide_context=True, > dag=dag) > > > def initialize_tables(*args, **kwargs): > return "initializing tables {}".format(kwargs["ds"]) > > > task_initialize_tables = PythonOperator( > task_id="initialize_tables", > python_callable=initialize_tables, > provide_context=True, > dag=dag) > > > def dataops_weekly_update_reviews(*args, **kwargs): > return "UPDATING weekly reviews {}".format(kwargs["dsasdfdsfa"]) > > > task_dataops_weekly_update_reviews = PythonOperator( > task_id="dataops_weekly_update_reviews", > python_callable=dataops_weekly_update_reviews, > provide_context=True, > dag=dag) > > > def load_dataops_reviews(*args, **kwargs): > return "loading dataops reviews" > > > task_load_dataops_reviews = PythonOperator( > task_id="load_dataops_reviews", > python_callable=load_dataops_reviews, > provide_context=True, > dag=dag) > > > def load_dataops_surveys(**kwargs): > return "Print out the running EMR cluster" > > > task_load_dataops_surveys = PythonOperator( > task_id="load_dataops_surveys", > provide_context=True, > python_callable=load_dataops_surveys, > dag=dag) > > > def load_cs_survey_answers(**kwargs): > return "load cs survey answers" > > > task_load_cs_survey_answers = PythonOperator( > task_id="load_cs_survey_answers", > provide_context=True, > python_callable=load_cs_survey_answers, > dag=dag) > > > def terminate_emr_cluster(*args, **kwargs): > return "terminate emr cluster" > > > task_terminate_emr_cluster = PythonOperator( > task_id="terminate_emr_cluster", > python_callable=terminate_emr_cluster, > provide_context=True, > trigger_rule="all_done", > dag=dag) > > > task_initialize_tables.set_upstream(task_instantiate_emr_cluster) > task_dataops_weekly_update_reviews.set_upstream(task_initialize_tables) > task_load_dataops_reviews.set_upstream(task_dataops_weekly_update_reviews) > task_terminate_emr_cluster.set_upstream(task_load_dataops_reviews) > task_load_dataops_surveys.set_upstream(task_dataops_weekly_update_reviews) > task_terminate_emr_cluster.set_upstream(task_load_dataops_surveys) > task_load_cs_survey_answers.set_upstream(task_dataops_ > weekly_update_reviews) > task_terminate_emr_cluster.set_upstream(task_load_cs_survey_answers) > > ``` >
