serkef commented on a change in pull request #3584: [AIRFLOW-249] Refactor the SLA mechanism URL: https://github.com/apache/airflow/pull/3584#discussion_r284265407
########## File path: airflow/utils/sla.py ########## @@ -0,0 +1,484 @@ +# there's a sqlalchemy.py in this dir! +from __future__ import absolute_import + +import airflow.models +from airflow.utils import asciiart +from airflow.utils.db import provide_session +from airflow.utils.email import send_email +from airflow.utils.log.logging_mixin import LoggingMixin +from airflow.utils.state import State +from six import string_types + +from sqlalchemy import or_ + +log = LoggingMixin().log + + +def yield_unscheduled_runs(dag, last_scheduled_run, ts): + """ + Yield new DagRuns that haven't been created yet. This functionality is + important to SLA misses because it is possible for the scheduler to fall + so far behind that it cannot create a DAGRun when it is supposed to (like + if it is offline, or if there are strict concurrency limits). We need to + understand and alert on what DAGRuns *should* have been created by this + point in time. + """ + + # TODO: A lot of this logic is duplicated from the scheduler. It would + # be better to have one function that yields upcoming DAG runs in a + # consistent way that is usable for both use cases. + + # Start by assuming that there is no next run. + next_run_date = None + + # The first DAGRun has not been created yet. + if not last_scheduled_run: + task_start_dates = [t.start_date for t in dag.tasks] + if task_start_dates: + next_run_date = dag.normalize_schedule(min(task_start_dates)) + # The DagRun is @once and has already happened. + elif dag.schedule_interval == '@once': + return + # Start from the next "normal" run. + else: + next_run_date = dag.following_schedule(last_scheduled_run.execution_date) + + while True: + # There should be a next execution. + if not next_run_date: + return + + # The next execution shouldn't be in the future. + if next_run_date > ts: Review comment: I am a bit confused with the timestamps. Should this be current timestamp? What if `next_run_date` was in the future when this function was called, but it's not anymore. What should happen? - Point x - task fails - Point y - function is called - Point z - next_run_date - Point k - sla is determined ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
