This is an automated email from the ASF dual-hosted git repository.

onikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new a11ec7296ce Assert executors support multi team (#60843)
a11ec7296ce is described below

commit a11ec7296ce52ae3693e46835a49d8c5e3677341
Author: Niko Oliveira <[email protected]>
AuthorDate: Wed Jan 21 10:02:27 2026 -0800

    Assert executors support multi team (#60843)
    
    If the user is trying to configure a particular executor for a multi
    team use, ensure the executor itself supports this.
    
    Includes changes and tests in core airflow and also the two executors
    that support multi team so far.
---
 .../src/airflow/executors/base_executor.py         |  1 +
 .../src/airflow/executors/executor_loader.py       |  7 +++
 .../src/airflow/executors/local_executor.py        |  1 +
 .../tests/unit/executors/test_base_executor.py     |  4 ++
 .../tests/unit/executors/test_executor_loader.py   | 71 ++++++++++++++++++++++
 .../tests/unit/executors/test_local_executor.py    |  3 +
 .../amazon/aws/executors/ecs/ecs_executor.py       |  2 +
 .../amazon/aws/executors/ecs/test_ecs_executor.py  |  3 +
 8 files changed, 92 insertions(+)

diff --git a/airflow-core/src/airflow/executors/base_executor.py 
b/airflow-core/src/airflow/executors/base_executor.py
index 64fa4756267..a54e1464c24 100644
--- a/airflow-core/src/airflow/executors/base_executor.py
+++ b/airflow-core/src/airflow/executors/base_executor.py
@@ -143,6 +143,7 @@ class BaseExecutor(LoggingMixin):
     active_spans = ThreadSafeDict()
 
     supports_ad_hoc_ti_run: bool = False
+    supports_multi_team: bool = False
     sentry_integration: str = ""
 
     is_local: bool = False
diff --git a/airflow-core/src/airflow/executors/executor_loader.py 
b/airflow-core/src/airflow/executors/executor_loader.py
index 6b74c0a416e..c7c2e18d8d1 100644
--- a/airflow-core/src/airflow/executors/executor_loader.py
+++ b/airflow-core/src/airflow/executors/executor_loader.py
@@ -360,6 +360,13 @@ class ExecutorLoader:
             executor_cls, import_source = 
cls.import_executor_cls(_executor_name)
             log.debug("Loading executor %s from %s", _executor_name, 
import_source.value)
             if _executor_name.team_name:
+                # Validate that team executors support multi-team functionality
+                if not executor_cls.supports_multi_team:
+                    raise AirflowConfigException(
+                        f"Executor {_executor_name.module_path} does not 
support multi-team functionality "
+                        f"but was configured for team 
'{_executor_name.team_name}'. "
+                        f"Only executors with supports_multi_team=True can be 
used as team executors."
+                    )
                 executor = executor_cls(team_name=_executor_name.team_name)
             else:
                 executor = executor_cls()
diff --git a/airflow-core/src/airflow/executors/local_executor.py 
b/airflow-core/src/airflow/executors/local_executor.py
index 07248861323..604de7c7f00 100644
--- a/airflow-core/src/airflow/executors/local_executor.py
+++ b/airflow-core/src/airflow/executors/local_executor.py
@@ -153,6 +153,7 @@ class LocalExecutor(BaseExecutor):
     is_local: bool = True
     is_mp_using_fork: bool = multiprocessing.get_start_method() == "fork"
 
+    supports_multi_team: bool = True
     serve_logs: bool = True
 
     activity_queue: SimpleQueue[workloads.All | None]
diff --git a/airflow-core/tests/unit/executors/test_base_executor.py 
b/airflow-core/tests/unit/executors/test_base_executor.py
index 96c84bee1df..5c2a3d6d549 100644
--- a/airflow-core/tests/unit/executors/test_base_executor.py
+++ b/airflow-core/tests/unit/executors/test_base_executor.py
@@ -54,6 +54,10 @@ def test_is_production_default_value():
     assert BaseExecutor.is_production
 
 
+def test_supports_multi_team_default_value():
+    assert not BaseExecutor.supports_multi_team
+
+
 def test_invalid_slotspool():
     with pytest.raises(ValueError, match="parallelism is set to 0 or lower"):
         BaseExecutor(0)
diff --git a/airflow-core/tests/unit/executors/test_executor_loader.py 
b/airflow-core/tests/unit/executors/test_executor_loader.py
index 1d75286c2f8..3c88729d394 100644
--- a/airflow-core/tests/unit/executors/test_executor_loader.py
+++ b/airflow-core/tests/unit/executors/test_executor_loader.py
@@ -684,3 +684,74 @@ class TestExecutorLoader:
                     match=r"Global executors must be specified before 
team-based executors",
                 ):
                     executor_loader.ExecutorLoader._get_team_executor_configs()
+
+    def test_team_executor_with_multi_team_support_loads_successfully(self):
+        """Test that executors with supports_multi_team=True load successfully 
for teams."""
+        with conf_vars({("core", "executor"): "LocalExecutor"}):
+            executor = executor_loader.ExecutorLoader.load_executor(
+                ExecutorName(
+                    
module_path="airflow.executors.local_executor.LocalExecutor",
+                    alias="LocalExecutor",
+                    team_name="team_a",
+                )
+            )
+            assert executor.team_name == "team_a"
+            assert executor.supports_multi_team is True
+
+    def test_team_executor_without_multi_team_support_fails(self):
+        """Test that executors without supports_multi_team fail when 
configured for teams."""
+        # Mock the executor class to have supports_multi_team = False
+        with (
+            conf_vars({("core", "executor"): "LocalExecutor"}),
+            mock.patch("airflow.executors.local_executor.LocalExecutor") as 
mock_executor_cls,
+        ):
+            # Set the class attribute to False
+            mock_executor_cls.supports_multi_team = False
+            mock_executor_cls.__name__ = "LocalExecutor"
+
+            with pytest.raises(
+                AirflowConfigException,
+                match=r".*does not support multi-team functionality.*",
+            ):
+                executor_loader.ExecutorLoader.load_executor(
+                    ExecutorName(
+                        
module_path="airflow.executors.local_executor.LocalExecutor",
+                        alias="LocalExecutor",
+                        team_name="team_a",
+                    )
+                )
+
+    def test_global_executor_works_regardless_of_multi_team_support(self):
+        """Test that global executors work regardless of supports_multi_team 
value."""
+        # Test with an executor that supports multi-team (LocalExecutor)
+        with conf_vars({("core", "executor"): "LocalExecutor"}):
+            executor = executor_loader.ExecutorLoader.load_executor(
+                ExecutorName(
+                    
module_path="airflow.executors.local_executor.LocalExecutor",
+                    alias="LocalExecutor",
+                    team_name=None,
+                )
+            )
+            assert executor.team_name is None
+            assert executor.supports_multi_team is True
+
+        # Test with a mocked executor that doesn't support multi-team
+        with (
+            conf_vars({("core", "executor"): "LocalExecutor"}),
+            mock.patch("airflow.executors.local_executor.LocalExecutor") as 
mock_executor_cls,
+        ):
+            mock_executor_cls.supports_multi_team = False
+            mock_executor_instance = mock.MagicMock()
+            mock_executor_instance.supports_multi_team = False
+            mock_executor_instance.team_name = None
+            mock_executor_cls.return_value = mock_executor_instance
+
+            executor = executor_loader.ExecutorLoader.load_executor(
+                ExecutorName(
+                    
module_path="airflow.executors.local_executor.LocalExecutor",
+                    alias="LocalExecutor",
+                    team_name=None,
+                )
+            )
+            assert executor.team_name is None
+            assert executor.supports_multi_team is False
diff --git a/airflow-core/tests/unit/executors/test_local_executor.py 
b/airflow-core/tests/unit/executors/test_local_executor.py
index 078b8ded3d9..fb93a699fd5 100644
--- a/airflow-core/tests/unit/executors/test_local_executor.py
+++ b/airflow-core/tests/unit/executors/test_local_executor.py
@@ -58,6 +58,9 @@ class TestLocalExecutor:
     def test_is_local_default_value(self):
         assert LocalExecutor.is_local
 
+    def test_supports_multi_team(self):
+        assert LocalExecutor.supports_multi_team
+
     def test_serve_logs_default_value(self):
         assert LocalExecutor.serve_logs
 
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/executors/ecs/ecs_executor.py
 
b/providers/amazon/src/airflow/providers/amazon/aws/executors/ecs/ecs_executor.py
index 6f209bb329c..3605c803cd0 100644
--- 
a/providers/amazon/src/airflow/providers/amazon/aws/executors/ecs/ecs_executor.py
+++ 
b/providers/amazon/src/airflow/providers/amazon/aws/executors/ecs/ecs_executor.py
@@ -90,6 +90,8 @@ class AwsEcsExecutor(BaseExecutor):
      Airflow TaskInstance's executor_config.
     """
 
+    supports_multi_team: bool = True
+
     # AWS limits the maximum number of ARNs in the describe_tasks function.
     DESCRIBE_TASKS_BATCH_SIZE = 99
 
diff --git 
a/providers/amazon/tests/unit/amazon/aws/executors/ecs/test_ecs_executor.py 
b/providers/amazon/tests/unit/amazon/aws/executors/ecs/test_ecs_executor.py
index e34832b6ffa..04e7d2bb822 100644
--- a/providers/amazon/tests/unit/amazon/aws/executors/ecs/test_ecs_executor.py
+++ b/providers/amazon/tests/unit/amazon/aws/executors/ecs/test_ecs_executor.py
@@ -378,6 +378,9 @@ class TestEcsExecutorTask:
 class TestAwsEcsExecutor:
     """Tests the AWS ECS Executor."""
 
+    def test_supports_multi_team(self):
+        assert AwsEcsExecutor.supports_multi_team
+
     
@mock.patch("airflow.providers.amazon.aws.executors.ecs.ecs_executor.AwsEcsExecutor.change_state")
     def test_execute(self, change_state_mock, mock_airflow_key, mock_executor, 
mock_cmd):
         """Test execution from end-to-end."""

Reply via email to