Repository: incubator-airflow
Updated Branches:
  refs/heads/master d1f94fe20 -> 9c0c4264c


[AIRFLOW-2178] Add handling on SLA miss errors

Closes #3173 from d3cay1/airflow2178-master


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/9c0c4264
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/9c0c4264
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/9c0c4264

Branch: refs/heads/master
Commit: 9c0c4264c3ecdee2d11c0be9d2a151ea423dd3d9
Parents: d1f94fe
Author: David Klosowski <dav...@thinknear.com>
Authored: Wed Apr 4 09:19:59 2018 +0200
Committer: Fokko Driesprong <fokkodriespr...@godatadriven.com>
Committed: Wed Apr 4 09:19:59 2018 +0200

----------------------------------------------------------------------
 airflow/jobs.py |  27 +++++++++-----
 tests/jobs.py   | 103 +++++++++++++++++++++++++++++++++++++++++++++------
 2 files changed, 109 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9c0c4264/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 13fc2a2..6241717 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -24,7 +24,6 @@ import os
 import psutil
 import signal
 import six
-import socket
 import sys
 import threading
 import time
@@ -43,7 +42,6 @@ from time import sleep
 from airflow import configuration as conf
 from airflow import executors, models, settings
 from airflow.exceptions import AirflowException
-from airflow.logging_config import configure_logging
 from airflow.models import DAG, DagRun
 from airflow.settings import Stats
 from airflow.task.task_runner import get_task_runner
