dud created AIRFLOW-106: --------------------------- Summary: email_on_failure doesn't being triggered Key: AIRFLOW-106 URL: https://issues.apache.org/jira/browse/AIRFLOW-106 Project: Apache Airflow Issue Type: Bug Environment: Latest version from Git Reporter: dud
Hello. I created the following workflow : {code} from airflow import DAG from airflow.operators import PythonOperator from datetime import datetime, timedelta from airflow.models import Variable from time import sleep default_args = { 'depends_on_past': False, 'start_date': datetime(2016, 5, 11, 15, 20), 'email': <my email> 'email_on_failure': True, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=2), 'end_date': datetime(2016, 5, 11, 16, 00), } PARENT_DAG_NAME = 'test' dag = DAG(PARENT_DAG_NAME, default_args=default_args, schedule_interval=timedelta(minutes=10)) def sleep1_function(**kwargs): sleep(90) return Variable.get('test_var') sleep1 = PythonOperator( task_id='sleep1', python_callable=sleep1_function, dag=dag) {code} I forgot to declare test_var so when this DAG launched it failed quickly. However no failure email was ever sent. Clearing the failed task to make it rerun doesn't trigger any email. Here is the logs : {code} [2016-05-11 15:53:31,784] {models.py:157} INFO - Filling up the DagBag from /var/lib/airflow/airflow/dags/test.py [2016-05-11 15:53:32,272] {models.py:157} INFO - Filling up the DagBag from /var/lib/airflow/airflow/dags/test.py [2016-05-11 15:53:32,313] {models.py:1216} INFO - -------------------------------------------------------------------------------- Starting attempt 1 of 2 -------------------------------------------------------------------------------- [2016-05-11 15:53:32,333] {models.py:1239} INFO - Executing <Task(PythonOperator): sleep1> on 2016-05-11 15:20:00 [2016-05-11 15:55:03,450] {models.py:1306} ERROR - Variable test_var does not exist Traceback (most recent call last): File "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/models.py", line 1265, in run result = task_copy.execute(context=context) File "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/operators/python_operator.py", line 66, in execute return_value = self.python_callable(*self.op_args, **self.op_kwargs) File "/var/lib/airflow/airflow/dags/test.py", line 31, in sleep1_function return Variable.get('test_var') File "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/utils/db.py", line 53, in wrapper result = func(*args, **kwargs) File "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/models.py", line 3145, in get raise ValueError('Variable {} does not exist'.format(key)) ValueError: Variable test_var does not exist [2016-05-11 15:55:03,581] {models.py:1318} INFO - Marking task as UP_FOR_RETRY [2016-05-11 15:55:03,759] {models.py:1347} ERROR - Variable test_var does not exist {code} In the DAG Runs page, the workflow is set as failed. In hte taks instance page, it is set as up_for_retry but no new run is ever scheduled. I tried incrementing the retires parameter, but nothing different happens, Airflow ever retries after the first run. dud -- This message was sent by Atlassian JIRA (v6.3.4#6332)