This is an automated email from the ASF dual-hosted git repository.

onikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new eb139711bde Allow executor aliases for "core executors" (#62838)
eb139711bde is described below

commit eb139711bde3e21c8fb44277c768e36b8a58fdec
Author: Niko Oliveira <[email protected]>
AuthorDate: Wed Mar 4 14:43:14 2026 -0800

    Allow executor aliases for "core executors" (#62838)
    
    Previously, core executors like "LocalExecutor", "CeleryExecutor", etc
    could not be given aliases. This was because these executors already had
    a built in alias, they could be referenced with their shortnames alone, 
e.g.:
    `AIRFLOW__CORE__EXECUTOR="LocalExecutor"`). However, with
    multi-team, two instances of the same executor can be used across a team
    and globally, e.g.: 
`AIRFLOW__CORE__EXECUTOR="LocalExecutor;team_a=LocalExecutor "`
    In this case Dags from team_a should be able to use either the global or
    team based executors, however there is no unique way to reference them.
    Setting `executor=LocalExecutor` on a task, would always resolve to the
    team based executor.
    
    To fix this, we now allow core executors to receive aliases. This was a
    "soft" limit before, to reduce complexity, but it now solves this
    problem nicely. It is also backwards compatible since the previous
    "short name" for the core executors was always the same as the
    classname, which is now an acceptable input to the task's `executor`
    field.
    
    Update docs as well for both multi-team and multiple executor config.
---
 airflow-core/docs/core-concepts/executor/index.rst |  34 +++++-
 airflow-core/docs/core-concepts/multi-team.rst     |  60 +++++++++++
 .../src/airflow/executors/executor_loader.py       |  46 +++++---
 .../src/airflow/jobs/scheduler_job_runner.py       |   2 +
 .../tests/unit/executors/test_executor_loader.py   | 116 ++++++++++++++++++++-
 airflow-core/tests/unit/jobs/test_scheduler_job.py |  31 ++++++
 6 files changed, 266 insertions(+), 23 deletions(-)

diff --git a/airflow-core/docs/core-concepts/executor/index.rst 
b/airflow-core/docs/core-concepts/executor/index.rst
index 3a0d593a49b..0600c6dd5a7 100644
--- a/airflow-core/docs/core-concepts/executor/index.rst
+++ b/airflow-core/docs/core-concepts/executor/index.rst
@@ -145,22 +145,46 @@ Some examples of valid multiple executor configuration:
     executor = KubernetesExecutor,my.custom.module.ExecutorClass
 
 
-.. note::
-    Using two instances of the _same_ executor class is not currently 
supported.
+Aliases
+"""""""
+
+To make it easier to specify executors on tasks and Dags, executor 
configuration supports aliases. You may then use this alias to refer to the 
executor in your Dags (see below).
+
+Aliases work with both custom executor module paths and built-in core 
executors:
+
+.. code-block:: ini
+
+    [core]
+    executor = LocalExecutor,short_name:my.custom.module.ExecutorClass
+
+.. code-block:: ini
 
-To make it easier to specify executors on tasks and Dags, executor 
configuration now supports aliases. You may then use this alias to refer to the 
executor in your Dags (see below).
+    [core]
+    executor = my_local_exec:LocalExecutor,my_celery_exec:CeleryExecutor
+
+Aliasing core executors is particularly useful when the same executor is used 
at both the global and team level when
+running Multi-Team Airflow, since it allows tasks to explicitly target a 
specific instance by alias:
 
 .. code-block:: ini
 
     [core]
-    executor = LocalExecutor,ShortName:my.custom.module.ExecutorClass
+    executor = 
global_celery_exec:CeleryExecutor;team1=team_celery_exec:CeleryExecutor
+
 
 .. note::
-    If a Dag specifies a task to use an executor that is not configured, the 
Dag will fail to parse and a warning dialog will be shown in the Airflow UI. 
Please ensure that all executors you wish to use are specified in Airflow 
configuration on *any* host/container that is running an Airflow component 
(scheduler, workers, etc).
+    Using two instances of the same Executor class is only supported in 
multi-team Airflow. For example: Two separate
+    teams can both use the CeleryExecutor but one single team cannot use two 
instances of the CeleryExecutor. An executor
+    can also be used globally and in teams at the same time. Please see the 
:doc:`Multi-Team Airflow documentation </core-concepts/multi-team>` for
+    more details on this.
+
 
 Writing Dags and Tasks
 ^^^^^^^^^^^^^^^^^^^^^^
 
+.. note::
+    If a Dag specifies a task to use an executor that is not configured, the 
Dag will fail to parse and a warning dialog will be shown in the Airflow UI. 
Please ensure that all executors you wish to use are specified in Airflow 
configuration on *any* host/container that is running an Airflow component 
(scheduler, workers, etc).
+
+
 To specify an executor for a task, make use of the executor parameter on 
Airflow Operators:
 
 .. code-block:: python
diff --git a/airflow-core/docs/core-concepts/multi-team.rst 
b/airflow-core/docs/core-concepts/multi-team.rst
index 65bc0af245f..0f8a7205673 100644
--- a/airflow-core/docs/core-concepts/multi-team.rst
+++ b/airflow-core/docs/core-concepts/multi-team.rst
@@ -283,6 +283,66 @@ Example configurations:
     # Invalid: Duplicate Executor within a Team
     executor = 
LocalExecutor;team_a=CeleryExecutor,CeleryExecutor;team_b=LocalExecutor
 
+Aliasing Executors Across Teams
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+When the same executor type is used at both the global and team level (e.g., 
``LocalExecutor`` globally and
+``LocalExecutor`` for a team), if tasks wish to target the global executor 
they need a way to distinguish between the
+two instances. To accomplish this, you can assign **aliases** to core 
executors using the ``Alias:ExecutorName`` syntax:
+
+.. code-block:: ini
+
+    [core]
+    executor = 
global_celery_exec:CeleryExecutor;team1=team_celery_exec:CeleryExecutor
+
+With this configuration:
+
+- The global ``CeleryExecutor`` is available via the alias 
``global_celery_exec``
+- The team_a ``CeleryExecutor`` is available via the alias ``team_celery_exec``
+- A task in ``team_a`` that sets ``executor="team_celery_exec"``, 
``executor="CeleryExecutor"``, or 
``executor="airflow.providers.celery.executors.celery_executor.CeleryExecutor"``
+  will run on the **team** executor
+- A task in ``team_a`` that sets ``executor="global_celery_exec"`` will run on 
the **global** executor
+
+.. code-block:: python
+
+    # Runs on the global CeleryExecutor via alias
+    BashOperator(
+        task_id="uses_global",
+        executor="global_celery_exec",
+        bash_command="echo 'running on global executor'",
+    )
+
+    # Runs on team_a's CeleryExecutor via alias
+    BashOperator(
+        task_id="use_team_alias",
+        executor="team_celery_exec",
+        bash_command="echo 'running on team executor'",
+    )
+
+    # Runs on team_a's CeleryExecutor via class name
+    BashOperator(
+        task_id="use_team_classname",
+        executor="CeleryExecutor",
+        bash_command="echo 'running on team executor'",
+    )
+
+    # Runs on team_a's CeleryExecutor via full module path
+    BashOperator(
+        task_id="use_team_module_path",
+        
executor="airflow.providers.celery.executors.celery_executor.CeleryExecutor",
+        bash_command="echo 'running on team executor'",
+    )
+
+    # Also runs on team_a's CeleryExecutor (implicit team default)
+    BashOperator(
+        task_id="use_default",
+        bash_command="echo 'running on default team executor'",
+    )
+
+Aliases work with all core executors (``LocalExecutor``, ``CeleryExecutor``, 
``KubernetesExecutor``, etc) as
+well as custom executor module paths. For more information on aliases and 
multiple executor configuration,
+see :ref:`Using Multiple Executors Concurrently 
<using-multiple-executors-concurrently>`.
+
 Team-specific Executor Settings
 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 
diff --git a/airflow-core/src/airflow/executors/executor_loader.py 
b/airflow-core/src/airflow/executors/executor_loader.py
index ef951bb1f7c..79555d49daf 100644
--- a/airflow-core/src/airflow/executors/executor_loader.py
+++ b/airflow-core/src/airflow/executors/executor_loader.py
@@ -83,34 +83,46 @@ class ExecutorLoader:
             for executor_name_str in executor_names_config:
                 if len(split_name := executor_name_str.split(":")) == 1:
                     name = split_name[0]
-                    # Check if this is an alias for a core airflow executor, 
module
-                    # paths won't be provided by the user in that case.
-                    if core_executor_module := cls.executors.get(name):
+                    # Check if this is a core airflow executor name. Use 
CORE_EXECUTOR_NAMES
+                    # (immutable set) rather than cls.executors (mutable dict 
that may contain
+                    # aliases added by init_executors at runtime).
+                    if name in CORE_EXECUTOR_NAMES:
                         executor_names_per_team.append(
-                            ExecutorName(module_path=core_executor_module, 
alias=name, team_name=team_name)
+                            ExecutorName(module_path=cls.executors[name], 
alias=name, team_name=team_name)
                         )
                     # A module path was provided
                     else:
                         executor_names_per_team.append(
                             ExecutorName(alias=None, module_path=name, 
team_name=team_name)
                         )
-                # An alias was provided with the module path
+                # An alias was provided with the module path or core executor 
name
                 elif len(split_name) == 2:
-                    # Ensure the user is not trying to override the existing 
aliases of any of the core
-                    # executors by providing an alias along with the existing 
core airflow executor alias
-                    # (e.g. my_local_exec_alias:LocalExecutor). Allowing this 
makes things unnecessarily
-                    # complicated. Multiple Executors of the same type will be 
supported by a future
-                    # multitenancy AIP.
-                    # The module component should always be a module path.
-                    module_path = split_name[1]
-                    if not module_path or module_path in CORE_EXECUTOR_NAMES 
or "." not in module_path:
+                    alias = split_name[0]
+                    module_or_name = split_name[1]
+                    if not module_or_name:
                         raise AirflowConfigException(
                             "Incorrectly formatted executor configuration. 
Second portion of an executor "
-                            f"configuration must be a module path but 
received: {module_path}"
+                            f"configuration must be a core executor name or 
module path but received: {module_or_name}"
+                        )
+                    # Check if the second part is a core executor name (e.g. 
"MyAlias:LocalExecutor").
+                    # If so, resolve it to its module path. Use 
CORE_EXECUTOR_NAMES (immutable set)
+                    # rather than cls.executors (mutable dict that may contain 
aliases added by
+                    # init_executors at runtime).
+                    if module_or_name in CORE_EXECUTOR_NAMES:
+                        executor_names_per_team.append(
+                            ExecutorName(
+                                alias=alias, 
module_path=cls.executors[module_or_name], team_name=team_name
+                            )
+                        )
+                    elif "." not in module_or_name:
+                        raise AirflowConfigException(
+                            "Incorrectly formatted executor configuration. 
Second portion of an executor "
+                            f"configuration must be a core executor name or 
module path but received: {module_or_name}"
+                        )
+                    else:
+                        executor_names_per_team.append(
+                            ExecutorName(alias=alias, 
module_path=module_or_name, team_name=team_name)
                         )
