This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 762ad78d7d8 Remove block_use_of_multi_team for 3.2 (#62330)
762ad78d7d8 is described below
commit 762ad78d7d89beb9786a5cabad9f7cb9d42c880a
Author: Niko Oliveira <[email protected]>
AuthorDate: Mon Feb 23 06:27:55 2026 -0800
Remove block_use_of_multi_team for 3.2 (#62330)
We're going to do an experimental/partial release of multi-team for
Airflow 3.2 and thus the feature blocker must be removed.
---
.../src/airflow/executors/executor_loader.py | 16 ------
.../tests/unit/executors/test_executor_loader.py | 64 +++++-----------------
2 files changed, 14 insertions(+), 66 deletions(-)
diff --git a/airflow-core/src/airflow/executors/executor_loader.py
b/airflow-core/src/airflow/executors/executor_loader.py
index c7c2e18d8d1..6a86426f210 100644
--- a/airflow-core/src/airflow/executors/executor_loader.py
+++ b/airflow-core/src/airflow/executors/executor_loader.py
@@ -18,7 +18,6 @@
from __future__ import annotations
-import os
from collections import defaultdict
from typing import TYPE_CHECKING
@@ -146,20 +145,6 @@ class ExecutorLoader:
return executor_names
- @classmethod
- def block_use_of_multi_team(cls):
- """
- Raise an exception if the user tries to use multiple team based
executors.
-
- Before the feature is complete we do not want users to accidentally
configure this.
- This can be overridden by setting the AIRFLOW__DEV__MULTI_TEAM_MODE
environment
- variable to "enabled"
- This check is built into a method so that it can be easily mocked in
unit tests.
- """
- team_dev_mode: str | None =
os.environ.get("AIRFLOW__DEV__MULTI_TEAM_MODE")
- if not team_dev_mode or team_dev_mode != "enabled":
- raise AirflowConfigException("Configuring multiple team based
executors is not yet supported!")
-
@classmethod
def _validate_teams_exist_in_database(cls, team_names: set[str]) -> None:
"""
@@ -218,7 +203,6 @@ class ExecutorLoader:
team_name = None
executor_names = team_executor_config.strip("=")
else:
- cls.block_use_of_multi_team()
if conf.getboolean("core", "multi_team", fallback=False):
team_name, executor_names = team_executor_config.split("=")
else:
diff --git a/airflow-core/tests/unit/executors/test_executor_loader.py
b/airflow-core/tests/unit/executors/test_executor_loader.py
index 3c88729d394..6c2116836d7 100644
--- a/airflow-core/tests/unit/executors/test_executor_loader.py
+++ b/airflow-core/tests/unit/executors/test_executor_loader.py
@@ -234,24 +234,13 @@ class TestExecutorLoader:
],
)
def test_get_hybrid_executors_from_configs(self, executor_config,
expected_executors_list):
- # Mock the blocking method for tests that involve actual team
configurations
with (
- mock.patch.object(executor_loader.ExecutorLoader,
"block_use_of_multi_team"),
mock.patch.object(executor_loader.ExecutorLoader,
"_validate_teams_exist_in_database"),
):
with conf_vars({("core", "executor"): executor_config, ("core",
"multi_team"): "True"}):
executors =
executor_loader.ExecutorLoader._get_executor_names()
assert executors == expected_executors_list
- def test_get_multi_team_executors_from_config_blocked_by_default(self):
- """By default the use of multiple team based executors is blocked for
now."""
- with conf_vars({("core", "executor"):
"=CeleryExecutor;team_a=CeleryExecutor;team_b=LocalExecutor"}):
- with pytest.raises(
- AirflowConfigException,
- match=r".*Configuring multiple team based executors is not yet
supported!.*",
- ):
- executor_loader.ExecutorLoader._get_executor_names()
-
def test_init_executors(self):
from airflow.providers.celery.executors.celery_executor import
CeleryExecutor
@@ -423,10 +412,7 @@ class TestExecutorLoader:
assert executor_loader._module_to_executors_per_team == {}
assert executor_loader._classname_to_executors_per_team == {}
assert executor_loader._team_name_to_executors == {}
- with (
- mock.patch.object(executor_loader.ExecutorLoader,
"block_use_of_multi_team"),
- mock.patch.object(executor_loader.ExecutorLoader,
"_validate_teams_exist_in_database"),
- ):
+ with mock.patch.object(executor_loader.ExecutorLoader,
"_validate_teams_exist_in_database"):
executor_loader.ExecutorLoader._get_executor_names()
assert executor_loader._executor_names == [
celery_global,
@@ -488,13 +474,12 @@ class TestExecutorLoader:
)
def test_duplicate_team_names_should_fail(self, executor_config):
"""Test that duplicate team names in executor configuration raise an
exception."""
- with mock.patch.object(executor_loader.ExecutorLoader,
"block_use_of_multi_team"):
- with conf_vars({("core", "executor"): executor_config, ("core",
"multi_team"): "True"}):
- with pytest.raises(
- AirflowConfigException,
- match=r"Team '.+' appears more than once in executor
configuration",
- ):
- executor_loader.ExecutorLoader._get_team_executor_configs()
+ with conf_vars({("core", "executor"): executor_config, ("core",
"multi_team"): "True"}):
+ with pytest.raises(
+ AirflowConfigException,
+ match=r"Team '.+' appears more than once in executor
configuration",
+ ):
+ executor_loader.ExecutorLoader._get_team_executor_configs()
@pytest.mark.parametrize(
"executor_config",
@@ -526,10 +511,7 @@ class TestExecutorLoader:
("team2", ["LocalExecutor"]),
]
- with (
- mock.patch.object(executor_loader.ExecutorLoader,
"block_use_of_multi_team"),
- mock.patch.object(executor_loader.ExecutorLoader,
"_validate_teams_exist_in_database"),
- ):
+ with mock.patch.object(executor_loader.ExecutorLoader,
"_validate_teams_exist_in_database"):
with conf_vars({("core", "executor"): executor_config, ("core",
"multi_team"): "True"}):
configs =
executor_loader.ExecutorLoader._get_team_executor_configs()
assert configs == expected_configs
@@ -569,10 +551,7 @@ class TestExecutorLoader:
)
def test_team_only_configurations_should_fail(self, executor_config):
"""Test that configurations with only team-based executors fail
validation."""
- with (
- mock.patch.object(executor_loader.ExecutorLoader,
"block_use_of_multi_team"),
- conf_vars({("core", "executor"): executor_config}),
- ):
+ with conf_vars({("core", "executor"): executor_config}):
with pytest.raises(
AirflowConfigException, match=r"At least one global executor
must be configured"
):
@@ -582,10 +561,7 @@ class TestExecutorLoader:
"""Test that executor config with valid teams loads successfully."""
mock_team_names = {"team_a", "team_b"}
- with (
- patch.object(executor_loader.Team, "get_all_team_names",
return_value=mock_team_names),
- mock.patch.object(executor_loader.ExecutorLoader,
"block_use_of_multi_team"),
- ):
+ with patch.object(executor_loader.Team, "get_all_team_names",
return_value=mock_team_names):
with conf_vars(
{
("core", "executor"):
"=CeleryExecutor;team_a=CeleryExecutor;team_b=LocalExecutor",
@@ -603,10 +579,7 @@ class TestExecutorLoader:
"""Test that executor config with invalid teams fails with clear
error."""
mock_team_names = {"team_a"} # team_b and team_c are missing
- with (
- patch.object(executor_loader.Team, "get_all_team_names",
return_value=mock_team_names),
- mock.patch.object(executor_loader.ExecutorLoader,
"block_use_of_multi_team"),
- ):
+ with patch.object(executor_loader.Team, "get_all_team_names",
return_value=mock_team_names):
with conf_vars(
{
(
@@ -633,10 +606,7 @@ class TestExecutorLoader:
def test_get_executor_names_skip_team_validation(self):
"""Test that get_executor_names can skip team validation."""
- with (
- patch.object(executor_loader.Team, "get_all_team_names") as
mock_get_team_names,
- mock.patch.object(executor_loader.ExecutorLoader,
"block_use_of_multi_team"),
- ):
+ with patch.object(executor_loader.Team, "get_all_team_names") as
mock_get_team_names:
with conf_vars(
{("core", "executor"): "=CeleryExecutor;team_a=LocalExecutor",
("core", "multi_team"): "True"}
):
@@ -646,10 +616,7 @@ class TestExecutorLoader:
def test_get_executor_names_default_validates_teams(self):
"""Test that get_executor_names validates teams by default."""
- with (
- patch.object(executor_loader.Team, "get_all_team_names") as
mock_get_team_names,
- mock.patch.object(executor_loader.ExecutorLoader,
"block_use_of_multi_team"),
- ):
+ with patch.object(executor_loader.Team, "get_all_team_names") as
mock_get_team_names:
with conf_vars(
{("core", "executor"): "=CeleryExecutor;team_a=LocalExecutor",
("core", "multi_team"): "True"}
):
@@ -674,10 +641,7 @@ class TestExecutorLoader:
Global executors must always be specified before any team-based
executors.
This ensures the default executor (first in config) is always global.
"""
- with (
- mock.patch.object(executor_loader.ExecutorLoader,
"block_use_of_multi_team"),
- mock.patch.object(executor_loader.ExecutorLoader,
"_validate_teams_exist_in_database"),
- ):
+ with mock.patch.object(executor_loader.ExecutorLoader,
"_validate_teams_exist_in_database"):
with conf_vars({("core", "executor"): executor_config, ("core",
"multi_team"): "True"}):
with pytest.raises(
AirflowConfigException,