This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch v1-10-stable
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v1-10-stable by this push:
     new fc40a49  Deprecate adding Operators and Sensors via plugins (#12069)
fc40a49 is described below

commit fc40a49e5b4610e12f66b90d2e24bc54fdd90ef9
Author: Ash Berlin-Taylor <[email protected]>
AuthorDate: Thu Nov 5 12:10:55 2020 +0000

    Deprecate adding Operators and Sensors via plugins (#12069)
    
    This adds two warnings -- one at registration time:
    
    ```
    /home/ash/airflow/plugins/example.py:27: FutureWarning: Registering 
operator or sensors in plugins is deprecated -- these should be treated like 
'plain' python modules, and imported normally in DAGs.
    
    Airflow 2.0 has removed the ability to register these types in plugins. See 
<http://airflow.apache.org/docs/stable/howto/custom-operator.html>.
      class LinksPlugin(AirflowPlugin):
    ```
    
    (The source/file shown here is the `class` line of the defining plugin.)
    
    And a second at import time:
    
    ```
    /home/ash/airflow/dags/foo.py:1: FutureWarning: Importing 'MyOperator' from 
under 'airflow.operators.*' has been deprecated and should be directly imported 
as 'example.MyOperator' instead.
    
    Support for importing from within the airflow namespace for plugins will be 
dropped entirely in Airflow 2.0. See 
<http://airflow.apache.org/docs/stable/howto/custom-operator.html>.
      from airflow.operators.links_plugin import MyOperator
    ```
    
    (The source here is where the import is used - i.e. in the DAG)
    
    Closes #9500
---
 airflow/operators/__init__.py |  8 ++++
 airflow/plugins_manager.py    | 90 +++++++++++++++++++++++++++++++++++++++++--
 airflow/sensors/__init__.py   |  7 ++++
 setup.py                      |  1 +
 4 files changed, 103 insertions(+), 3 deletions(-)

diff --git a/airflow/operators/__init__.py b/airflow/operators/__init__.py
index 00f34d0..a901ab9 100644
--- a/airflow/operators/__init__.py
+++ b/airflow/operators/__init__.py
@@ -21,6 +21,9 @@ import sys
 import os
 from airflow.models import BaseOperator  # noqa: F401
 
+
+PY37 = sys.version_info >= (3, 7)
+
 # ------------------------------------------------------------------------
 #
 # #TODO #FIXME Airflow 2.0
@@ -103,7 +106,12 @@ def _integrate_plugins():
     """Integrate plugins to the context"""
     from airflow.plugins_manager import operators_modules
     for operators_module in operators_modules:
+
         sys.modules[operators_module.__name__] = operators_module
+        if not PY37:
+            from pep562 import Pep562
+            operators_module = Pep562(operators_module.__name__)
+
         globals()[operators_module._name] = operators_module
 
         ##########################################################
diff --git a/airflow/plugins_manager.py b/airflow/plugins_manager.py
index 5fd680e..5b86fd1 100644
--- a/airflow/plugins_manager.py
+++ b/airflow/plugins_manager.py
@@ -28,9 +28,12 @@ import inspect
 import logging
 import os
 import re
+import sys
+import warnings
 from typing import Any, Dict, List, Type
 
 import pkg_resources
+from six import with_metaclass
 
 from airflow import settings
 from airflow.models.baseoperator import BaseOperatorLink
@@ -40,11 +43,30 @@ log = logging.getLogger(__name__)
 import_errors = {}
 
 
+PY37 = sys.version_info >= (3, 7)
+
+
 class AirflowPluginException(Exception):
     pass
 
 
-class AirflowPlugin(object):
+class _MetaPluginClass(type):
+    def __new__(cls, name, bases, props):
+        if props.get('operators', []) or props.get('sensors', []):
+            warnings.warn(
+                "Registering operators or sensors in plugins is deprecated -- 
these should be treated like "
+                "'plain' python modules, and imported normally in DAGs.\n"
+                "\n"
+                "Airflow 2.0 has removed the ability to register these types 
in plugins. See "
+                
"<http://airflow.apache.org/docs/stable/howto/custom-operator.html>.",
+                category=FutureWarning,
+                stacklevel=2,
+            )
+
+        return super(_MetaPluginClass, cls).__new__(cls, name, bases, props)
+
+
+class AirflowPlugin(with_metaclass(_MetaPluginClass, object)):
     name = None  # type: str
     operators = []  # type: List[Any]
     sensors = []  # type: List[Any]
@@ -104,9 +126,11 @@ def load_entrypoint_plugins(entry_points, airflow_plugins):
         log.debug('Importing entry_point plugin %s', entry_point.name)
         try:
             plugin_obj = entry_point.load()
+            plugin_obj.__usable_import_name = entry_point.module_name
             if is_valid_plugin(plugin_obj, airflow_plugins):
                 if callable(getattr(plugin_obj, 'on_load', None)):
                     plugin_obj.on_load()
+
                     airflow_plugins.append(plugin_obj)
         except Exception as e:  # pylint: disable=broad-except
             log.exception("Failed to import plugin %s", entry_point.name)
@@ -154,12 +178,24 @@ for root, dirs, files in os.walk(settings.PLUGINS_FOLDER, 
followlinks=True):
                 continue
 
             log.debug('Importing plugin module %s', filepath)
+
+            if mod_name == "__init__":
+                compat_import_name = root
+            else:
+                compat_import_name = os.path.join(root, mod_name)
+
+            compat_import_name = os.path.relpath(
+                compat_import_name,
+                settings.PLUGINS_FOLDER,
+            ).replace(os.sep, '.')
+
             # normalize root path as namespace
             namespace = '_'.join([re.sub(norm_pattern, '__', root), mod_name])
 
             m = imp.load_source(namespace, filepath)
             for obj in list(m.__dict__.values()):
                 if is_valid_plugin(obj, plugins):
+                    obj.__usable_import_name = compat_import_name
                     plugins.append(obj)
 
         except Exception as e:
@@ -173,6 +209,54 @@ plugins = load_entrypoint_plugins(
 )
 
 
+def make_deprecated_module(name, objects, plugin):
+    name = name.lower()
+    module = imp.new_module(name)
+    module._name = name.split('.')[-1]
+    module._objects = objects
+    objects = {o.__name__: o for o in objects}
+
+    def __getattr__(attrname):
+        """Get attribute."""
+        if attrname not in objects:
+            raise AttributeError("module '{}' has no attribute 
'{}'".format(name, attrname))
+
+        stacklevel = 2 if PY37 else 3
+
+        obj = objects[attrname]
+        # Use __qualname__ where we have it for Py 3.3+
+        obj_name = getattr(obj, '__qualname__', obj.__name__)
+
+        # Work out what the "correct" import name should be
+        if obj.__module__ == plugin.__module__:
+            # Class is defined in the plugin
+            correct_import_name = '.'.join((plugin.__usable_import_name, 
obj_name))
+        else:
+            # Class was imported from somewhere else, just direct user to use 
that instead
+            correct_import_name = '.'.join((obj.__module__, obj_name))
+
+        warnings.warn(
+            "Importing '{}' from under 'airflow.operators.*' has been 
deprecated and should be directly "
+            "imported as '{}' instead.\n"
+            "\n"
+            "Support for importing from within the airflow namespace for 
plugins will be dropped entirely "
+            "in Airflow 2.0. See 
<http://airflow.apache.org/docs/stable/howto/custom-operator.html>.".format(
+                attrname, correct_import_name
+            ),
+            category=FutureWarning,
+            stacklevel=stacklevel
+        )
+        return obj
+
+    def __dir__():
+        return objects.keys()
+
+    module.__getattr__ = __getattr__
+    module.__dir__ = __dir__
+
+    return module
+
+
 def make_module(name, objects):
     log.debug('Creating module %s', name)
     name = name.lower()
@@ -208,9 +292,9 @@ during deserialization
 
 for p in plugins:
     operators_modules.append(
-        make_module('airflow.operators.' + p.name, p.operators + p.sensors))
+        make_deprecated_module('airflow.operators.' + p.name, p.operators + 
p.sensors, p))
     sensors_modules.append(
-        make_module('airflow.sensors.' + p.name, p.sensors)
+        make_deprecated_module('airflow.sensors.' + p.name, p.sensors, p)
     )
     hooks_modules.append(make_module('airflow.hooks.' + p.name, p.hooks))
     executors_modules.append(
diff --git a/airflow/sensors/__init__.py b/airflow/sensors/__init__.py
index b9d1dbb..449ed31 100644
--- a/airflow/sensors/__init__.py
+++ b/airflow/sensors/__init__.py
@@ -20,6 +20,9 @@
 import sys
 import os as _os
 
+
+PY37 = sys.version_info >= (3, 7)
+
 _sensors = {
     'base_sensor_operator': ['BaseSensorOperator'],
     'external_task_sensor': ['ExternalTaskSensor'],
@@ -46,6 +49,10 @@ def _integrate_plugins():
     from airflow.plugins_manager import sensors_modules
     for sensors_module in sensors_modules:
         sys.modules[sensors_module.__name__] = sensors_module
+
+        if not PY37:
+            from pep562 import Pep562
+            sensors_module = Pep562(sensors_module.__name__)
         globals()[sensors_module._name] = sensors_module
 
         ##########################################################
diff --git a/setup.py b/setup.py
index 159a420..0b6480b 100644
--- a/setup.py
+++ b/setup.py
@@ -580,6 +580,7 @@ INSTALL_REQUIREMENTS = [
     'marshmallow-sqlalchemy>=0.16.1, <0.19.0;python_version<"3.6"',
     'pandas>=0.17.1, <2.0',
     'pendulum==1.4.4',
+    'pep562~=1.0;python_version<"3.7"',
     'psutil>=4.2.0, <6.0.0',
     'pygments>=2.0.1, <3.0',
     'python-daemon>=2.1.1',

Reply via email to