This is an automated email from the ASF dual-hosted git repository.
onikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new eb139711bde Allow executor aliases for "core executors" (#62838)
eb139711bde is described below
commit eb139711bde3e21c8fb44277c768e36b8a58fdec
Author: Niko Oliveira <[email protected]>
AuthorDate: Wed Mar 4 14:43:14 2026 -0800
Allow executor aliases for "core executors" (#62838)
Previously, core executors like "LocalExecutor", "CeleryExecutor", etc
could not be given aliases. This was because these executors already had
a built in alias, they could be referenced with their shortnames alone,
e.g.:
`AIRFLOW__CORE__EXECUTOR="LocalExecutor"`). However, with
multi-team, two instances of the same executor can be used across a team
and globally, e.g.:
`AIRFLOW__CORE__EXECUTOR="LocalExecutor;team_a=LocalExecutor "`
In this case Dags from team_a should be able to use either the global or
team based executors, however there is no unique way to reference them.
Setting `executor=LocalExecutor` on a task, would always resolve to the
team based executor.
To fix this, we now allow core executors to receive aliases. This was a
"soft" limit before, to reduce complexity, but it now solves this
problem nicely. It is also backwards compatible since the previous
"short name" for the core executors was always the same as the
classname, which is now an acceptable input to the task's `executor`
field.
Update docs as well for both multi-team and multiple executor config.
---
airflow-core/docs/core-concepts/executor/index.rst | 34 +++++-
airflow-core/docs/core-concepts/multi-team.rst | 60 +++++++++++
.../src/airflow/executors/executor_loader.py | 46 +++++---
.../src/airflow/jobs/scheduler_job_runner.py | 2 +
.../tests/unit/executors/test_executor_loader.py | 116 ++++++++++++++++++++-
airflow-core/tests/unit/jobs/test_scheduler_job.py | 31 ++++++
6 files changed, 266 insertions(+), 23 deletions(-)
diff --git a/airflow-core/docs/core-concepts/executor/index.rst
b/airflow-core/docs/core-concepts/executor/index.rst
index 3a0d593a49b..0600c6dd5a7 100644
--- a/airflow-core/docs/core-concepts/executor/index.rst
+++ b/airflow-core/docs/core-concepts/executor/index.rst
@@ -145,22 +145,46 @@ Some examples of valid multiple executor configuration:
executor = KubernetesExecutor,my.custom.module.ExecutorClass
-.. note::
- Using two instances of the _same_ executor class is not currently
supported.
+Aliases
+"""""""
+
+To make it easier to specify executors on tasks and Dags, executor
configuration supports aliases. You may then use this alias to refer to the
executor in your Dags (see below).
+
+Aliases work with both custom executor module paths and built-in core
executors:
+
+.. code-block:: ini
+
+ [core]
+ executor = LocalExecutor,short_name:my.custom.module.ExecutorClass
+
+.. code-block:: ini
-To make it easier to specify executors on tasks and Dags, executor
configuration now supports aliases. You may then use this alias to refer to the
executor in your Dags (see below).
+ [core]
+ executor = my_local_exec:LocalExecutor,my_celery_exec:CeleryExecutor
+
+Aliasing core executors is particularly useful when the same executor is used
at both the global and team level when
+running Multi-Team Airflow, since it allows tasks to explicitly target a
specific instance by alias:
.. code-block:: ini
[core]
- executor = LocalExecutor,ShortName:my.custom.module.ExecutorClass
+ executor =
global_celery_exec:CeleryExecutor;team1=team_celery_exec:CeleryExecutor
+
.. note::
- If a Dag specifies a task to use an executor that is not configured, the
Dag will fail to parse and a warning dialog will be shown in the Airflow UI.
Please ensure that all executors you wish to use are specified in Airflow
configuration on *any* host/container that is running an Airflow component
(scheduler, workers, etc).
+ Using two instances of the same Executor class is only supported in
multi-team Airflow. For example: Two separate
+ teams can both use the CeleryExecutor but one single team cannot use two
instances of the CeleryExecutor. An executor
+ can also be used globally and in teams at the same time. Please see the
:doc:`Multi-Team Airflow documentation </core-concepts/multi-team>` for
+ more details on this.
+
Writing Dags and Tasks
^^^^^^^^^^^^^^^^^^^^^^
+.. note::
+ If a Dag specifies a task to use an executor that is not configured, the
Dag will fail to parse and a warning dialog will be shown in the Airflow UI.
Please ensure that all executors you wish to use are specified in Airflow
configuration on *any* host/container that is running an Airflow component
(scheduler, workers, etc).
+
+
To specify an executor for a task, make use of the executor parameter on
Airflow Operators:
.. code-block:: python
diff --git a/airflow-core/docs/core-concepts/multi-team.rst
b/airflow-core/docs/core-concepts/multi-team.rst
index 65bc0af245f..0f8a7205673 100644
--- a/airflow-core/docs/core-concepts/multi-team.rst
+++ b/airflow-core/docs/core-concepts/multi-team.rst
@@ -283,6 +283,66 @@ Example configurations:
# Invalid: Duplicate Executor within a Team
executor =
LocalExecutor;team_a=CeleryExecutor,CeleryExecutor;team_b=LocalExecutor
+Aliasing Executors Across Teams
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+When the same executor type is used at both the global and team level (e.g.,
``LocalExecutor`` globally and
+``LocalExecutor`` for a team), if tasks wish to target the global executor
they need a way to distinguish between the
+two instances. To accomplish this, you can assign **aliases** to core
executors using the ``Alias:ExecutorName`` syntax:
+
+.. code-block:: ini
+
+ [core]
+ executor =
global_celery_exec:CeleryExecutor;team1=team_celery_exec:CeleryExecutor
+
+With this configuration:
+
+- The global ``CeleryExecutor`` is available via the alias
``global_celery_exec``
+- The team_a ``CeleryExecutor`` is available via the alias ``team_celery_exec``
+- A task in ``team_a`` that sets ``executor="team_celery_exec"``,
``executor="CeleryExecutor"``, or
``executor="airflow.providers.celery.executors.celery_executor.CeleryExecutor"``
+ will run on the **team** executor
+- A task in ``team_a`` that sets ``executor="global_celery_exec"`` will run on
the **global** executor
+
+.. code-block:: python
+
+ # Runs on the global CeleryExecutor via alias
+ BashOperator(
+ task_id="uses_global",
+ executor="global_celery_exec",
+ bash_command="echo 'running on global executor'",
+ )
+
+ # Runs on team_a's CeleryExecutor via alias
+ BashOperator(
+ task_id="use_team_alias",
+ executor="team_celery_exec",
+ bash_command="echo 'running on team executor'",
+ )
+
+ # Runs on team_a's CeleryExecutor via class name
+ BashOperator(
+ task_id="use_team_classname",
+ executor="CeleryExecutor",
+ bash_command="echo 'running on team executor'",
+ )
+
+ # Runs on team_a's CeleryExecutor via full module path
+ BashOperator(
+ task_id="use_team_module_path",
+
executor="airflow.providers.celery.executors.celery_executor.CeleryExecutor",
+ bash_command="echo 'running on team executor'",
+ )
+
+ # Also runs on team_a's CeleryExecutor (implicit team default)
+ BashOperator(
+ task_id="use_default",
+ bash_command="echo 'running on default team executor'",
+ )
+
+Aliases work with all core executors (``LocalExecutor``, ``CeleryExecutor``,
``KubernetesExecutor``, etc) as
+well as custom executor module paths. For more information on aliases and
multiple executor configuration,
+see :ref:`Using Multiple Executors Concurrently
<using-multiple-executors-concurrently>`.
+
Team-specific Executor Settings
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
diff --git a/airflow-core/src/airflow/executors/executor_loader.py
b/airflow-core/src/airflow/executors/executor_loader.py
index ef951bb1f7c..79555d49daf 100644
--- a/airflow-core/src/airflow/executors/executor_loader.py
+++ b/airflow-core/src/airflow/executors/executor_loader.py
@@ -83,34 +83,46 @@ class ExecutorLoader:
for executor_name_str in executor_names_config:
if len(split_name := executor_name_str.split(":")) == 1:
name = split_name[0]
- # Check if this is an alias for a core airflow executor,
module
- # paths won't be provided by the user in that case.
- if core_executor_module := cls.executors.get(name):
+ # Check if this is a core airflow executor name. Use
CORE_EXECUTOR_NAMES
+ # (immutable set) rather than cls.executors (mutable dict
that may contain
+ # aliases added by init_executors at runtime).
+ if name in CORE_EXECUTOR_NAMES:
executor_names_per_team.append(
- ExecutorName(module_path=core_executor_module,
alias=name, team_name=team_name)
+ ExecutorName(module_path=cls.executors[name],
alias=name, team_name=team_name)
)
# A module path was provided
else:
executor_names_per_team.append(
ExecutorName(alias=None, module_path=name,
team_name=team_name)
)
- # An alias was provided with the module path
+ # An alias was provided with the module path or core executor
name
elif len(split_name) == 2:
- # Ensure the user is not trying to override the existing
aliases of any of the core
- # executors by providing an alias along with the existing
core airflow executor alias
- # (e.g. my_local_exec_alias:LocalExecutor). Allowing this
makes things unnecessarily
- # complicated. Multiple Executors of the same type will be
supported by a future
- # multitenancy AIP.
- # The module component should always be a module path.
- module_path = split_name[1]
- if not module_path or module_path in CORE_EXECUTOR_NAMES
or "." not in module_path:
+ alias = split_name[0]
+ module_or_name = split_name[1]
+ if not module_or_name:
raise AirflowConfigException(
"Incorrectly formatted executor configuration.
Second portion of an executor "
- f"configuration must be a module path but
received: {module_path}"
+ f"configuration must be a core executor name or
module path but received: {module_or_name}"
+ )
+ # Check if the second part is a core executor name (e.g.
"MyAlias:LocalExecutor").
+ # If so, resolve it to its module path. Use
CORE_EXECUTOR_NAMES (immutable set)
+ # rather than cls.executors (mutable dict that may contain
aliases added by
+ # init_executors at runtime).
+ if module_or_name in CORE_EXECUTOR_NAMES:
+ executor_names_per_team.append(
+ ExecutorName(
+ alias=alias,
module_path=cls.executors[module_or_name], team_name=team_name
+ )
+ )
+ elif "." not in module_or_name:
+ raise AirflowConfigException(
+ "Incorrectly formatted executor configuration.
Second portion of an executor "
+ f"configuration must be a core executor name or
module path but received: {module_or_name}"
+ )
+ else:
+ executor_names_per_team.append(
+ ExecutorName(alias=alias,
module_path=module_or_name, team_name=team_name)
)
- executor_names_per_team.append(
- ExecutorName(alias=split_name[0],
module_path=split_name[1], team_name=team_name)
- )
else:
raise AirflowConfigException(
f"Incorrectly formatted executor configuration:
{executor_name_str}"
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index b77f87f6fb1..367447968c2 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -3326,10 +3326,12 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
if _executor.name and workload.get_executor_name() in (
_executor.name.alias,
_executor.name.module_path,
+ _executor.name.module_path.split(".")[-1],
):
# The executor must either match the team or be global
(i.e. team_name is None)
if team_name and _executor.team_name == team_name or
_executor.team_name is None:
executor = _executor
+ break
if executor is not None:
self.log.debug(
diff --git a/airflow-core/tests/unit/executors/test_executor_loader.py
b/airflow-core/tests/unit/executors/test_executor_loader.py
index 07efc11b9a3..c0c0a2f6331 100644
--- a/airflow-core/tests/unit/executors/test_executor_loader.py
+++ b/airflow-core/tests/unit/executors/test_executor_loader.py
@@ -231,6 +231,81 @@ class TestExecutorLoader:
],
id="core_executors_and_custom_module_path_executor_with_aliases_per_team",
),
+ pytest.param(
+ "my_local:LocalExecutor, CeleryExecutor",
+ [
+ ExecutorName(
+
module_path="airflow.executors.local_executor.LocalExecutor",
+ alias="my_local",
+ team_name=None,
+ ),
+ ExecutorName(
+
module_path="airflow.providers.celery.executors.celery_executor.CeleryExecutor",
+ alias="CeleryExecutor",
+ team_name=None,
+ ),
+ ],
+ id="aliased_core_executor",
+ ),
+ pytest.param(
+ "my_celery:CeleryExecutor, my_local:LocalExecutor",
+ [
+ ExecutorName(
+
module_path="airflow.providers.celery.executors.celery_executor.CeleryExecutor",
+ alias="my_celery",
+ team_name=None,
+ ),
+ ExecutorName(
+
module_path="airflow.executors.local_executor.LocalExecutor",
+ alias="my_local",
+ team_name=None,
+ ),
+ ],
+ id="multiple_aliased_core_executors",
+ ),
+ pytest.param(
+ "=GlobalLocal:LocalExecutor;team_a=TeamLocal:LocalExecutor",
+ [
+ ExecutorName(
+
module_path="airflow.executors.local_executor.LocalExecutor",
+ alias="GlobalLocal",
+ team_name=None,
+ ),
+ ExecutorName(
+
module_path="airflow.executors.local_executor.LocalExecutor",
+ alias="TeamLocal",
+ team_name="team_a",
+ ),
+ ],
+ id="aliased_core_executor_global_and_team",
+ ),
+ pytest.param(
+ "=LocalExecutor;team_a=TeamLocal:LocalExecutor",
+ [
+ ExecutorName(
+
module_path="airflow.executors.local_executor.LocalExecutor",
+ alias="LocalExecutor",
+ team_name=None,
+ ),
+ ExecutorName(
+
module_path="airflow.executors.local_executor.LocalExecutor",
+ alias="TeamLocal",
+ team_name="team_a",
+ ),
+ ],
+ id="core_executor_global_and_aliased_core_executor_team",
+ ),
+ pytest.param(
+ "my_k8s:KubernetesExecutor",
+ [
+ ExecutorName(
+
module_path="airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor",
+ alias="my_k8s",
+ team_name=None,
+ ),
+ ],
+ id="aliased_kubernetes_executor",
+ ),
],
)
def test_get_hybrid_executors_from_configs(self, executor_config,
expected_executors_list):
@@ -276,8 +351,8 @@ class TestExecutorLoader:
"LocalExecutor, Ce:ler:yExecutor",
"LocalExecutor, CeleryExecutor:",
"LocalExecutor, my_cool_alias:",
- "LocalExecutor, my_cool_alias:CeleryExecutor",
"LocalExecutor, module.path.first:alias_second",
+ "LocalExecutor, my_cool_alias:not_a_core_or_module",
],
)
def
test_get_hybrid_executors_from_config_core_executors_bad_config_format(self,
executor_config):
@@ -340,6 +415,45 @@ class TestExecutorLoader:
LocalExecutor,
)
+ def test_load_executor_aliased_core_executor(self):
+ """Test that a core executor can be aliased and loaded by its custom
alias."""
+ with conf_vars({("core", "executor"): "my_local:LocalExecutor"}):
+ executor_loader.ExecutorLoader.init_executors()
+ # Can load by alias
+ executor = executor_loader.ExecutorLoader.load_executor("my_local")
+ assert isinstance(executor, LocalExecutor)
+ assert executor.name.alias == "my_local"
+ assert executor.name.module_path ==
"airflow.executors.local_executor.LocalExecutor"
+ # Can also load by module path
+ assert isinstance(
+ executor_loader.ExecutorLoader.load_executor(
+ "airflow.executors.local_executor.LocalExecutor"
+ ),
+ LocalExecutor,
+ )
+ # Can also load by class name
+ assert isinstance(
+ executor_loader.ExecutorLoader.load_executor("LocalExecutor"),
+ LocalExecutor,
+ )
+
+ @pytest.mark.parametrize(
+ "executor_config",
+ [
+ "my_local:LocalExecutor, LocalExecutor",
+ "alias1:LocalExecutor, alias2:LocalExecutor",
+ "my_local:LocalExecutor,
airflow.executors.local_executor.LocalExecutor",
+ "alias1:CeleryExecutor, alias2:CeleryExecutor",
+ ],
+ )
+ def test_aliased_core_executor_duplicate_in_same_team_fails(self,
executor_config):
+ """Test that two executors of the same type within the same team
always fails (duplicate)."""
+ with conf_vars({("core", "executor"): executor_config}):
+ with pytest.raises(
+ AirflowConfigException, match=r".+Duplicate executors are not
yet supported.+"
+ ):
+ executor_loader.ExecutorLoader._get_executor_names()
+
@mock.patch(
"airflow.executors.executor_loader.ExecutorLoader._get_executor_names",
wraps=executor_loader.ExecutorLoader._get_executor_names,
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index 7de30013ec5..275f85fd83f 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -8369,6 +8369,37 @@ class TestSchedulerJob:
assert result == mock_executors[1]
+ @conf_vars({("core", "multi_team"): "false"})
+ def test_try_to_load_executor_matches_by_classname(self, dag_maker,
mock_executors, session):
+ """Test that executor lookup matches by classname when alias and
module_path don't match.
+
+ This covers the edge case where a user aliases a core executor (e.g.
+ ``global_exec:LocalExecutor;team1=team_exec:LocalExecutor``) but a
task specifies
+ ``executor="LocalExecutor"`` (the classname). The scheduler should
still find the
+ executor by matching the last component of the module_path (the
classname).
+ """
+ # Set up the mock executors with aliases that differ from the classname
+ mock_executors[0].name = ExecutorName(
+ alias="global_exec",
module_path="airflow.executors.local_executor.LocalExecutor"
+ )
+ mock_executors[1].name = ExecutorName(
+ alias="team_exec",
module_path="airflow.executors.local_executor.LocalExecutor"
+ )
+
+ with dag_maker(dag_id="test_dag", session=session):
+ task = EmptyOperator(task_id="test_task", executor="LocalExecutor")
+
+ dr = dag_maker.create_dagrun()
+ ti = dr.get_task_instance(task.task_id, session)
+
+ scheduler_job = Job()
+ self.job_runner = SchedulerJobRunner(job=scheduler_job)
+
+ result = self.job_runner._try_to_load_executor(ti, session)
+
+ # Should match by classname (last component of module_path) and return
the global executor
+ assert result == mock_executors[0]
+
@conf_vars({("core", "multi_team"): "true"})
def test_multi_team_scheduling_loop_batch_optimization(self, dag_maker,
mock_executors, session):
"""Test that the scheduling loop uses batch team resolution
optimization."""