feluelle commented on a change in pull request #6696: [AIRFLOW-6128] Simplify 
plugins_manager and webserver plugin code
URL: https://github.com/apache/airflow/pull/6696#discussion_r363356576
 
 

 ##########
 File path: airflow/plugins_manager.py
 ##########
 @@ -159,161 +143,195 @@ def is_valid_plugin(plugin_obj, existing_plugins):
     return False
 
 
-plugins = []  # type: List[AirflowPlugin]
+plugins: List[AirflowPlugin] = []
+stat_name_handlers: List[Callable[[str], str]] = []
+
+
+def load_entrypoint_plugins(entry_points: Generator[EntryPoint, None, None]) 
-> None:
+    """
+    Load AirflowPlugin subclasses from the entrypoints
+    provided. The entry_point group should be 'airflow.plugins'.
+
+    :param entry_points: A collection of entrypoints to search for plugins
+    :type entry_points: Generator[setuptools.EntryPoint, None, None]
+    """
+    for entry_point in entry_points:
+        log.debug('Importing entry_point plugin %s', entry_point.name)
+        plugin_obj = entry_point.load()
+        if is_valid_plugin(plugin_obj, plugins):
+            if callable(getattr(plugin_obj, 'on_load', None)):
+                plugin_obj.on_load()
+                plugins.append(plugin_obj)
+
 
-norm_pattern = re.compile(r'[/|.]')
+load_entrypoint_plugins(pkg_resources.iter_entry_points('airflow.plugins'))
 
-assert settings.PLUGINS_FOLDER, "Plugins folder is not set"
 
-# Crawl through the plugins folder to find AirflowPlugin derivatives
-for root, dirs, files in os.walk(settings.PLUGINS_FOLDER, followlinks=True):
-    for f in files:
-        filepath = os.path.join(root, f)
-        try:
-            if not os.path.isfile(filepath):
-                continue
-            mod_name, file_ext = os.path.splitext(
-                os.path.split(filepath)[-1])
-            if file_ext != '.py':
-                continue
+def import_plugin(filepath: str, mod_name: str, root: str) -> None:
+    """Imports plugin module."""
+    log.debug('Importing plugin module %s', filepath)
+    # normalize root path as namespace
+    namespace = '_'.join([re.sub(NORM_PATTERN, '__', root), mod_name])
+    # noinspection PyDeprecation
+    module = imp.load_source(namespace, filepath)
+    for obj in list(module.__dict__.values()):
+        if is_valid_plugin(obj, plugins):
+            plugins.append(obj)
 
-            log.debug('Importing plugin module %s', filepath)
-            # normalize root path as namespace
-            namespace = '_'.join([re.sub(norm_pattern, '__', root), mod_name])
 
-            m = imp.load_source(namespace, filepath)
-            for obj in list(m.__dict__.values()):
-                if is_valid_plugin(obj, plugins):
-                    plugins.append(obj)
-        except Exception as e:  # pylint: disable=broad-except
-            log.exception(e)
-            path = filepath or str(f)
-            log.error('Failed to import plugin %s', path)
-            import_errors[path] = str(e)
+def try_to_import_plugin(filepath: str, root: str) -> None:
+    """Tries to import plugin from the file specified. Skips it if not a valid 
python file."""
+    if not os.path.isfile(filepath):
+        return
+    mod_name, file_ext = os.path.splitext(os.path.split(filepath)[-1])
+    if file_ext != '.py':
+        return
+    import_plugin(filepath=filepath, mod_name=mod_name, root=root)
 
-plugins = load_entrypoint_plugins(
-    pkg_resources.iter_entry_points('airflow.plugins'),
-    plugins
-)
+
+def find_airflow_plugins() -> None:
+    """Crawl through the plugins folder to find AirflowPlugin derivatives"""
+    assert settings.PLUGINS_FOLDER, "Plugins folder is not set"
+    for root, _, files in os.walk(settings.PLUGINS_FOLDER, followlinks=True):
+        for f in files:
+            try:
+                try_to_import_plugin(filepath=os.path.join(root, f), root=root)
+            except Exception as e:  # pylint: disable=broad-except
+                log.exception(e)
+                log.error('Failed to import plugin %s in root %s', str(f), 
root)
+                import_errors[str(f)] = str(e)
 
 
-# pylint: disable=protected-access
 # noinspection Mypy,PyTypeHints
-def make_module(name: str, objects: List[Any]):
+def make_module(name: str, objects: List[Any]) -> ModuleType:
     """Creates new module."""
     log.debug('Creating module %s', name)
     name = name.lower()
+    # noinspection PyDeprecation
     module = imp.new_module(name)
+    # pylint: disable=protected-access
     module._name = name.split('.')[-1]  # type: ignore
     module._objects = objects           # type: ignore
+    # pylint: enable=protected-access
     module.__dict__.update((o.__name__, o) for o in objects)
     return module
