EricGao888 edited a comment on issue #19286:
URL: https://github.com/apache/airflow/issues/19286#issuecomment-962999214


   ``` 
               # dagrun.schedule_delay
   def _update_state(dag: DAG, dag_run: DagRun):
               dag_run.state = State.RUNNING
               dag_run.start_date = timezone.utcnow()
               if dag.timetable.periodic:
                   # TODO: Logically, this should be DagRunInfo.run_after, but 
the
                   # information is not stored on a DagRun, only before the 
actual
                   # execution on DagModel.next_dagrun_create_after. We should 
add
                   # a field on DagRun for this instead of relying on the run
                   # always happening immediately after the data interval.
                   expected_start_date = dag.get_run_data_interval(dag_run).end
                   schedule_delay = dag_run.start_date - expected_start_date
                   Stats.timing(f'dagrun.schedule_delay.{dag.dag_id}', 
schedule_delay)
   
               # first_task_scheduling_delay
               if not self.dag.timetable.periodic:
                   # We can't emit this metric if there is no following 
schedule to calculate from!
                   return
   
               ordered_tis_by_start_date = [ti for ti in finished_tis if 
ti.start_date]
               ordered_tis_by_start_date.sort(key=lambda ti: ti.start_date, 
reverse=False)
               first_start_date = ordered_tis_by_start_date[0].start_date
               if first_start_date:
                   # TODO: Logically, this should be DagRunInfo.run_after, but 
the
                   # information is not stored on a DagRun, only before the 
actual
                   # execution on DagModel.next_dagrun_create_after. We should 
add
                   # a field on DagRun for this instead of relying on the run
                   # always happening immediately after the data interval.
                   data_interval_end = dag.get_run_data_interval(self).end
                   true_delay = first_start_date - data_interval_end
                   if true_delay.total_seconds() > 0:
                       
Stats.timing(f'dagrun.{dag.dag_id}.first_task_scheduling_delay', true_delay)
   ```
   I think the above two code fragments copied from scheduler_job.py and 
dagrun.py calculate the metics for dagrun.schedule_delay.<dag_id> and 
dagrun.<dag_id>.first_task_scheduling_delay respectively. It seems if 
`dag.timetable.periodic` is false, these two metrics will not be calculated and 
we cannot achieve the delay for these kinds of dags.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to