yangguoaws opened a new issue, #29013:
URL: https://github.com/apache/airflow/issues/29013

   ### Apache Airflow version
   
   2.5.0
   
   ### What happened
   
   When the dag was set with `dagrun_timeout` 
[parameter](https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/dag/index.html#airflow.models.dag.DAG)
 and the dag run failed due to time out reason, the metrics 
`dagrun.duration.failed.<dag_id>` was not triggered.
   
   ### What you think should happen instead
   
   _No response_
   
   ### How to reproduce
   
   _No response_
   
   ### Operating System
   
   Ubuntu 22.04.1 LTS
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-amazon==7.1.0
   apache-airflow-providers-common-sql==1.3.3
   apache-airflow-providers-ftp==3.3.0
   apache-airflow-providers-http==4.1.1
   apache-airflow-providers-imap==3.1.1
   apache-airflow-providers-postgres==5.4.0
   apache-airflow-providers-sqlite==3.3.1
   
   ### Deployment
   
   Virtualenv installation
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   According to the 
[doc](https://airflow.apache.org/docs/apache-airflow/stable/logging-monitoring/metrics.html#timers),
 the metrics `dagrun.duration.failed.<dag_id>` should capture `Milliseconds 
taken for a DagRun to reach failed state`. However, if the dag run was failed 
due to the dag run level timeout, the metric can not capture the failed dag run.
   
   I deep dive to the airflow code and figured out the reason.
   
   The timer `dagrun.duration.failed.{self.dag_id}` was triggered in the method 
_emit_duration_stats_for_finished_state. 
[code](https://github.com/apache/airflow/blob/2.5.0/airflow/models/dagrun.py#L880-L894)
   ```
       def _emit_duration_stats_for_finished_state(self):
           if self.state == State.RUNNING:
               return
           if self.start_date is None:
               self.log.warning("Failed to record duration of %s: start_date is 
not set.", self)
               return
           if self.end_date is None:
               self.log.warning("Failed to record duration of %s: end_date is 
not set.", self)
               return
   
           duration = self.end_date - self.start_date
           if self.state == State.SUCCESS:
               Stats.timing(f"dagrun.duration.success.{self.dag_id}", duration)
           elif self.state == State.FAILED:
               Stats.timing(f"dagrun.duration.failed.{self.dag_id}", duration)
   ```
   
   The function `_emit_duration_stats_for_finished_state` was only called in 
the update_state() method for class DagRun(). 
[code](https://github.com/apache/airflow/blob/2.5.0/airflow/models/dagrun.py#L650-L677)
 If the update_state() method was not call, then 
`_emit_duration_stats_for_finished_state` will not used.
   ```
           if self._state == DagRunState.FAILED or self._state == 
DagRunState.SUCCESS:
               msg = (
                   "DagRun Finished: dag_id=%s, execution_date=%s, run_id=%s, "
                   "run_start_date=%s, run_end_date=%s, run_duration=%s, "
                   "state=%s, external_trigger=%s, run_type=%s, "
                   "data_interval_start=%s, data_interval_end=%s, dag_hash=%s"
               )
               self.log.info(
                   msg,
                   self.dag_id,
                   self.execution_date,
                   self.run_id,
                   self.start_date,
                   self.end_date,
                   (self.end_date - self.start_date).total_seconds()
                   if self.start_date and self.end_date
                   else None,
                   self._state,
                   self.external_trigger,
                   self.run_type,
                   self.data_interval_start,
                   self.data_interval_end,
                   self.dag_hash,
               )
               session.flush()
   
           
self._emit_true_scheduling_delay_stats_for_finished_state(finished_tis)
           self._emit_duration_stats_for_finished_state()
   ```
   
   When a dag run was timed out, in the scheduler job, it will only call 
set_state(). 
[code](https://github.com/apache/airflow/blob/2.5.0/airflow/jobs/scheduler_job.py#L1280-L1312)
   ```
           if (
               dag_run.start_date
               and dag.dagrun_timeout
               and dag_run.start_date < timezone.utcnow() - dag.dagrun_timeout
           ):
               dag_run.set_state(DagRunState.FAILED)
               unfinished_task_instances = (
                   session.query(TI)
                   .filter(TI.dag_id == dag_run.dag_id)
                   .filter(TI.run_id == dag_run.run_id)
                   .filter(TI.state.in_(State.unfinished))
               )
               for task_instance in unfinished_task_instances:
                   task_instance.state = TaskInstanceState.SKIPPED
                   session.merge(task_instance)
               session.flush()
               self.log.info("Run %s of %s has timed-out", dag_run.run_id, 
dag_run.dag_id)
               active_runs = dag.get_num_active_runs(only_running=False, 
session=session)
               # Work out if we should allow creating a new DagRun now?
               if self._should_update_dag_next_dagruns(dag, dag_model, 
active_runs):
                   dag_model.calculate_dagrun_date_fields(dag, 
dag.get_run_data_interval(dag_run))
   
               callback_to_execute = DagCallbackRequest(
                   full_filepath=dag.fileloc,
                   dag_id=dag.dag_id,
                   run_id=dag_run.run_id,
                   is_failure_callback=True,
                   processor_subdir=dag_model.processor_subdir,
                   msg="timed_out",
               )
   
               dag_run.notify_dagrun_state_changed()
               return callback_to_execute
   ```
   
   From the above code, we can see that when the DAG run was timed out, it will 
call the set_state() method only. Here update_state() method was not called and 
that is why the metrics dagrun.duration.failed.{self.dag_id} was not set up 
accordingly.
   
   Please fix this bug to let the timer `dagrun.duration.failed.<dag_id>` can 
capture the failed dag run due to dag level timed out.
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to