Parallel executing comes to mind. What version of Airflow are you running 
(always report this) and please provide full logs (processor, scheduler, 
worker).

Thanks
Bolke
 
> On 7 Jun 2017, at 00:13, Ali Naqvi <[email protected]> wrote:
> 
> Hi folks,
> 
> So it turns out in the celeryexecutor case the dag deadlocks.
> 
> The last log from the dag run is:
> 
> ```
> Deadlock; marking run <DagRun example_dataops_weekly_reviews @ 2017-06-01
> 23:24:07.054844: manual__2017-06-01T23:24:07.054844, externally triggered:
> True> failed
> ```
> 
> which is over her:
> 
> https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L4250-L4253
> 
> and ultimately its the no_dependencies_met variable which is true in this
> case for celery executor.
> 
> I am not clear why this would be an executor specific issue.
> 
> Best Regards,
> Ali
> 
> On Sat, May 6, 2017 at 7:22 PM, Ali Naqvi <[email protected]> wrote:
> 
>> Hi folks,
>> I have a dag that does propagate failures correctly in sequential executor:
>> 
>> https://www.dropbox.com/s/zh0quoj99e44qxh/Screenshot%
>> 202017-05-06%2012.54.10.png?dl=0
>> 
>> but does not propagate failures when using celery executor:
>> 
>> https://www.dropbox.com/s/mfxqhawwf0760gm/Screenshot%
>> 202017-05-06%2019.14.06.png?dl=0
>> Below is sample dag which I used to recreate the problem. I force the
>> failure in the dataops_weekly_update_reviews task by using a non-existent
>> keyword argument.
>> 
>> ```
>> import airflow
>> import datetime
>> from airflow.operators.python_operator import PythonOperator
>> from airflow.models import DAG
>> 
>> args = {
>>    'owner': 'airflow',
>>    'start_date': datetime.datetime(2017, 5, 5),
>>    'queue': 'development'
>> }
>> 
>> dag = DAG(
>>    dag_id='example_dataops_weekly_reviews', default_args=args,
>>    schedule_interval=None)
>> 
>> 
>> def instantiate_emr_cluster(*args, **kwargs):
>>    return "instantiating emr cluster"
>> 
>> task_instantiate_emr_cluster = PythonOperator(
>>    task_id="instantiate_emr_cluster",
>>    python_callable=instantiate_emr_cluster,
>>    provide_context=True,
>>    dag=dag)
>> 
>> 
>> def initialize_tables(*args, **kwargs):
>>    return "initializing tables {}".format(kwargs["ds"])
>> 
>> 
>> task_initialize_tables = PythonOperator(
>>    task_id="initialize_tables",
>>    python_callable=initialize_tables,
>>    provide_context=True,
>>    dag=dag)
>> 
>> 
>> def dataops_weekly_update_reviews(*args, **kwargs):
>>    return "UPDATING weekly reviews {}".format(kwargs["dsasdfdsfa"])
>> 
>> 
>> task_dataops_weekly_update_reviews = PythonOperator(
>>    task_id="dataops_weekly_update_reviews",
>>    python_callable=dataops_weekly_update_reviews,
>>    provide_context=True,
>>    dag=dag)
>> 
>> 
>> def load_dataops_reviews(*args, **kwargs):
>>    return "loading dataops reviews"
>> 
>> 
>> task_load_dataops_reviews = PythonOperator(
>>    task_id="load_dataops_reviews",
>>    python_callable=load_dataops_reviews,
>>    provide_context=True,
>>    dag=dag)
>> 
>> 
>> def load_dataops_surveys(**kwargs):
>>    return "Print out the running EMR cluster"
>> 
>> 
>> task_load_dataops_surveys = PythonOperator(
>>    task_id="load_dataops_surveys",
>>    provide_context=True,
>>    python_callable=load_dataops_surveys,
>>    dag=dag)
>> 
>> 
>> def load_cs_survey_answers(**kwargs):
>>    return "load cs survey answers"
>> 
>> 
>> task_load_cs_survey_answers = PythonOperator(
>>    task_id="load_cs_survey_answers",
>>    provide_context=True,
>>    python_callable=load_cs_survey_answers,
>>    dag=dag)
>> 
>> 
>> def terminate_emr_cluster(*args, **kwargs):
>>    return "terminate emr cluster"
>> 
>> 
>> task_terminate_emr_cluster = PythonOperator(
>>    task_id="terminate_emr_cluster",
>>    python_callable=terminate_emr_cluster,
>>    provide_context=True,
>>    trigger_rule="all_done",
>>    dag=dag)
>> 
>> 
>> task_initialize_tables.set_upstream(task_instantiate_emr_cluster)
>> task_dataops_weekly_update_reviews.set_upstream(task_initialize_tables)
>> task_load_dataops_reviews.set_upstream(task_dataops_weekly_update_reviews)
>> task_terminate_emr_cluster.set_upstream(task_load_dataops_reviews)
>> task_load_dataops_surveys.set_upstream(task_dataops_weekly_update_reviews)
>> task_terminate_emr_cluster.set_upstream(task_load_dataops_surveys)
>> task_load_cs_survey_answers.set_upstream(task_dataops_
>> weekly_update_reviews)
>> task_terminate_emr_cluster.set_upstream(task_load_cs_survey_answers)
>> 
>> ```
>> 

Reply via email to