ashb closed pull request #2338: [AIRFLOW-1262] Allow configuration of email 
alert subject and body
URL: https://github.com/apache/incubator-airflow/pull/2338
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/models.py b/airflow/models.py
index a428c8abb7..0765738517 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -1978,21 +1978,40 @@ def render_templates(self):
                 setattr(task, attr, rendered_content)
 
     def email_alert(self, exception):
-        task = self.task
-        title = "Airflow alert: {self}".format(**locals())
-        exception = str(exception).replace('\n', '<br>')
+        exception_html = str(exception).replace('\n', '<br>')
+        jinja_context = self.get_template_context()
+        jinja_context.update(dict(
+            exception=exception,
+            exception_html=exception_html,
+            try_number=self.try_number,
+            max_tries=self.max_tries))
+
+        jinja_env = self.task.get_template_env()
+
+        default_subject = 'Airflow alert: {{ti}}'
         # For reporting purposes, we report based on 1-indexed,
         # not 0-indexed lists (i.e. Try 1 instead of
         # Try 0 for the first attempt).
-        body = (
-            "Try {try_number} out of {max_tries}<br>"
-            "Exception:<br>{exception}<br>"
-            "Log: <a href='{self.log_url}'>Link</a><br>"
-            "Host: {self.hostname}<br>"
-            "Log file: {self.log_filepath}<br>"
-            "Mark success: <a href='{self.mark_success_url}'>Link</a><br>"
-        ).format(try_number=self.try_number, max_tries=self.max_tries + 1, 
**locals())
-        send_email(task.email, title, body)
+        default_html_content = (
+            'Try {{try_number}} out of {{max_tries + 1}}<br>'
+            'Exception:<br>{{exception_html}}<br>'
+            'Log: <a href="{{ti.log_url}}">Link</a><br>'
+            'Host: {{ti.hostname}}<br>'
+            'Log file: {{ti.log_filepath}}<br>'
+            'Mark success: <a href="{{ti.mark_success_url}}">Link</a><br>'
+        )
+
+        def render(key, content):
+            if configuration.has_option('email', key):
+                path = configuration.get('email', key)
+                with open(path) as f:
+                    content = f.read()
+
+            return jinja_env.from_string(content).render(**jinja_context)
+
+        subject = render('subject_template', default_subject)
+        html_content = render('html_content_template', default_html_content)
+        send_email(self.task.email, subject, html_content)
 
     def set_duration(self):
         if self.end_date and self.start_date:
@@ -2880,9 +2899,7 @@ def render_template(self, attr, content, context):
         Renders a template either from a file or directly in a field, and 
returns
         the rendered result.
         """
-        jinja_env = self.dag.get_template_env() \
-            if hasattr(self, 'dag') \
-            else jinja2.Environment(cache_size=0)
+        jinja_env = self.get_template_env()
 
         exts = self.__class__.template_ext
         if (
@@ -2892,6 +2909,11 @@ def render_template(self, attr, content, context):
         else:
             return self.render_template_from_field(attr, content, context, 
jinja_env)
 
+    def get_template_env(self):
+        return self.dag.get_template_env() \
+            if hasattr(self, 'dag') \
+            else jinja2.Environment(cache_size=0)
+
     def prepare_template(self):
         """
         Hook that is triggered after the templated fields get replaced
@@ -2909,7 +2931,7 @@ def resolve_template_files(self):
                 continue
             elif isinstance(content, six.string_types) and \
                     any([content.endswith(ext) for ext in self.template_ext]):
-                env = self.dag.get_template_env()
+                env = self.get_template_env()
                 try:
                     setattr(self, attr, env.loader.get_source(env, content)[0])
                 except Exception as e:
diff --git a/tests/models.py b/tests/models.py
index 9f439605e1..6439e08a7c 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -35,7 +35,7 @@
 
 import pendulum
 import six
-from mock import ANY, Mock, patch
+from mock import ANY, Mock, mock_open, patch
 from parameterized import parameterized
 
 from airflow import AirflowException, configuration, models, settings
@@ -2576,15 +2576,53 @@ def test_overwrite_params_with_dag_run_conf_none(self):
 
     @patch('airflow.models.send_email')
     def test_email_alert(self, mock_send_email):
-        task = DummyOperator(task_id='op', email='t...@test.test')
+        dag = models.DAG(dag_id='test_failure_email')
+        task = BashOperator(
+            task_id='test_email_alert',
+            dag=dag,
+            bash_command='exit 1',
+            start_date=DEFAULT_DATE,
+            email='to')
+
         ti = TI(task=task, execution_date=datetime.datetime.now())
-        ti.email_alert(RuntimeError('it broke'))
 
-        self.assertTrue(mock_send_email.called)
+        try:
+            ti.run()
+        except AirflowException:
+            pass
+
+        (email, title, body), _ = mock_send_email.call_args
+        self.assertEqual(email, 'to')
+        self.assertIn('test_email_alert', title)
+        self.assertIn('test_email_alert', body)
+
+    @patch('airflow.models.send_email')
+    def test_email_alert_with_config(self, mock_send_email):
+        dag = models.DAG(dag_id='test_failure_email')
+        task = BashOperator(
+            task_id='test_email_alert_with_config',
+            dag=dag,
+            bash_command='exit 1',
+            start_date=DEFAULT_DATE,
+            email='to')
+
+        ti = TI(
+            task=task, execution_date=datetime.datetime.now())
+
+        configuration.set('email', 'SUBJECT_TEMPLATE', '/subject/path')
+        configuration.set('email', 'HTML_CONTENT_TEMPLATE', 
'/html_content/path')
+
+        opener = mock_open(read_data='template: {{ti.task_id}}')
+        with patch('airflow.models.open', opener, create=True):
+            try:
+                ti.run()
+            except AirflowException:
+                pass
+
         (email, title, body), _ = mock_send_email.call_args
-        self.assertEqual(email, 't...@test.test')
-        self.assertIn(repr(ti), title)
-        self.assertIn('it broke', body)
+        self.assertEqual(email, 'to')
+        self.assertEqual('template: test_email_alert_with_config', title)
+        self.assertEqual('template: test_email_alert_with_config', body)
 
     def test_set_duration(self):
         task = DummyOperator(task_id='op', email='t...@test.test')


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to