syedahsn commented on code in PR #37635:
URL: https://github.com/apache/airflow/pull/37635#discussion_r1511880593


##########
airflow/executors/executor_loader.py:
##########
@@ -69,56 +71,181 @@ class ExecutorLoader:
     }
 
     @classmethod
-    def get_default_executor_name(cls) -> str:
-        """Return the default executor name from Airflow configuration.
+    def block_use_of_hybrid_exec(cls, executor_config: list):
+        """Raise an exception if the user tries to use multiple executors 
before the feature is complete.
 
-        :return: executor name from Airflow configuration
+        This check is built into a method so that it can be easily mocked in 
unit tests.
+
+        :param executor_config: core.executor configuration value.
+        """
+        if len(executor_config) > 1 or ":" in "".join(executor_config):
+            raise AirflowConfigException(
+                "Configuring multiple executors and executor aliases are not 
yet supported!: "
+                f"{executor_config}"
+            )
+
+    @classmethod
+    def _get_executor_names(cls) -> list[ExecutorName]:
+        """Return the executor names from Airflow configuration.
+
+        :return: List of executor names from Airflow configuration
         """
         from airflow.configuration import conf
 
-        return conf.get_mandatory_value("core", "EXECUTOR")
+        if _executor_names:
+            return _executor_names
+
+        executor_names_raw = conf.get_mandatory_list_value("core", "EXECUTOR")
+
+        # AIP-61 is WIP. Unblock configuring multiple executors when the 
feature is ready to launch
+        cls.block_use_of_hybrid_exec(executor_names_raw)
+
+        executor_names = []
+        for name in executor_names_raw:
+            if len(split_name := name.split(":")) == 1:
+                name = split_name[0]
+                # Check if this is an alias for a core airflow executor, module
+                # paths won't be provided by the user in that case.
+                if core_executor_module := cls.executors.get(name):
+                    executor_names.append(ExecutorName(alias=name, 
module_path=core_executor_module))
+                # Only a module path or plugin name was provided
+                else:
+                    executor_names.append(ExecutorName(alias=None, 
module_path=name))
+            # An alias was provided with the module path
+            elif len(split_name) == 2:
+                # Ensure the user is not trying to override the existing 
aliases of any of the core
+                # executors by providing an alias along with the existing core 
airflow executor alias
+                # (e.g. my_local_exec_alias:LocalExecutor). Allowing this 
makes things unnecessarily
+                # complicated. Multiple Executors of the same type will be 
supported by a future multitenancy
+                # AIP.
+                # The module component should always be a module or plugin 
path.
+                if not split_name[1] or split_name[1] in CORE_EXECUTOR_NAMES:
+                    raise AirflowConfigException(
+                        f"Incorrectly formatted executor configuration: 
{name}\n"
+                        "second portion of an executor configuration must be a 
module path"
+                    )
+                else:
+                    executor_names.append(ExecutorName(alias=split_name[0], 
module_path=split_name[1]))
+            else:
+                raise AirflowConfigException(f"Incorrectly formatted executor 
configuration: {name}")
+
+        # As of now, we do not allow duplicate executors.
+        # Add all module paths/plugin names to a set, since the actual code is 
what is unique
+        unique_modules = set([exec_name.module_path for exec_name in 
executor_names])
+        if len(unique_modules) < len(executor_names):
+            msg = (
+                "At least one executor was configured twice. Duplicate 
executors is not yet supported. "
+                "Please check your configuration again to correct the issue."
+            )
+            raise AirflowConfigException(msg)
+
+        # Populate some mappings for fast future lookups
+        for executor_name in executor_names:
+            # Executors will not always have aliases
+            if executor_name.alias:
+                _alias_to_executors[executor_name.alias] = executor_name
+            # All executors will have a module path
+            _module_to_executors[executor_name.module_path] = executor_name
+            # Cache the executor names, so the logic of this method only runs 
once
+            _executor_names.append(executor_name)
+
+        return executor_names
+
+    @classmethod
+    def get_default_executor_name(cls) -> ExecutorName:
+        """Return the default executor name from Airflow configuration.
+
+        :return: executor name from Airflow configuration
+        """
+        # The default executor is the first configured executor in the list
+        return cls._get_executor_names()[0]
 
     @classmethod
     def get_default_executor(cls) -> BaseExecutor:
         """Create a new instance of the configured executor if none exists and 
returns it."""
-        if cls._default_executor is not None:
-            return cls._default_executor
+        default_executor = cls.load_executor(cls.get_default_executor_name())
+
+        return default_executor
+
+    @classmethod
+    def init_executors(cls) -> list[BaseExecutor]:
+        """Create a new instance of all configured executors if not cached 
already."""
+        executor_names = cls._get_executor_names()
+        loaded_executors = []
+        for executor_name in executor_names:
+            loaded_executor = cls.load_executor(executor_name.module_path)
+            if executor_name.alias:
+                cls.executors[executor_name.alias] = executor_name.module_path
+            else:
+                cls.executors[loaded_executor.__class__.__name__] = 
executor_name.module_path
 
-        return cls.load_executor(cls.get_default_executor_name())
+            loaded_executors.append(loaded_executor)
+
+        return loaded_executors
+
+    @classmethod
+    def lookup_executor_name_by_str(cls, executor_name_str: str) -> 
ExecutorName:
+        # lookup the executor by alias first, if not check if we're given a 
module path
+        if executor_name := _alias_to_executors.get(executor_name_str):
+            return executor_name
+        elif executor_name := _module_to_executors.get(executor_name_str):
+            return executor_name
+        else:
+            raise AirflowException(f"Unknown executor being loaded: 
{executor_name}")
 
     @classmethod
-    def load_executor(cls, executor_name: str) -> BaseExecutor:
+    def load_executor(cls, executor_name: ExecutorName | str) -> BaseExecutor:
         """
         Load the executor.
 
         This supports the following formats:
         * by executor name for core executor
         * by ``{plugin_name}.{class_name}`` for executor from plugins
         * by import path.
+        * by ExecutorName object specification
 
         :return: an instance of executor class via executor_name
         """
-        if executor_name == CELERY_KUBERNETES_EXECUTOR:
-            return cls.__load_celery_kubernetes_executor()
-        elif executor_name == LOCAL_KUBERNETES_EXECUTOR:
-            return cls.__load_local_kubernetes_executor()
+        if isinstance(executor_name, str):
+            _executor_name = cls.lookup_executor_name_by_str(executor_name)
+        else:
+            _executor_name = executor_name
+
+        # Check if the executor has been previously loaded. Avoid constructing 
a new object
+        if _executor_name in _loaded_executors:
+            return _loaded_executors[_executor_name]
 
         try:
-            executor_cls, import_source = 
cls.import_executor_cls(executor_name)
-            log.debug("Loading executor %s from %s", executor_name, 
import_source.value)
+            if _executor_name.alias == CELERY_KUBERNETES_EXECUTOR:

Review Comment:
   Just wondering why these executors are mentioned here by name. Is this 
introducing coupling?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to