This is an automated email from the ASF dual-hosted git repository.
onikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new a11ec7296ce Assert executors support multi team (#60843)
a11ec7296ce is described below
commit a11ec7296ce52ae3693e46835a49d8c5e3677341
Author: Niko Oliveira <[email protected]>
AuthorDate: Wed Jan 21 10:02:27 2026 -0800
Assert executors support multi team (#60843)
If the user is trying to configure a particular executor for a multi
team use, ensure the executor itself supports this.
Includes changes and tests in core airflow and also the two executors
that support multi team so far.
---
.../src/airflow/executors/base_executor.py | 1 +
.../src/airflow/executors/executor_loader.py | 7 +++
.../src/airflow/executors/local_executor.py | 1 +
.../tests/unit/executors/test_base_executor.py | 4 ++
.../tests/unit/executors/test_executor_loader.py | 71 ++++++++++++++++++++++
.../tests/unit/executors/test_local_executor.py | 3 +
.../amazon/aws/executors/ecs/ecs_executor.py | 2 +
.../amazon/aws/executors/ecs/test_ecs_executor.py | 3 +
8 files changed, 92 insertions(+)
diff --git a/airflow-core/src/airflow/executors/base_executor.py
b/airflow-core/src/airflow/executors/base_executor.py
index 64fa4756267..a54e1464c24 100644
--- a/airflow-core/src/airflow/executors/base_executor.py
+++ b/airflow-core/src/airflow/executors/base_executor.py
@@ -143,6 +143,7 @@ class BaseExecutor(LoggingMixin):
active_spans = ThreadSafeDict()
supports_ad_hoc_ti_run: bool = False
+ supports_multi_team: bool = False
sentry_integration: str = ""
is_local: bool = False
diff --git a/airflow-core/src/airflow/executors/executor_loader.py
b/airflow-core/src/airflow/executors/executor_loader.py
index 6b74c0a416e..c7c2e18d8d1 100644
--- a/airflow-core/src/airflow/executors/executor_loader.py
+++ b/airflow-core/src/airflow/executors/executor_loader.py
@@ -360,6 +360,13 @@ class ExecutorLoader:
executor_cls, import_source =
cls.import_executor_cls(_executor_name)
log.debug("Loading executor %s from %s", _executor_name,
import_source.value)
if _executor_name.team_name:
+ # Validate that team executors support multi-team functionality
+ if not executor_cls.supports_multi_team:
+ raise AirflowConfigException(
+ f"Executor {_executor_name.module_path} does not
support multi-team functionality "
+ f"but was configured for team
'{_executor_name.team_name}'. "
+ f"Only executors with supports_multi_team=True can be
used as team executors."
+ )
executor = executor_cls(team_name=_executor_name.team_name)
else:
executor = executor_cls()
diff --git a/airflow-core/src/airflow/executors/local_executor.py
b/airflow-core/src/airflow/executors/local_executor.py
index 07248861323..604de7c7f00 100644
--- a/airflow-core/src/airflow/executors/local_executor.py
+++ b/airflow-core/src/airflow/executors/local_executor.py
@@ -153,6 +153,7 @@ class LocalExecutor(BaseExecutor):
is_local: bool = True
is_mp_using_fork: bool = multiprocessing.get_start_method() == "fork"
+ supports_multi_team: bool = True
serve_logs: bool = True
activity_queue: SimpleQueue[workloads.All | None]
diff --git a/airflow-core/tests/unit/executors/test_base_executor.py
b/airflow-core/tests/unit/executors/test_base_executor.py
index 96c84bee1df..5c2a3d6d549 100644
--- a/airflow-core/tests/unit/executors/test_base_executor.py
+++ b/airflow-core/tests/unit/executors/test_base_executor.py
@@ -54,6 +54,10 @@ def test_is_production_default_value():
assert BaseExecutor.is_production
+def test_supports_multi_team_default_value():
+ assert not BaseExecutor.supports_multi_team
+
+
def test_invalid_slotspool():
with pytest.raises(ValueError, match="parallelism is set to 0 or lower"):
BaseExecutor(0)
diff --git a/airflow-core/tests/unit/executors/test_executor_loader.py
b/airflow-core/tests/unit/executors/test_executor_loader.py
index 1d75286c2f8..3c88729d394 100644
--- a/airflow-core/tests/unit/executors/test_executor_loader.py
+++ b/airflow-core/tests/unit/executors/test_executor_loader.py
@@ -684,3 +684,74 @@ class TestExecutorLoader:
match=r"Global executors must be specified before
team-based executors",
):
executor_loader.ExecutorLoader._get_team_executor_configs()
+
+ def test_team_executor_with_multi_team_support_loads_successfully(self):
+ """Test that executors with supports_multi_team=True load successfully
for teams."""
+ with conf_vars({("core", "executor"): "LocalExecutor"}):
+ executor = executor_loader.ExecutorLoader.load_executor(
+ ExecutorName(
+
module_path="airflow.executors.local_executor.LocalExecutor",
+ alias="LocalExecutor",
+ team_name="team_a",
+ )
+ )
+ assert executor.team_name == "team_a"
+ assert executor.supports_multi_team is True
+
+ def test_team_executor_without_multi_team_support_fails(self):
+ """Test that executors without supports_multi_team fail when
configured for teams."""
+ # Mock the executor class to have supports_multi_team = False
+ with (
+ conf_vars({("core", "executor"): "LocalExecutor"}),
+ mock.patch("airflow.executors.local_executor.LocalExecutor") as
mock_executor_cls,
+ ):
+ # Set the class attribute to False
+ mock_executor_cls.supports_multi_team = False
+ mock_executor_cls.__name__ = "LocalExecutor"
+
+ with pytest.raises(
+ AirflowConfigException,
+ match=r".*does not support multi-team functionality.*",
+ ):
+ executor_loader.ExecutorLoader.load_executor(
+ ExecutorName(
+
module_path="airflow.executors.local_executor.LocalExecutor",
+ alias="LocalExecutor",
+ team_name="team_a",
+ )
+ )
+
+ def test_global_executor_works_regardless_of_multi_team_support(self):
+ """Test that global executors work regardless of supports_multi_team
value."""
+ # Test with an executor that supports multi-team (LocalExecutor)
+ with conf_vars({("core", "executor"): "LocalExecutor"}):
+ executor = executor_loader.ExecutorLoader.load_executor(
+ ExecutorName(
+
module_path="airflow.executors.local_executor.LocalExecutor",
+ alias="LocalExecutor",
+ team_name=None,
+ )
+ )
+ assert executor.team_name is None
+ assert executor.supports_multi_team is True
+
+ # Test with a mocked executor that doesn't support multi-team
+ with (
+ conf_vars({("core", "executor"): "LocalExecutor"}),
+ mock.patch("airflow.executors.local_executor.LocalExecutor") as
mock_executor_cls,
+ ):
+ mock_executor_cls.supports_multi_team = False
+ mock_executor_instance = mock.MagicMock()
+ mock_executor_instance.supports_multi_team = False
+ mock_executor_instance.team_name = None
+ mock_executor_cls.return_value = mock_executor_instance
+
+ executor = executor_loader.ExecutorLoader.load_executor(
+ ExecutorName(
+
module_path="airflow.executors.local_executor.LocalExecutor",
+ alias="LocalExecutor",
+ team_name=None,
+ )
+ )
+ assert executor.team_name is None
+ assert executor.supports_multi_team is False
diff --git a/airflow-core/tests/unit/executors/test_local_executor.py
b/airflow-core/tests/unit/executors/test_local_executor.py
index 078b8ded3d9..fb93a699fd5 100644
--- a/airflow-core/tests/unit/executors/test_local_executor.py
+++ b/airflow-core/tests/unit/executors/test_local_executor.py
@@ -58,6 +58,9 @@ class TestLocalExecutor:
def test_is_local_default_value(self):
assert LocalExecutor.is_local
+ def test_supports_multi_team(self):
+ assert LocalExecutor.supports_multi_team
+
def test_serve_logs_default_value(self):
assert LocalExecutor.serve_logs
diff --git
a/providers/amazon/src/airflow/providers/amazon/aws/executors/ecs/ecs_executor.py
b/providers/amazon/src/airflow/providers/amazon/aws/executors/ecs/ecs_executor.py
index 6f209bb329c..3605c803cd0 100644
---
a/providers/amazon/src/airflow/providers/amazon/aws/executors/ecs/ecs_executor.py
+++
b/providers/amazon/src/airflow/providers/amazon/aws/executors/ecs/ecs_executor.py
@@ -90,6 +90,8 @@ class AwsEcsExecutor(BaseExecutor):
Airflow TaskInstance's executor_config.
"""
+ supports_multi_team: bool = True
+
# AWS limits the maximum number of ARNs in the describe_tasks function.
DESCRIBE_TASKS_BATCH_SIZE = 99
diff --git
a/providers/amazon/tests/unit/amazon/aws/executors/ecs/test_ecs_executor.py
b/providers/amazon/tests/unit/amazon/aws/executors/ecs/test_ecs_executor.py
index e34832b6ffa..04e7d2bb822 100644
--- a/providers/amazon/tests/unit/amazon/aws/executors/ecs/test_ecs_executor.py
+++ b/providers/amazon/tests/unit/amazon/aws/executors/ecs/test_ecs_executor.py
@@ -378,6 +378,9 @@ class TestEcsExecutorTask:
class TestAwsEcsExecutor:
"""Tests the AWS ECS Executor."""
+ def test_supports_multi_team(self):
+ assert AwsEcsExecutor.supports_multi_team
+
@mock.patch("airflow.providers.amazon.aws.executors.ecs.ecs_executor.AwsEcsExecutor.change_state")
def test_execute(self, change_state_mock, mock_airflow_key, mock_executor,
mock_cmd):
"""Test execution from end-to-end."""