-# pylint: enable=protected-access
+
+
+def prepare_plugin_links() -> None:
+    """Prepares links for plugins."""
+    for plugin in plugins:
+        if not plugin.name:
+            raise AirflowPluginException("Plugin name is missing.")
+        plugin_name: str = plugin.name
+        operators_modules.append(
+            make_module('airflow.operators.' + plugin_name, plugin.operators + 
plugin.sensors))
+        sensors_modules.append(
+            make_module('airflow.sensors.' + plugin_name, plugin.sensors)
+        )
+        hooks_modules.append(make_module('airflow.hooks.' + plugin_name, 
plugin.hooks))
+        executors_modules.append(
+            make_module('airflow.executors.' + plugin_name, plugin.executors))
+        macros_modules.append(make_module('airflow.macros.' + plugin_name, 
plugin.macros))
+
+        admin_views.extend(plugin.admin_views)
+        menu_links.extend(plugin.menu_links)
+        flask_appbuilder_views.extend(plugin.appbuilder_views)
+        flask_appbuilder_menu_links.extend(plugin.appbuilder_menu_items)
+        flask_blueprints.extend([{
+            'name': plugin.name,
+            'blueprint': bp
+        } for bp in plugin.flask_blueprints])
+        if plugin.stat_name_handler:
+            stat_name_handlers.append(plugin.stat_name_handler)
+        global_operator_extra_links.extend(plugin.global_operator_extra_links)
+        # Only register Operator links if its ``operators`` property is not an 
empty list
+        # So that we can only attach this links to a specific Operator
+        operator_extra_links.extend([
+            ope for ope in plugin.operator_extra_links if ope.operators])
+        registered_operator_link_classes.update({
+            "{}.{}".format(link.__class__.__module__,
+                           link.__class__.__name__): link.__class__
+            for link in plugin.operator_extra_links
+        })
 
 
 # Plugin components to integrate as modules
-operators_modules = []
-sensors_modules = []
-hooks_modules = []
-executors_modules = []
-macros_modules = []
+operators_modules: List[ModuleType] = []
+sensors_modules: List[ModuleType] = []
+hooks_modules: List[ModuleType] = []
+executors_modules: List[ModuleType] = []
+macros_modules: List[ModuleType] = []
 
 # Plugin components to integrate directly
 admin_views: List[Any] = []
 flask_blueprints: List[Any] = []
 menu_links: List[Any] = []
 flask_appbuilder_views: List[Any] = []
 flask_appbuilder_menu_links: List[Any] = []
-stat_name_handler: Any = None
 global_operator_extra_links: List[Any] = []
 operator_extra_links: List[Any] = []
+
 registered_operator_link_classes: Dict[str, Type] = {}
 """Mapping of class names to class of OperatorLinks registered by plugins.
 
 Used by the DAG serialization code to only allow specific classes to be created
 during deserialization
 """
 
-stat_name_handlers = []
-for p in plugins:
-    if not p.name:
-        raise AirflowPluginException("Plugin name is missing.")
-    plugin_name: str = p.name
-    operators_modules.append(
-        make_module('airflow.operators.' + plugin_name, p.operators + 
p.sensors))
-    sensors_modules.append(
-        make_module('airflow.sensors.' + plugin_name, p.sensors)
-    )
-    hooks_modules.append(make_module('airflow.hooks.' + plugin_name, p.hooks))
-    executors_modules.append(
-        make_module('airflow.executors.' + plugin_name, p.executors))
-    macros_modules.append(make_module('airflow.macros.' + plugin_name, 
p.macros))
-
-    admin_views.extend(p.admin_views)
-    menu_links.extend(p.menu_links)
-    flask_appbuilder_views.extend(p.appbuilder_views)
-    flask_appbuilder_menu_links.extend(p.appbuilder_menu_items)
-    flask_blueprints.extend([{
-        'name': p.name,
-        'blueprint': bp
-    } for bp in p.flask_blueprints])
-    if p.stat_name_handler:
-        stat_name_handlers.append(p.stat_name_handler)
-    global_operator_extra_links.extend(p.global_operator_extra_links)
-    operator_extra_links.extend([ope for ope in p.operator_extra_links])
+find_airflow_plugins()
+prepare_plugin_links()
 
-    registered_operator_link_classes.update({
-        "{}.{}".format(link.__class__.__module__,
-                       link.__class__.__name__): link.__class__
-        for link in p.operator_extra_links
-    })
 
-if len(stat_name_handlers) > 1:
-    raise AirflowPluginException(
-        'Specified more than one stat_name_handler ({}) '
-        'is not allowed.'.format(stat_name_handlers))
+def get_stat_name_handler() -> Optional[Callable[[str], str]]:
+    """Retrieves the single configured stats_handler (there can be only 
one!)"""
+    assert len(stat_name_handlers) in {0, 1}, \
+        f'Specified more than one stat_name_handler ({stat_name_handlers}) 
which is not allowed.'
 
 Review comment:
   > I thought we did decide to not use assert for raising exceptions.
   
   Same here.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to