This is an automated email from the ASF dual-hosted git repository.
onikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 6901fe09546 Gate multi team executor loader changes with feature flag
(#56740)
6901fe09546 is described below
commit 6901fe09546e560d304cec6ac4b2355e688c4dcb
Author: Niko Oliveira <[email protected]>
AuthorDate: Thu Oct 16 16:13:22 2025 -0700
Gate multi team executor loader changes with feature flag (#56740)
Do not attempt to load executors from other teams unless
core.multi_team=True
---
airflow-core/src/airflow/executors/executor_loader.py | 10 +++++++++-
.../tests/unit/executors/test_executor_loader.py | 17 +++++++++++------
2 files changed, 20 insertions(+), 7 deletions(-)
diff --git a/airflow-core/src/airflow/executors/executor_loader.py
b/airflow-core/src/airflow/executors/executor_loader.py
index 373f8938cf4..af038503469 100644
--- a/airflow-core/src/airflow/executors/executor_loader.py
+++ b/airflow-core/src/airflow/executors/executor_loader.py
@@ -211,7 +211,15 @@ class ExecutorLoader:
executor_names = team_executor_config.strip("=")
else:
cls.block_use_of_multi_team()
- team_name, executor_names = team_executor_config.split("=")
+ if conf.getboolean("core", "multi_team", fallback=False):
+ team_name, executor_names = team_executor_config.split("=")
+ else:
+ log.warning(
+ "The 'multi_team' config is not enabled, but team
executors were configured. "
+ "The following team executor config will be ignored:
%s",
+ team_executor_config,
+ )
+ continue
# Check for duplicate team names
if team_name in seen_teams:
diff --git a/airflow-core/tests/unit/executors/test_executor_loader.py
b/airflow-core/tests/unit/executors/test_executor_loader.py
index 740ed70526c..06304876818 100644
--- a/airflow-core/tests/unit/executors/test_executor_loader.py
+++ b/airflow-core/tests/unit/executors/test_executor_loader.py
@@ -239,7 +239,7 @@ class TestExecutorLoader:
mock.patch.object(executor_loader.ExecutorLoader,
"block_use_of_multi_team"),
mock.patch.object(executor_loader.ExecutorLoader,
"_validate_teams_exist_in_database"),
):
- with conf_vars({("core", "executor"): executor_config}):
+ with conf_vars({("core", "executor"): executor_config, ("core",
"multi_team"): "True"}):
executors =
executor_loader.ExecutorLoader._get_executor_names()
assert executors == expected_executors_list
@@ -391,10 +391,11 @@ class TestExecutorLoader:
def test_get_executor_names_set_module_variables(self):
with conf_vars(
{
+ ("core", "multi_team"): "True",
(
"core",
"executor",
- ):
"=CeleryExecutor,LocalExecutor,fake_exec:unit.executors.test_executor_loader.FakeExecutor;team_a=CeleryExecutor,unit.executors.test_executor_loader.FakeExecutor;team_b=fake_exec:unit.executors.test_executor_loader.FakeExecutor"
+ ):
"=CeleryExecutor,LocalExecutor,fake_exec:unit.executors.test_executor_loader.FakeExecutor;team_a=CeleryExecutor,unit.executors.test_executor_loader.FakeExecutor;team_b=fake_exec:unit.executors.test_executor_loader.FakeExecutor",
}
):
celery_path =
"airflow.providers.celery.executors.celery_executor.CeleryExecutor"
@@ -488,7 +489,7 @@ class TestExecutorLoader:
def test_duplicate_team_names_should_fail(self, executor_config):
"""Test that duplicate team names in executor configuration raise an
exception."""
with mock.patch.object(executor_loader.ExecutorLoader,
"block_use_of_multi_team"):
- with conf_vars({("core", "executor"): executor_config}):
+ with conf_vars({("core", "executor"): executor_config, ("core",
"multi_team"): "True"}):
with pytest.raises(
AirflowConfigException,
match=r"Team '.+' appears more than once in executor
configuration",
@@ -529,7 +530,7 @@ class TestExecutorLoader:
mock.patch.object(executor_loader.ExecutorLoader,
"block_use_of_multi_team"),
mock.patch.object(executor_loader.ExecutorLoader,
"_validate_teams_exist_in_database"),
):
- with conf_vars({("core", "executor"): executor_config}):
+ with conf_vars({("core", "executor"): executor_config, ("core",
"multi_team"): "True"}):
configs =
executor_loader.ExecutorLoader._get_team_executor_configs()
assert configs == expected_configs
@@ -586,7 +587,10 @@ class TestExecutorLoader:
mock.patch.object(executor_loader.ExecutorLoader,
"block_use_of_multi_team"),
):
with conf_vars(
- {("core", "executor"):
"=CeleryExecutor;team_a=CeleryExecutor;team_b=LocalExecutor"}
+ {
+ ("core", "executor"):
"=CeleryExecutor;team_a=CeleryExecutor;team_b=LocalExecutor",
+ ("core", "multi_team"): "True",
+ }
):
configs =
executor_loader.ExecutorLoader._get_team_executor_configs()
@@ -608,7 +612,8 @@ class TestExecutorLoader:
(
"core",
"executor",
- ):
"=CeleryExecutor;team_a=CeleryExecutor;team_b=LocalExecutor;team_c=KubernetesExecutor"
+ ):
"=CeleryExecutor;team_a=CeleryExecutor;team_b=LocalExecutor;team_c=KubernetesExecutor",
+ ("core", "multi_team"): "True",
}
):
with pytest.raises(AirflowConfigException):