This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 8bb1e8e5ca [AIP-51] Add helper to import default executor class
(#27974)
8bb1e8e5ca is described below
commit 8bb1e8e5ca2a122a5d4915b553eeca3422bc6766
Author: Niko <[email protected]>
AuthorDate: Sat Dec 3 09:05:28 2022 -0800
[AIP-51] Add helper to import default executor class (#27974)
Also add missing testing for executor loader import mechanism
---
airflow/executors/executor_loader.py | 28 ++++++++++++++++++++++------
tests/executors/test_executor_loader.py | 32 +++++++++++++++++++++++++++++++-
2 files changed, 53 insertions(+), 7 deletions(-)
diff --git a/airflow/executors/executor_loader.py
b/airflow/executors/executor_loader.py
index 56802017e4..0852b5206b 100644
--- a/airflow/executors/executor_loader.py
+++ b/airflow/executors/executor_loader.py
@@ -65,18 +65,23 @@ class ExecutorLoader:
DEBUG_EXECUTOR: "airflow.executors.debug_executor.DebugExecutor",
}
+ @classmethod
+ def get_default_executor_name(cls) -> str:
+ """Returns the default executor name from Airflow configuration.
+
+ :return: executor name from Airflow configuration
+ """
+ from airflow.configuration import conf
+
+ return conf.get_mandatory_value("core", "EXECUTOR")
+
@classmethod
def get_default_executor(cls) -> BaseExecutor:
"""Creates a new instance of the configured executor if none exists
and returns it."""
if cls._default_executor is not None:
return cls._default_executor
- from airflow.configuration import conf
-
- executor_name = conf.get_mandatory_value("core", "EXECUTOR")
- cls._default_executor = cls.load_executor(executor_name)
-
- return cls._default_executor
+ return cls.load_executor(cls.get_default_executor_name())
@classmethod
def load_executor(cls, executor_name: str) -> BaseExecutor:
@@ -134,6 +139,17 @@ class ExecutorLoader:
return import_string(f"airflow.executors.{executor_name}"),
ConnectorSource.PLUGIN
return import_string(executor_name), ConnectorSource.CUSTOM_PATH
+ @classmethod
+ def import_default_executor_cls(cls) -> tuple[type[BaseExecutor],
ConnectorSource]:
+ """
+ Imports the default executor class.
+
+ :return: executor class and executor import source
+ """
+ executor_name = cls.get_default_executor_name()
+
+ return cls.import_executor_cls(executor_name)
+
@classmethod
def __load_celery_kubernetes_executor(cls) -> BaseExecutor:
celery_executor = import_string(cls.executors[CELERY_EXECUTOR])()
diff --git a/tests/executors/test_executor_loader.py
b/tests/executors/test_executor_loader.py
index 180e7b961c..96d7030e17 100644
--- a/tests/executors/test_executor_loader.py
+++ b/tests/executors/test_executor_loader.py
@@ -21,7 +21,7 @@ from unittest import mock
import pytest
from airflow import plugins_manager
-from airflow.executors.executor_loader import ExecutorLoader
+from airflow.executors.executor_loader import ConnectorSource, ExecutorLoader
from tests.test_utils.config import conf_vars
# Plugin Manager creates new modules, which is difficult to mock, so we use
test isolation by a unique name.
@@ -73,3 +73,33 @@ class TestExecutorLoader:
executor = ExecutorLoader.get_default_executor()
assert executor is not None
assert "FakeExecutor" == executor.__class__.__name__
+
+ @pytest.mark.parametrize(
+ "executor_name",
+ [
+ "CeleryExecutor",
+ "CeleryKubernetesExecutor",
+ "DebugExecutor",
+ "KubernetesExecutor",
+ "LocalExecutor",
+ ],
+ )
+ def test_should_support_import_executor_from_core(self, executor_name):
+ with conf_vars({("core", "executor"): executor_name}):
+ executor, import_source =
ExecutorLoader.import_default_executor_cls()
+ assert executor_name == executor.__name__
+ assert import_source == ConnectorSource.CORE
+
+ @mock.patch("airflow.plugins_manager.plugins", [FakePlugin()])
+ @mock.patch("airflow.plugins_manager.executors_modules", None)
+ def test_should_support_import_plugins(self):
+ with conf_vars({("core", "executor"):
f"{TEST_PLUGIN_NAME}.FakeExecutor"}):
+ executor, import_source =
ExecutorLoader.import_default_executor_cls()
+ assert "FakeExecutor" == executor.__name__
+ assert import_source == ConnectorSource.PLUGIN
+
+ def test_should_support_import_custom_path(self):
+ with conf_vars({("core", "executor"):
"tests.executors.test_executor_loader.FakeExecutor"}):
+ executor, import_source =
ExecutorLoader.import_default_executor_cls()
+ assert "FakeExecutor" == executor.__name__
+ assert import_source == ConnectorSource.CUSTOM_PATH