o-nikolas commented on code in PR #37635:
URL: https://github.com/apache/airflow/pull/37635#discussion_r1503294232


##########
airflow/executors/executor_loader.py:
##########
@@ -69,56 +71,181 @@ class ExecutorLoader:
     }
 
     @classmethod
-    def get_default_executor_name(cls) -> str:
-        """Return the default executor name from Airflow configuration.
+    def block_use_of_hybrid_exec(cls, executor_config: list):
+        """Raise an exception if the user tries to use multiple executors 
before the feature is complete.
 
-        :return: executor name from Airflow configuration
+        This check is built into a method so that it can be easily mocked in 
unit tests.
+
+        :param executor_config: core.executor configuration value.
+        """
+        if len(executor_config) > 1 or ":" in "".join(executor_config):
+            raise AirflowConfigException(
+                "Configuring multiple executors and executor aliases are not 
yet supported!: "
+                f"{executor_config}"
+            )
+
+    @classmethod
+    def _get_executor_names(cls) -> list[ExecutorName]:
+        """Return the executor names from Airflow configuration.
+
+        :return: List of executor names from Airflow configuration
         """
         from airflow.configuration import conf
 
-        return conf.get_mandatory_value("core", "EXECUTOR")
+        if _executor_names:
+            return _executor_names
+
+        executor_names_raw = conf.get_mandatory_list_value("core", "EXECUTOR")
+
+        # AIP-61 is WIP. Unblock configuring multiple executors when the 
feature is ready to launch
+        cls.block_use_of_hybrid_exec(executor_names_raw)
+
+        executor_names = []
+        for name in executor_names_raw:
+            if len(split_name := name.split(":")) == 1:
+                name = split_name[0]
+                # Check if this is an alias for a core airflow executor, module
+                # paths won't be provided by the user in that case.
+                if core_executor_module := cls.executors.get(name):
+                    executor_names.append(ExecutorName(alias=name, 
module_path=core_executor_module))
+                # Only a module path or plugin name was provided
+                else:
+                    executor_names.append(ExecutorName(alias=None, 
module_path=name))
+            # An alias was provided with the module path
+            elif len(split_name) == 2:
+                # Ensure the user is not trying to override the existing 
aliases of any of the core
+                # executors by providing an alias along with the existing core 
airflow executor alias
+                # (e.g. my_local_exec_alias:LocalExecutor). Allowing this 
makes things unnecessarily
+                # complicated. Multiple Executors of the same type will be 
supported by a future multitenancy
+                # AIP.
+                # The module component should always be a module or plugin 
path.
+                if not split_name[1] or split_name[1] in CORE_EXECUTOR_NAMES:
+                    raise AirflowConfigException(
+                        f"Incorrectly formatted executor configuration: 
{name}\n"
+                        "second portion of an executor configuration must be a 
module path"
+                    )
+                else:
+                    executor_names.append(ExecutorName(alias=split_name[0], 
module_path=split_name[1]))
+            else:
+                raise AirflowConfigException(f"Incorrectly formatted executor 
configuration: {name}")
+
+        # As of now, we do not allow duplicate executors.
+        # Add all module paths/plugin names to a set, since the actual code is 
what is unique
+        unique_modules = set([exec_name.module_path for exec_name in 
executor_names])
+        if len(unique_modules) < len(executor_names):
+            msg = (
+                "At least one executor was configured twice. Duplicate 
executors is not yet supported. "
+                "Please check your configuration again to correct the issue."
+            )
+            raise AirflowConfigException(msg)
+
+        # Populate some mappings for fast future lookups
+        for executor_name in executor_names:
+            # Executors will not always have aliases
+            if executor_name.alias:
+                _alias_to_executors[executor_name.alias] = executor_name
+            # All executors will have a module path
+            _module_to_executors[executor_name.module_path] = executor_name
+            # Cache the executor names, so the logic of this method only runs 
once
+            _executor_names.append(executor_name)
+
+        return executor_names
+
+    @classmethod
+    def get_default_executor_name(cls) -> ExecutorName:
+        """Return the default executor name from Airflow configuration.
+
+        :return: executor name from Airflow configuration
+        """
+        # The default executor is the first configured executor in the list
+        return cls._get_executor_names()[0]
 
     @classmethod
     def get_default_executor(cls) -> BaseExecutor:
         """Create a new instance of the configured executor if none exists and 
returns it."""
-        if cls._default_executor is not None:
-            return cls._default_executor
+        default_executor = cls.load_executor(cls.get_default_executor_name())
+
+        return default_executor
+
+    @classmethod
+    def init_executors(cls) -> list[BaseExecutor]:

Review Comment:
   Not used by any code here. Will be used by the scheduler and backfill job 
runners (or more precisely the jobs within).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to