-                    executor_names_per_team.append(
-                        ExecutorName(alias=split_name[0], 
module_path=split_name[1], team_name=team_name)
-                    )
                 else:
                     raise AirflowConfigException(
                         f"Incorrectly formatted executor configuration: 
{executor_name_str}"
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py 
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index b77f87f6fb1..367447968c2 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -3326,10 +3326,12 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                 if _executor.name and workload.get_executor_name() in (
                     _executor.name.alias,
                     _executor.name.module_path,
+                    _executor.name.module_path.split(".")[-1],
                 ):
                     # The executor must either match the team or be global 
(i.e. team_name is None)
                     if team_name and _executor.team_name == team_name or 
_executor.team_name is None:
                         executor = _executor
+                        break
 
         if executor is not None:
             self.log.debug(
diff --git a/airflow-core/tests/unit/executors/test_executor_loader.py 
b/airflow-core/tests/unit/executors/test_executor_loader.py
index 07efc11b9a3..c0c0a2f6331 100644
--- a/airflow-core/tests/unit/executors/test_executor_loader.py
+++ b/airflow-core/tests/unit/executors/test_executor_loader.py
@@ -231,6 +231,81 @@ class TestExecutorLoader:
                 ],
                 
id="core_executors_and_custom_module_path_executor_with_aliases_per_team",
             ),
