This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 92fd810e017 Extend airflow_ti_running metrics by scheduled, queued and 
deferred (#58819)
92fd810e017 is described below

commit 92fd810e01732b4670b63c8c6e339078cd4dc3af
Author: AutomationDev85 <[email protected]>
AuthorDate: Sun Dec 14 18:46:14 2025 +0100

    Extend airflow_ti_running metrics by scheduled, queued and deferred (#58819)
    
    * ti metric export of scheduled, queued and deferred
    
    * Added deprecated warning for new option
    
    * only add task_id to metric
    
    ---------
    
    Co-authored-by: AutomationDev85 <AutomationDev85>
---
 .../logging-monitoring/metrics.rst                 |  9 ++++
 .../administration-and-deployment/scheduler.rst    |  4 +-
 .../src/airflow/config_templates/config.yml        |  5 ++-
 .../src/airflow/jobs/scheduler_job_runner.py       | 48 +++++++++++++---------
 chart/files/statsd-mappings.yml                    | 21 ++++++++++
 .../src/airflow_shared/configuration/parser.py     |  1 +
 6 files changed, 65 insertions(+), 23 deletions(-)

diff --git 
a/airflow-core/docs/administration-and-deployment/logging-monitoring/metrics.rst
 
b/airflow-core/docs/administration-and-deployment/logging-monitoring/metrics.rst
index 0a38c1b0992..b7430c3afaa 100644
--- 
a/airflow-core/docs/administration-and-deployment/logging-monitoring/metrics.rst
+++ 
b/airflow-core/docs/administration-and-deployment/logging-monitoring/metrics.rst
@@ -263,9 +263,18 @@ Name                                                 
Description
 ``triggerer.capacity_left.<hostname>``               Capacity left on a 
triggerer to run triggers (described by hostname)
 ``triggerer.capacity_left``                          Capacity left on a 
triggerer to run triggers (described by hostname).
                                                      Metric with hostname 
tagging.
+``ti.scheduled.<queue>.<dag_id>.<task_id>``          Number of scheduled tasks 
in a given Dag.
+``ti.scheduled``                                     Number of scheduled tasks 
in a given Dag.
+                                                     Metric with queue, dag_id 
and task_id tagging.
+``ti.queued.<queue>.<dag_id>.<task_id>``             Number of queued tasks in 
a given Dag.
+``ti.queued``                                        Number of queued tasks in 
a given Dag.
+                                                     Metric with queue, dag_id 
and task_id tagging.
 ``ti.running.<queue>.<dag_id>.<task_id>``            Number of running tasks 
in a given Dag. As ti.start and ti.finish can run out of sync this metric shows 
all running tis.
 ``ti.running``                                       Number of running tasks 
in a given Dag. As ti.start and ti.finish can run out of sync this metric shows 
all running tis.
                                                      Metric with queue, dag_id 
and task_id tagging.
+``ti.deferred.<queue>.<dag_id>.<task_id>``           Number of deferred tasks 
in a given Dag.
+``ti.deferred``                                      Number of deferred tasks 
in a given Dag.
+                                                     Metric with queue, dag_id 
and task_id tagging.
 ==================================================== 
========================================================================
 
 Timers
diff --git a/airflow-core/docs/administration-and-deployment/scheduler.rst 
b/airflow-core/docs/administration-and-deployment/scheduler.rst
index 582a4ab36e8..2bb447c0774 100644
--- a/airflow-core/docs/administration-and-deployment/scheduler.rst
+++ b/airflow-core/docs/administration-and-deployment/scheduler.rst
@@ -261,9 +261,9 @@ However, you can also look at other non-performance-related 
scheduler configurat
   period.
 
 
-- :ref:`config:scheduler__running_metrics_interval`
+- :ref:`config:scheduler__ti_metrics_interval`
 
-  How often (in seconds) should running task instance stats be sent to StatsD
+  How often (in seconds) should task instance (scheduled, queued, running and 
deferred) stats be sent to StatsD
   (if statsd_on is enabled). This is a *relatively* expensive query to compute
   this, so this should be set to match the same period as your StatsD roll-up
   period.
diff --git a/airflow-core/src/airflow/config_templates/config.yml 
b/airflow-core/src/airflow/config_templates/config.yml
index 56136680cfa..38b6fe85266 100644
--- a/airflow-core/src/airflow/config_templates/config.yml
+++ b/airflow-core/src/airflow/config_templates/config.yml
@@ -2150,9 +2150,10 @@ scheduler:
       type: float
       example: ~
       default: "5.0"
-    running_metrics_interval:
+    ti_metrics_interval:
       description: |
-        How often (in seconds) should running task instance stats be sent to 
StatsD (if statsd_on is enabled)
+        How often (in seconds) should task instance (scheduled, queued, 
running and deferred) stats
+        be sent to StatsD (if statsd_on is enabled)
       version_added: 3.0.0
       type: float
       example: ~
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py 
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index ceff2ff5ca6..eb8e7b22770 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -1452,8 +1452,8 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
 
         if self._is_metrics_enabled():
             timers.call_regular_interval(
-                conf.getfloat("scheduler", "running_metrics_interval", 
fallback=30.0),
-                self._emit_running_ti_metrics,
+                conf.getfloat("scheduler", "ti_metrics_interval", 
fallback=30.0),
+                self._emit_ti_metrics,
             )
 
             timers.call_regular_interval(
@@ -2445,36 +2445,46 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
         count_result: int | None = query.count()
         return count_result if count_result is not None else 0
 
-    previous_ti_running_metrics: dict[tuple[str, str, str], int] = {}
+    previous_ti_metrics: dict[TaskInstanceState, dict[tuple[str, str, str], 
int]] = {}
 
     @provide_session
-    def _emit_running_ti_metrics(self, session: Session = NEW_SESSION) -> None:
-        running = (
+    def _emit_ti_metrics(self, session: Session = NEW_SESSION) -> None:
+        metric_states = {State.SCHEDULED, State.QUEUED, State.RUNNING, 
State.DEFERRED}
+        all_states_metric = (
             session.query(
+                TaskInstance.state,
                 TaskInstance.dag_id,
                 TaskInstance.task_id,
                 TaskInstance.queue,
-                func.count(TaskInstance.task_id).label("running_count"),
+                func.count(TaskInstance.task_id).label("count"),
             )
-            .filter(TaskInstance.state == State.RUNNING)
-            .group_by(TaskInstance.dag_id, TaskInstance.task_id, 
TaskInstance.queue)
+            .filter(TaskInstance.state.in_(metric_states))
+            .group_by(TaskInstance.state, TaskInstance.dag_id, 
TaskInstance.task_id, TaskInstance.queue)
             .all()
         )
 
-        ti_running_metrics = {(row.dag_id, row.task_id, row.queue): 
row.running_count for row in running}
+        for state in metric_states:
+            if state not in self.previous_ti_metrics:
+                self.previous_ti_metrics[state] = {}
 
-        for (dag_id, task_id, queue), count in ti_running_metrics.items():
-            Stats.gauge(f"ti.running.{queue}.{dag_id}.{task_id}", count)
-            Stats.gauge("ti.running", count, tags={"queue": queue, "dag_id": 
dag_id, "task_id": task_id})
+            ti_metrics = {
+                (row.dag_id, row.task_id, row.queue): row.count
+                for row in all_states_metric
+                if row.state == state
+            }
+
+            for (dag_id, task_id, queue), count in ti_metrics.items():
+                Stats.gauge(f"ti.{state}.{queue}.{dag_id}.{task_id}", count)
+                Stats.gauge(f"ti.{state}", count, tags={"queue": queue, 
"dag_id": dag_id, "task_id": task_id})
 
-        for prev_key in self.previous_ti_running_metrics:
-            # reset stats which are not running anymore
-            if prev_key not in ti_running_metrics:
-                dag_id, task_id, queue = prev_key
-                Stats.gauge(f"ti.running.{queue}.{dag_id}.{task_id}", 0)
-                Stats.gauge("ti.running", 0, tags={"queue": queue, "dag_id": 
dag_id, "task_id": task_id})
+            for prev_key in self.previous_ti_metrics[state]:
+                # Reset previously exported stats that are no longer present 
in current metrics to zero
+                if prev_key not in ti_metrics:
+                    dag_id, task_id, queue = prev_key
+                    Stats.gauge(f"ti.{state}.{queue}.{dag_id}.{task_id}", 0)
+                    Stats.gauge(f"ti.{state}", 0, tags={"queue": queue, 
"dag_id": dag_id, "task_id": task_id})
 
-        self.previous_ti_running_metrics = ti_running_metrics
+            self.previous_ti_metrics[state] = ti_metrics
 
     @provide_session
     def _emit_running_dags_metric(self, session: Session = NEW_SESSION) -> 
None:
diff --git a/chart/files/statsd-mappings.yml b/chart/files/statsd-mappings.yml
index 679b600a517..a6a216e9c32 100644
--- a/chart/files/statsd-mappings.yml
+++ b/chart/files/statsd-mappings.yml
@@ -119,3 +119,24 @@ mappings:
       queue: "$1"
       dag_id: "$2"
       task_id: "$3"
+
+  - match: airflow.ti.queued.*.*.*
+    name: "airflow_ti_queued"
+    labels:
+      queue: "$1"
+      dag_id: "$2"
+      task_id: "$3"
+
+  - match: airflow.ti.scheduled.*.*.*
+    name: "airflow_ti_scheduled"
+    labels:
+      queue: "$1"
+      dag_id: "$2"
+      task_id: "$3"
+
+  - match: airflow.ti.deferred.*.*.*
+    name: "airflow_ti_deferred"
+    labels:
+      queue: "$1"
+      dag_id: "$2"
+      task_id: "$3"
diff --git a/shared/configuration/src/airflow_shared/configuration/parser.py 
b/shared/configuration/src/airflow_shared/configuration/parser.py
index ed935b4c958..9f4dbb26afa 100644
--- a/shared/configuration/src/airflow_shared/configuration/parser.py
+++ b/shared/configuration/src/airflow_shared/configuration/parser.py
@@ -156,6 +156,7 @@ class AirflowConfigParser(ConfigParser):
         ("api", "require_confirmation_dag_change"): ("webserver", 
"require_confirmation_dag_change", "3.1.0"),
         ("api", "instance_name"): ("webserver", "instance_name", "3.1.0"),
         ("api", "log_config"): ("api", "access_logfile", "3.1.0"),
+        ("scheduler", "ti_metrics_interval"): ("scheduler", 
"running_metrics_interval", "3.2.0"),
     }
 
     # A mapping of new section -> (old section, since_version).

Reply via email to