This is an automated email from the ASF dual-hosted git repository.
o-nikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new f6bd8b48d29 Add team_name tag to scheduler metrics for multi-team
deployments (#68594)
f6bd8b48d29 is described below
commit f6bd8b48d296f95d44f819714514cfdb0c967c44
Author: D. Ferruzzi <[email protected]>
AuthorDate: Tue Jun 16 11:31:42 2026 -0700
Add team_name tag to scheduler metrics for multi-team deployments (#68594)
---
.../src/airflow/jobs/scheduler_job_runner.py | 63 +++++++++++--
airflow-core/tests/unit/jobs/test_scheduler_job.py | 100 +++++++++++++++++++++
2 files changed, 157 insertions(+), 6 deletions(-)
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index 813f782437a..ec5b3248178 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -117,6 +117,7 @@ from airflow.timetables.base import Timetable,
compute_rollup_fingerprint
from airflow.timetables.simple import AssetTriggeredTimetable
from airflow.triggers.base import TriggerEvent
from airflow.utils.event_scheduler import EventScheduler
+from airflow.utils.helpers import prune_dict
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.retries import MAX_DB_RETRIES, retry_db_transaction,
run_with_db_retries
from airflow.utils.session import NEW_SESSION, create_session, provide_session
@@ -1437,9 +1438,14 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
)
if ti_queued and not ti_requeued:
+ team_name = (
+ DagModel.get_team_name(ti.dag_id, session=session)
+ if conf.getboolean("core", "multi_team")
+ else None
+ )
stats.incr(
"scheduler.tasks.killed_externally",
- tags={"dag_id": ti.dag_id, "task_id": ti.task_id},
+ tags=prune_dict({"dag_id": ti.dag_id, "task_id":
ti.task_id, "team_name": team_name}),
)
msg = (
"Executor %s reported that the task instance %s finished
with state %s, but the task instance's state attribute is %s. " # noqa:
RUF100, UP031, flynt
@@ -2671,7 +2677,16 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
stats.timing(
"dagrun.schedule_delay",
schedule_delay,
- tags={"dag_id": dag.dag_id},
+ tags=prune_dict(
+ {
+ "dag_id": dag.dag_id,
+ "team_name":
self._get_team_names_for_dag_ids([dag.dag_id], session).get(
+ dag.dag_id
+ )
+ if self._multi_team
+ else None,
+ }
+ ),
)
# cache saves time during scheduling of many dag_runs for same dag
@@ -2822,7 +2837,16 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
stats.timing(
"dagrun.duration.failed",
duration,
- tags={"dag_id": dag_run.dag_id},
+ tags=prune_dict(
+ {
+ "dag_id": dag_run.dag_id,
+ "team_name":
self._get_team_names_for_dag_ids([dag_run.dag_id], session).get(
+ dag_run.dag_id
+ )
+ if self._multi_team
+ else None,
+ }
+ ),
)
return callback_to_execute
@@ -3106,6 +3130,12 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
)
all_states_metric = session.execute(stmt).all()
+ if self._multi_team:
+ unique_dag_ids = {row[1] for row in all_states_metric}
+ dag_id_to_team_name =
self._get_team_names_for_dag_ids(unique_dag_ids, session)
+ else:
+ dag_id_to_team_name = {}
+
for state in metric_states:
if state not in self.previous_ti_metrics:
self.previous_ti_metrics[state] = {}
@@ -3120,7 +3150,14 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
stats.gauge(
f"ti.{state}",
float(count),
- tags={"queue": queue, "dag_id": dag_id, "task_id":
task_id},
+ tags=prune_dict(
+ {
+ "queue": queue,
+ "dag_id": dag_id,
+ "task_id": task_id,
+ "team_name": dag_id_to_team_name.get(dag_id),
+ }
+ ),
)
for prev_key in self.previous_ti_metrics[state]:
@@ -3130,7 +3167,14 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
stats.gauge(
f"ti.{state}",
0,
- tags={"queue": queue, "dag_id": dag_id, "task_id":
task_id},
+ tags=prune_dict(
+ {
+ "queue": queue,
+ "dag_id": dag_id,
+ "task_id": task_id,
+ "team_name": dag_id_to_team_name.get(dag_id),
+ }
+ ),
)
self.previous_ti_metrics[state] = ti_metrics
@@ -3508,7 +3552,14 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
continue
executor.change_state(ti.key, TaskInstanceState.FAILED,
remove_running=True)
stats.incr(
- "task_instances_without_heartbeats_killed", tags={"dag_id":
ti.dag_id, "task_id": ti.task_id}
+ "task_instances_without_heartbeats_killed",
+ tags=prune_dict(
+ {
+ "dag_id": ti.dag_id,
+ "task_id": ti.task_id,
+ "team_name": dag_id_to_team_name.get(ti.dag_id),
+ }
+ ),
)
# [END find_and_purge_task_instances_without_heartbeats]
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index 1a089780633..540bdc46054 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -2873,6 +2873,55 @@ class TestSchedulerJob:
mock_stats.gauge.assert_any_call("pool.queued_slots", mock.ANY,
tags=expected_tags)
mock_stats.gauge.assert_any_call("pool.running_slots", mock.ANY,
tags=expected_tags)
+ @pytest.mark.parametrize(
+ ("multi_team", "expected_tags"),
+ [
+ pytest.param(
+ "true",
+ {"queue": "default", "dag_id": "ti_gauge_dag", "task_id":
"task1", "team_name": "ti_team"},
+ id="with_team",
+ ),
+ pytest.param(
+ "false",
+ {"queue": "default", "dag_id": "ti_gauge_dag", "task_id":
"task1"},
+ id="without_team",
+ ),
+ ],
+ )
+ @mock.patch("airflow._shared.observability.metrics.stats._get_backend")
+ def test_emit_ti_metrics_team_name(self, mock_get_backend, multi_team,
expected_tags, dag_maker, session):
+ """TI gauge metrics include team_name only when multi_team is
enabled."""
+ mock_stats = mock.MagicMock(spec=StatsLogger)
+ mock_get_backend.return_value = mock_stats
+
+ clear_db_teams()
+
+ team = Team(name="ti_team")
+ session.add(team)
+ session.flush()
+
+ clear_db_dag_bundles()
+
+ bundle = DagBundleModel(name="ti_bundle")
+ bundle.teams.append(team)
+ session.add(bundle)
+ session.flush()
+
+ with dag_maker(dag_id="ti_gauge_dag", bundle_name="ti_bundle",
session=session):
+ EmptyOperator(task_id="task1")
+
+ dr = dag_maker.create_dagrun()
+ ti = dr.get_task_instances(session=session)[0]
+ ti.state = State.RUNNING
+ session.flush()
+
+ with conf_vars({("core", "multi_team"): multi_team}):
+ scheduler_job = Job()
+ self.job_runner = SchedulerJobRunner(job=scheduler_job)
+ self.job_runner._emit_ti_metrics(session=session)
+
+ mock_stats.gauge.assert_any_call("ti.running", mock.ANY,
tags=expected_tags)
+
def test_enqueue_task_instances_with_queued_state(self, dag_maker,
session):
dag_id =
"SchedulerJobTest.test_enqueue_task_instances_with_queued_state"
task_id_1 = "dummy"
@@ -3537,6 +3586,57 @@ class TestSchedulerJob:
assert any("Backfilled dag_version_id" in rec.message for rec in
caplog.records)
mock_executor.send_callback.assert_called_once()
+ @pytest.mark.parametrize(
+ ("multi_team", "expected_tags"),
+ [
+ pytest.param(
+ "true",
+ {"dag_id": "heartbeat_dag", "task_id": "task", "team_name":
"hb_team"},
+ id="with_team",
+ ),
+ pytest.param(
+ "false",
+ {"dag_id": "heartbeat_dag", "task_id": "task"},
+ id="without_team",
+ ),
+ ],
+ )
+ @mock.patch("airflow._shared.observability.metrics.stats.incr")
+ def test_purge_heartbeat_killed_metric_team_name(
+ self, mock_incr, multi_team, expected_tags, dag_maker, session
+ ):
+ clear_db_teams()
+ team = Team(name="hb_team")
+ session.add(team)
+ session.flush()
+
+ clear_db_dag_bundles()
+ bundle = DagBundleModel(name="hb_bundle")
+ bundle.teams.append(team)
+ session.add(bundle)
+ session.flush()
+
+ with dag_maker("heartbeat_dag", bundle_name="hb_bundle",
session=session):
+ EmptyOperator(task_id="task")
+
+ dag_run = dag_maker.create_dagrun(run_id="test_run",
state=DagRunState.RUNNING)
+
+ mock_executor = MagicMock()
+ scheduler_job = Job()
+
+ ti = dag_run.get_task_instance(task_id="task", session=session)
+ ti.state = TaskInstanceState.RUNNING
+ ti.queued_by_job_id = scheduler_job.id
+ ti.last_heartbeat_at = timezone.utcnow() - timedelta(hours=1)
+ session.merge(ti)
+ session.commit()
+
+ with conf_vars({("core", "multi_team"): multi_team}):
+ self.job_runner = SchedulerJobRunner(scheduler_job,
executors=[mock_executor])
+ self.job_runner._purge_task_instances_without_heartbeats([ti],
session=session)
+
+ mock_incr.assert_any_call("task_instances_without_heartbeats_killed",
tags=expected_tags)
+
@staticmethod
def mock_failure_callback(context):
pass