yangguoaws opened a new issue, #29013: URL: https://github.com/apache/airflow/issues/29013
### Apache Airflow version 2.5.0 ### What happened When the dag was set with `dagrun_timeout` [parameter](https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/dag/index.html#airflow.models.dag.DAG) and the dag run failed due to time out reason, the metrics `dagrun.duration.failed.<dag_id>` was not triggered. ### What you think should happen instead _No response_ ### How to reproduce _No response_ ### Operating System Ubuntu 22.04.1 LTS ### Versions of Apache Airflow Providers apache-airflow-providers-amazon==7.1.0 apache-airflow-providers-common-sql==1.3.3 apache-airflow-providers-ftp==3.3.0 apache-airflow-providers-http==4.1.1 apache-airflow-providers-imap==3.1.1 apache-airflow-providers-postgres==5.4.0 apache-airflow-providers-sqlite==3.3.1 ### Deployment Virtualenv installation ### Deployment details _No response_ ### Anything else According to the [doc](https://airflow.apache.org/docs/apache-airflow/stable/logging-monitoring/metrics.html#timers), the metrics `dagrun.duration.failed.<dag_id>` should capture `Milliseconds taken for a DagRun to reach failed state`. However, if the dag run was failed due to the dag run level timeout, the metric can not capture the failed dag run. I deep dive to the airflow code and figured out the reason. The timer `dagrun.duration.failed.{self.dag_id}` was triggered in the method _emit_duration_stats_for_finished_state. [code](https://github.com/apache/airflow/blob/2.5.0/airflow/models/dagrun.py#L880-L894) ``` def _emit_duration_stats_for_finished_state(self): if self.state == State.RUNNING: return if self.start_date is None: self.log.warning("Failed to record duration of %s: start_date is not set.", self) return if self.end_date is None: self.log.warning("Failed to record duration of %s: end_date is not set.", self) return duration = self.end_date - self.start_date if self.state == State.SUCCESS: Stats.timing(f"dagrun.duration.success.{self.dag_id}", duration) elif self.state == State.FAILED: Stats.timing(f"dagrun.duration.failed.{self.dag_id}", duration) ``` The function `_emit_duration_stats_for_finished_state` was only called in the update_state() method for class DagRun(). [code](https://github.com/apache/airflow/blob/2.5.0/airflow/models/dagrun.py#L650-L677) If the update_state() method was not call, then `_emit_duration_stats_for_finished_state` will not used. ``` if self._state == DagRunState.FAILED or self._state == DagRunState.SUCCESS: msg = ( "DagRun Finished: dag_id=%s, execution_date=%s, run_id=%s, " "run_start_date=%s, run_end_date=%s, run_duration=%s, " "state=%s, external_trigger=%s, run_type=%s, " "data_interval_start=%s, data_interval_end=%s, dag_hash=%s" ) self.log.info( msg, self.dag_id, self.execution_date, self.run_id, self.start_date, self.end_date, (self.end_date - self.start_date).total_seconds() if self.start_date and self.end_date else None, self._state, self.external_trigger, self.run_type, self.data_interval_start, self.data_interval_end, self.dag_hash, ) session.flush() self._emit_true_scheduling_delay_stats_for_finished_state(finished_tis) self._emit_duration_stats_for_finished_state() ``` When a dag run was timed out, in the scheduler job, it will only call set_state(). [code](https://github.com/apache/airflow/blob/2.5.0/airflow/jobs/scheduler_job.py#L1280-L1312) ``` if ( dag_run.start_date and dag.dagrun_timeout and dag_run.start_date < timezone.utcnow() - dag.dagrun_timeout ): dag_run.set_state(DagRunState.FAILED) unfinished_task_instances = ( session.query(TI) .filter(TI.dag_id == dag_run.dag_id) .filter(TI.run_id == dag_run.run_id) .filter(TI.state.in_(State.unfinished)) ) for task_instance in unfinished_task_instances: task_instance.state = TaskInstanceState.SKIPPED session.merge(task_instance) session.flush() self.log.info("Run %s of %s has timed-out", dag_run.run_id, dag_run.dag_id) active_runs = dag.get_num_active_runs(only_running=False, session=session) # Work out if we should allow creating a new DagRun now? if self._should_update_dag_next_dagruns(dag, dag_model, active_runs): dag_model.calculate_dagrun_date_fields(dag, dag.get_run_data_interval(dag_run)) callback_to_execute = DagCallbackRequest( full_filepath=dag.fileloc, dag_id=dag.dag_id, run_id=dag_run.run_id, is_failure_callback=True, processor_subdir=dag_model.processor_subdir, msg="timed_out", ) dag_run.notify_dagrun_state_changed() return callback_to_execute ``` From the above code, we can see that when the DAG run was timed out, it will call the set_state() method only. Here update_state() method was not called and that is why the metrics dagrun.duration.failed.{self.dag_id} was not set up accordingly. Please fix this bug to let the timer `dagrun.duration.failed.<dag_id>` can capture the failed dag run due to dag level timed out. ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
