jedcunningham commented on code in PR #58819:
URL: https://github.com/apache/airflow/pull/58819#discussion_r2582161763


##########
airflow-ctl/src/airflowctl/ctl/commands/config_command.py:
##########
@@ -643,6 +643,10 @@ def _get_option_value(self, config_resp: Config) -> str | 
None:
         renamed_to=ConfigParameter("dag_processor", "refresh_interval"),
         breaking=True,
     ),
+    ConfigChange(

Review Comment:
   Not sure we need this one - this is brand new in 3 anyways, so we'd have no 
occurrences of it in 2. And we already emit warnings when we fall back to the 
old value.



##########
chart/files/statsd-mappings.yml:
##########
@@ -119,3 +119,24 @@ mappings:
       queue: "$1"
       dag_id: "$2"
       task_id: "$3"
+
+  - match: airflow.ti.queued.*.*.*

Review Comment:
   This is okay, in this case, because it's all net new mapping.
   
   But let's imagine the running equivalent wasn't here yet, and we added it. 
That we couldn't do.



##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -2371,36 +2371,48 @@ def _get_num_times_stuck_in_queued(self, ti: 
TaskInstance, session: Session = NE
         count_result: int | None = query.count()
         return count_result if count_result is not None else 0
 
-    previous_ti_running_metrics: dict[tuple[str, str, str], int] = {}
+    previous_ti_metrics: dict[TaskInstanceState, dict[tuple[str, str, str], 
int]] = {}
 
     @provide_session
-    def _emit_running_ti_metrics(self, session: Session = NEW_SESSION) -> None:
-        running = (
+    def _emit_ti_metrics(self, session: Session = NEW_SESSION) -> None:
+        metric_states = {State.SCHEDULED, State.QUEUED, State.RUNNING, 
State.DEFERRED}
+        all_states_metric = (
             session.query(
+                TaskInstance.state,
                 TaskInstance.dag_id,
                 TaskInstance.task_id,
                 TaskInstance.queue,
-                func.count(TaskInstance.task_id).label("running_count"),
+                func.count(TaskInstance.task_id).label("count"),
             )
-            .filter(TaskInstance.state == State.RUNNING)
-            .group_by(TaskInstance.dag_id, TaskInstance.task_id, 
TaskInstance.queue)
+            .filter(TaskInstance.state.in_(metric_states))
+            .group_by(TaskInstance.state, TaskInstance.dag_id, 
TaskInstance.task_id, TaskInstance.queue)
             .all()
         )
 
-        ti_running_metrics = {(row.dag_id, row.task_id, row.queue): 
row.running_count for row in running}
+        for state in metric_states:
+            if state not in self.previous_ti_metrics:
+                self.previous_ti_metrics[state] = {}
 
-        for (dag_id, task_id, queue), count in ti_running_metrics.items():
-            Stats.gauge(f"ti.running.{queue}.{dag_id}.{task_id}", count)
-            Stats.gauge("ti.running", count, tags={"queue": queue, "dag_id": 
dag_id, "task_id": task_id})
+            ti_metrics = {
+                (row.dag_id, row.task_id, row.queue): row.count
+                for row in all_states_metric
+                if row.state == state
+            }
+
+            for (dag_id, task_id, queue), count in ti_metrics.items():
+                # Replace '.' with '__' in task_id to avoid StatsD metric 
hierarchy conflicts from task group separators
+                
Stats.gauge(f"ti.{state}.{queue}.{dag_id}.{task_id.replace('.', '__')}", count)

Review Comment:
   My 2c, leave the task id alone in this PR. If you want to tackle this, it 
should be done more comprehensively, as it impacts every single metric we emit.



##########
chart/files/statsd-mappings.yml:
##########
@@ -119,3 +119,24 @@ mappings:
       queue: "$1"
       dag_id: "$2"
       task_id: "$3"
+
+  - match: airflow.ti.queued.*.*.*

Review Comment:
   Also, just more context, all of these mappings are pretty fragile. They do 
all sorts of wrong if you have a dot in any of your strings :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to