o-nikolas commented on code in PR #30612:
URL: https://github.com/apache/airflow/pull/30612#discussion_r1178489849
##########
airflow/models/taskinstance.py:
##########
@@ -1387,6 +1389,52 @@ def _log_state(self, lead_msg: str = "") -> None:
self._date_or_empty("end_date"),
)
+ def emit_state_change_metric(self, new_state: TaskInstanceState):
+ """
+ Sends a time metric logging how much time a given state transition
took.
Review Comment:
```suggestion
Sends a time metric representing how much time a given state
transition took.
```
##########
airflow/models/taskinstance.py:
##########
@@ -1387,6 +1389,52 @@ def _log_state(self, lead_msg: str = "") -> None:
self._date_or_empty("end_date"),
)
+ def emit_state_change_metric(self, new_state: TaskInstanceState):
+ """
+ Sends a time metric logging how much time a given state transition
took.
+ The previous state and metric name is deduced from the state the task
was put in.
+
+ :param new_state: The state that has just been set for this task.
+ We do not use `self.state`, because sometimes the state is updated
directly in the DB and not in
+ the local TaskInstance object.
+ Supported states: QUEUED and RUNNING
+ """
+ if self.end_date:
+ # if the task has an end date, it means that this is not its first
round.
+ # we send the state transition time metric only on the first try,
otherwise it gets more complex.
+ return
+
+ # switch on state and deduce which metric to send
+ if new_state == State.RUNNING:
+ metric_name = "queued_duration"
+ if self.queued_dttm is None:
+ # this should not really happen except in tests or rare cases,
+ # but we don't want to create errors just for a metric, so we
just skip it
+ self.log.warning(
+ "cannot record %s for task %s because previous state
change time has not been saved",
+ metric_name,
+ self.task_id,
+ )
+ return
+ timing = (timezone.utcnow() - self.queued_dttm).total_seconds()
+ elif new_state == State.QUEUED:
+ metric_name = "scheduled_duration"
+ if self.start_date is None:
+ # same comment as above
+ self.log.warning(
+ "cannot record %s for task %s because previous state
change time has not been saved",
+ metric_name,
+ self.task_id,
+ )
+ return
+ timing = (timezone.utcnow() - self.start_date).total_seconds()
+ else:
+ raise NotImplementedError
Review Comment:
Maybe add a string to the error that provides a bit more context and info.
you can add which state was provided and which states are supported.
##########
airflow/models/taskinstance.py:
##########
@@ -1387,6 +1389,52 @@ def _log_state(self, lead_msg: str = "") -> None:
self._date_or_empty("end_date"),
)
+ def emit_state_change_metric(self, new_state: TaskInstanceState):
+ """
+ Sends a time metric logging how much time a given state transition
took.
+ The previous state and metric name is deduced from the state the task
was put in.
+
+ :param new_state: The state that has just been set for this task.
+ We do not use `self.state`, because sometimes the state is updated
directly in the DB and not in
+ the local TaskInstance object.
+ Supported states: QUEUED and RUNNING
+ """
+ if self.end_date:
+ # if the task has an end date, it means that this is not its first
round.
+ # we send the state transition time metric only on the first try,
otherwise it gets more complex.
+ return
+
+ # switch on state and deduce which metric to send
+ if new_state == State.RUNNING:
+ metric_name = "queued_duration"
+ if self.queued_dttm is None:
+ # this should not really happen except in tests or rare cases,
+ # but we don't want to create errors just for a metric, so we
just skip it
+ self.log.warning(
+ "cannot record %s for task %s because previous state
change time has not been saved",
+ metric_name,
+ self.task_id,
+ )
+ return
+ timing = (timezone.utcnow() - self.queued_dttm).total_seconds()
+ elif new_state == State.QUEUED:
+ metric_name = "scheduled_duration"
+ if self.start_date is None:
+ # same comment as above
+ self.log.warning(
+ "cannot record %s for task %s because previous state
change time has not been saved",
+ metric_name,
+ self.task_id,
+ )
+ return
+ timing = (timezone.utcnow() - self.start_date).total_seconds()
+ else:
+ raise NotImplementedError
+
+ # send metric twice, once (legacy) with tags in the name and once with
tags as tags
+ Stats.timing(f"dag.{self.dag_id}.{self.task_id}.{metric_name}", timing)
+ Stats.timing(f"task.{metric_name}", timing, tags={"task_id":
self.task_id, "dag_id": self.dag_id})
Review Comment:
👍
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]