This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 92fd810e017 Extend airflow_ti_running metrics by scheduled, queued and
deferred (#58819)
92fd810e017 is described below
commit 92fd810e01732b4670b63c8c6e339078cd4dc3af
Author: AutomationDev85 <[email protected]>
AuthorDate: Sun Dec 14 18:46:14 2025 +0100
Extend airflow_ti_running metrics by scheduled, queued and deferred (#58819)
* ti metric export of scheduled, queued and deferred
* Added deprecated warning for new option
* only add task_id to metric
---------
Co-authored-by: AutomationDev85 <AutomationDev85>
---
.../logging-monitoring/metrics.rst | 9 ++++
.../administration-and-deployment/scheduler.rst | 4 +-
.../src/airflow/config_templates/config.yml | 5 ++-
.../src/airflow/jobs/scheduler_job_runner.py | 48 +++++++++++++---------
chart/files/statsd-mappings.yml | 21 ++++++++++
.../src/airflow_shared/configuration/parser.py | 1 +
6 files changed, 65 insertions(+), 23 deletions(-)
diff --git
a/airflow-core/docs/administration-and-deployment/logging-monitoring/metrics.rst
b/airflow-core/docs/administration-and-deployment/logging-monitoring/metrics.rst
index 0a38c1b0992..b7430c3afaa 100644
---
a/airflow-core/docs/administration-and-deployment/logging-monitoring/metrics.rst
+++
b/airflow-core/docs/administration-and-deployment/logging-monitoring/metrics.rst
@@ -263,9 +263,18 @@ Name
Description
``triggerer.capacity_left.<hostname>`` Capacity left on a
triggerer to run triggers (described by hostname)
``triggerer.capacity_left`` Capacity left on a
triggerer to run triggers (described by hostname).
Metric with hostname
tagging.
+``ti.scheduled.<queue>.<dag_id>.<task_id>`` Number of scheduled tasks
in a given Dag.
+``ti.scheduled`` Number of scheduled tasks
in a given Dag.
+ Metric with queue, dag_id
and task_id tagging.
+``ti.queued.<queue>.<dag_id>.<task_id>`` Number of queued tasks in
a given Dag.
+``ti.queued`` Number of queued tasks in
a given Dag.
+ Metric with queue, dag_id
and task_id tagging.
``ti.running.<queue>.<dag_id>.<task_id>`` Number of running tasks
in a given Dag. As ti.start and ti.finish can run out of sync this metric shows
all running tis.
``ti.running`` Number of running tasks
in a given Dag. As ti.start and ti.finish can run out of sync this metric shows
all running tis.
Metric with queue, dag_id
and task_id tagging.
+``ti.deferred.<queue>.<dag_id>.<task_id>`` Number of deferred tasks
in a given Dag.
+``ti.deferred`` Number of deferred tasks
in a given Dag.
+ Metric with queue, dag_id
and task_id tagging.
====================================================
========================================================================
Timers
diff --git a/airflow-core/docs/administration-and-deployment/scheduler.rst
b/airflow-core/docs/administration-and-deployment/scheduler.rst
index 582a4ab36e8..2bb447c0774 100644
--- a/airflow-core/docs/administration-and-deployment/scheduler.rst
+++ b/airflow-core/docs/administration-and-deployment/scheduler.rst
@@ -261,9 +261,9 @@ However, you can also look at other non-performance-related
scheduler configurat
period.
-- :ref:`config:scheduler__running_metrics_interval`
+- :ref:`config:scheduler__ti_metrics_interval`
- How often (in seconds) should running task instance stats be sent to StatsD
+ How often (in seconds) should task instance (scheduled, queued, running and
deferred) stats be sent to StatsD
(if statsd_on is enabled). This is a *relatively* expensive query to compute
this, so this should be set to match the same period as your StatsD roll-up
period.
diff --git a/airflow-core/src/airflow/config_templates/config.yml
b/airflow-core/src/airflow/config_templates/config.yml
index 56136680cfa..38b6fe85266 100644
--- a/airflow-core/src/airflow/config_templates/config.yml
+++ b/airflow-core/src/airflow/config_templates/config.yml
@@ -2150,9 +2150,10 @@ scheduler:
type: float
example: ~
default: "5.0"
- running_metrics_interval:
+ ti_metrics_interval:
description: |
- How often (in seconds) should running task instance stats be sent to
StatsD (if statsd_on is enabled)
+ How often (in seconds) should task instance (scheduled, queued,
running and deferred) stats
+ be sent to StatsD (if statsd_on is enabled)
version_added: 3.0.0
type: float
example: ~
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index ceff2ff5ca6..eb8e7b22770 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -1452,8 +1452,8 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
if self._is_metrics_enabled():
timers.call_regular_interval(
- conf.getfloat("scheduler", "running_metrics_interval",
fallback=30.0),
- self._emit_running_ti_metrics,
+ conf.getfloat("scheduler", "ti_metrics_interval",
fallback=30.0),
+ self._emit_ti_metrics,
)
timers.call_regular_interval(
@@ -2445,36 +2445,46 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
count_result: int | None = query.count()
return count_result if count_result is not None else 0
- previous_ti_running_metrics: dict[tuple[str, str, str], int] = {}
+ previous_ti_metrics: dict[TaskInstanceState, dict[tuple[str, str, str],
int]] = {}
@provide_session
- def _emit_running_ti_metrics(self, session: Session = NEW_SESSION) -> None:
- running = (
+ def _emit_ti_metrics(self, session: Session = NEW_SESSION) -> None:
+ metric_states = {State.SCHEDULED, State.QUEUED, State.RUNNING,
State.DEFERRED}
+ all_states_metric = (
session.query(
+ TaskInstance.state,
TaskInstance.dag_id,
TaskInstance.task_id,
TaskInstance.queue,
- func.count(TaskInstance.task_id).label("running_count"),
+ func.count(TaskInstance.task_id).label("count"),
)
- .filter(TaskInstance.state == State.RUNNING)
- .group_by(TaskInstance.dag_id, TaskInstance.task_id,
TaskInstance.queue)
+ .filter(TaskInstance.state.in_(metric_states))
+ .group_by(TaskInstance.state, TaskInstance.dag_id,
TaskInstance.task_id, TaskInstance.queue)
.all()
)
- ti_running_metrics = {(row.dag_id, row.task_id, row.queue):
row.running_count for row in running}
+ for state in metric_states:
+ if state not in self.previous_ti_metrics:
+ self.previous_ti_metrics[state] = {}
- for (dag_id, task_id, queue), count in ti_running_metrics.items():
- Stats.gauge(f"ti.running.{queue}.{dag_id}.{task_id}", count)
- Stats.gauge("ti.running", count, tags={"queue": queue, "dag_id":
dag_id, "task_id": task_id})
+ ti_metrics = {
+ (row.dag_id, row.task_id, row.queue): row.count
+ for row in all_states_metric
+ if row.state == state
+ }
+
+ for (dag_id, task_id, queue), count in ti_metrics.items():
+ Stats.gauge(f"ti.{state}.{queue}.{dag_id}.{task_id}", count)
+ Stats.gauge(f"ti.{state}", count, tags={"queue": queue,
"dag_id": dag_id, "task_id": task_id})
- for prev_key in self.previous_ti_running_metrics:
- # reset stats which are not running anymore
- if prev_key not in ti_running_metrics:
- dag_id, task_id, queue = prev_key
- Stats.gauge(f"ti.running.{queue}.{dag_id}.{task_id}", 0)
- Stats.gauge("ti.running", 0, tags={"queue": queue, "dag_id":
dag_id, "task_id": task_id})
+ for prev_key in self.previous_ti_metrics[state]:
+ # Reset previously exported stats that are no longer present
in current metrics to zero
+ if prev_key not in ti_metrics:
+ dag_id, task_id, queue = prev_key
+ Stats.gauge(f"ti.{state}.{queue}.{dag_id}.{task_id}", 0)
+ Stats.gauge(f"ti.{state}", 0, tags={"queue": queue,
"dag_id": dag_id, "task_id": task_id})
- self.previous_ti_running_metrics = ti_running_metrics
+ self.previous_ti_metrics[state] = ti_metrics
@provide_session
def _emit_running_dags_metric(self, session: Session = NEW_SESSION) ->
None:
diff --git a/chart/files/statsd-mappings.yml b/chart/files/statsd-mappings.yml
index 679b600a517..a6a216e9c32 100644
--- a/chart/files/statsd-mappings.yml
+++ b/chart/files/statsd-mappings.yml
@@ -119,3 +119,24 @@ mappings:
queue: "$1"
dag_id: "$2"
task_id: "$3"
+
+ - match: airflow.ti.queued.*.*.*
+ name: "airflow_ti_queued"
+ labels:
+ queue: "$1"
+ dag_id: "$2"
+ task_id: "$3"
+
+ - match: airflow.ti.scheduled.*.*.*
+ name: "airflow_ti_scheduled"
+ labels:
+ queue: "$1"
+ dag_id: "$2"
+ task_id: "$3"
+
+ - match: airflow.ti.deferred.*.*.*
+ name: "airflow_ti_deferred"
+ labels:
+ queue: "$1"
+ dag_id: "$2"
+ task_id: "$3"
diff --git a/shared/configuration/src/airflow_shared/configuration/parser.py
b/shared/configuration/src/airflow_shared/configuration/parser.py
index ed935b4c958..9f4dbb26afa 100644
--- a/shared/configuration/src/airflow_shared/configuration/parser.py
+++ b/shared/configuration/src/airflow_shared/configuration/parser.py
@@ -156,6 +156,7 @@ class AirflowConfigParser(ConfigParser):
("api", "require_confirmation_dag_change"): ("webserver",
"require_confirmation_dag_change", "3.1.0"),
("api", "instance_name"): ("webserver", "instance_name", "3.1.0"),
("api", "log_config"): ("api", "access_logfile", "3.1.0"),
+ ("scheduler", "ti_metrics_interval"): ("scheduler",
"running_metrics_interval", "3.2.0"),
}
# A mapping of new section -> (old section, since_version).