+            pytest.param(
+                "my_local:LocalExecutor, CeleryExecutor",
+                [
+                    ExecutorName(
+                        
module_path="airflow.executors.local_executor.LocalExecutor",
+                        alias="my_local",
+                        team_name=None,
+                    ),
+                    ExecutorName(
+                        
module_path="airflow.providers.celery.executors.celery_executor.CeleryExecutor",
+                        alias="CeleryExecutor",
+                        team_name=None,
+                    ),
+                ],
+                id="aliased_core_executor",
+            ),
+            pytest.param(
+                "my_celery:CeleryExecutor, my_local:LocalExecutor",
+                [
+                    ExecutorName(
+                        
module_path="airflow.providers.celery.executors.celery_executor.CeleryExecutor",
+                        alias="my_celery",
+                        team_name=None,
+                    ),
+                    ExecutorName(
+                        
module_path="airflow.executors.local_executor.LocalExecutor",
+                        alias="my_local",
+                        team_name=None,
+                    ),
+                ],
+                id="multiple_aliased_core_executors",
+            ),
+            pytest.param(
+                "=GlobalLocal:LocalExecutor;team_a=TeamLocal:LocalExecutor",
+                [
+                    ExecutorName(
+                        
module_path="airflow.executors.local_executor.LocalExecutor",
+                        alias="GlobalLocal",
+                        team_name=None,
+                    ),
+                    ExecutorName(
+                        
module_path="airflow.executors.local_executor.LocalExecutor",
+                        alias="TeamLocal",
+                        team_name="team_a",
+                    ),
+                ],
+                id="aliased_core_executor_global_and_team",
+            ),
+            pytest.param(
+                "=LocalExecutor;team_a=TeamLocal:LocalExecutor",
+                [
+                    ExecutorName(
+                        
module_path="airflow.executors.local_executor.LocalExecutor",
+                        alias="LocalExecutor",
+                        team_name=None,
+                    ),
+                    ExecutorName(
+                        
module_path="airflow.executors.local_executor.LocalExecutor",
+                        alias="TeamLocal",
+                        team_name="team_a",
+                    ),
+                ],
+                id="core_executor_global_and_aliased_core_executor_team",
+            ),
+            pytest.param(
+                "my_k8s:KubernetesExecutor",
+                [
+                    ExecutorName(
+                        
module_path="airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor",
+                        alias="my_k8s",
+                        team_name=None,
+                    ),
+                ],
+                id="aliased_kubernetes_executor",
+            ),
         ],
     )
     def test_get_hybrid_executors_from_configs(self, executor_config, 
expected_executors_list):
@@ -276,8 +351,8 @@ class TestExecutorLoader:
             "LocalExecutor, Ce:ler:yExecutor",
             "LocalExecutor, CeleryExecutor:",
             "LocalExecutor, my_cool_alias:",
