This is an automated email from the ASF dual-hosted git repository.
ferruzzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 8b46c346ed8 Add team_name tag to asset metrics for multi-team
deployments (#68367)
8b46c346ed8 is described below
commit 8b46c346ed875be381a49ae0e1e1e0c6ea6969a9
Author: D. Ferruzzi <[email protected]>
AuthorDate: Thu Jun 18 09:36:01 2026 -0700
Add team_name tag to asset metrics for multi-team deployments (#68367)
---
airflow-core/src/airflow/assets/manager.py | 10 ++--
.../src/airflow/jobs/scheduler_job_runner.py | 7 ++-
airflow-core/tests/unit/assets/test_manager.py | 57 +++++++++++++++++++-
airflow-core/tests/unit/jobs/test_scheduler_job.py | 61 ++++++++++++++++++++++
4 files changed, 130 insertions(+), 5 deletions(-)
diff --git a/airflow-core/src/airflow/assets/manager.py
b/airflow-core/src/airflow/assets/manager.py
index f72c533c5a0..c8d2edef4f1 100644
--- a/airflow-core/src/airflow/assets/manager.py
+++ b/airflow-core/src/airflow/assets/manager.py
@@ -44,7 +44,7 @@ from airflow.models.asset import (
)
from airflow.models.log import Log
from airflow.timetables.base import compute_rollup_fingerprint
-from airflow.utils.helpers import is_container
+from airflow.utils.helpers import is_container, prune_dict
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.sqlalchemy import get_dialect_name, with_row_locks
@@ -397,7 +397,12 @@ class AssetManager(LoggingMixin):
)
)
- stats.incr("asset.updates")
+ team_name = None
+ if task_instance and conf.getboolean("core", "multi_team"):
+ from airflow.models.dag import DagModel
+
+ team_name = DagModel.get_team_name(task_instance.dag_id,
session=session)
+ stats.incr("asset.updates", tags=prune_dict({"team_name": team_name}))
dags_to_queue = (
dags_to_queue_from_asset | dags_to_queue_from_asset_alias |
dags_to_queue_from_asset_ref
@@ -405,7 +410,6 @@ class AssetManager(LoggingMixin):
if conf.getboolean("core", "multi_team"):
if task_instance:
- team_name = DagModel.get_team_name(task_instance.dag_id,
session=session)
resolved_source_teams = {team_name} if team_name else set()
# Resolve consumer-team filtering from the outlet reference
outlet_ref = session.scalar(
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index 51178847267..93b155f2257 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -2611,7 +2611,12 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
creating_job_id=self.job.id,
session=session,
)
- stats.incr("asset.triggered_dagruns")
+ team_name = (
+ self._get_team_names_for_dag_ids([dag.dag_id],
session).get(dag.dag_id)
+ if self._multi_team
+ else None
+ )
+ stats.incr("asset.triggered_dagruns",
tags=prune_dict({"team_name": team_name}))
dag_run.consumed_asset_events.extend(asset_events)
self.log.info(
"Created asset-triggered DagRun for '%s': run_id=%s, consumed
%d asset events",
diff --git a/airflow-core/tests/unit/assets/test_manager.py
b/airflow-core/tests/unit/assets/test_manager.py
index 687f4d17847..40d9401b9ff 100644
--- a/airflow-core/tests/unit/assets/test_manager.py
+++ b/airflow-core/tests/unit/assets/test_manager.py
@@ -29,6 +29,7 @@ from sqlalchemy import delete, func, select
from sqlalchemy.orm import Session
from airflow import settings
+from airflow._shared.observability.metrics.base_stats_logger import StatsLogger
from airflow.assets.manager import AssetManager
from airflow.models.asset import (
AssetAliasModel,
@@ -40,7 +41,9 @@ from airflow.models.asset import (
DagScheduleAssetReference,
)
from airflow.models.dag import DAG, DagModel
+from airflow.models.dagbundle import DagBundleModel
from airflow.models.log import Log
+from airflow.models.team import Team
from airflow.partition_mappers.temporal import FanOutMapper, StartOfWeekMapper
from airflow.partition_mappers.window import WeekWindow
from airflow.providers.standard.operators.empty import EmptyOperator
@@ -48,7 +51,11 @@ from airflow.sdk.definitions.asset import Asset
from airflow.sdk.definitions.timetables.assets import PartitionedAssetTimetable
from tests_common.test_utils.config import conf_vars
-from tests_common.test_utils.db import clear_db_apdr, clear_db_logs,
clear_db_pakl
+from tests_common.test_utils.db import (
+ clear_db_apdr,
+ clear_db_logs,
+ clear_db_pakl,
+)
from unit.listeners import asset_listener
pytestmark = pytest.mark.db_test
@@ -670,6 +677,54 @@ def _make_asset_model(
return model
+class TestAssetMetricsTeamName:
+ @pytest.mark.parametrize(
+ ("multi_team", "expect_team_tag"),
+ [
+ pytest.param("true", True, id="with_team"),
+ pytest.param("false", False, id="without_team"),
+ ],
+ )
+ @mock.patch("airflow._shared.observability.metrics.stats._get_backend")
+ def test_asset_updates_respects_team_name(
+ self, mock_get_backend, multi_team, expect_team_tag, session, dag_maker
+ ):
+ mock_stats = mock.MagicMock(spec=StatsLogger)
+ mock_get_backend.return_value = mock_stats
+
+ suffix = "with_team" if expect_team_tag else "without_team"
+
+ team_name = f"team_asset_upd_{suffix}"
+ team = Team(name=team_name)
+ session.add(team)
+ session.flush()
+
+ bundle_name = f"bundle_asset_upd_{suffix}"
+ bundle = DagBundleModel(name=bundle_name)
+ bundle.teams.append(team)
+ session.add(bundle)
+ session.flush()
+
+ asset_name = f"metric_asset_{suffix}"
+ asset = Asset(uri=f"test://{asset_name}", name=asset_name,
group="asset")
+ with dag_maker(dag_id=f"asset_dag_{suffix}", bundle_name=bundle_name,
session=session):
+ EmptyOperator(task_id="task1", outlets=[asset])
+
+ ti = mock.MagicMock()
+ ti.dag_id = f"asset_dag_{suffix}"
+ ti.task_id = "task1"
+ ti.run_id = "run1"
+ ti.map_index = -1
+
+ with conf_vars({("core", "multi_team"): multi_team}):
+ AssetManager().register_asset_change(task_instance=ti,
asset=asset, session=session)
+
+ if expect_team_tag:
+ mock_stats.incr.assert_any_call("asset.updates",
tags={"team_name": team_name})
+ else:
+ mock_stats.incr.assert_any_call("asset.updates")
+
+
class TestFilterDagsByTeam:
@conf_vars({("core", "multi_team"): "false"})
def test_multi_team_disabled_returns_all_dags(self):
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index af1b34423da..b312f9fd1b0 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -5667,6 +5667,67 @@ class TestSchedulerJob:
assert created_run.data_interval_end is None
assert created_run.creating_job_id == scheduler_job.id
+ @pytest.mark.parametrize(
+ ("multi_team", "expect_team_tag"),
+ [
+ pytest.param("true", True, id="with_team"),
+ pytest.param("false", False, id="without_team"),
+ ],
+ )
+ @mock.patch("airflow._shared.observability.metrics.stats._get_backend")
+ def test_asset_triggered_dagruns_respects_team_name(
+ self, mock_get_backend, multi_team, expect_team_tag, session, dag_maker
+ ):
+ mock_stats = mock.MagicMock(spec=StatsLogger)
+ mock_get_backend.return_value = mock_stats
+
+ suffix = "with_team" if expect_team_tag else "without_team"
+
+ team_name = f"team_asset_trig_{suffix}"
+ team = Team(name=team_name)
+ session.add(team)
+ session.flush()
+
+ bundle_name = f"bundle_asset_trig_{suffix}"
+ bundle = DagBundleModel(name=bundle_name)
+ bundle.teams.append(team)
+ session.add(bundle)
+ session.commit()
+
+ asset_name = f"test_team_asset_{suffix}"
+ asset = Asset(uri=f"test://{asset_name}", name=asset_name,
group="test_group")
+ with dag_maker(dag_id=f"producer_{suffix}", bundle_name=bundle_name,
session=session):
+ BashOperator(task_id="task", bash_command="echo 1",
outlets=[asset])
+ dr = dag_maker.create_dagrun()
+
+ asset_id = session.scalar(select(AssetModel.id).where(AssetModel.uri
== asset.uri))
+ event = AssetEvent(
+ asset_id=asset_id,
+ source_task_id="task",
+ source_dag_id=dr.dag_id,
+ source_run_id=dr.run_id,
+ source_map_index=-1,
+ )
+ session.add(event)
+
+ with dag_maker(
+ dag_id=f"consumer_{suffix}", schedule=[asset],
bundle_name=bundle_name, session=session
+ ):
+ pass
+
+ session.add(AssetDagRunQueue(asset_id=asset_id,
target_dag_id=f"consumer_{suffix}"))
+ session.flush()
+
+ with conf_vars({("core", "multi_team"): multi_team}):
+ scheduler_job = Job()
+ self.job_runner = SchedulerJobRunner(job=scheduler_job,
executors=[self.null_exec])
+ self.job_runner._create_dagruns_for_dags(session, session)
+
+ if expect_team_tag:
+ mock_stats.incr.assert_any_call("asset.triggered_dagruns",
tags={"team_name": team_name})
+ else:
+ mock_stats.incr.assert_any_call("asset.triggered_dagruns")
+
@pytest.mark.need_serialized_dag
@pytest.mark.parametrize(
("disable", "enable"),