This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 14a613fc7d Allow executors to be specified with only the class name of 
the Executor (#40131)
14a613fc7d is described below

commit 14a613fc7dd148b9721e011ec629cb373d0d3c2e
Author: Syed Hussain <[email protected]>
AuthorDate: Sat Jun 8 14:11:30 2024 -0700

    Allow executors to be specified with only the class name of the Executor 
(#40131)
    
    * Allow executors to be specified with only the class name of the Executor
    
    * Fix unit test by mocking loaded Executor, so it doesn't access the DB
    
    * Update doc string to include passing class name as a valid way to specify 
an executor
---
 airflow/executors/executor_loader.py    |  7 ++++++-
 tests/executors/test_executor_loader.py | 25 +++++++++++++++++++++++++
 2 files changed, 31 insertions(+), 1 deletion(-)

diff --git a/airflow/executors/executor_loader.py 
b/airflow/executors/executor_loader.py
index 5fb9f90d4f..b8b886954b 100644
--- a/airflow/executors/executor_loader.py
+++ b/airflow/executors/executor_loader.py
@@ -50,6 +50,7 @@ if TYPE_CHECKING:
 # executor may have both so we need two lookup dicts.
 _alias_to_executors: dict[str, ExecutorName] = {}
 _module_to_executors: dict[str, ExecutorName] = {}
+_classname_to_executors: dict[str, ExecutorName] = {}
 # Used to cache the computed ExecutorNames so that we don't need to read/parse 
config more than once
 _executor_names: list[ExecutorName] = []
 # Used to cache executors so that we don't construct executor objects 
unnecessarily
@@ -149,6 +150,7 @@ class ExecutorLoader:
                 _alias_to_executors[executor_name.alias] = executor_name
             # All executors will have a module path
             _module_to_executors[executor_name.module_path] = executor_name
+            _classname_to_executors[executor_name.module_path.split(".")[-1]] 
= executor_name
             # Cache the executor names, so the logic of this method only runs 
once
             _executor_names.append(executor_name)
 
@@ -201,6 +203,8 @@ class ExecutorLoader:
             return executor_name
         elif executor_name := _module_to_executors.get(executor_name_str):
             return executor_name
+        elif executor_name := _classname_to_executors.get(executor_name_str):
+            return executor_name
         else:
             raise AirflowException(f"Unknown executor being loaded: 
{executor_name_str}")
 
@@ -212,7 +216,8 @@ class ExecutorLoader:
         This supports the following formats:
         * by executor name for core executor
         * by ``{plugin_name}.{class_name}`` for executor from plugins
-        * by import path.
+        * by import path
+        * by class name of the Executor
         * by ExecutorName object specification
 
         :return: an instance of executor class via executor_name
diff --git a/tests/executors/test_executor_loader.py 
b/tests/executors/test_executor_loader.py
index 25fb08b008..01b7d9fee0 100644
--- a/tests/executors/test_executor_loader.py
+++ b/tests/executors/test_executor_loader.py
@@ -28,6 +28,7 @@ from airflow.exceptions import AirflowConfigException
 from airflow.executors import executor_loader
 from airflow.executors.executor_loader import ConnectorSource, ExecutorLoader, 
ExecutorName
 from airflow.executors.local_executor import LocalExecutor
+from airflow.providers.amazon.aws.executors.ecs.ecs_executor import 
AwsEcsExecutor
 from airflow.providers.celery.executors.celery_executor import CeleryExecutor
 from tests.test_utils.config import conf_vars
 
@@ -334,3 +335,27 @@ class TestExecutorLoader:
                 assert isinstance(
                     
ExecutorLoader.load_executor(executor_loader._executor_names[0]), LocalExecutor
                 )
+
+    
@mock.patch("airflow.providers.amazon.aws.executors.ecs.ecs_executor.AwsEcsExecutor",
 autospec=True)
+    def test_load_custom_executor_with_classname(self, mock_executor):
+        with patch.object(ExecutorLoader, "block_use_of_hybrid_exec"):
+            with conf_vars(
+                {
+                    (
+                        "core",
+                        "executor",
+                    ): 
"my_alias:airflow.providers.amazon.aws.executors.ecs.ecs_executor.AwsEcsExecutor"
+                }
+            ):
+                ExecutorLoader.init_executors()
+                assert isinstance(ExecutorLoader.load_executor("my_alias"), 
AwsEcsExecutor)
+                assert 
isinstance(ExecutorLoader.load_executor("AwsEcsExecutor"), AwsEcsExecutor)
+                assert isinstance(
+                    ExecutorLoader.load_executor(
+                        
"airflow.providers.amazon.aws.executors.ecs.ecs_executor.AwsEcsExecutor"
+                    ),
+                    AwsEcsExecutor,
+                )
+                assert isinstance(
+                    
ExecutorLoader.load_executor(executor_loader._executor_names[0]), AwsEcsExecutor
+                )

Reply via email to