This is an automated email from the ASF dual-hosted git repository.

onikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 87b279b301c Update executor loader cache (#55469)
87b279b301c is described below

commit 87b279b301ccd05afe7a86082e4b6a60bebea5b4
Author: Niko Oliveira <[email protected]>
AuthorDate: Fri Sep 12 09:52:04 2025 -0700

    Update executor loader cache (#55469)
    
    The executor loader caches the values it reads from config in several
    ways to make lookups very fast. Now with teams, we may have multiple
    instances of the same executor so the caches must now be per-team.
    
    Also disallow having more than one list of executors for each team.
    
    Add tests to cover more of these edge cases and to harden test coverage
    of the multi-team usecase in general.
---
 .../src/airflow/executors/executor_loader.py       |  63 +++++---
 airflow-core/tests/integration/otel/test_otel.py   |   4 +-
 .../tests/unit/executors/test_executor_loader.py   | 165 +++++++++++++++++++++
 .../src/tests_common/test_utils/executor_loader.py |  15 +-
 4 files changed, 221 insertions(+), 26 deletions(-)

diff --git a/airflow-core/src/airflow/executors/executor_loader.py 
b/airflow-core/src/airflow/executors/executor_loader.py
index 49c40e94389..e13a0b80f3f 100644
--- a/airflow-core/src/airflow/executors/executor_loader.py
+++ b/airflow-core/src/airflow/executors/executor_loader.py
@@ -20,6 +20,7 @@ from __future__ import annotations
 
 import logging
 import os
+from collections import defaultdict
 from typing import TYPE_CHECKING
 
 from airflow.exceptions import AirflowConfigException, UnknownExecutorException
@@ -41,11 +42,11 @@ if TYPE_CHECKING:
 
 # Used to lookup an ExecutorName via a string alias or module path. An
 # executor may have both so we need two lookup dicts.
-_alias_to_executors: dict[str, ExecutorName] = {}
-_module_to_executors: dict[str, ExecutorName] = {}
+_alias_to_executors_per_team: dict[str | None, dict[str, ExecutorName]] = 
defaultdict(dict)
+_module_to_executors_per_team: dict[str | None, dict[str, ExecutorName]] = 
defaultdict(dict)
+_classname_to_executors_per_team: dict[str | None, dict[str, ExecutorName]] = 
defaultdict(dict)
 # Used to lookup an ExecutorName via the team id.
-_team_name_to_executors: dict[str | None, ExecutorName] = {}
-_classname_to_executors: dict[str, ExecutorName] = {}
+_team_name_to_executors: dict[str | None, list[ExecutorName]] = 
defaultdict(list)
 # Used to cache the computed ExecutorNames so that we don't need to read/parse 
config more than once
 _executor_names: list[ExecutorName] = []
 
@@ -125,13 +126,14 @@ class ExecutorLoader:
         for executor_name in executor_names:
             # Executors will not always have aliases
             if executor_name.alias:
-                _alias_to_executors[executor_name.alias] = executor_name
-            # All executors will have a team id. It _may_ be None, for now 
that means it is a system
-            # level executor
-            _team_name_to_executors[executor_name.team_name] = executor_name
+                
_alias_to_executors_per_team[executor_name.team_name][executor_name.alias] = 
executor_name
+            # All executors will have a team name. It _may_ be None, for now 
that means it is a system level executor
+            
_team_name_to_executors[executor_name.team_name].append(executor_name)
             # All executors will have a module path
-            _module_to_executors[executor_name.module_path] = executor_name
-            _classname_to_executors[executor_name.module_path.split(".")[-1]] 
= executor_name
+            
_module_to_executors_per_team[executor_name.team_name][executor_name.module_path]
 = executor_name
+            _classname_to_executors_per_team[executor_name.team_name][
+                executor_name.module_path.split(".")[-1]
+            ] = executor_name
             # Cache the executor names, so the logic of this method only runs 
once
             _executor_names.append(executor_name)
 
@@ -167,6 +169,8 @@ class ExecutorLoader:
                 "The 'executor' key in the 'core' section of the configuration 
is mandatory and cannot be empty"
             )
         configs: list[tuple[str | None, list[str]]] = []
+        seen_teams: set[str | None] = set()
+
         # The executor_config can look like a few things. One is just a single 
executor name, such as
         # "CeleryExecutor". Or a list of executors, such as 
"CeleryExecutor,KubernetesExecutor,module.path.to.executor".
         # In these cases these are all executors that are available to all 
teams, with the first one being the
@@ -178,13 +182,23 @@ class ExecutorLoader:
             # The first item in the list may not have a team id (either empty 
string before the equal
             # sign or no equal sign at all), which means it is a global 
executor config.
             if "=" not in team_executor_config or 
team_executor_config.startswith("="):
-                team_executor_config = team_executor_config.strip("=")
-                # Split by comma to get the individual executor names and 
strip spaces off of them
-                configs.append((None, [name.strip() for name in 
team_executor_config.split(",")]))
+                team_name = None
+                executor_names = team_executor_config.strip("=")
             else:
                 cls.block_use_of_multi_team()
                 team_name, executor_names = team_executor_config.split("=")
-                configs.append((team_name, [name.strip() for name in 
executor_names.split(",")]))
+
+            # Check for duplicate team names
+            if team_name in seen_teams:
+                raise AirflowConfigException(
+                    f"Team '{team_name}' appears more than once in executor 
configuration. "
+                    f"Each team can only be specified once in the executor 
config."
+                )
+            seen_teams.add(team_name)
+
+            # Split by comma to get the individual executor names and strip 
spaces off of them
+            configs.append((team_name, [name.strip() for name in 
executor_names.split(",")]))
+
         return configs
 
     @classmethod
@@ -197,14 +211,15 @@ class ExecutorLoader:
         return cls._get_executor_names()
 
     @classmethod
-    def get_default_executor_name(cls) -> ExecutorName:
+    def get_default_executor_name(cls, team_name: str | None = None) -> 
ExecutorName:
         """
         Return the default executor name from Airflow configuration.
 
         :return: executor name from Airflow configuration
         """
+        cls._get_executor_names()
         # The default executor is the first configured executor in the list
-        return cls._get_executor_names()[0]
+        return _team_name_to_executors[team_name][0]
 
     @classmethod
     def get_default_executor(cls) -> BaseExecutor:
@@ -230,17 +245,23 @@ class ExecutorLoader:
         return loaded_executors
 
     @classmethod
-    def lookup_executor_name_by_str(cls, executor_name_str: str) -> 
ExecutorName:
+    def lookup_executor_name_by_str(
+        cls, executor_name_str: str, team_name: str | None = None
+    ) -> ExecutorName:
         # lookup the executor by alias first, if not check if we're given a 
module path
-        if not _classname_to_executors or not _module_to_executors or not 
_alias_to_executors:
+        if (
+            not _classname_to_executors_per_team
+            or not _module_to_executors_per_team
+            or not _alias_to_executors_per_team
+        ):
             # if we haven't loaded the executors yet, such as directly calling 
load_executor
             cls._get_executor_names()
 
-        if executor_name := _alias_to_executors.get(executor_name_str):
+        if executor_name := _alias_to_executors_per_team.get(team_name, 
{}).get(executor_name_str):
             return executor_name
-        if executor_name := _module_to_executors.get(executor_name_str):
+        if executor_name := _module_to_executors_per_team.get(team_name, 
{}).get(executor_name_str):
             return executor_name
-        if executor_name := _classname_to_executors.get(executor_name_str):
+        if executor_name := _classname_to_executors_per_team.get(team_name, 
{}).get(executor_name_str):
             return executor_name
         raise UnknownExecutorException(f"Unknown executor being loaded: 
{executor_name_str}")
 
diff --git a/airflow-core/tests/integration/otel/test_otel.py 
b/airflow-core/tests/integration/otel/test_otel.py
index e218820b292..4dd9874f736 100644
--- a/airflow-core/tests/integration/otel/test_otel.py
+++ b/airflow-core/tests/integration/otel/test_otel.py
@@ -709,7 +709,9 @@ class TestOtelIntegration:
             
module_path="airflow.providers.celery.executors.celery_executor.CeleryExecutor",
             alias="CeleryExecutor",
         )
-        monkeypatch.setattr(executor_loader, "_alias_to_executors", 
{"CeleryExecutor": executor_name})
+        monkeypatch.setattr(
+            executor_loader, "_alias_to_executors_per_team", {None: 
{"CeleryExecutor": executor_name}}
+        )
 
     @pytest.fixture(autouse=True)
     def cleanup_control_file_if_needed(self):
diff --git a/airflow-core/tests/unit/executors/test_executor_loader.py 
b/airflow-core/tests/unit/executors/test_executor_loader.py
index d3495752880..30fb90d0138 100644
--- a/airflow-core/tests/unit/executors/test_executor_loader.py
+++ b/airflow-core/tests/unit/executors/test_executor_loader.py
@@ -395,3 +395,168 @@ class TestExecutorLoader:
                     
executor_loader.ExecutorLoader.load_executor(executor_loader._executor_names[0]),
                     AwsEcsExecutor,
                 )
