serkef commented on a change in pull request #3584: [AIRFLOW-249] Refactor the 
SLA mechanism
URL: https://github.com/apache/airflow/pull/3584#discussion_r284237533
 
 

 ##########
 File path: airflow/utils/sla.py
 ##########
 @@ -0,0 +1,484 @@
+# there's a sqlalchemy.py in this dir!
+from __future__ import absolute_import
+
+import airflow.models
+from airflow.utils import asciiart
+from airflow.utils.db import provide_session
+from airflow.utils.email import send_email
+from airflow.utils.log.logging_mixin import LoggingMixin
+from airflow.utils.state import State
+from six import string_types
+
+from sqlalchemy import or_
+
+log = LoggingMixin().log
+
+
+def yield_unscheduled_runs(dag, last_scheduled_run, ts):
+    """
+    Yield new DagRuns that haven't been created yet. This functionality is
+    important to SLA misses because it is possible for the scheduler to fall
+    so far behind that it cannot create a DAGRun when it is supposed to (like
+    if it is offline, or if there are strict concurrency limits). We need to
+    understand and alert on what DAGRuns *should* have been created by this
+    point in time.
+    """
+
+    # TODO: A lot of this logic is duplicated from the scheduler. It would
+    # be better to have one function that yields upcoming DAG runs in a
+    # consistent way that is usable for both use cases.
+
+    # Start by assuming that there is no next run.
+    next_run_date = None
+
+    # The first DAGRun has not been created yet.
+    if not last_scheduled_run:
+        task_start_dates = [t.start_date for t in dag.tasks]
+        if task_start_dates:
+            next_run_date = dag.normalize_schedule(min(task_start_dates))
+    # The DagRun is @once and has already happened.
+    elif dag.schedule_interval == '@once':
+        return
+    # Start from the next "normal" run.
+    else:
+        next_run_date = 
dag.following_schedule(last_scheduled_run.execution_date)
+
+    while True:
+        # There should be a next execution.
+        if not next_run_date:
+            return
+
+        # The next execution shouldn't be in the future.
+        if next_run_date > ts:
+            return
+
+        # The next execution shouldn't be beyond the DAG's end date.
+        # n.b. - tasks have their own end dates checked later
+        if next_run_date and dag.end_date and next_run_date > dag.end_date:
+            return
+
+        # Calculate the end of this execution period.
+        if dag.schedule_interval == '@once':
+            period_end = next_run_date
+        else:
+            period_end = dag.following_schedule(next_run_date)
+
+        # The next execution shouldn't still be mid-period.
+        if period_end > ts:
+            return
+
+        # We've passed every filter; this is a valid future DagRun that
+        # presumably hasn't been scheduled due to concurrency limits.
+        # Create and yield a fake DAGRun, which won't exist in the db yet.
+        next_run = airflow.models.DagRun(
+            dag_id=dag.dag_id,
+            run_id=airflow.models.DagRun.ID_PREFIX + next_run_date.isoformat(),
+            execution_date=next_run_date,
+            start_date=ts,
+            state=State.NONE,
+            external_trigger=False,
+        )
+        next_run.dag = dag
+        yield next_run
+
+        # Examine the next date.
+        next_run_date = dag.following_schedule(next_run_date)
+
+
+def yield_unscheduled_tis(dag_run, ts, session=None):
+    """
+    Given an unscheduled `DagRun`, yield any unscheduled TIs that will exist
+    for it in the future, respecting the end date of the DAG and task. See note
+    above for why this is important for SLA notifications.
+    """
+    for task in dag_run.dag.tasks:
+        end_dates = []
+        if dag_run.dag.end_date:
+            end_dates.append(dag_run.dag.end_date)
+        if task.end_date:
+            end_dates.append(task.end_date)
+
+        # Create TIs if there is no end date, or it hasn't happened yet.
+        if not end_dates or ts < min(end_dates):
+            yield airflow.models.TaskInstance(task, dag_run.execution_date)
+
+
+def get_sla_misses(ti, session):
+    """
+    Get all SLA misses that match a particular TaskInstance. There may be
+    several matches if the Task has several independent SLAs.
+    """
+    SM = airflow.models.SlaMiss
+    return session.query(SM).filter(
+        SM.dag_id == ti.dag_id,
+        SM.task_id == ti.task_id,
+        SM.execution_date == ti.execution_date
+    ).all()
+
+
+def create_sla_misses(ti, timestamp, session):
+    """
+    Determine whether a TaskInstance has missed any SLAs as of a provided
+    timestamp. If it has, create `SlaMiss` objects in the provided session.
+    Note that one TaskInstance can have multiple SLA miss objects: for example,
+    it can both start late and run longer than expected.
+    """
+    # Skipped task instances will never trigger SLAs because they
+    # were intentionally not scheduled. Though, it's still a valid and
+    # interesting SLA miss if a task that's *going* to be skipped today is
+    # late! That could mean that an upstream task is hanging.
+    if ti.state == State.SKIPPED:
+        return
+
+    log.debug("Calculating SLA misses for {} as of {}".format(ti, timestamp))
+
+    SM = airflow.models.SlaMiss
+
+    # Get existing SLA misses for this task instance.
+    ti_misses = {sm.sla_type: sm for sm in get_sla_misses(ti, session)}
+
+    # Calculate SLA misses that don't already exist. Wrapping exceptions here
+    # is important so that an exception in one type of SLA doesn't
+    # prevent other task SLAs from getting triggered.
+
+    # SLA Miss for Expected Duration
+    # n.b. - this one can't be calculated until the ti has started!
+    if SM.TASK_DURATION_EXCEEDED not in ti_misses \
+            and ti.task.expected_duration and ti.start_date:
+        try:
+            if ti.state in State.finished():
+                duration = ti.end_date - ti.start_date
+            else:
+                # Use the current time, if the task is still running.
+                duration = timestamp - ti.start_date
+
+            if duration > ti.task.expected_duration:
+                log.debug("Task instance {}'s duration of {} > its expected "
+                          "duration of {}. Creating duration exceeded SLA 
miss."
+                          .format(ti, duration, ti.task.expected_duration))
+                session.merge(SM(
+                    task_id=ti.task_id,
+                    dag_id=ti.dag_id,
+                    execution_date=ti.execution_date,
+                    sla_type=SM.TASK_DURATION_EXCEEDED,
+                    timestamp=timestamp))
+            else:
+                log.debug("Task instance {}'s duration of {} <= its expected "
+                          "duration of {}, SLA not yet missed."
+                          .format(ti, duration, ti.task.expected_duration))
+        except Exception:
+            log.exception(
+                "Failed to calculate expected duration SLA miss for "
+                "task instance %s",
+                ti
+            )
+
+    # SLA Miss for Expected Start
+    if SM.TASK_LATE_START not in ti_misses and ti.task.expected_start:
+        try:
+            # If a TI's exc date is 01-01-2018, we expect it to start by the 
next
+            # execution date (01-02-2018) plus a delta of expected_start.
+            expected_start = ti.task.dag.following_schedule(ti.execution_date)
+            expected_start += ti.task.expected_start
+
+            # The case where we have started the ti, but late
+            if ti.start_date and ti.start_date > expected_start:
+                log.debug("Task instance {}'s actual start {} > its expected "
+                          "start of {}. Creating late start SLA miss."
+                          .format(ti, ti.start_date, expected_start))
+                session.merge(SM(
+                    task_id=ti.task_id,
+                    dag_id=ti.dag_id,
+                    execution_date=ti.execution_date,
+                    sla_type=SM.TASK_LATE_START,
+                    timestamp=timestamp))
+
+            # The case where we haven't even started the ti yet
+            elif timestamp > expected_start:
+                log.debug("Task instance {} has not started by its expected "
+                          "start of {}. Creating late start SLA miss."
+                          .format(ti, expected_start))
+                session.merge(SM(
+                    task_id=ti.task_id,
+                    dag_id=ti.dag_id,
+                    execution_date=ti.execution_date,
+                    sla_type=SM.TASK_LATE_START,
+                    timestamp=timestamp))
+            else:
+                log.debug("Task instance {}'s expected start of {} hasn't "
+                          "happened yet, SLA not yet missed."
+                          .format(ti, expected_start))
+        except Exception:
+            log.exception(
+                "Failed to calculate expected start SLA miss for "
+                "task instance %s",
+                ti
+            )
+
+    # SLA Miss for Expected Finish
+    if SM.TASK_LATE_FINISH not in ti_misses and ti.task.expected_finish:
+        try:
+            # If a TI's exc date is 01-01-2018, we expect it to finish by the 
next
+            # execution date (01-02-2018) plus a delta of expected_finish.
+            expected_finish = ti.task.dag.following_schedule(ti.execution_date)
+            expected_finish += ti.task.expected_finish
+
+            if ti.end_date and ti.end_date > expected_finish:
+                log.debug("Task instance {}'s actual finish {} > its expected "
+                          "finish of {}. Creating late finish SLA miss."
+                          .format(ti, ti.end_date, expected_finish))
+                session.merge(SM(
+                    task_id=ti.task_id,
+                    dag_id=ti.dag_id,
+                    execution_date=ti.execution_date,
+                    sla_type=SM.TASK_LATE_FINISH,
+                    timestamp=timestamp))
+
+            elif timestamp > expected_finish:
+                log.debug("Task instance {} has not finished by its expected "
+                          "finish of {}. Creating late finish SLA miss."
+                          .format(ti, expected_finish))
+                session.merge(SM(
+                    task_id=ti.task_id,
+                    dag_id=ti.dag_id,
+                    execution_date=ti.execution_date,
+                    sla_type=SM.TASK_LATE_FINISH,
+                    timestamp=timestamp))
+            else:
+                log.debug("Task instance {}'s expected finish of {} hasn't "
+                          "happened yet, SLA not yet missed."
+                          .format(ti, expected_finish))
+        except Exception:
+            log.exception(
+                "Failed to calculate expected finish SLA miss for "
+                "task instance %s",
+                ti
+            )
+
+
+def send_sla_miss_email(context):
+    """
+    Send an SLA miss email. This is the default SLA miss callback.
+    """
+    sla_miss = context["sla_miss"]
+
+    if sla_miss.sla_type == sla_miss.TASK_DURATION_EXCEEDED:
+        email_function = send_task_duration_exceeded_email
+    elif sla_miss.sla_type == sla_miss.TASK_LATE_START:
+        email_function = send_task_late_start_email
+    elif sla_miss.sla_type == sla_miss.TASK_LATE_FINISH:
+        email_function = send_task_late_finish_email
+    else:
+        log.warning("Received unexpected SLA Miss type: %s", sla_miss.sla_type)
+        return
+
 
 Review comment:
   Would it be cleaner if this was a dictionary?
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to