[ https://issues.apache.org/jira/browse/AIRFLOW-106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15281583#comment-15281583 ]
Siddharth Anand commented on AIRFLOW-106: ----------------------------------------- This shows the bug behavior if the task to fail is not the first one. !bug_works_if_not_first_task_that_fails.png! > Task Retries, on_failure callback, and email_on_failure Not Honored if First > Task in a DAG Fails > ------------------------------------------------------------------------------------------------ > > Key: AIRFLOW-106 > URL: https://issues.apache.org/jira/browse/AIRFLOW-106 > Project: Apache Airflow > Issue Type: Bug > Environment: Latest version from Git > Reporter: dud > Assignee: Siddharth Anand > Priority: Blocker > Attachments: bug_works_if_not_first_task_that_fails.png, > screenshot-1.png > > > Hello. > I created the following workflow : > {code} > from airflow import DAG > from airflow.operators import PythonOperator > from datetime import datetime, timedelta > from airflow.models import Variable > from time import sleep > default_args = { > 'depends_on_past': False, > 'start_date': datetime(2016, 5, 11, 15, 20), > 'email': <my email> > 'email_on_failure': True, > 'email_on_retry': False, > 'retries': 1, > 'retry_delay': timedelta(minutes=2), > 'end_date': datetime(2016, 5, 11, 16, 00), > } > PARENT_DAG_NAME = 'test' > dag = DAG(PARENT_DAG_NAME, default_args=default_args, > schedule_interval=timedelta(minutes=10)) > def sleep1_function(**kwargs): > sleep(90) > return Variable.get('test_var') > sleep1 = PythonOperator( > task_id='sleep1', > python_callable=sleep1_function, > dag=dag) > {code} > I forgot to declare test_var so when this DAG launched it failed quickly. > However no failure email was ever sent. Clearing the failed task to make it > rerun doesn't trigger any email. > Here is the logs : > {code} > [2016-05-11 15:53:31,784] {models.py:157} INFO - Filling up the DagBag from > /var/lib/airflow/airflow/dags/test.py > [2016-05-11 15:53:32,272] {models.py:157} INFO - Filling up the DagBag from > /var/lib/airflow/airflow/dags/test.py > [2016-05-11 15:53:32,313] {models.py:1216} INFO - > -------------------------------------------------------------------------------- > Starting attempt 1 of 2 > -------------------------------------------------------------------------------- > [2016-05-11 15:53:32,333] {models.py:1239} INFO - Executing > <Task(PythonOperator): sleep1> on 2016-05-11 15:20:00 > [2016-05-11 15:55:03,450] {models.py:1306} ERROR - Variable test_var does not > exist > Traceback (most recent call last): > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/models.py", > line 1265, in run > result = task_copy.execute(context=context) > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/operators/python_operator.py", > line 66, in execute > return_value = self.python_callable(*self.op_args, **self.op_kwargs) > File "/var/lib/airflow/airflow/dags/test.py", line 31, in sleep1_function > return Variable.get('test_var') > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/utils/db.py", > line 53, in wrapper > result = func(*args, **kwargs) > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/models.py", > line 3145, in get > raise ValueError('Variable {} does not exist'.format(key)) > ValueError: Variable test_var does not exist > [2016-05-11 15:55:03,581] {models.py:1318} INFO - Marking task as UP_FOR_RETRY > [2016-05-11 15:55:03,759] {models.py:1347} ERROR - Variable test_var does not > exist > {code} > In the DAG Runs page, the workflow is set as failed. In hte taks instance > page, it is set as up_for_retry but no new run is ever scheduled. > I tried incrementing the retires parameter, but nothing different happens, > Airflow never retries after the first run. > dud -- This message was sent by Atlassian JIRA (v6.3.4#6332)