Repository: incubator-airflow Updated Branches: refs/heads/v1-9-test 8b4a5039d -> 6e74d49ad
[AIRFLOW-988] Fix repeating SLA miss callbacks When a callback is passed to `sla_miss_callback` but an email address is not specified, the callback will be continuously called. This is due to the logic used when pulling the slas in `SchedulerJob.manage_slas`. By filtering on `notification_sent` only we will still handle the cases where email is used, but it will prevent the continuous callbacks. Closes #2415 from cjonesy/master Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/6e74d49a Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/6e74d49a Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/6e74d49a Branch: refs/heads/v1-9-test Commit: 6e74d49add28679bbcd1f4cf56e833e1c1ca48c2 Parents: 8b4a503 Author: Charlie Jones <[email protected]> Authored: Mon Oct 2 13:51:40 2017 -0700 Committer: Chris Riccomini <[email protected]> Committed: Mon Oct 2 13:53:48 2017 -0700 ---------------------------------------------------------------------- airflow/jobs.py | 5 ++--- tests/jobs.py | 41 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 43 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6e74d49a/airflow/jobs.py ---------------------------------------------------------------------- diff --git a/airflow/jobs.py b/airflow/jobs.py index 6f4cf97..d697f2d 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -655,8 +655,7 @@ class SchedulerJob(BaseJob): slas = ( session .query(SlaMiss) - .filter(or_(SlaMiss.email_sent == False, - SlaMiss.notification_sent == False)) + .filter(SlaMiss.notification_sent == False) .filter(SlaMiss.dag_id == dag.dag_id) .all() ) @@ -696,7 +695,7 @@ class SchedulerJob(BaseJob): dag.sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis) notification_sent = True email_content = """\ - Here's a list of tasks thas missed their SLAs: + Here's a list of tasks that missed their SLAs: <pre><code>{task_list}\n<code></pre> Blocking tasks: <pre><code>{blocking_task_list}\n{bug}<code></pre> http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6e74d49a/tests/jobs.py ---------------------------------------------------------------------- diff --git a/tests/jobs.py b/tests/jobs.py index 3039e38..fdbbc50 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -37,6 +37,7 @@ from airflow.models import DAG, DagModel, DagBag, DagRun, Pool, TaskInstance as from airflow.operators.dummy_operator import DummyOperator from airflow.operators.bash_operator import BashOperator from airflow.task_runner.base_task_runner import BaseTaskRunner +from airflow.utils.dates import days_ago from airflow.utils.db import provide_session from airflow.utils.state import State from airflow.utils.timeout import timeout @@ -2111,6 +2112,46 @@ class SchedulerJobTest(unittest.TestCase): do_schedule() self.assertEquals(2, len(executor.queued_tasks)) + def test_scheduler_sla_miss_callback(self): + """ + Test that the scheduler does not call the sla_miss_callback when a notification has already been sent + """ + session = settings.Session() + + # Mock the callback function so we can verify that it was not called + sla_callback = mock.MagicMock() + + # Create dag with a start of 2 days ago, but an sla of 1 day ago so we'll already have an sla_miss on the books + test_start_date = days_ago(2) + dag = DAG(dag_id='test_sla_miss', + sla_miss_callback=sla_callback, + default_args={'start_date': test_start_date, + 'sla': datetime.timedelta(days=1)}) + + task = DummyOperator(task_id='dummy', + dag=dag, + owner='airflow') + + # Create a TaskInstance for two days ago + session.merge(models.TaskInstance(task=task, + execution_date=test_start_date, + state='success')) + + # Create an SlaMiss where notification was sent, but email was not + session.merge(models.SlaMiss(task_id='dummy', + dag_id='test_sla_miss', + execution_date=test_start_date, + email_sent=False, + notification_sent=True)) + + # Now call manage_slas and see if the sla_miss callback gets called + scheduler = SchedulerJob(dag_id='test_sla_miss', + num_runs=1, + **self.default_scheduler_args) + scheduler.manage_slas(dag=dag, session=session) + + sla_callback.assert_not_called() + def test_retry_still_in_executor(self): """ Checks if the scheduler does not put a task in limbo, when a task is retried
