This is an automated email from the ASF dual-hosted git repository.
o-nikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 10e5ee47f42 Add --team-name CLI argument to triggerer for multi-team
deployments (#67254)
10e5ee47f42 is described below
commit 10e5ee47f42732eeb6b0b30413a9354b00f2d6f3
Author: Ramit Kataria <[email protected]>
AuthorDate: Mon May 25 15:33:57 2026 -0700
Add --team-name CLI argument to triggerer for multi-team deployments
(#67254)
When core.multi_team is enabled, a triggerer can now be scoped to a
specific team via `airflow triggerer --team-name <name>`. The argument
is validated at startup (team must exist, config must be enabled) and
threaded through TriggererJobRunner → TriggerRunnerSupervisor. Query
filtering will be wired in a follow-up PR.
---
airflow-core/src/airflow/cli/cli_config.py | 5 +++
.../src/airflow/cli/commands/triggerer_command.py | 28 ++++++++++--
.../src/airflow/jobs/triggerer_job_runner.py | 4 ++
.../unit/cli/commands/test_triggerer_command.py | 51 +++++++++++++++++++++-
airflow-core/tests/unit/jobs/test_triggerer_job.py | 44 +++++++++++++++++++
5 files changed, 126 insertions(+), 6 deletions(-)
diff --git a/airflow-core/src/airflow/cli/cli_config.py
b/airflow-core/src/airflow/cli/cli_config.py
index b41ac481e0e..8e701db79bb 100644
--- a/airflow-core/src/airflow/cli/cli_config.py
+++ b/airflow-core/src/airflow/cli/cli_config.py
@@ -1025,6 +1025,10 @@ ARG_QUEUES = Arg(
type=string_list_type,
help="Optional comma-separated list of task queues which the triggerer
should consume from.",
)
+ARG_TRIGGERER_TEAM_NAME = Arg(
+ ("--team-name",),
+ help="Team name to scope this triggerer to. Requires core.multi_team to be
enabled.",
+)
ARG_DAG_LIST_COLUMNS = Arg(
("--columns",),
@@ -2140,6 +2144,7 @@ core_commands: list[CLICommand] = [
ARG_SKIP_SERVE_LOGS,
ARG_DEV,
ARG_QUEUES,
+ ARG_TRIGGERER_TEAM_NAME,
),
),
ActionCommand(
diff --git a/airflow-core/src/airflow/cli/commands/triggerer_command.py
b/airflow-core/src/airflow/cli/commands/triggerer_command.py
index f35332abd1f..1182fbb05b2 100644
--- a/airflow-core/src/airflow/cli/commands/triggerer_command.py
+++ b/airflow-core/src/airflow/cli/commands/triggerer_command.py
@@ -52,11 +52,15 @@ def _serve_logs(skip_serve_logs: bool = False) ->
Generator[None, None, None]:
@enable_memray_trace(component=MemrayTraceComponents.triggerer)
def triggerer_run(
- skip_serve_logs: bool, capacity: int, triggerer_heartrate: float, queues:
set[str] | None = None
+ skip_serve_logs: bool,
+ capacity: int,
+ triggerer_heartrate: float,
+ queues: set[str] | None = None,
+ team_name: str | None = None,
):
with _serve_logs(skip_serve_logs):
triggerer_job_runner = TriggererJobRunner(
- job=Job(heartrate=triggerer_heartrate), capacity=capacity,
queues=queues
+ job=Job(heartrate=triggerer_heartrate), capacity=capacity,
queues=queues, team_name=team_name
)
run_job(job=triggerer_job_runner.job,
execute_callable=triggerer_job_runner._execute)
@@ -75,6 +79,18 @@ def triggerer(args):
"--queues option may only be used when triggerer.queues_enabled is
`True`."
)
+ multi_team = conf.getboolean("core", "multi_team")
+ team_name: str | None = getattr(args, "team_name", None)
+
+ if team_name and not multi_team:
+ raise AirflowConfigException("--team-name option may only be used when
core.multi_team is enabled.")
+
+ if team_name:
+ from airflow.models.team import Team
+
+ if Team.get_name_if_exists(team_name) is None:
+ raise AirflowConfigException(f"Team {team_name!r} does not exist.")
+
queues = set(args.queues) if args.queues else None
triggerer_heartrate = conf.getfloat("triggerer", "JOB_HEARTBEAT_SEC")
@@ -82,7 +98,9 @@ def triggerer(args):
from airflow.cli.hot_reload import run_with_reloader
run_with_reloader(
- lambda: triggerer_run(args.skip_serve_logs, args.capacity,
triggerer_heartrate, queues),
+ lambda: triggerer_run(
+ args.skip_serve_logs, args.capacity, triggerer_heartrate,
queues, team_name
+ ),
process_name="triggerer",
)
return
@@ -90,6 +108,8 @@ def triggerer(args):
run_command_with_daemon_option(
args=args,
process_name="triggerer",
- callback=lambda: triggerer_run(args.skip_serve_logs, args.capacity,
triggerer_heartrate, queues),
+ callback=lambda: triggerer_run(
+ args.skip_serve_logs, args.capacity, triggerer_heartrate, queues,
team_name
+ ),
should_setup_logging=True,
)
diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py
b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
index c73dfd857bd..14c522d9c29 100644
--- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py
+++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
@@ -182,6 +182,7 @@ class TriggererJobRunner(BaseJobRunner, LoggingMixin):
job: Job,
capacity=None,
queues: set[str] | None = None,
+ team_name: str | None = None,
):
super().__init__(job)
if capacity is None:
@@ -191,6 +192,7 @@ class TriggererJobRunner(BaseJobRunner, LoggingMixin):
else:
raise ValueError(f"Capacity number {capacity!r} is invalid")
self.queues = queues
+ self.team_name = team_name
def register_signals(self) -> None:
"""Register signals that stop child processes."""
@@ -241,6 +243,7 @@ class TriggererJobRunner(BaseJobRunner, LoggingMixin):
capacity=self.capacity,
logger=log,
queues=self.queues,
+ team_name=self.team_name,
)
# Run the main DB comms loop in this process
@@ -428,6 +431,7 @@ class TriggerRunnerSupervisor(WatchedSubprocess):
job: Job | None = None
capacity: int
queues: set[str] | None = None
+ team_name: str | None = None
health_check_threshold = conf.getint("triggerer",
"triggerer_health_check_threshold")
runner_health_check_threshold = conf.getfloat("triggerer",
"runner_health_check_threshold")
diff --git a/airflow-core/tests/unit/cli/commands/test_triggerer_command.py
b/airflow-core/tests/unit/cli/commands/test_triggerer_command.py
index fb8f6b347d3..0cc9a10b52a 100644
--- a/airflow-core/tests/unit/cli/commands/test_triggerer_command.py
+++ b/airflow-core/tests/unit/cli/commands/test_triggerer_command.py
@@ -51,7 +51,9 @@ class TestTriggererCommand:
triggerer_command.triggerer(args)
mock_serve.return_value.__enter__.assert_called_once()
mock_serve.return_value.__exit__.assert_called_once()
- mock_triggerer_job_runner.assert_called_once_with(job=mock.ANY,
capacity=42, queues=None)
+ mock_triggerer_job_runner.assert_called_once_with(
+ job=mock.ANY, capacity=42, queues=None, team_name=None
+ )
@conf_vars({("triggerer", "queues_enabled"): "True"})
@mock.patch("airflow.cli.commands.triggerer_command.TriggererJobRunner")
@@ -64,7 +66,7 @@ class TestTriggererCommand:
mock_serve.return_value.__enter__.assert_called_once()
mock_serve.return_value.__exit__.assert_called_once()
mock_triggerer_job_runner.assert_called_once_with(
- job=mock.ANY, capacity=4, queues=set(["my_queue", "other_queue"])
+ job=mock.ANY, capacity=4, queues=set(["my_queue", "other_queue"]),
team_name=None
)
@mock.patch("airflow.cli.commands.triggerer_command.TriggererJobRunner")
@@ -92,3 +94,48 @@ class TestTriggererCommand:
mock_reloader.assert_called_once()
# The callback function should be callable
assert callable(mock_reloader.call_args[0][0])
+
+ @conf_vars({("core", "multi_team"): "False"})
+ def test_team_name_rejected_when_multi_team_disabled(self):
+ """--team-name should raise when core.multi_team is disabled"""
+ from airflow.exceptions import AirflowConfigException
+
+ args = self.parser.parse_args(["triggerer", "--team-name", "team_a"])
+ with pytest.raises(AirflowConfigException,
match="--team-name.*core.multi_team"):
+ triggerer_command.triggerer(args)
+
+ @conf_vars({("core", "multi_team"): "True"})
+ @mock.patch("airflow.models.team.Team.get_name_if_exists",
return_value=None)
+ def test_team_name_rejected_when_team_does_not_exist(self, mock_get_team):
+ """--team-name should raise when the specified team doesn't exist in
DB"""
+ from airflow.exceptions import AirflowConfigException
+
+ args = self.parser.parse_args(["triggerer", "--team-name",
"nonexistent_team"])
+ with pytest.raises(AirflowConfigException, match="does not exist"):
+ triggerer_command.triggerer(args)
+ mock_get_team.assert_called_once_with("nonexistent_team")
+
+ @conf_vars({("core", "multi_team"): "True"})
+ @mock.patch("airflow.models.team.Team.get_name_if_exists",
return_value="team_a")
+ @mock.patch("airflow.cli.commands.triggerer_command.TriggererJobRunner")
+ @mock.patch("airflow.cli.commands.triggerer_command._serve_logs")
+ def test_team_name_passed_through(self, mock_serve,
mock_triggerer_job_runner, mock_get_team):
+ """--team-name should be passed to TriggererJobRunner when valid"""
+ mock_triggerer_job_runner.return_value.job_type = "TriggererJob"
+ args = self.parser.parse_args(["triggerer", "--team-name", "team_a"])
+ triggerer_command.triggerer(args)
+ mock_triggerer_job_runner.assert_called_once_with(
+ job=mock.ANY, capacity=mock.ANY, queues=None, team_name="team_a"
+ )
+
+ @conf_vars({("core", "multi_team"): "False"})
+ @mock.patch("airflow.cli.commands.triggerer_command.TriggererJobRunner")
+ @mock.patch("airflow.cli.commands.triggerer_command._serve_logs")
+ def test_no_team_name_passes_none(self, mock_serve,
mock_triggerer_job_runner):
+ """Without --team-name, team_name=None is passed"""
+ mock_triggerer_job_runner.return_value.job_type = "TriggererJob"
+ args = self.parser.parse_args(["triggerer"])
+ triggerer_command.triggerer(args)
+ mock_triggerer_job_runner.assert_called_once_with(
+ job=mock.ANY, capacity=mock.ANY, queues=None, team_name=None
+ )
diff --git a/airflow-core/tests/unit/jobs/test_triggerer_job.py
b/airflow-core/tests/unit/jobs/test_triggerer_job.py
index 73643ab3867..3d3a399c121 100644
--- a/airflow-core/tests/unit/jobs/test_triggerer_job.py
+++ b/airflow-core/tests/unit/jobs/test_triggerer_job.py
@@ -200,6 +200,14 @@ def test_capacity_decode():
TriggererJobRunner(job=job, capacity=input_str)
[email protected]("team_name", ["team_a", None])
+def test_triggerer_job_runner_stores_team_name(team_name):
+ """TriggererJobRunner stores team_name as-is (validated at CLI layer)."""
+ job = Job()
+ runner = TriggererJobRunner(job, capacity=10, team_name=team_name)
+ assert runner.team_name == team_name
+
+
@pytest.fixture
def supervisor_builder(mocker, session):
def builder(job=None):
@@ -236,6 +244,42 @@ def supervisor_builder(mocker, session):
return builder
+def test_supervisor_stores_team_name(supervisor_builder, mocker, session):
+ """TriggerRunnerSupervisor stores team_name field."""
+ job = Job()
+ session.add(job)
+ session.flush()
+
+ import psutil
+
+ process = mocker.Mock(spec=psutil.Process, pid=99)
+ mock_stdin = mocker.Mock(spec=socket)
+
+ proc = TriggerRunnerSupervisor(
+ process_log=mocker.Mock(spec=FilteringBoundLogger),
+ id=job.id,
+ job=job,
+ pid=process.pid,
+ stdin=mock_stdin,
+ process=process,
+ capacity=10,
+ team_name="team_x",
+ )
+ assert proc.team_name == "team_x"
+
+ proc_global = TriggerRunnerSupervisor(
+ process_log=mocker.Mock(spec=FilteringBoundLogger),
+ id=job.id,
+ job=job,
+ pid=process.pid,
+ stdin=mock_stdin,
+ process=process,
+ capacity=10,
+ team_name=None,
+ )
+ assert proc_global.team_name is None
+
+
def test_run_invokes_seams_in_order(supervisor_builder, mocker):
"""run() enters run_context, drives run_once while not should_stop, then
exits run_context."""
from contextlib import contextmanager