ashb commented on a change in pull request #12466: URL: https://github.com/apache/airflow/pull/12466#discussion_r531485590
########## File path: airflow/cli/commands/provider_command.py ########## @@ -0,0 +1,88 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Providers sub-commands""" +from typing import Dict, List, Tuple + +import pygments +import yaml +from pygments.lexers.data import YamlLexer +from tabulate import tabulate + +from airflow.providers_manager import ProvidersManager +from airflow.utils.cli import should_use_colors +from airflow.utils.code_utils import get_terminal_formatter + + +def _tabulate_providers(providers: List[Dict], tablefmt: str): + tabulate_data = [ + { + 'Provider name': provider['package-name'], + 'Description': provider['description'], + 'Version': provider['versions'][0], + } + for version, provider in providers + ] + + msg = tabulate(tabulate_data, tablefmt=tablefmt, headers='keys') + return msg + + +def provider_get(args): + """Get a provider info.""" + providers = ProvidersManager().providers + if args.provider_name in providers: + provider_version = providers[args.provider_name][0] + provider_info = providers[args.provider_name][1] Review comment: ```suggestion provider_version, provider_info = providers[args.provider_name] ``` ########## File path: airflow/models/connection.py ########## @@ -30,69 +31,20 @@ from airflow.exceptions import AirflowException, AirflowNotFoundException from airflow.models.base import ID_LEN, Base from airflow.models.crypto import get_fernet +from airflow.providers_manager import ProvidersManager from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.module_loading import import_string -# A map that assigns a connection type to a tuple that contains -# the path of the class and the name of the conn_id key parameter. -# PLEASE KEEP BELOW LIST IN ALPHABETICAL ORDER. -CONN_TYPE_TO_HOOK = { - "azure_batch": ( - "airflow.providers.microsoft.azure.hooks.azure_batch.AzureBatchHook", - "azure_batch_conn_id", - ), - "azure_cosmos": ( - "airflow.providers.microsoft.azure.hooks.azure_cosmos.AzureCosmosDBHook", - "azure_cosmos_conn_id", - ), - "azure_data_lake": ( - "airflow.providers.microsoft.azure.hooks.azure_data_lake.AzureDataLakeHook", - "azure_data_lake_conn_id", - ), - "cassandra": ("airflow.providers.apache.cassandra.hooks.cassandra.CassandraHook", "cassandra_conn_id"), - "cloudant": ("airflow.providers.cloudant.hooks.cloudant.CloudantHook", "cloudant_conn_id"), - "dataprep": ("airflow.providers.google.cloud.hooks.dataprep.GoogleDataprepHook", "dataprep_default"), - "docker": ("airflow.providers.docker.hooks.docker.DockerHook", "docker_conn_id"), - "elasticsearch": ( - "airflow.providers.elasticsearch.hooks.elasticsearch.ElasticsearchHook", - "elasticsearch_conn_id", - ), - "exasol": ("airflow.providers.exasol.hooks.exasol.ExasolHook", "exasol_conn_id"), - "gcpcloudsql": ( - "airflow.providers.google.cloud.hooks.cloud_sql.CloudSQLDatabaseHook", - "gcp_cloudsql_conn_id", - ), - "gcpssh": ( - "airflow.providers.google.cloud.hooks.compute_ssh.ComputeEngineSSHHook", - "gcp_conn_id", - ), - "google_cloud_platform": ( - "airflow.providers.google.cloud.hooks.bigquery.BigQueryHook", - "bigquery_conn_id", - ), - "grpc": ("airflow.providers.grpc.hooks.grpc.GrpcHook", "grpc_conn_id"), - "hive_cli": ("airflow.providers.apache.hive.hooks.hive.HiveCliHook", "hive_cli_conn_id"), - "hiveserver2": ("airflow.providers.apache.hive.hooks.hive.HiveServer2Hook", "hiveserver2_conn_id"), - "imap": ("airflow.providers.imap.hooks.imap.ImapHook", "imap_conn_id"), - "jdbc": ("airflow.providers.jdbc.hooks.jdbc.JdbcHook", "jdbc_conn_id"), - "jira": ("airflow.providers.jira.hooks.jira.JiraHook", "jira_conn_id"), - "kubernetes": ("airflow.providers.cncf.kubernetes.hooks.kubernetes.KubernetesHook", "kubernetes_conn_id"), - "mongo": ("airflow.providers.mongo.hooks.mongo.MongoHook", "conn_id"), - "mssql": ("airflow.providers.odbc.hooks.odbc.OdbcHook", "odbc_conn_id"), - "mysql": ("airflow.providers.mysql.hooks.mysql.MySqlHook", "mysql_conn_id"), - "odbc": ("airflow.providers.odbc.hooks.odbc.OdbcHook", "odbc_conn_id"), - "oracle": ("airflow.providers.oracle.hooks.oracle.OracleHook", "oracle_conn_id"), - "pig_cli": ("airflow.providers.apache.pig.hooks.pig.PigCliHook", "pig_cli_conn_id"), - "postgres": ("airflow.providers.postgres.hooks.postgres.PostgresHook", "postgres_conn_id"), - "presto": ("airflow.providers.presto.hooks.presto.PrestoHook", "presto_conn_id"), - "redis": ("airflow.providers.redis.hooks.redis.RedisHook", "redis_conn_id"), - "snowflake": ("airflow.providers.snowflake.hooks.snowflake.SnowflakeHook", "snowflake_conn_id"), - "sqlite": ("airflow.providers.sqlite.hooks.sqlite.SqliteHook", "sqlite_conn_id"), - "tableau": ("airflow.providers.salesforce.hooks.tableau.TableauHook", "tableau_conn_id"), - "vertica": ("airflow.providers.vertica.hooks.vertica.VerticaHook", "vertica_conn_id"), - "wasb": ("airflow.providers.microsoft.azure.hooks.wasb.WasbHook", "wasb_conn_id"), -} -# PLEASE KEEP ABOVE LIST IN ALPHABETICAL ORDER. + +@lru_cache(maxsize=None) +def get_conn_type_to_hook() -> Dict[str, Tuple[str, str]]: Review comment: ```suggestion def get_conn_type_to_hook_mappping() -> Dict[str, Tuple[str, str]]: ``` As an English speakger `get_conn_type_to_hook` reads to me like it should be used as `get_conn_type_to_hook(self.conn_type)`, so slight tweak to the function name please. That said, given this is a one line function that just accesses a property on a singleton object, do we even need the function? ########## File path: dev/provider_packages/SETUP_TEMPLATE.py.jinja2 ########## @@ -59,19 +59,6 @@ def do_setup(version_suffix_for_pypi=''): packages=find_namespace_packages( include=['airflow.providers.{{ PROVIDER_PACKAGE_ID }}', 'airflow.providers.{{ PROVIDER_PACKAGE_ID }}.*']), - package_data={ '': [ -{% if PROVIDER_PACKAGE_ID == 'amazon' %} - "*.json", -{% elif PROVIDER_PACKAGE_ID == 'cncf.kubernetes' %} - "*.yaml", -{% elif PROVIDER_PACKAGE_ID == 'google' %} - "*.yaml", - "*.sql", -{% elif PROVIDER_PACKAGE_ID == 'papermill' %} - "*.ipynb", -{% endif %} - ], - }, Review comment: This is needed I think, otherwise the datafiles are not included in built wheels. ########## File path: airflow/providers_manager.py ########## @@ -36,52 +39,231 @@ def _create_validator(): + """Creates JSON schema validator from the provider.yaml.schema.json""" schema = json.loads(importlib_resources.read_text('airflow', 'provider.yaml.schema.json')) cls = jsonschema.validators.validator_for(schema) validator = cls(schema) return validator class ProvidersManager: - """Manages all provider packages.""" + """ + Manages all provider packages. This is a Singleton class. The first time it is + instantiated, it discovers all available providers in installed packages and + local source folders (if airflow is run from sources). + """ + + _instance = None + resource_version = "0" + + def __new__(cls): + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance def __init__(self): - self._provider_directory = {} - try: - from airflow import providers - except ImportError as e: - log.warning("No providers are present or error when importing them! :%s", e) - return + # Keeps dict of providers keyed by module name and value is Tuple: version, provider_info + self._provider_dict: Dict[str, Tuple[str, Dict]] = {} + # Keeps dict of hooks keyed by connection type and value is + # Tuple: connection class, connection_id_attribute_name + self._hooks_dict: Dict[str, Tuple[str, str]] = {} self._validator = _create_validator() - self.__find_all_providers(providers.__path__) + # Local source folders are loaded first. They should take precedence over the package ones for + # Development purpose. In production provider.yaml files are not present in the 'airflow" directory + # So there is no risk we are going to override package provider accidentally. This can only happen + # in case of local development + self._discover_all_airflow_builtin_providers_from_local_sources() + self._discover_all_providers_from_packages() + self._discover_hooks() + self._sort_provider_dictionary() + self._sort_hooks_dictionary() - def __find_all_providers(self, paths: str): - def onerror(_): - exception_string = traceback.format_exc() - log.warning(exception_string) + def _sort_hooks_dictionary(self): + """ + Creates provider_directory as sorted (by package_name) OrderedDict. - for module_info in pkgutil.walk_packages(paths, prefix="airflow.providers.", onerror=onerror): + Duplicates are removed from "package" providers in case corresponding "folder" provider is found. + The "folder" providers are from local sources (packages do not contain provider.yaml files), + so if someone has airflow installed from local sources, the providers are imported from there + first so, provider information should be taken from there. + :return: + """ + sorted_dict = OrderedDict() + for connection_type in sorted(self._hooks_dict.keys()): + sorted_dict[connection_type] = self._hooks_dict[connection_type] + self._hooks_dict = sorted_dict + + def _sort_provider_dictionary(self): + """ + Sort provider_dictionary using OrderedDict. + + The dictionary gets sorted so that when you iterate through it, the providers are by + default returned in alphabetical order. + """ + sorted_dict = OrderedDict() + for provider_name in sorted(self._provider_dict.keys()): + sorted_dict[provider_name] = self._provider_dict[provider_name] + self._provider_dict = sorted_dict + + def _discover_all_providers_from_packages(self) -> None: + """ + Discovers all providers by scanning packages installed. The list of providers should be returned + via the 'apache_airflow_provider' entrypoint as a dictionary conforming to the + 'airflow/provider.yaml.schema.json' schema. + """ + for entry_point in pkg_resources.iter_entry_points('apache_airflow_provider'): + package_name = entry_point.dist.project_name + log.debug("Loading %s from package %s", entry_point, package_name) + version = entry_point.dist.version try: - imported_module = importlib.import_module(module_info.name) - except Exception as e: # noqa pylint: disable=broad-except - log.warning("Error when importing %s:%s", module_info.name, e) + provider_info = entry_point.load()() + except pkg_resources.VersionConflict as e: + log.warning( + "The provider package %s could not be registered because of version conflict : %s", + package_name, + e, + ) continue - try: - provider = importlib_resources.read_text(imported_module, 'provider.yaml') - provider_info = yaml.safe_load(provider) - self._validator.validate(provider_info) - self._provider_directory[provider_info['package-name']] = provider_info - except FileNotFoundError: - # This is OK - this is not a provider package - pass - except TypeError as e: - if "is not a package" not in str(e): - log.warning("Error when loading 'provider.yaml' file from %s:%s}", module_info.name, e) - # Otherwise this is OK - this is likely a module - except Exception as e: # noqa pylint: disable=broad-except - log.warning("Error when loading 'provider.yaml' file from %s:%s", module_info.name, e) + self._validator.validate(provider_info) + provider_info_package_name = provider_info['package-name'] + if package_name != provider_info_package_name: + raise Exception( + f"The package '{package_name}' from setuptools and " + f"{provider_info_package_name} do not match. Please make sure they are aligned" + ) + if package_name not in self._provider_dict: + self._provider_dict[package_name] = (version, provider_info) + else: + log.warning( + "The provider for package '%s' could not be registered from because providers for that " + "package name have already been registered", + package_name, + ) + + def _discover_all_airflow_builtin_providers_from_local_sources(self) -> None: + """ + Finds all built-in airflow providers if airflow is run from the local sources. + It finds `provider.yaml` files for all such providers and registers the providers using those. + + This 'provider.yaml' scanning takes precedence over scanning packages installed + in case you have both sources and packages installed, the providers will be loaded from + the "airflow" sources rather than from the packages. + """ + try: + import airflow.providers + except ImportError: + log.info("You have no providers installed.") + return + try: + for path in airflow.providers.__path__: + self._add_provider_info_from_local_source_files_on_path(path) + except Exception as e: # noqa pylint: disable=broad-except + log.warning("Error when loading 'provider.yaml' files from airflow sources: %s", e) + + def _add_provider_info_from_local_source_files_on_path(self, path) -> None: + """ + Finds all the provider.yaml files in the directory specified. + + :param path: path where to look for provider.yaml files + """ + root_path = path + for folder, subdirs, files in os.walk(path, topdown=True): + for filename in fnmatch.filter(files, "provider.yaml"): + package_name = "apache-airflow-providers" + folder[len(root_path) :].replace(os.sep, "-") + self._add_provider_info_from_local_source_file(os.path.join(folder, filename), package_name) + subdirs[:] = [] + + def _add_provider_info_from_local_source_file(self, path, package_name) -> None: + """ + Parses found provider.yaml file and adds found provider to the dictionary. + + :param path: full file path of the provider.yaml file + :param package_name: name of the package + """ + try: + log.debug("Loading %s from %s", package_name, path) + with open(path) as provider_yaml_file: + provider_info = yaml.safe_load(provider_yaml_file) + self._validator.validate(provider_info) + + version = provider_info['versions'][0] + if package_name not in self._provider_dict: + self._provider_dict[package_name] = (version, provider_info) + else: + log.warning( + "The providers for package '%s' could not be registered because providers for that " + "package name have already been registered", + package_name, + ) + except Exception as e: # noqa pylint: disable=broad-except + log.warning("Error when loading '%s': %s", path, e) + + def _discover_hooks(self) -> None: + """Retrieves all connections defined in the providers""" + for name, provider in self._provider_dict.items(): + provider_package = name + hook_class_names = provider[1].get("hook-class-names") + if hook_class_names: + for hook_class_name in hook_class_names: + self._add_hook(hook_class_name, provider_package) + + def _add_hook(self, hook_class_name, provider_package) -> None: + """ + Adds hook class name to list of hooks + + :param hook_class_name: name of the Hook class + :param provider_package: provider package adding the hook + """ + if provider_package.startswith("apache-airflow"): + provider_path = provider_package[len("apache-") :].replace("-", ".") + if not hook_class_name.startswith(provider_path): + log.warning( + "Sanity check failed when importing '%s' from '%s' package. It should start with '%s'", + hook_class_name, + provider_package, + provider_path, + ) + return + if hook_class_name in self._hooks_dict: + log.warning( + "The hook_class '%s' has been already registered.", + hook_class_name, + ) + return + try: + module, class_name = hook_class_name.rsplit('.', maxsplit=1) + hook_class = getattr(importlib.import_module(module), class_name) + except Exception as e: # noqa pylint: disable=broad-except + log.warning( + "Exception when importing '%s' from '%s' package: %s", + hook_class_name, + provider_package, + e, + ) + return + conn_type = getattr(hook_class, 'conn_type') + if not conn_type: + log.warning( + "The hook_class '%s' misses connection_type attribute and cannot be registered", + hook_class, + ) + return + connection_if_attribute_name = getattr(hook_class, 'conn_name_attr') + if not connection_if_attribute_name: + log.warning( + "The hook_class '%s' misses conn_name_attr attribute and cannot be registered", + hook_class, + ) + return + + self._hooks_dict[conn_type] = (hook_class_name, connection_if_attribute_name) Review comment: ```suggestion connection_id_attribute_name = getattr(hook_class, 'conn_name_attr') if not connection_id_attribute_name: log.warning( "The hook_class '%s' is missing `conn_name_attr` attribute and cannot be registered", hook_class, ) return self._hooks_dict[conn_type] = (hook_class_name, connection_id_attribute_name) ``` ########## File path: airflow/providers_manager.py ########## @@ -36,52 +39,231 @@ def _create_validator(): + """Creates JSON schema validator from the provider.yaml.schema.json""" schema = json.loads(importlib_resources.read_text('airflow', 'provider.yaml.schema.json')) cls = jsonschema.validators.validator_for(schema) validator = cls(schema) return validator class ProvidersManager: - """Manages all provider packages.""" + """ + Manages all provider packages. This is a Singleton class. The first time it is + instantiated, it discovers all available providers in installed packages and + local source folders (if airflow is run from sources). + """ + + _instance = None + resource_version = "0" + + def __new__(cls): + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance def __init__(self): - self._provider_directory = {} - try: - from airflow import providers - except ImportError as e: - log.warning("No providers are present or error when importing them! :%s", e) - return + # Keeps dict of providers keyed by module name and value is Tuple: version, provider_info + self._provider_dict: Dict[str, Tuple[str, Dict]] = {} + # Keeps dict of hooks keyed by connection type and value is + # Tuple: connection class, connection_id_attribute_name + self._hooks_dict: Dict[str, Tuple[str, str]] = {} self._validator = _create_validator() - self.__find_all_providers(providers.__path__) + # Local source folders are loaded first. They should take precedence over the package ones for + # Development purpose. In production provider.yaml files are not present in the 'airflow" directory + # So there is no risk we are going to override package provider accidentally. This can only happen + # in case of local development + self._discover_all_airflow_builtin_providers_from_local_sources() + self._discover_all_providers_from_packages() + self._discover_hooks() + self._sort_provider_dictionary() + self._sort_hooks_dictionary() - def __find_all_providers(self, paths: str): - def onerror(_): - exception_string = traceback.format_exc() - log.warning(exception_string) + def _sort_hooks_dictionary(self): + """ + Creates provider_directory as sorted (by package_name) OrderedDict. - for module_info in pkgutil.walk_packages(paths, prefix="airflow.providers.", onerror=onerror): + Duplicates are removed from "package" providers in case corresponding "folder" provider is found. + The "folder" providers are from local sources (packages do not contain provider.yaml files), + so if someone has airflow installed from local sources, the providers are imported from there + first so, provider information should be taken from there. + :return: + """ + sorted_dict = OrderedDict() + for connection_type in sorted(self._hooks_dict.keys()): + sorted_dict[connection_type] = self._hooks_dict[connection_type] + self._hooks_dict = sorted_dict + + def _sort_provider_dictionary(self): + """ + Sort provider_dictionary using OrderedDict. + + The dictionary gets sorted so that when you iterate through it, the providers are by + default returned in alphabetical order. + """ + sorted_dict = OrderedDict() + for provider_name in sorted(self._provider_dict.keys()): + sorted_dict[provider_name] = self._provider_dict[provider_name] + self._provider_dict = sorted_dict + + def _discover_all_providers_from_packages(self) -> None: + """ + Discovers all providers by scanning packages installed. The list of providers should be returned + via the 'apache_airflow_provider' entrypoint as a dictionary conforming to the + 'airflow/provider.yaml.schema.json' schema. + """ + for entry_point in pkg_resources.iter_entry_points('apache_airflow_provider'): + package_name = entry_point.dist.project_name + log.debug("Loading %s from package %s", entry_point, package_name) + version = entry_point.dist.version try: - imported_module = importlib.import_module(module_info.name) - except Exception as e: # noqa pylint: disable=broad-except - log.warning("Error when importing %s:%s", module_info.name, e) + provider_info = entry_point.load()() + except pkg_resources.VersionConflict as e: + log.warning( + "The provider package %s could not be registered because of version conflict : %s", + package_name, + e, + ) continue - try: - provider = importlib_resources.read_text(imported_module, 'provider.yaml') - provider_info = yaml.safe_load(provider) - self._validator.validate(provider_info) - self._provider_directory[provider_info['package-name']] = provider_info - except FileNotFoundError: - # This is OK - this is not a provider package - pass - except TypeError as e: - if "is not a package" not in str(e): - log.warning("Error when loading 'provider.yaml' file from %s:%s}", module_info.name, e) - # Otherwise this is OK - this is likely a module - except Exception as e: # noqa pylint: disable=broad-except - log.warning("Error when loading 'provider.yaml' file from %s:%s", module_info.name, e) + self._validator.validate(provider_info) + provider_info_package_name = provider_info['package-name'] + if package_name != provider_info_package_name: + raise Exception( + f"The package '{package_name}' from setuptools and " + f"{provider_info_package_name} do not match. Please make sure they are aligned" + ) + if package_name not in self._provider_dict: + self._provider_dict[package_name] = (version, provider_info) + else: + log.warning( + "The provider for package '%s' could not be registered from because providers for that " + "package name have already been registered", + package_name, + ) + + def _discover_all_airflow_builtin_providers_from_local_sources(self) -> None: + """ + Finds all built-in airflow providers if airflow is run from the local sources. + It finds `provider.yaml` files for all such providers and registers the providers using those. + + This 'provider.yaml' scanning takes precedence over scanning packages installed + in case you have both sources and packages installed, the providers will be loaded from + the "airflow" sources rather than from the packages. + """ + try: + import airflow.providers + except ImportError: + log.info("You have no providers installed.") + return + try: + for path in airflow.providers.__path__: + self._add_provider_info_from_local_source_files_on_path(path) + except Exception as e: # noqa pylint: disable=broad-except + log.warning("Error when loading 'provider.yaml' files from airflow sources: %s", e) + + def _add_provider_info_from_local_source_files_on_path(self, path) -> None: + """ + Finds all the provider.yaml files in the directory specified. + + :param path: path where to look for provider.yaml files + """ + root_path = path + for folder, subdirs, files in os.walk(path, topdown=True): + for filename in fnmatch.filter(files, "provider.yaml"): + package_name = "apache-airflow-providers" + folder[len(root_path) :].replace(os.sep, "-") + self._add_provider_info_from_local_source_file(os.path.join(folder, filename), package_name) + subdirs[:] = [] + + def _add_provider_info_from_local_source_file(self, path, package_name) -> None: + """ + Parses found provider.yaml file and adds found provider to the dictionary. + + :param path: full file path of the provider.yaml file + :param package_name: name of the package + """ + try: + log.debug("Loading %s from %s", package_name, path) + with open(path) as provider_yaml_file: + provider_info = yaml.safe_load(provider_yaml_file) + self._validator.validate(provider_info) + + version = provider_info['versions'][0] + if package_name not in self._provider_dict: + self._provider_dict[package_name] = (version, provider_info) + else: + log.warning( + "The providers for package '%s' could not be registered because providers for that " + "package name have already been registered", + package_name, + ) + except Exception as e: # noqa pylint: disable=broad-except + log.warning("Error when loading '%s': %s", path, e) + + def _discover_hooks(self) -> None: + """Retrieves all connections defined in the providers""" + for name, provider in self._provider_dict.items(): + provider_package = name + hook_class_names = provider[1].get("hook-class-names") + if hook_class_names: + for hook_class_name in hook_class_names: + self._add_hook(hook_class_name, provider_package) + + def _add_hook(self, hook_class_name, provider_package) -> None: + """ + Adds hook class name to list of hooks + + :param hook_class_name: name of the Hook class + :param provider_package: provider package adding the hook + """ + if provider_package.startswith("apache-airflow"): + provider_path = provider_package[len("apache-") :].replace("-", ".") + if not hook_class_name.startswith(provider_path): + log.warning( + "Sanity check failed when importing '%s' from '%s' package. It should start with '%s'", + hook_class_name, + provider_package, + provider_path, + ) + return + if hook_class_name in self._hooks_dict: + log.warning( + "The hook_class '%s' has been already registered.", + hook_class_name, + ) + return + try: + module, class_name = hook_class_name.rsplit('.', maxsplit=1) + hook_class = getattr(importlib.import_module(module), class_name) + except Exception as e: # noqa pylint: disable=broad-except + log.warning( + "Exception when importing '%s' from '%s' package: %s", + hook_class_name, + provider_package, + e, + ) + return + conn_type = getattr(hook_class, 'conn_type') + if not conn_type: + log.warning( + "The hook_class '%s' misses connection_type attribute and cannot be registered", Review comment: ```suggestion "The hook_class '%s' is missing `connection_type` attribute and cannot be registered", ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
