This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 14a613fc7d Allow executors to be specified with only the class name of
the Executor (#40131)
14a613fc7d is described below
commit 14a613fc7dd148b9721e011ec629cb373d0d3c2e
Author: Syed Hussain <[email protected]>
AuthorDate: Sat Jun 8 14:11:30 2024 -0700
Allow executors to be specified with only the class name of the Executor
(#40131)
* Allow executors to be specified with only the class name of the Executor
* Fix unit test by mocking loaded Executor, so it doesn't access the DB
* Update doc string to include passing class name as a valid way to specify
an executor
---
airflow/executors/executor_loader.py | 7 ++++++-
tests/executors/test_executor_loader.py | 25 +++++++++++++++++++++++++
2 files changed, 31 insertions(+), 1 deletion(-)
diff --git a/airflow/executors/executor_loader.py
b/airflow/executors/executor_loader.py
index 5fb9f90d4f..b8b886954b 100644
--- a/airflow/executors/executor_loader.py
+++ b/airflow/executors/executor_loader.py
@@ -50,6 +50,7 @@ if TYPE_CHECKING:
# executor may have both so we need two lookup dicts.
_alias_to_executors: dict[str, ExecutorName] = {}
_module_to_executors: dict[str, ExecutorName] = {}
+_classname_to_executors: dict[str, ExecutorName] = {}
# Used to cache the computed ExecutorNames so that we don't need to read/parse
config more than once
_executor_names: list[ExecutorName] = []
# Used to cache executors so that we don't construct executor objects
unnecessarily
@@ -149,6 +150,7 @@ class ExecutorLoader:
_alias_to_executors[executor_name.alias] = executor_name
# All executors will have a module path
_module_to_executors[executor_name.module_path] = executor_name
+ _classname_to_executors[executor_name.module_path.split(".")[-1]]
= executor_name
# Cache the executor names, so the logic of this method only runs
once
_executor_names.append(executor_name)
@@ -201,6 +203,8 @@ class ExecutorLoader:
return executor_name
elif executor_name := _module_to_executors.get(executor_name_str):
return executor_name
+ elif executor_name := _classname_to_executors.get(executor_name_str):
+ return executor_name
else:
raise AirflowException(f"Unknown executor being loaded:
{executor_name_str}")
@@ -212,7 +216,8 @@ class ExecutorLoader:
This supports the following formats:
* by executor name for core executor
* by ``{plugin_name}.{class_name}`` for executor from plugins
- * by import path.
+ * by import path
+ * by class name of the Executor
* by ExecutorName object specification
:return: an instance of executor class via executor_name
diff --git a/tests/executors/test_executor_loader.py
b/tests/executors/test_executor_loader.py
index 25fb08b008..01b7d9fee0 100644
--- a/tests/executors/test_executor_loader.py
+++ b/tests/executors/test_executor_loader.py
@@ -28,6 +28,7 @@ from airflow.exceptions import AirflowConfigException
from airflow.executors import executor_loader
from airflow.executors.executor_loader import ConnectorSource, ExecutorLoader,
ExecutorName
from airflow.executors.local_executor import LocalExecutor
+from airflow.providers.amazon.aws.executors.ecs.ecs_executor import
AwsEcsExecutor
from airflow.providers.celery.executors.celery_executor import CeleryExecutor
from tests.test_utils.config import conf_vars
@@ -334,3 +335,27 @@ class TestExecutorLoader:
assert isinstance(
ExecutorLoader.load_executor(executor_loader._executor_names[0]), LocalExecutor
)
+
+
@mock.patch("airflow.providers.amazon.aws.executors.ecs.ecs_executor.AwsEcsExecutor",
autospec=True)
+ def test_load_custom_executor_with_classname(self, mock_executor):
+ with patch.object(ExecutorLoader, "block_use_of_hybrid_exec"):
+ with conf_vars(
+ {
+ (
+ "core",
+ "executor",
+ ):
"my_alias:airflow.providers.amazon.aws.executors.ecs.ecs_executor.AwsEcsExecutor"
+ }
+ ):
+ ExecutorLoader.init_executors()
+ assert isinstance(ExecutorLoader.load_executor("my_alias"),
AwsEcsExecutor)
+ assert
isinstance(ExecutorLoader.load_executor("AwsEcsExecutor"), AwsEcsExecutor)
+ assert isinstance(
+ ExecutorLoader.load_executor(
+
"airflow.providers.amazon.aws.executors.ecs.ecs_executor.AwsEcsExecutor"
+ ),
+ AwsEcsExecutor,
+ )
+ assert isinstance(
+
ExecutorLoader.load_executor(executor_loader._executor_names[0]), AwsEcsExecutor
+ )