Parallel executing comes to mind. What version of Airflow are you running (always report this) and please provide full logs (processor, scheduler, worker).
Thanks Bolke > On 7 Jun 2017, at 00:13, Ali Naqvi <[email protected]> wrote: > > Hi folks, > > So it turns out in the celeryexecutor case the dag deadlocks. > > The last log from the dag run is: > > ``` > Deadlock; marking run <DagRun example_dataops_weekly_reviews @ 2017-06-01 > 23:24:07.054844: manual__2017-06-01T23:24:07.054844, externally triggered: > True> failed > ``` > > which is over her: > > https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L4250-L4253 > > and ultimately its the no_dependencies_met variable which is true in this > case for celery executor. > > I am not clear why this would be an executor specific issue. > > Best Regards, > Ali > > On Sat, May 6, 2017 at 7:22 PM, Ali Naqvi <[email protected]> wrote: > >> Hi folks, >> I have a dag that does propagate failures correctly in sequential executor: >> >> https://www.dropbox.com/s/zh0quoj99e44qxh/Screenshot% >> 202017-05-06%2012.54.10.png?dl=0 >> >> but does not propagate failures when using celery executor: >> >> https://www.dropbox.com/s/mfxqhawwf0760gm/Screenshot% >> 202017-05-06%2019.14.06.png?dl=0 >> Below is sample dag which I used to recreate the problem. I force the >> failure in the dataops_weekly_update_reviews task by using a non-existent >> keyword argument. >> >> ``` >> import airflow >> import datetime >> from airflow.operators.python_operator import PythonOperator >> from airflow.models import DAG >> >> args = { >> 'owner': 'airflow', >> 'start_date': datetime.datetime(2017, 5, 5), >> 'queue': 'development' >> } >> >> dag = DAG( >> dag_id='example_dataops_weekly_reviews', default_args=args, >> schedule_interval=None) >> >> >> def instantiate_emr_cluster(*args, **kwargs): >> return "instantiating emr cluster" >> >> task_instantiate_emr_cluster = PythonOperator( >> task_id="instantiate_emr_cluster", >> python_callable=instantiate_emr_cluster, >> provide_context=True, >> dag=dag) >> >> >> def initialize_tables(*args, **kwargs): >> return "initializing tables {}".format(kwargs["ds"]) >> >> >> task_initialize_tables = PythonOperator( >> task_id="initialize_tables", >> python_callable=initialize_tables, >> provide_context=True, >> dag=dag) >> >> >> def dataops_weekly_update_reviews(*args, **kwargs): >> return "UPDATING weekly reviews {}".format(kwargs["dsasdfdsfa"]) >> >> >> task_dataops_weekly_update_reviews = PythonOperator( >> task_id="dataops_weekly_update_reviews", >> python_callable=dataops_weekly_update_reviews, >> provide_context=True, >> dag=dag) >> >> >> def load_dataops_reviews(*args, **kwargs): >> return "loading dataops reviews" >> >> >> task_load_dataops_reviews = PythonOperator( >> task_id="load_dataops_reviews", >> python_callable=load_dataops_reviews, >> provide_context=True, >> dag=dag) >> >> >> def load_dataops_surveys(**kwargs): >> return "Print out the running EMR cluster" >> >> >> task_load_dataops_surveys = PythonOperator( >> task_id="load_dataops_surveys", >> provide_context=True, >> python_callable=load_dataops_surveys, >> dag=dag) >> >> >> def load_cs_survey_answers(**kwargs): >> return "load cs survey answers" >> >> >> task_load_cs_survey_answers = PythonOperator( >> task_id="load_cs_survey_answers", >> provide_context=True, >> python_callable=load_cs_survey_answers, >> dag=dag) >> >> >> def terminate_emr_cluster(*args, **kwargs): >> return "terminate emr cluster" >> >> >> task_terminate_emr_cluster = PythonOperator( >> task_id="terminate_emr_cluster", >> python_callable=terminate_emr_cluster, >> provide_context=True, >> trigger_rule="all_done", >> dag=dag) >> >> >> task_initialize_tables.set_upstream(task_instantiate_emr_cluster) >> task_dataops_weekly_update_reviews.set_upstream(task_initialize_tables) >> task_load_dataops_reviews.set_upstream(task_dataops_weekly_update_reviews) >> task_terminate_emr_cluster.set_upstream(task_load_dataops_reviews) >> task_load_dataops_surveys.set_upstream(task_dataops_weekly_update_reviews) >> task_terminate_emr_cluster.set_upstream(task_load_dataops_surveys) >> task_load_cs_survey_answers.set_upstream(task_dataops_ >> weekly_update_reviews) >> task_terminate_emr_cluster.set_upstream(task_load_cs_survey_answers) >> >> ``` >>