+
+    def test_get_executor_names_set_module_variables(self):
+        with conf_vars(
+            {
+                (
+                    "core",
+                    "executor",
+                ): 
"=CeleryExecutor,LocalExecutor,fake_exec:unit.executors.test_executor_loader.FakeExecutor;team_a=CeleryExecutor,unit.executors.test_executor_loader.FakeExecutor;team_b=fake_exec:unit.executors.test_executor_loader.FakeExecutor"
+            }
+        ):
+            celery_path = 
"airflow.providers.celery.executors.celery_executor.CeleryExecutor"
+            local_path = "airflow.executors.local_executor.LocalExecutor"
+            fake_exec_path = "unit.executors.test_executor_loader.FakeExecutor"
+            celery_global = ExecutorName(module_path=celery_path, 
alias="CeleryExecutor", team_name=None)
+            local_global = ExecutorName(module_path=local_path, 
alias="LocalExecutor", team_name=None)
+            fake_global = ExecutorName(module_path=fake_exec_path, 
alias="fake_exec", team_name=None)
+            team_a_celery = ExecutorName(
+                module_path=celery_path,
+                alias="CeleryExecutor",
+                team_name="team_a",
+            )
+            team_a_fake = ExecutorName(
+                module_path=fake_exec_path,
+                team_name="team_a",
+            )
+            team_b_fake = ExecutorName(
+                module_path=fake_exec_path,
+                alias="fake_exec",
+                team_name="team_b",
+            )
+            assert executor_loader._executor_names == []
+            assert executor_loader._alias_to_executors_per_team == {}
+            assert executor_loader._module_to_executors_per_team == {}
+            assert executor_loader._classname_to_executors_per_team == {}
+            assert executor_loader._team_name_to_executors == {}
+            with mock.patch.object(executor_loader.ExecutorLoader, 
"block_use_of_multi_team"):
+                executor_loader.ExecutorLoader._get_executor_names()
+            assert executor_loader._executor_names == [
+                celery_global,
+                local_global,
+                fake_global,
+                team_a_celery,
+                team_a_fake,
+                team_b_fake,
+            ]
+            assert executor_loader._alias_to_executors_per_team == {
+                None: {
+                    "CeleryExecutor": celery_global,
+                    "LocalExecutor": local_global,
+                    "fake_exec": fake_global,
+                },
+                "team_a": {"CeleryExecutor": team_a_celery},
+                "team_b": {"fake_exec": team_b_fake},
+            }
+            assert executor_loader._module_to_executors_per_team == {
+                None: {
+                    celery_path: celery_global,
+                    local_path: local_global,
+                    fake_exec_path: fake_global,
+                },
+                "team_a": {
+                    celery_path: team_a_celery,
+                    fake_exec_path: team_a_fake,
+                },
+                "team_b": {
+                    fake_exec_path: team_b_fake,
+                },
+            }
+            assert executor_loader._classname_to_executors_per_team == {
+                None: {
+                    "CeleryExecutor": celery_global,
+                    "LocalExecutor": local_global,
+                    "FakeExecutor": fake_global,
+                },
+                "team_a": {
+                    "CeleryExecutor": team_a_celery,
+                    "FakeExecutor": team_a_fake,
+                },
+                "team_b": {
+                    "FakeExecutor": team_b_fake,
+                },
+            }
+            assert executor_loader._team_name_to_executors == {
+                None: [celery_global, local_global, fake_global],
+                "team_a": [team_a_celery, team_a_fake],
+                "team_b": [team_b_fake],
+            }
+
+    @pytest.mark.parametrize(
+        "executor_config",
+        [
+            "team1=CeleryExecutor;team1=LocalExecutor",
+            
"team1=CeleryExecutor;team2=LocalExecutor;team1=KubernetesExecutor",
+            "CeleryExecutor;team1=LocalExecutor;team1=KubernetesExecutor",
+            
"team_a=CeleryExecutor;team_b=LocalExecutor;team_a=KubernetesExecutor",
+        ],
+    )
+    def test_duplicate_team_names_should_fail(self, executor_config):
+        """Test that duplicate team names in executor configuration raise an 
exception."""
+        with mock.patch.object(executor_loader.ExecutorLoader, 
"block_use_of_multi_team"):
+            with conf_vars({("core", "executor"): executor_config}):
+                with pytest.raises(
+                    AirflowConfigException,
+                    match=r"Team '.+' appears more than once in executor 
configuration",
+                ):
+                    executor_loader.ExecutorLoader._get_team_executor_configs()
+
+    @pytest.mark.parametrize(
+        "executor_config",
+        [
+            "CeleryExecutor;LocalExecutor",  # Two separate global teams
+            "CeleryExecutor;KubernetesExecutor;LocalExecutor",  # Three 
separate global teams
+            "=CeleryExecutor;LocalExecutor",  # Explicit global team followed 
by another global team
+            "CeleryExecutor;=LocalExecutor",  # Global team followed by 
explicit global team
+        ],
+    )
+    def test_multiple_global_team_specifications_should_fail(self, 
executor_config):
+        """Test that multiple global team specifications raise an exception.
+
+        Only one global team specification should be allowed (comma-delimited 
executors),
+        not multiple semicolon-separated global teams.
+        """
+        with conf_vars({("core", "executor"): executor_config}):
+            with pytest.raises(
+                AirflowConfigException, match=r"Team 'None' appears more than 
once in executor configuration"
+            ):
+                executor_loader.ExecutorLoader._get_team_executor_configs()
+
+    def test_valid_team_configurations_order_preservation(self):
+        """Test that valid team configurations preserve order and work 
correctly."""
+        executor_config = 
"LocalExecutor;team1=CeleryExecutor,KubernetesExecutor;team2=LocalExecutor"
+        expected_configs = [
+            (None, ["LocalExecutor"]),
+            ("team1", ["CeleryExecutor", "KubernetesExecutor"]),
+            ("team2", ["LocalExecutor"]),
+        ]
+
+        with mock.patch.object(executor_loader.ExecutorLoader, 
"block_use_of_multi_team"):
+            with conf_vars({("core", "executor"): executor_config}):
+                configs = 
executor_loader.ExecutorLoader._get_team_executor_configs()
+                assert configs == expected_configs
+
+    @pytest.mark.parametrize(
+        ("executor_config", "expected_configs"),
+        [
+            # Single global team with one executor
+            ("CeleryExecutor", [(None, ["CeleryExecutor"])]),
+            # Single global team with multiple comma-delimited executors
+            ("CeleryExecutor,LocalExecutor", [(None, ["CeleryExecutor", 
"LocalExecutor"])]),
+            (
+                "CeleryExecutor,LocalExecutor,KubernetesExecutor",
+                [(None, ["CeleryExecutor", "LocalExecutor", 
"KubernetesExecutor"])],
+            ),
+            # Single global team with explicit = prefix
+            ("=CeleryExecutor,LocalExecutor", [(None, ["CeleryExecutor", 
"LocalExecutor"])]),
+        ],
+    )
+    def test_single_global_team_configurations_work(self, executor_config, 
expected_configs):
+        """Test that single global team configurations work correctly.
+
+        A single global team can have multiple executors specified as 
comma-delimited list.
+        """
+        with conf_vars({("core", "executor"): executor_config}):
+            configs = 
executor_loader.ExecutorLoader._get_team_executor_configs()
+            assert configs == expected_configs
diff --git a/devel-common/src/tests_common/test_utils/executor_loader.py 
b/devel-common/src/tests_common/test_utils/executor_loader.py
index 6374b381e1d..69466ba1103 100644
--- a/devel-common/src/tests_common/test_utils/executor_loader.py
+++ b/devel-common/src/tests_common/test_utils/executor_loader.py
@@ -17,6 +17,7 @@
 # under the License.
 from __future__ import annotations
 