@@ -672,8 +670,13 @@ class SchedulerJob(BaseJob):
             if dag.sla_miss_callback:
                 # Execute the alert callback
                 self.log.info(' --------------> ABOUT TO CALL SLA MISS CALL 
BACK ')
-                dag.sla_miss_callback(dag, task_list, blocking_task_list, 
slas, blocking_tis)
-                notification_sent = True
+                try:
+                    dag.sla_miss_callback(dag, task_list, blocking_task_list, 
slas,
+                                          blocking_tis)
+                    notification_sent = True
+                except Exception:
+                    self.log.exception("Could not call sla_miss_callback for 
DAG %s",
+                                       dag.dag_id)
             email_content = """\
             Here's a list of tasks that missed their SLAs:
             <pre><code>{task_list}\n<code></pre>
@@ -691,12 +694,16 @@ class SchedulerJob(BaseJob):
                         if email not in emails:
                             emails.append(email)
             if emails and len(slas):
-                send_email(
-                    emails,
-                    "[airflow] SLA miss on DAG=" + dag.dag_id,
-                    email_content)
-                email_sent = True
-                notification_sent = True
+                try:
+                    send_email(
+                        emails,
+                        "[airflow] SLA miss on DAG=" + dag.dag_id,
+                        email_content)
+                    email_sent = True
+                    notification_sent = True
+                except Exception:
+                    self.log.exception("Could not send SLA Miss email 
notification for"
+                                       " DAG %s", dag.dag_id)
             # If we sent any notification, update the sla_miss table
             if notification_sent:
                 for sla in slas:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9c0c4264/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index ace593a..1e411e2 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -23,7 +23,6 @@ import multiprocessing
 import os
 import shutil
 import six
-import socket
 import threading
 import time
 import unittest
@@ -46,7 +45,7 @@ from airflow.utils.timeout import timeout
 from airflow.utils.dag_processing import SimpleDag, SimpleDagBag, 
list_py_file_paths
 from airflow.utils.net import get_hostname
 
-from mock import Mock, patch
+from mock import Mock, patch, MagicMock, PropertyMock
 from sqlalchemy.orm.session import make_transient
 from tests.executors.test_executor import TestExecutor
 
@@ -95,7 +94,7 @@ class BackfillJobTest(unittest.TestCase):
         target_dag.clear()
 
         scheduler = SchedulerJob()
-        queue = mock.Mock()
+        queue = Mock()
         scheduler._process_task_instances(target_dag, queue=queue)
         self.assertFalse(queue.append.called)
 
@@ -108,7 +107,7 @@ class BackfillJobTest(unittest.TestCase):
         job.run()
 
         scheduler = SchedulerJob()
-        queue = mock.Mock()
+        queue = Mock()
         scheduler._process_task_instances(target_dag, queue=queue)
 
         self.assertTrue(queue.append.called)
@@ -1944,7 +1943,7 @@ class SchedulerJobTest(unittest.TestCase):
         dr = scheduler.create_dag_run(dag)
         self.assertIsNotNone(dr)
 
-        queue = mock.Mock()
+        queue = Mock()
         scheduler._process_task_instances(dag, queue=queue)
 
         queue.append.assert_called_with(
@@ -1976,7 +1975,7 @@ class SchedulerJobTest(unittest.TestCase):
             dag_id='test_scheduler_do_not_schedule_removed_task',
             start_date=DEFAULT_DATE)
 
-        queue = mock.Mock()
+        queue = Mock()
         scheduler._process_task_instances(dag, queue=queue)
 
         queue.put.assert_not_called()
@@ -2002,7 +2001,7 @@ class SchedulerJobTest(unittest.TestCase):
         dr = scheduler.create_dag_run(dag)
         self.assertIsNone(dr)
 
-        queue = mock.Mock()
+        queue = Mock()
         scheduler._process_task_instances(dag, queue=queue)
 
         queue.put.assert_not_called()
@@ -2034,7 +2033,7 @@ class SchedulerJobTest(unittest.TestCase):
         session.commit()
         session.close()
 
-        queue = mock.Mock()
+        queue = Mock()
         scheduler._process_task_instances(dag, queue=queue)
 
         queue.put.assert_not_called()
@@ -2072,7 +2071,7 @@ class SchedulerJobTest(unittest.TestCase):
             dag=dag,
             owner='airflow')
 
-        queue = mock.Mock()
+        queue = Mock()
         scheduler._process_task_instances(dag, queue=queue)
 
         tis = dr.get_task_instances()
@@ -2210,7 +2209,7 @@ class SchedulerJobTest(unittest.TestCase):
         # Reduce max_active_runs to 1
         dag.max_active_runs = 1
 
-        queue = mock.Mock()
+        queue = Mock()
         # and schedule them in, so we can check how many
         # tasks are put on the queue (should be one, not 3)
         scheduler._process_task_instances(dag, queue=queue)
@@ -2384,7 +2383,7 @@ class SchedulerJobTest(unittest.TestCase):
         session = settings.Session()
 
         # Mock the callback function so we can verify that it was not called
-        sla_callback = mock.MagicMock()
+        sla_callback = MagicMock()
 
         # Create dag with a start of 2 days ago, but an sla of 1 day ago so 
we'll already have an sla_miss on the books
         test_start_date = days_ago(2)
@@ -2417,6 +2416,88 @@ class SchedulerJobTest(unittest.TestCase):
 
         sla_callback.assert_not_called()
 
+    def test_scheduler_sla_miss_callback_exception(self):
+        """
+        Test that the scheduler gracefully logs an exception if there is a 
problem
+         calling the sla_miss_callback
+        """
+        session = settings.Session()
+
+        sla_callback = MagicMock(side_effect=RuntimeError('Could not call 
function'))
+
+        test_start_date = days_ago(2)
+        dag = DAG(dag_id='test_sla_miss',
+                  sla_miss_callback=sla_callback,
+                  default_args={'start_date': test_start_date})
+
+        task = DummyOperator(task_id='dummy',
+                             dag=dag,
+                             owner='airflow',
+                             sla=datetime.timedelta(hours=1))
+
+        session.merge(models.TaskInstance(task=task,
+                                          execution_date=test_start_date,
+                                          state='Success'))
+
+        # Create an SlaMiss where notification was sent, but email was not
+        session.merge(models.SlaMiss(task_id='dummy',
+                                     dag_id='test_sla_miss',
+                                     execution_date=test_start_date))
+
+        # Now call manage_slas and see if the sla_miss callback gets called
+        scheduler = SchedulerJob(dag_id='test_sla_miss',
+                                 **self.default_scheduler_args)
+
+        with mock.patch('airflow.jobs.SchedulerJob.log',
+                        new_callable=PropertyMock) as mock_log:
+            scheduler.manage_slas(dag=dag, session=session)
+            sla_callback.assert_called()
+            mock_log().exception.assert_called_with(
+                'Could not call sla_miss_callback for DAG %s',
+                'test_sla_miss')
+
+    @mock.patch("airflow.utils.email.send_email")
+    def test_scheduler_sla_miss_email_exception(self, mock_send_email):
+        """
+        Test that the scheduler gracefully logs an exception if there is a 
problem
+          sending an email
+        """
+        session = settings.Session()
+
+        # Mock the callback function so we can verify that it was not called
+        mock_send_email.side_effect = RuntimeError('Could not send an email')
+
+        test_start_date = days_ago(2)
+        dag = DAG(dag_id='test_sla_miss',
+                  default_args={'start_date': test_start_date,
+                                'sla': datetime.timedelta(days=1)})
+
+        task = DummyOperator(task_id='dummy',
+                             dag=dag,
+                             owner='airflow',
+                             email='t...@test.com',
+                             sla=datetime.timedelta(hours=1))
+
+        session.merge(models.TaskInstance(task=task,
+                                          execution_date=test_start_date,
+                                          state='Success'))
+
+        # Create an SlaMiss where notification was sent, but email was not
+        session.merge(models.SlaMiss(task_id='dummy',
+                                     dag_id='test_sla_miss',
+                                     execution_date=test_start_date))
+
+        scheduler = SchedulerJob(dag_id='test_sla_miss',
+                                 num_runs=1,
+                                 **self.default_scheduler_args)
+
+        with mock.patch('airflow.jobs.SchedulerJob.log',
+                        new_callable=PropertyMock) as mock_log:
+            scheduler.manage_slas(dag=dag, session=session)
+            mock_log().exception.assert_called_with(
+                'Could not send SLA Miss email notification for DAG %s',
+                'test_sla_miss')
+
     def test_retry_still_in_executor(self):
         """
         Checks if the scheduler does not put a task in limbo, when a task is 
retried

Reply via email to