This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v1-10-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit ea32d0d83ce915798ba9779dbf7c1df9faf7c241 Author: Tomek Urbaszek <[email protected]> AuthorDate: Thu Aug 13 20:09:44 2020 +0200 [AIRFLOW-6706] Lazy load operator extra links (#7327) (#10318) Co-authored-by: Kamil BreguĊa <[email protected]> Backported from https://github.com/apache/airflow/pull/7327 cherry-picked from b180e4b --- airflow/operators/__init__.py | 4 +--- airflow/plugins_manager.py | 29 +---------------------------- airflow/serialization/serialized_objects.py | 27 ++++++++++++++++++++------- 3 files changed, 22 insertions(+), 38 deletions(-) diff --git a/airflow/operators/__init__.py b/airflow/operators/__init__.py index fb5383f..00f34d0 100644 --- a/airflow/operators/__init__.py +++ b/airflow/operators/__init__.py @@ -101,7 +101,7 @@ if not os.environ.get('AIRFLOW_USE_NEW_IMPORTS', False): def _integrate_plugins(): """Integrate plugins to the context""" - from airflow.plugins_manager import operators_modules, register_inbuilt_operator_links + from airflow.plugins_manager import operators_modules for operators_module in operators_modules: sys.modules[operators_module.__name__] = operators_module globals()[operators_module._name] = operators_module @@ -121,5 +121,3 @@ def _integrate_plugins(): "import from 'airflow.operators.[plugin_module]' " "instead. Support for direct imports will be dropped " "entirely in Airflow 2.0.".format(i=operator_name)) - - register_inbuilt_operator_links() diff --git a/airflow/plugins_manager.py b/airflow/plugins_manager.py index b70517b..5fd680e 100644 --- a/airflow/plugins_manager.py +++ b/airflow/plugins_manager.py @@ -28,7 +28,7 @@ import inspect import logging import os import re -from typing import Any, Dict, List, Set, Type +from typing import Any, Dict, List, Type import pkg_resources @@ -114,33 +114,6 @@ def load_entrypoint_plugins(entry_points, airflow_plugins): return airflow_plugins -def register_inbuilt_operator_links(): - """ - Register all the Operators Links that are already defined for the operators - in the "airflow" project. Example: QDSLink (Operator Link for Qubole Operator) - - This is required to populate the "allowed list" of allowed classes when deserializing operator links - """ - inbuilt_operator_links = set() # type: Set[Type] - - try: - from airflow.contrib.operators.bigquery_operator import BigQueryConsoleLink, BigQueryConsoleIndexableLink # noqa E501 # pylint: disable=R0401,line-too-long - inbuilt_operator_links.update([BigQueryConsoleLink, BigQueryConsoleIndexableLink]) - except ImportError: - pass - - try: - from airflow.contrib.operators.qubole_operator import QDSLink # pylint: disable=R0401 - inbuilt_operator_links.update([QDSLink]) - except ImportError: - pass - - registered_operator_link_classes.update({ - "{}.{}".format(link.__module__, link.__name__): link - for link in inbuilt_operator_links - }) - - def is_valid_plugin(plugin_obj, existing_plugins): """ Check whether a potential object is a subclass of diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index 34372db..d959e92 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -23,7 +23,7 @@ import datetime import enum import logging import six -from typing import TYPE_CHECKING, Optional, Union, Dict +from typing import TYPE_CHECKING, Optional, Union, Dict, List import cattr import pendulum @@ -36,6 +36,7 @@ from airflow.serialization.enums import DagAttributeTypes as DAT, Encoding from airflow.serialization.helpers import serialize_template_field from airflow.serialization.json_schema import Validator, load_dag_schema from airflow.settings import json +from airflow.utils.module_loading import import_string from airflow.www.utils import get_python_source try: @@ -49,6 +50,17 @@ if TYPE_CHECKING: log = logging.getLogger(__name__) +BUILTIN_OPERATOR_EXTRA_LINKS = [ + "airflow.contrib.operators.bigquery_operator.BigQueryConsoleLink", + "airflow.contrib.operators.bigquery_operator.BigQueryConsoleIndexableLink", + "airflow.contrib.operators.qubole_operator.QDSLink", + # providers new paths + "airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleLink", + "airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleIndexableLink", + "airflow.providers.qubole.operators.qubole.QDSLink" +] # type: List[str] + + class BaseSerialization: """BaseSerialization provides utils for serialization.""" @@ -459,15 +471,16 @@ class SerializedBaseOperator(BaseOperator, BaseSerialization): # list(_operator_links_source.items())[0] = # ('airflow.gcp.operators.bigquery.BigQueryConsoleIndexableLink', {'index': 0}) - _operator_link_class, data = list(_operator_links_source.items())[0] - - if _operator_link_class in registered_operator_link_classes: - single_op_link_class_name = registered_operator_link_classes[_operator_link_class] + _operator_link_class_path, data = list(_operator_links_source.items())[0] + if _operator_link_class_path in BUILTIN_OPERATOR_EXTRA_LINKS: + single_op_link_class = import_string(_operator_link_class_path) + elif _operator_link_class_path in registered_operator_link_classes: + single_op_link_class = registered_operator_link_classes[_operator_link_class_path] else: - raise KeyError("Operator Link class %r not registered" % _operator_link_class) + raise KeyError("Operator Link class %r not registered" % _operator_link_class_path) op_predefined_extra_link = cattr.structure( - data, single_op_link_class_name) # type: BaseOperatorLink + data, single_op_link_class) # type: BaseOperatorLink op_predefined_extra_links.update( {op_predefined_extra_link.name: op_predefined_extra_link}