+from collections import defaultdict
 from typing import TYPE_CHECKING
 
 import airflow.executors.executor_loader as executor_loader
@@ -27,8 +28,14 @@ if TYPE_CHECKING:
 
 def clean_executor_loader_module():
     """Clean the executor_loader state, as it stores global variables in the 
module, causing side effects for some tests."""
-    executor_loader._alias_to_executors: dict[str, ExecutorName] = {}
-    executor_loader._module_to_executors: dict[str, ExecutorName] = {}
-    executor_loader._team_name_to_executors: dict[str | None, ExecutorName] = 
{}
-    executor_loader._classname_to_executors: dict[str, ExecutorName] = {}
+    executor_loader._alias_to_executors_per_team: dict[str | None, dict[str, 
ExecutorName]] = defaultdict(
+        dict
+    )
+    executor_loader._module_to_executors_per_team: dict[str | None, dict[str, 
ExecutorName]] = defaultdict(
+        dict
+    )
+    executor_loader._classname_to_executors_per_team: dict[str | None, 
dict[str, ExecutorName]] = defaultdict(
+        dict
+    )
+    executor_loader._team_name_to_executors: dict[str | None, 
list[ExecutorName]] = defaultdict(list)
     executor_loader._executor_names: list[ExecutorName] = []

Reply via email to