-            "LocalExecutor, my_cool_alias:CeleryExecutor",
             "LocalExecutor, module.path.first:alias_second",
+            "LocalExecutor, my_cool_alias:not_a_core_or_module",
         ],
     )
     def 
test_get_hybrid_executors_from_config_core_executors_bad_config_format(self, 
executor_config):
@@ -340,6 +415,45 @@ class TestExecutorLoader:
                 LocalExecutor,
             )
 
+    def test_load_executor_aliased_core_executor(self):
+        """Test that a core executor can be aliased and loaded by its custom 
alias."""
+        with conf_vars({("core", "executor"): "my_local:LocalExecutor"}):
+            executor_loader.ExecutorLoader.init_executors()
+            # Can load by alias
+            executor = executor_loader.ExecutorLoader.load_executor("my_local")
+            assert isinstance(executor, LocalExecutor)
+            assert executor.name.alias == "my_local"
+            assert executor.name.module_path == 
"airflow.executors.local_executor.LocalExecutor"
+            # Can also load by module path
+            assert isinstance(
+                executor_loader.ExecutorLoader.load_executor(
+                    "airflow.executors.local_executor.LocalExecutor"
+                ),
+                LocalExecutor,
+            )
+            # Can also load by class name
+            assert isinstance(
+                executor_loader.ExecutorLoader.load_executor("LocalExecutor"),
+                LocalExecutor,
+            )
+
+    @pytest.mark.parametrize(
+        "executor_config",
+        [
+            "my_local:LocalExecutor, LocalExecutor",
+            "alias1:LocalExecutor, alias2:LocalExecutor",
+            "my_local:LocalExecutor, 
airflow.executors.local_executor.LocalExecutor",
+            "alias1:CeleryExecutor, alias2:CeleryExecutor",
+        ],
+    )
+    def test_aliased_core_executor_duplicate_in_same_team_fails(self, 
executor_config):
+        """Test that two executors of the same type within the same team 
always fails (duplicate)."""
+        with conf_vars({("core", "executor"): executor_config}):
+            with pytest.raises(
+                AirflowConfigException, match=r".+Duplicate executors are not 
yet supported.+"
+            ):
+                executor_loader.ExecutorLoader._get_executor_names()
+
     @mock.patch(
         "airflow.executors.executor_loader.ExecutorLoader._get_executor_names",
         wraps=executor_loader.ExecutorLoader._get_executor_names,
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py 
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index 7de30013ec5..275f85fd83f 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -8369,6 +8369,37 @@ class TestSchedulerJob:
 
         assert result == mock_executors[1]
 
+    @conf_vars({("core", "multi_team"): "false"})
+    def test_try_to_load_executor_matches_by_classname(self, dag_maker, 
mock_executors, session):
+        """Test that executor lookup matches by classname when alias and 
module_path don't match.
+
+        This covers the edge case where a user aliases a core executor (e.g.
+        ``global_exec:LocalExecutor;team1=team_exec:LocalExecutor``) but a 
task specifies
+        ``executor="LocalExecutor"`` (the classname). The scheduler should 
still find the
+        executor by matching the last component of the module_path (the 
classname).
+        """
+        # Set up the mock executors with aliases that differ from the classname
+        mock_executors[0].name = ExecutorName(
+            alias="global_exec", 
module_path="airflow.executors.local_executor.LocalExecutor"
+        )
+        mock_executors[1].name = ExecutorName(
+            alias="team_exec", 
module_path="airflow.executors.local_executor.LocalExecutor"
+        )
+
+        with dag_maker(dag_id="test_dag", session=session):
+            task = EmptyOperator(task_id="test_task", executor="LocalExecutor")
+
+        dr = dag_maker.create_dagrun()
+        ti = dr.get_task_instance(task.task_id, session)
+
+        scheduler_job = Job()
+        self.job_runner = SchedulerJobRunner(job=scheduler_job)
+
+        result = self.job_runner._try_to_load_executor(ti, session)
+
+        # Should match by classname (last component of module_path) and return 
the global executor
+        assert result == mock_executors[0]
+
     @conf_vars({("core", "multi_team"): "true"})
     def test_multi_team_scheduling_loop_batch_optimization(self, dag_maker, 
mock_executors, session):
         """Test that the scheduling loop uses batch team resolution 
optimization."""

Reply via email to