This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v1-10-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit fa49c33c22eeaa65d31cc51bba6595ce3c50a9c3 Author: Ace Haidrey <[email protected]> AuthorDate: Fri Nov 13 14:03:42 2020 -0800 Add metric for scheduling delay between first run task & expected start time (#9544) Co-authored-by: Ace Haidrey <[email protected]> (cherry picked from commit aac3877ec374e5f376d8f95b50031c10625216a4) --- airflow/models/dagrun.py | 36 ++++++++++++++++++++++++++++++ docs/metrics.rst | 23 ++++++++++--------- tests/models/test_dagrun.py | 54 ++++++++++++++++++++++++++++++++++++++++++--- 3 files changed, 99 insertions(+), 14 deletions(-) diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index 9775c9f..8ba7f4e 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -320,6 +320,7 @@ class DagRun(Base, LoggingMixin): else: self.set_state(State.RUNNING) + self._emit_true_scheduling_delay_stats_for_finished_state(finished_tasks) self._emit_duration_stats_for_finished_state() # todo: determine we want to use with_for_update to make sure to lock the run @@ -356,6 +357,41 @@ class DagRun(Base, LoggingMixin): session=session): return True + def _emit_true_scheduling_delay_stats_for_finished_state(self, finished_tis): + """ + This is a helper method to emit the true scheduling delay stats, which is defined as + the time when the first task in DAG starts minus the expected DAG run datetime. + This method will be used in the update_state method when the state of the DagRun + is updated to a completed status (either success or failure). The method will find the first + started task within the DAG and calculate the expected DagRun start time (based on + dag.execution_date & dag.schedule_interval), and minus these two values to get the delay. + The emitted data may contains outlier (e.g. when the first task was cleared, so + the second task's start_date will be used), but we can get rid of the the outliers + on the stats side through the dashboards tooling built. + Note, the stat will only be emitted if the DagRun is a scheduler triggered one + (i.e. external_trigger is False). + """ + try: + if self.state == State.RUNNING: + return + if self.external_trigger: + return + if not finished_tis: + return + dag = self.get_dag() + ordered_tis_by_start_date = [ti for ti in finished_tis if ti.start_date] + ordered_tis_by_start_date.sort(key=lambda ti: ti.start_date, reverse=False) + first_start_date = ordered_tis_by_start_date[0].start_date + if first_start_date: + # dag.following_schedule calculates the expected start datetime for a scheduled dagrun + # i.e. a daily flow for execution date 1/1/20 actually runs on 1/2/20 hh:mm:ss, + # and ti.start_date will be 1/2/20 hh:mm:ss so the following schedule is comparison + true_delay = (first_start_date - dag.following_schedule(self.execution_date)).total_seconds() + if true_delay >= 0: + Stats.timing('dagrun.{}.first_task_scheduling_delay'.format(dag.dag_id), true_delay) + except Exception as e: + self.log.warning('Failed to record first_task_scheduling_delay metric:\n', e) + def _emit_duration_stats_for_finished_state(self): if self.state == State.RUNNING: return diff --git a/docs/metrics.rst b/docs/metrics.rst index afbd7c9..1e6e06b 100644 --- a/docs/metrics.rst +++ b/docs/metrics.rst @@ -90,14 +90,15 @@ Name Description Timers ------ -=========================================== ================================================= -Name Description -=========================================== ================================================= -``dagrun.dependency-check.<dag_id>`` Milliseconds taken to check DAG dependencies -``dag.<dag_id>.<task_id>.duration`` Milliseconds taken to finish a task -``dag_processing.last_duration.<dag_file>`` Milliseconds taken to load the given DAG file -``dagrun.duration.success.<dag_id>`` Milliseconds taken for a DagRun to reach success state -``dagrun.duration.failed.<dag_id>`` Milliseconds taken for a DagRun to reach failed state -``dagrun.schedule_delay.<dag_id>`` Milliseconds of delay between the scheduled DagRun - start date and the actual DagRun start date -=========================================== ================================================= +================================================= ======================================================================= +Name Description +================================================= ======================================================================= +``dagrun.dependency-check.<dag_id>`` Milliseconds taken to check DAG dependencies +``dag.<dag_id>.<task_id>.duration`` Milliseconds taken to finish a task +``dag_processing.last_duration.<dag_file>`` Milliseconds taken to load the given DAG file +``dagrun.duration.success.<dag_id>`` Milliseconds taken for a DagRun to reach success state +``dagrun.duration.failed.<dag_id>`` Milliseconds taken for a DagRun to reach failed state +``dagrun.schedule_delay.<dag_id>`` Milliseconds of delay between the scheduled DagRun + start date and the actual DagRun start date +``dagrun.<dag_id>.first_task_scheduling_delay`` Seconds elapsed between first task start_date and dagrun expected start +================================================= ============================================================================== diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py index 6dcf49e..1f627c5 100644 --- a/tests/models/test_dagrun.py +++ b/tests/models/test_dagrun.py @@ -16,7 +16,6 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. - import datetime import unittest @@ -24,14 +23,16 @@ from parameterized import parameterized from airflow import settings, models from airflow.jobs import BackfillJob -from airflow.models import DAG, DagRun, clear_task_instances +from airflow.models import DAG, DagRun, clear_task_instances, DagModel from airflow.models import TaskInstance as TI from airflow.operators.dummy_operator import DummyOperator from airflow.operators.python_operator import ShortCircuitOperator +from airflow.settings import Stats from airflow.utils import timezone +from airflow.utils.dates import days_ago from airflow.utils.state import State from airflow.utils.trigger_rule import TriggerRule -from tests.compat import mock +from tests.compat import mock, call from tests.models import DEFAULT_DATE @@ -608,3 +609,50 @@ class DagRunTest(unittest.TestCase): dagrun.verify_integrity() task = dagrun.get_task_instances()[0] assert task.queue == 'queue1' + + @mock.patch.object(Stats, 'timing') + def test_no_scheduling_delay_for_nonscheduled_runs(self, stats_mock): + """ + Tests that dag scheduling delay stat is not called if the dagrun is not a scheduled run. + This case is manual run. Simple test for sanity check. + """ + dag = DAG(dag_id='test_dagrun_stats', start_date=days_ago(1)) + dag_task = DummyOperator(task_id='dummy', dag=dag) + + initial_task_states = { + dag_task.task_id: State.SUCCESS, + } + + dag_run = self.create_dag_run(dag=dag, state=State.RUNNING, task_states=initial_task_states) + dag_run.update_state() + self.assertNotIn(call('dagrun.{}.first_task_scheduling_delay'.format(dag.dag_id)), + stats_mock.mock_calls) + + @mock.patch.object(Stats, 'timing') + def test_emit_scheduling_delay(self, stats_mock): + """ + Tests that dag scheduling delay stat is set properly once running scheduled dag. + dag_run.update_state() invokes the _emit_true_scheduling_delay_stats_for_finished_state method. + """ + dag = DAG(dag_id='test_emit_dag_stats', start_date=days_ago(1)) + dag_task = DummyOperator(task_id='dummy', dag=dag, owner='airflow') + + session = settings.Session() + orm_dag = DagModel(dag_id=dag.dag_id, is_active=True) + session.add(orm_dag) + session.flush() + dag_run = dag.create_dagrun( + run_id="test", + state=State.SUCCESS, + execution_date=dag.start_date, + start_date=dag.start_date, + session=session, + ) + ti = dag_run.get_task_instance(dag_task.task_id) + ti.set_state(State.SUCCESS, session) + session.commit() + session.close() + dag_run.update_state() + true_delay = (ti.start_date - dag.following_schedule(dag_run.execution_date)).total_seconds() + sched_delay_stat_call = call('dagrun.{}.first_task_scheduling_delay'.format(dag.dag_id), true_delay) + self.assertIn(sched_delay_stat_call, stats_mock.mock_calls)
