AutomationDev85 commented on code in PR #58819:
URL: https://github.com/apache/airflow/pull/58819#discussion_r2589048668


##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -2371,36 +2371,48 @@ def _get_num_times_stuck_in_queued(self, ti: 
TaskInstance, session: Session = NE
         count_result: int | None = query.count()
         return count_result if count_result is not None else 0
 
-    previous_ti_running_metrics: dict[tuple[str, str, str], int] = {}
+    previous_ti_metrics: dict[TaskInstanceState, dict[tuple[str, str, str], 
int]] = {}
 
     @provide_session
-    def _emit_running_ti_metrics(self, session: Session = NEW_SESSION) -> None:
-        running = (
+    def _emit_ti_metrics(self, session: Session = NEW_SESSION) -> None:
+        metric_states = {State.SCHEDULED, State.QUEUED, State.RUNNING, 
State.DEFERRED}
+        all_states_metric = (
             session.query(
+                TaskInstance.state,
                 TaskInstance.dag_id,
                 TaskInstance.task_id,
                 TaskInstance.queue,
-                func.count(TaskInstance.task_id).label("running_count"),
+                func.count(TaskInstance.task_id).label("count"),
             )
-            .filter(TaskInstance.state == State.RUNNING)
-            .group_by(TaskInstance.dag_id, TaskInstance.task_id, 
TaskInstance.queue)
+            .filter(TaskInstance.state.in_(metric_states))
+            .group_by(TaskInstance.state, TaskInstance.dag_id, 
TaskInstance.task_id, TaskInstance.queue)
             .all()
         )
 
-        ti_running_metrics = {(row.dag_id, row.task_id, row.queue): 
row.running_count for row in running}
+        for state in metric_states:
+            if state not in self.previous_ti_metrics:
+                self.previous_ti_metrics[state] = {}
 
-        for (dag_id, task_id, queue), count in ti_running_metrics.items():
-            Stats.gauge(f"ti.running.{queue}.{dag_id}.{task_id}", count)
-            Stats.gauge("ti.running", count, tags={"queue": queue, "dag_id": 
dag_id, "task_id": task_id})
+            ti_metrics = {
+                (row.dag_id, row.task_id, row.queue): row.count
+                for row in all_states_metric
+                if row.state == state
+            }
+
+            for (dag_id, task_id, queue), count in ti_metrics.items():
+                # Replace '.' with '__' in task_id to avoid StatsD metric 
hierarchy conflicts from task group separators
+                
Stats.gauge(f"ti.{state}.{queue}.{dag_id}.{task_id.replace('.', '__')}", count)

Review Comment:
   Thanks for the feedback. I’ve reverted to using the plain task_id. I hope to 
revisit this later with a more comprehensive solution.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to