kaxil commented on a change in pull request #7644: [AIRFLOW-7003] Lazy load all
plguins
URL: https://github.com/apache/airflow/pull/7644#discussion_r389819304
##########
File path: airflow/plugins_manager.py
##########
@@ -183,103 +199,117 @@ def make_module(name: str, objects: List[Any]):
# pylint: enable=protected-access
-# Plugin components to integrate as modules
-operators_modules = []
-sensors_modules = []
-hooks_modules = []
-executors_modules = []
-macros_modules = []
+def endure_plugins_loaded():
+ """
+ Load plugins from plugins directory and entrypoints.
-# Plugin components to integrate directly
-admin_views: List[Any] = []
-flask_blueprints: List[Any] = []
-menu_links: List[Any] = []
-flask_appbuilder_views: List[Any] = []
-flask_appbuilder_menu_links: List[Any] = []
-global_operator_extra_links: List[Any] = []
-operator_extra_links: List[Any] = []
-registered_operator_link_classes: Dict[str, Type] = {}
-"""Mapping of class names to class of OperatorLinks registered by plugins.
+ Plugins are only loaded if they have not been previously loaded.
+ """
+ global plugins # pylint: disable=global-statement
+
+ if plugins is not None:
+ log.debug("Plugins are already loaded. Skipping.")
+ return
+
+ if not settings.PLUGINS_FOLDER:
+ raise ValueError("Plugins folder is not set")
+
+ log.debug("Loading plugins")
+
+ plugins = []
+
+ load_plugins_from_plugin_directory()
+ load_entrypoint_plugins()
+
+ initialize_plugins()
+
+
+def initialize_plugins():
+ """Creates modules for loaded extension from plugins"""
+ # pylint: disable=global-statement
+ global plugins
+ global operators_modules
+ global sensors_modules
+ global hooks_modules
+ global executors_modules
+ global macros_modules
+
+ global admin_views
+ global flask_blueprints
+ global menu_links
+ global flask_appbuilder_views
+ global flask_appbuilder_menu_links
+ global global_operator_extra_links
+ global operator_extra_links
+ global registered_operator_link_classes
+ # pylint: enable=global-statement
+
+ log.debug("Initialize plugin modules")
+
+ for plugin in plugins:
+ plugin_name: str = plugin.name
+ operators_modules.append(
+ make_module('airflow.operators.' + plugin_name, plugin.operators +
plugin.sensors))
+ sensors_modules.append(
+ make_module('airflow.sensors.' + plugin_name, plugin.sensors)
+ )
+ hooks_modules.append(make_module('airflow.hooks.' + plugin_name,
plugin.hooks))
+ executors_modules.append(
+ make_module('airflow.executors.' + plugin_name, plugin.executors))
+ macros_modules.append(make_module('airflow.macros.' + plugin_name,
plugin.macros))
+
+ admin_views.extend(plugin.admin_views)
+ menu_links.extend(plugin.menu_links)
+ flask_appbuilder_views.extend(plugin.appbuilder_views)
+ flask_appbuilder_menu_links.extend(plugin.appbuilder_menu_items)
+ flask_blueprints.extend([{
+ 'name': plugin.name,
+ 'blueprint': bp
+ } for bp in plugin.flask_blueprints])
+ global_operator_extra_links.extend(plugin.global_operator_extra_links)
+ operator_extra_links.extend(list(plugin.operator_extra_links))
+
+ registered_operator_link_classes.update({
+ "{}.{}".format(link.__class__.__module__,
+ link.__class__.__name__): link.__class__
+ for link in plugin.operator_extra_links
+ })
-Used by the DAG serialization code to only allow specific classes to be created
-during deserialization
-"""
-for p in plugins:
- if not p.name:
- raise AirflowPluginException("Plugin name is missing.")
- plugin_name: str = p.name
- operators_modules.append(
- make_module('airflow.operators.' + plugin_name, p.operators +
p.sensors))
- sensors_modules.append(
- make_module('airflow.sensors.' + plugin_name, p.sensors)
- )
- hooks_modules.append(make_module('airflow.hooks.' + plugin_name, p.hooks))
- executors_modules.append(
- make_module('airflow.executors.' + plugin_name, p.executors))
- macros_modules.append(make_module('airflow.macros.' + plugin_name,
p.macros))
-
- admin_views.extend(p.admin_views)
- menu_links.extend(p.menu_links)
- flask_appbuilder_views.extend(p.appbuilder_views)
- flask_appbuilder_menu_links.extend(p.appbuilder_menu_items)
- flask_blueprints.extend([{
- 'name': p.name,
- 'blueprint': bp
- } for bp in p.flask_blueprints])
- global_operator_extra_links.extend(p.global_operator_extra_links)
- operator_extra_links.extend(list(p.operator_extra_links))
-
- registered_operator_link_classes.update({
- "{}.{}".format(link.__class__.__module__,
- link.__class__.__name__): link.__class__
- for link in p.operator_extra_links
- })
-
-
-def integrate_operator_plugins() -> None:
- """Integrate operators plugins to the context"""
+def integrate_executor_plugins() -> None:
+ """Integrate executor plugins to the context."""
+ endure_plugins_loaded()
+
+ log.debug("Integrate executor plugins")
+
+ for executors_module in executors_modules:
+ sys.modules[executors_module.__name__] = executors_module
+ # noinspection PyProtectedMember
+ globals()[executors_module._name] = executors_module # pylint:
disable=protected-access
+
+
+def integrate_dag_plugins() -> None:
+ """Integrates operator, sensor, hook, macro plugins."""
+ endure_plugins_loaded()
Review comment:
```suggestion
ensure_plugins_loaded()
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services