This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 2037303  Adds support for Connection/Hook discovery from providers 
(#12466)
2037303 is described below

commit 2037303eef93fd36ab13746b045d1c1fee6aa143
Author: Jarek Potiuk <jarek.pot...@polidea.com>
AuthorDate: Sun Nov 29 15:31:49 2020 +0100

    Adds support for Connection/Hook discovery from providers (#12466)
    
    * Adds support for Hook discovery from providers
    
    This PR extends providers discovery with the mechanism
    of retrieving mapping of connections from type to hook.
    
    Fixes #12456
    
    * fixup! Adds support for Hook discovery from providers
    
    * fixup! fixup! Adds support for Hook discovery from providers
---
 .pre-commit-config.yaml                            |   2 +-
 airflow/cli/cli_parser.py                          |   6 ++
 airflow/cli/commands/provider_command.py           |  25 ++++-
 airflow/models/connection.py                       |  65 +------------
 airflow/plugins_manager.py                         |  18 +---
 airflow/provider.yaml.schema.json                  |   7 ++
 .../providers/apache/cassandra/hooks/cassandra.py  |   6 +-
 airflow/providers/apache/cassandra/provider.yaml   |   3 +
 airflow/providers/apache/hive/hooks/hive.py        |   7 +-
 airflow/providers/apache/hive/provider.yaml        |   4 +
 airflow/providers/apache/pig/hooks/pig.py          |   6 +-
 airflow/providers/apache/pig/provider.yaml         |   4 +
 airflow/providers/cloudant/hooks/cloudant.py       |   6 +-
 airflow/providers/cloudant/provider.yaml           |   3 +
 .../providers/cncf/kubernetes/hooks/kubernetes.py  |   6 +-
 airflow/providers/cncf/kubernetes/provider.yaml    |   3 +
 airflow/providers/docker/hooks/docker.py           |   6 +-
 airflow/providers/docker/provider.yaml             |   3 +
 .../providers/elasticsearch/hooks/elasticsearch.py |   1 +
 airflow/providers/elasticsearch/provider.yaml      |   3 +
 airflow/providers/exasol/hooks/exasol.py           |   1 +
 airflow/providers/exasol/provider.yaml             |   3 +
 airflow/providers/google/cloud/hooks/bigquery.py   |   4 +-
 airflow/providers/google/cloud/hooks/cloud_sql.py  |   7 +-
 .../providers/google/cloud/hooks/compute_ssh.py    |   4 +
 airflow/providers/google/cloud/hooks/dataprep.py   |   6 +-
 airflow/providers/google/provider.yaml             |   6 ++
 airflow/providers/grpc/hooks/grpc.py               |   6 +-
 airflow/providers/grpc/provider.yaml               |   3 +
 airflow/providers/imap/hooks/imap.py               |   6 +-
 airflow/providers/imap/provider.yaml               |   3 +
 airflow/providers/jdbc/hooks/jdbc.py               |   1 +
 airflow/providers/jdbc/provider.yaml               |   3 +
 airflow/providers/jira/hooks/jira.py               |   6 +-
 airflow/providers/jira/provider.yaml               |   3 +
 .../providers/microsoft/azure/hooks/azure_batch.py |   6 +-
 .../microsoft/azure/hooks/azure_cosmos.py          |   6 +-
 .../microsoft/azure/hooks/azure_data_lake.py       |   6 +-
 airflow/providers/microsoft/azure/hooks/wasb.py    |   6 +-
 airflow/providers/microsoft/azure/provider.yaml    |   6 ++
 airflow/providers/microsoft/mssql/hooks/mssql.py   |  12 +--
 airflow/providers/microsoft/mssql/provider.yaml    |   3 +
 airflow/providers/mongo/hooks/mongo.py             |   4 +-
 airflow/providers/mongo/provider.yaml              |   3 +
 airflow/providers/mysql/hooks/mysql.py             |   1 +
 airflow/providers/mysql/provider.yaml              |   3 +
 airflow/providers/odbc/hooks/odbc.py               |   1 +
 airflow/providers/odbc/provider.yaml               |   3 +
 airflow/providers/oracle/hooks/oracle.py           |   1 +
 airflow/providers/oracle/provider.yaml             |   5 +
 airflow/providers/postgres/hooks/postgres.py       |   1 +
 airflow/providers/postgres/provider.yaml           |   3 +
 airflow/providers/presto/hooks/presto.py           |   1 +
 airflow/providers/presto/provider.yaml             |   3 +
 airflow/providers/redis/hooks/redis.py             |   6 +-
 airflow/providers/redis/provider.yaml              |   3 +
 airflow/providers/salesforce/hooks/tableau.py      |   6 +-
 airflow/providers/salesforce/provider.yaml         |   3 +
 airflow/providers/snowflake/hooks/snowflake.py     |   1 +
 airflow/providers/snowflake/provider.yaml          |   3 +
 airflow/providers/sqlite/hooks/sqlite.py           |   1 +
 airflow/providers/sqlite/provider.yaml             |   3 +
 airflow/providers/vertica/hooks/vertica.py         |   1 +
 airflow/providers/vertica/provider.yaml            |   3 +
 airflow/providers_manager.py                       |  96 +++++++++++++++----
 .../exasol/provider.yaml => utils/entry_points.py} |  33 +++----
 .../pre_commit_check_provider_yaml_files.py        | 103 +++++++++++----------
 .../run_install_and_test_provider_packages.sh      |  20 ++++
 tests/core/test_providers_manager.py               |  41 ++++++++
 tests/models/test_connection.py                    |  14 ---
 70 files changed, 441 insertions(+), 216 deletions(-)

diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index c262c9a..40433b0 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -457,7 +457,7 @@ repos:
         entry: ./scripts/ci/pre_commit/pre_commit_check_provider_yaml_files.py
         language: python
         require_serial: true
-        files: provider.yaml$
+        files: 
provider.yaml$|scripts/ci/pre_commit/pre_commit_check_provider_yaml_files.py$
         additional_dependencies: ['PyYAML==5.3.1', 'jsonschema==3.2.0', 
'tabulate==0.8.7']
       - id: mermaid
         name: Generate mermaid images
diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py
index fbcb01d..6a13db0 100644
--- a/airflow/cli/cli_parser.py
+++ b/airflow/cli/cli_parser.py
@@ -1154,6 +1154,12 @@ CONNECTIONS_COMMANDS = (
 )
 PROVIDERS_COMMANDS = (
     ActionCommand(
+        name='hooks',
+        help='List registered provider hooks',
+        
func=lazy_load_command('airflow.cli.commands.provider_command.hooks_list'),
+        args=(ARG_OUTPUT,),
+    ),
+    ActionCommand(
         name='list',
         help='List installed providers',
         
func=lazy_load_command('airflow.cli.commands.provider_command.providers_list'),
diff --git a/airflow/cli/commands/provider_command.py 
b/airflow/cli/commands/provider_command.py
index 70fbd43..a2df92f 100644
--- a/airflow/cli/commands/provider_command.py
+++ b/airflow/cli/commands/provider_command.py
@@ -15,7 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 """Providers sub-commands"""
-from typing import Dict, List
+from typing import Dict, List, Tuple
 
 import pygments
 import yaml
@@ -44,8 +44,7 @@ def provider_get(args):
     """Get a provider info."""
     providers = ProvidersManager().providers
     if args.provider_name in providers:
-        provider_version = providers[args.provider_name][0]
-        provider_info = providers[args.provider_name][1]
+        provider_version, provider_info = providers[args.provider_name]
         print("#")
         print(f"# Provider: {args.provider_name}")
         print(f"# Version: {provider_version}")
@@ -64,3 +63,23 @@ def provider_get(args):
 def providers_list(args):
     """Lists all providers at the command line"""
     print(_tabulate_providers(ProvidersManager().providers.values(), 
args.output))
+
+
+def _tabulate_hooks(hook_items: Tuple[str, Tuple[str, str]], tablefmt: str):
+    tabulate_data = [
+        {
+            'Connection type': hook_item[0],
+            'Hook class': hook_item[1][0],
+            'Hook connection attribute name': hook_item[1][1],
+        }
+        for hook_item in hook_items
+    ]
+
+    msg = tabulate(tabulate_data, tablefmt=tablefmt, headers='keys')
+    return msg
+
+
+def hooks_list(args):
+    """Lists all hooks at the command line"""
+    msg = _tabulate_hooks(ProvidersManager().hooks.items(), args.output)
+    print(msg)
diff --git a/airflow/models/connection.py b/airflow/models/connection.py
index d12d680..391aafb 100644
--- a/airflow/models/connection.py
+++ b/airflow/models/connection.py
@@ -30,70 +30,10 @@ from airflow.configuration import ensure_secrets_loaded
 from airflow.exceptions import AirflowException, AirflowNotFoundException
 from airflow.models.base import ID_LEN, Base
 from airflow.models.crypto import get_fernet
+from airflow.providers_manager import ProvidersManager
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.module_loading import import_string
 
-# A map that assigns a connection type to a tuple that contains
-# the path of the class and the name of the conn_id key parameter.
-# PLEASE KEEP BELOW LIST IN ALPHABETICAL ORDER.
-CONN_TYPE_TO_HOOK = {
-    "azure_batch": (
-        "airflow.providers.microsoft.azure.hooks.azure_batch.AzureBatchHook",
-        "azure_batch_conn_id",
-    ),
-    "azure_cosmos": (
-        
"airflow.providers.microsoft.azure.hooks.azure_cosmos.AzureCosmosDBHook",
-        "azure_cosmos_conn_id",
-    ),
-    "azure_data_lake": (
-        
"airflow.providers.microsoft.azure.hooks.azure_data_lake.AzureDataLakeHook",
-        "azure_data_lake_conn_id",
-    ),
-    "cassandra": 
("airflow.providers.apache.cassandra.hooks.cassandra.CassandraHook", 
"cassandra_conn_id"),
-    "cloudant": ("airflow.providers.cloudant.hooks.cloudant.CloudantHook", 
"cloudant_conn_id"),
-    "dataprep": 
("airflow.providers.google.cloud.hooks.dataprep.GoogleDataprepHook", 
"dataprep_default"),
-    "docker": ("airflow.providers.docker.hooks.docker.DockerHook", 
"docker_conn_id"),
-    "elasticsearch": (
-        
"airflow.providers.elasticsearch.hooks.elasticsearch.ElasticsearchHook",
-        "elasticsearch_conn_id",
-    ),
-    "exasol": ("airflow.providers.exasol.hooks.exasol.ExasolHook", 
"exasol_conn_id"),
-    "gcpcloudsql": (
-        "airflow.providers.google.cloud.hooks.cloud_sql.CloudSQLDatabaseHook",
-        "gcp_cloudsql_conn_id",
-    ),
-    "gcpssh": (
-        
"airflow.providers.google.cloud.hooks.compute_ssh.ComputeEngineSSHHook",
-        "gcp_conn_id",
-    ),
-    "google_cloud_platform": (
-        "airflow.providers.google.cloud.hooks.bigquery.BigQueryHook",
-        "bigquery_conn_id",
-    ),
-    "grpc": ("airflow.providers.grpc.hooks.grpc.GrpcHook", "grpc_conn_id"),
-    "hive_cli": ("airflow.providers.apache.hive.hooks.hive.HiveCliHook", 
"hive_cli_conn_id"),
-    "hiveserver2": 
("airflow.providers.apache.hive.hooks.hive.HiveServer2Hook", 
"hiveserver2_conn_id"),
-    "imap": ("airflow.providers.imap.hooks.imap.ImapHook", "imap_conn_id"),
-    "jdbc": ("airflow.providers.jdbc.hooks.jdbc.JdbcHook", "jdbc_conn_id"),
-    "jira": ("airflow.providers.jira.hooks.jira.JiraHook", "jira_conn_id"),
-    "kubernetes": 
("airflow.providers.cncf.kubernetes.hooks.kubernetes.KubernetesHook", 
"kubernetes_conn_id"),
-    "mongo": ("airflow.providers.mongo.hooks.mongo.MongoHook", "conn_id"),
-    "mssql": ("airflow.providers.odbc.hooks.odbc.OdbcHook", "odbc_conn_id"),
-    "mysql": ("airflow.providers.mysql.hooks.mysql.MySqlHook", 
"mysql_conn_id"),
-    "odbc": ("airflow.providers.odbc.hooks.odbc.OdbcHook", "odbc_conn_id"),
-    "oracle": ("airflow.providers.oracle.hooks.oracle.OracleHook", 
"oracle_conn_id"),
-    "pig_cli": ("airflow.providers.apache.pig.hooks.pig.PigCliHook", 
"pig_cli_conn_id"),
-    "postgres": ("airflow.providers.postgres.hooks.postgres.PostgresHook", 
"postgres_conn_id"),
-    "presto": ("airflow.providers.presto.hooks.presto.PrestoHook", 
"presto_conn_id"),
-    "redis": ("airflow.providers.redis.hooks.redis.RedisHook", 
"redis_conn_id"),
-    "snowflake": ("airflow.providers.snowflake.hooks.snowflake.SnowflakeHook", 
"snowflake_conn_id"),
-    "sqlite": ("airflow.providers.sqlite.hooks.sqlite.SqliteHook", 
"sqlite_conn_id"),
-    "tableau": ("airflow.providers.salesforce.hooks.tableau.TableauHook", 
"tableau_conn_id"),
-    "vertica": ("airflow.providers.vertica.hooks.vertica.VerticaHook", 
"vertica_conn_id"),
-    "wasb": ("airflow.providers.microsoft.azure.hooks.wasb.WasbHook", 
"wasb_conn_id"),
-}
-# PLEASE KEEP ABOVE LIST IN ALPHABETICAL ORDER.
-
 
 def parse_netloc_to_hostname(*args, **kwargs):
     """This method is deprecated."""
@@ -326,7 +266,8 @@ class Connection(Base, LoggingMixin):  # pylint: 
disable=too-many-instance-attri
 
     def get_hook(self):
         """Return hook based on conn_type."""
-        hook_class_name, conn_id_param = CONN_TYPE_TO_HOOK.get(self.conn_type, 
(None, None))
+        hook_class_name, conn_id_param = 
ProvidersManager().hooks.get(self.conn_type, (None, None))
+
         if not hook_class_name:
             raise AirflowException(f'Unknown hook type "{self.conn_type}"')
         hook_class = import_string(hook_class_name)
diff --git a/airflow/plugins_manager.py b/airflow/plugins_manager.py
index 643f35f..3286f52 100644
--- a/airflow/plugins_manager.py
+++ b/airflow/plugins_manager.py
@@ -29,6 +29,7 @@ from typing import TYPE_CHECKING, Any, Dict, List, Optional, 
Type
 import importlib_metadata
 
 from airflow import settings
+from airflow.utils.entry_points import entry_points_with_dist
 from airflow.utils.file import find_path_from_directory
 
 if TYPE_CHECKING:
@@ -169,23 +170,6 @@ def is_valid_plugin(plugin_obj):
     return False
 
 
-def entry_points_with_dist(group: str):
-    """
-    Return EntryPoint objects of the given group, along with the distribution 
information.
-
-    This is like the ``entry_points()`` function from importlib.metadata,
-    except it also returns the distribution the entry_point was loaded from.
-
-    :param group: FIlter results to only this entrypoint group
-    :return: Generator of (EntryPoint, Distribution) objects for the specified 
groups
-    """
-    for dist in importlib_metadata.distributions():
-        for e in dist.entry_points:
-            if e.group != group:
-                continue
-            yield (e, dist)
-
-
 def load_entrypoint_plugins():
     """
     Load and register plugins AirflowPlugin subclasses from the entrypoints.
diff --git a/airflow/provider.yaml.schema.json 
b/airflow/provider.yaml.schema.json
index 19ece21..88644b1 100644
--- a/airflow/provider.yaml.schema.json
+++ b/airflow/provider.yaml.schema.json
@@ -173,6 +173,13 @@
           "python-module"
         ]
       }
+    },
+    "hook-class-names": {
+      "type": "array",
+      "description": "Hook class names that provide connection types to core",
+      "items": {
+        "type": "string"
+      }
     }
   },
   "additionalProperties": false,
diff --git a/airflow/providers/apache/cassandra/hooks/cassandra.py 
b/airflow/providers/apache/cassandra/hooks/cassandra.py
index 0166aa3..802303d 100644
--- a/airflow/providers/apache/cassandra/hooks/cassandra.py
+++ b/airflow/providers/apache/cassandra/hooks/cassandra.py
@@ -83,7 +83,11 @@ class CassandraHook(BaseHook, LoggingMixin):
     For details of the Cluster config, see cassandra.cluster.
     """
 
-    def __init__(self, cassandra_conn_id: str = 'cassandra_default'):
+    conn_name_attr = 'cassandra_conn_id'
+    default_conn_name = 'cassandra_default'
+    conn_type = 'cassandra'
+
+    def __init__(self, cassandra_conn_id: str = default_conn_name):
         super().__init__()
         conn = self.get_connection(cassandra_conn_id)
 
diff --git a/airflow/providers/apache/cassandra/provider.yaml 
b/airflow/providers/apache/cassandra/provider.yaml
index 77402c0..b75425f 100644
--- a/airflow/providers/apache/cassandra/provider.yaml
+++ b/airflow/providers/apache/cassandra/provider.yaml
@@ -41,3 +41,6 @@ hooks:
   - integration-name: Apache Cassandra
     python-modules:
       - airflow.providers.apache.cassandra.hooks.cassandra
+
+hook-class-names:
+  - airflow.providers.apache.cassandra.hooks.cassandra.CassandraHook
diff --git a/airflow/providers/apache/hive/hooks/hive.py 
b/airflow/providers/apache/hive/hooks/hive.py
index a04cfdd..6c93602 100644
--- a/airflow/providers/apache/hive/hooks/hive.py
+++ b/airflow/providers/apache/hive/hooks/hive.py
@@ -78,9 +78,13 @@ class HiveCliHook(BaseHook):
     :type  mapred_job_name: str
     """
 
+    conn_name_attr = 'hive_cli_conn_id'
+    default_conn_name = 'hive_cli_default'
+    conn_type = 'hive_cli'
+
     def __init__(
         self,
-        hive_cli_conn_id: str = "hive_cli_default",
+        hive_cli_conn_id: str = default_conn_name,
         run_as: Optional[str] = None,
         mapred_queue: Optional[str] = None,
         mapred_queue_priority: Optional[str] = None,
@@ -809,6 +813,7 @@ class HiveServer2Hook(DbApiHook):
 
     conn_name_attr = 'hiveserver2_conn_id'
     default_conn_name = 'hiveserver2_default'
+    conn_type = 'hiveserver2'
     supports_autocommit = False
 
     def get_conn(self, schema: Optional[str] = None) -> Any:
diff --git a/airflow/providers/apache/hive/provider.yaml 
b/airflow/providers/apache/hive/provider.yaml
index 98b9482..68c18b7 100644
--- a/airflow/providers/apache/hive/provider.yaml
+++ b/airflow/providers/apache/hive/provider.yaml
@@ -66,3 +66,7 @@ transfers:
   - source-integration-name: Microsoft SQL Server (MSSQL)
     target-integration-name: Apache Hive
     python-module: airflow.providers.apache.hive.transfers.mssql_to_hive
+
+hook-class-names:
+  - airflow.providers.apache.hive.hooks.hive.HiveCliHook
+  - airflow.providers.apache.hive.hooks.hive.HiveServer2Hook
diff --git a/airflow/providers/apache/pig/hooks/pig.py 
b/airflow/providers/apache/pig/hooks/pig.py
index c8e39d8..1560f69 100644
--- a/airflow/providers/apache/pig/hooks/pig.py
+++ b/airflow/providers/apache/pig/hooks/pig.py
@@ -33,7 +33,11 @@ class PigCliHook(BaseHook):
 
     """
 
-    def __init__(self, pig_cli_conn_id: str = "pig_cli_default") -> None:
+    conn_name_attr = 'pig_cli_conn_id'
+    default_conn_name = 'pig_cli_default'
+    conn_type = 'pig_cli'
+
+    def __init__(self, pig_cli_conn_id: str = default_conn_name) -> None:
         super().__init__()
         conn = self.get_connection(pig_cli_conn_id)
         self.pig_properties = conn.extra_dejson.get('pig_properties', '')
diff --git a/airflow/providers/apache/pig/provider.yaml 
b/airflow/providers/apache/pig/provider.yaml
index f1754c7..175cd70 100644
--- a/airflow/providers/apache/pig/provider.yaml
+++ b/airflow/providers/apache/pig/provider.yaml
@@ -32,7 +32,11 @@ operators:
   - integration-name: Apache Pig
     python-modules:
       - airflow.providers.apache.pig.operators.pig
+
 hooks:
   - integration-name: Apache Pig
     python-modules:
       - airflow.providers.apache.pig.hooks.pig
+
+hook-class-names:
+  - airflow.providers.apache.pig.hooks.pig.PigCliHook
diff --git a/airflow/providers/cloudant/hooks/cloudant.py 
b/airflow/providers/cloudant/hooks/cloudant.py
index e490ffc..6193bba 100644
--- a/airflow/providers/cloudant/hooks/cloudant.py
+++ b/airflow/providers/cloudant/hooks/cloudant.py
@@ -32,7 +32,11 @@ class CloudantHook(BaseHook):
     :type cloudant_conn_id: str
     """
 
-    def __init__(self, cloudant_conn_id: str = 'cloudant_default') -> None:
+    conn_name_attr = 'cloudant_conn_id'
+    default_conn_name = 'cloudant_default'
+    conn_type = 'cloudant'
+
+    def __init__(self, cloudant_conn_id: str = default_conn_name) -> None:
         super().__init__()
         self.cloudant_conn_id = cloudant_conn_id
 
diff --git a/airflow/providers/cloudant/provider.yaml 
b/airflow/providers/cloudant/provider.yaml
index 4986c43..0f24c5e 100644
--- a/airflow/providers/cloudant/provider.yaml
+++ b/airflow/providers/cloudant/provider.yaml
@@ -32,3 +32,6 @@ hooks:
   - integration-name: IBM Cloudant
     python-modules:
       - airflow.providers.cloudant.hooks.cloudant
+
+hook-class-names:
+  - airflow.providers.cloudant.hooks.cloudant.CloudantHook
diff --git a/airflow/providers/cncf/kubernetes/hooks/kubernetes.py 
b/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
index 499cac5..27e2175 100644
--- a/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
+++ b/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
@@ -54,8 +54,12 @@ class KubernetesHook(BaseHook):
     :type conn_id: str
     """
 
+    conn_name_attr = 'kubernetes_conn_id'
+    default_conn_name = 'kubernetes_default'
+    conn_type = 'kubernetes'
+
     def __init__(
-        self, conn_id: str = "kubernetes_default", client_configuration: 
Optional[client.Configuration] = None
+        self, conn_id: str = default_conn_name, client_configuration: 
Optional[client.Configuration] = None
     ) -> None:
         super().__init__()
         self.conn_id = conn_id
diff --git a/airflow/providers/cncf/kubernetes/provider.yaml 
b/airflow/providers/cncf/kubernetes/provider.yaml
index c66fd71..541af8e 100644
--- a/airflow/providers/cncf/kubernetes/provider.yaml
+++ b/airflow/providers/cncf/kubernetes/provider.yaml
@@ -48,3 +48,6 @@ hooks:
   - integration-name: Kubernetes
     python-modules:
       - airflow.providers.cncf.kubernetes.hooks.kubernetes
+
+hook-class-names:
+  - airflow.providers.cncf.kubernetes.hooks.kubernetes.KubernetesHook
diff --git a/airflow/providers/docker/hooks/docker.py 
b/airflow/providers/docker/hooks/docker.py
index 5ed87d6..acd9ed0 100644
--- a/airflow/providers/docker/hooks/docker.py
+++ b/airflow/providers/docker/hooks/docker.py
@@ -34,9 +34,13 @@ class DockerHook(BaseHook, LoggingMixin):
     :type docker_conn_id: str
     """
 
+    conn_name_attr = 'docker_conn_id'
+    default_conn_name = 'docker_default'
+    conn_type = 'docker'
+
     def __init__(
         self,
-        docker_conn_id='docker_default',
+        docker_conn_id: str = default_conn_name,
         base_url: Optional[str] = None,
         version: Optional[str] = None,
         tls: Optional[str] = None,
diff --git a/airflow/providers/docker/provider.yaml 
b/airflow/providers/docker/provider.yaml
index a02202c..56d2488 100644
--- a/airflow/providers/docker/provider.yaml
+++ b/airflow/providers/docker/provider.yaml
@@ -43,3 +43,6 @@ hooks:
   - integration-name: Docker
     python-modules:
       - airflow.providers.docker.hooks.docker
+
+hook-class-names:
+  - airflow.providers.docker.hooks.docker.DockerHook
diff --git a/airflow/providers/elasticsearch/hooks/elasticsearch.py 
b/airflow/providers/elasticsearch/hooks/elasticsearch.py
index 8c6d1d2..16306ac 100644
--- a/airflow/providers/elasticsearch/hooks/elasticsearch.py
+++ b/airflow/providers/elasticsearch/hooks/elasticsearch.py
@@ -29,6 +29,7 @@ class ElasticsearchHook(DbApiHook):
 
     conn_name_attr = 'elasticsearch_conn_id'
     default_conn_name = 'elasticsearch_default'
+    conn_type = 'elasticsearch'
 
     def __init__(self, schema: str = "http", connection: 
Optional[AirflowConnection] = None, *args, **kwargs):
         super().__init__(*args, **kwargs)
diff --git a/airflow/providers/elasticsearch/provider.yaml 
b/airflow/providers/elasticsearch/provider.yaml
index 079d724..7802a2e 100644
--- a/airflow/providers/elasticsearch/provider.yaml
+++ b/airflow/providers/elasticsearch/provider.yaml
@@ -32,3 +32,6 @@ hooks:
   - integration-name: Elasticsearch
     python-modules:
       - airflow.providers.elasticsearch.hooks.elasticsearch
+
+hook-class-names:
+  - airflow.providers.elasticsearch.hooks.elasticsearch.ElasticsearchHook
diff --git a/airflow/providers/exasol/hooks/exasol.py 
b/airflow/providers/exasol/hooks/exasol.py
index 935eb8b..4ff8649 100644
--- a/airflow/providers/exasol/hooks/exasol.py
+++ b/airflow/providers/exasol/hooks/exasol.py
@@ -38,6 +38,7 @@ class ExasolHook(DbApiHook):
 
     conn_name_attr = 'exasol_conn_id'
     default_conn_name = 'exasol_default'
+    conn_type = 'exasol'
     supports_autocommit = True
 
     def __init__(self, *args, **kwargs) -> None:
diff --git a/airflow/providers/exasol/provider.yaml 
b/airflow/providers/exasol/provider.yaml
index ae13837..1ea9254 100644
--- a/airflow/providers/exasol/provider.yaml
+++ b/airflow/providers/exasol/provider.yaml
@@ -38,3 +38,6 @@ hooks:
   - integration-name: Exasol
     python-modules:
       - airflow.providers.exasol.hooks.exasol
+
+hook-class-names:
+  - airflow.providers.exasol.hooks.exasol.ExasolHook
diff --git a/airflow/providers/google/cloud/hooks/bigquery.py 
b/airflow/providers/google/cloud/hooks/bigquery.py
index 95434c2..bfdc4bf 100644
--- a/airflow/providers/google/cloud/hooks/bigquery.py
+++ b/airflow/providers/google/cloud/hooks/bigquery.py
@@ -67,7 +67,9 @@ BigQueryJob = Union[CopyJob, QueryJob, LoadJob, ExtractJob]
 class BigQueryHook(GoogleBaseHook, DbApiHook):
     """Interact with BigQuery. This hook uses the Google Cloud connection."""
 
-    conn_name_attr = 'gcp_conn_id'  # type: str
+    conn_name_attr = 'gcp_conn_id'
+    default_conn_name = 'google_cloud_default'
+    conn_type = 'google_cloud_platform'
 
     def __init__(
         self,
diff --git a/airflow/providers/google/cloud/hooks/cloud_sql.py 
b/airflow/providers/google/cloud/hooks/cloud_sql.py
index dca25e9..023524c 100644
--- a/airflow/providers/google/cloud/hooks/cloud_sql.py
+++ b/airflow/providers/google/cloud/hooks/cloud_sql.py
@@ -709,6 +709,11 @@ class CloudSQLDatabaseHook(BaseHook):  # noqa
            in the connection URL
     :type default_gcp_project_id: str
     """
+
+    conn_name_attr = 'gcp_cloudsql_conn_id'
+    default_conn_name = 'google_cloud_sql_default'
+    conn_type = 'gcpcloudsql'
+
     _conn = None  # type: Optional[Any]
 
     def __init__(
@@ -735,7 +740,7 @@ class CloudSQLDatabaseHook(BaseHook):  # noqa
         self.user = self.cloudsql_connection.login  # type: Optional[str]
         self.password = self.cloudsql_connection.password  # type: 
Optional[str]
         self.public_ip = self.cloudsql_connection.host  # type: Optional[str]
-        self.public_port = self.cloudsql_connection.port  # type: Optional[str]
+        self.public_port = self.cloudsql_connection.port  # type: Optional[int]
         self.sslcert = self.extras.get('sslcert')  # type: Optional[str]
         self.sslkey = self.extras.get('sslkey')  # type: Optional[str]
         self.sslrootcert = self.extras.get('sslrootcert')  # type: 
Optional[str]
diff --git a/airflow/providers/google/cloud/hooks/compute_ssh.py 
b/airflow/providers/google/cloud/hooks/compute_ssh.py
index fe6ad43..2d33c62 100644
--- a/airflow/providers/google/cloud/hooks/compute_ssh.py
+++ b/airflow/providers/google/cloud/hooks/compute_ssh.py
@@ -89,6 +89,10 @@ class ComputeEngineSSHHook(SSHHook):
     :type delegate_to: str
     """
 
+    conn_name_attr = 'gcp_conn_id'
+    default_conn_name = 'google_cloud_default'
+    conn_type = 'gcpssh'
+
     def __init__(  # pylint: disable=too-many-arguments
         self,
         gcp_conn_id: str = 'google_cloud_default',
diff --git a/airflow/providers/google/cloud/hooks/dataprep.py 
b/airflow/providers/google/cloud/hooks/dataprep.py
index d5c8ab0..a9c969f 100644
--- a/airflow/providers/google/cloud/hooks/dataprep.py
+++ b/airflow/providers/google/cloud/hooks/dataprep.py
@@ -37,7 +37,11 @@ class GoogleDataprepHook(BaseHook):
 
     """
 
-    def __init__(self, dataprep_conn_id: str = "dataprep_default") -> None:
+    conn_name_attr = 'dataprep_conn_id'
+    default_conn_name = 'dataprep_default'
+    conn_type = 'dataprep'
+
+    def __init__(self, dataprep_conn_id: str = default_conn_name) -> None:
         super().__init__()
         self.dataprep_conn_id = dataprep_conn_id
         conn = self.get_connection(self.dataprep_conn_id)
diff --git a/airflow/providers/google/provider.yaml 
b/airflow/providers/google/provider.yaml
index 5acc424..469134d 100644
--- a/airflow/providers/google/provider.yaml
+++ b/airflow/providers/google/provider.yaml
@@ -630,3 +630,9 @@ transfers:
   - source-integration-name: Google Ads
     target-integration-name: Google Cloud Storage (GCS)
     python-module: airflow.providers.google.ads.transfers.ads_to_gcs
+
+hook-class-names:
+  - airflow.providers.google.cloud.hooks.dataprep.GoogleDataprepHook
+  - airflow.providers.google.cloud.hooks.cloud_sql.CloudSQLDatabaseHook
+  - airflow.providers.google.cloud.hooks.compute_ssh.ComputeEngineSSHHook
+  - airflow.providers.google.cloud.hooks.bigquery.BigQueryHook
diff --git a/airflow/providers/grpc/hooks/grpc.py 
b/airflow/providers/grpc/hooks/grpc.py
index ccfbd8f..13b60ca 100644
--- a/airflow/providers/grpc/hooks/grpc.py
+++ b/airflow/providers/grpc/hooks/grpc.py
@@ -47,9 +47,13 @@ class GrpcHook(BaseHook):
         its only arg. Could be partial or lambda.
     """
 
+    conn_name_attr = 'grpc_conn_id'
+    default_conn_name = 'grpc_default'
+    conn_type = 'grpc'
+
     def __init__(
         self,
-        grpc_conn_id: str,
+        grpc_conn_id: str = default_conn_name,
         interceptors: Optional[List[Callable]] = None,
         custom_connection_func: Optional[Callable] = None,
     ) -> None:
diff --git a/airflow/providers/grpc/provider.yaml 
b/airflow/providers/grpc/provider.yaml
index a607e97..8a06f79 100644
--- a/airflow/providers/grpc/provider.yaml
+++ b/airflow/providers/grpc/provider.yaml
@@ -37,3 +37,6 @@ hooks:
   - integration-name: gRPC
     python-modules:
       - airflow.providers.grpc.hooks.grpc
+
+hook-class-names:
+  - airflow.providers.grpc.hooks.grpc.GrpcHook
diff --git a/airflow/providers/imap/hooks/imap.py 
b/airflow/providers/imap/hooks/imap.py
index 926db52..8e7236f 100644
--- a/airflow/providers/imap/hooks/imap.py
+++ b/airflow/providers/imap/hooks/imap.py
@@ -42,7 +42,11 @@ class ImapHook(BaseHook):
     :type imap_conn_id: str
     """
 
-    def __init__(self, imap_conn_id: str = 'imap_default') -> None:
+    conn_name_attr = 'imap_conn_id'
+    default_conn_name = 'imap_default'
+    conn_type = 'imap'
+
+    def __init__(self, imap_conn_id: str = default_conn_name) -> None:
         super().__init__()
         self.imap_conn_id = imap_conn_id
         self.mail_client: Optional[imaplib.IMAP4_SSL] = None
diff --git a/airflow/providers/imap/provider.yaml 
b/airflow/providers/imap/provider.yaml
index 7616c64..e91a899 100644
--- a/airflow/providers/imap/provider.yaml
+++ b/airflow/providers/imap/provider.yaml
@@ -38,3 +38,6 @@ hooks:
   - integration-name: Internet Message Access Protocol (IMAP)
     python-modules:
       - airflow.providers.imap.hooks.imap
+
+hook-class-names:
+  - airflow.providers.imap.hooks.imap.ImapHook
diff --git a/airflow/providers/jdbc/hooks/jdbc.py 
b/airflow/providers/jdbc/hooks/jdbc.py
index 48086d2..8f837f8 100644
--- a/airflow/providers/jdbc/hooks/jdbc.py
+++ b/airflow/providers/jdbc/hooks/jdbc.py
@@ -35,6 +35,7 @@ class JdbcHook(DbApiHook):
 
     conn_name_attr = 'jdbc_conn_id'
     default_conn_name = 'jdbc_default'
+    conn_type = 'jdbc'
     supports_autocommit = True
 
     def get_conn(self) -> jaydebeapi.Connection:
diff --git a/airflow/providers/jdbc/provider.yaml 
b/airflow/providers/jdbc/provider.yaml
index dac49d3..91a291c 100644
--- a/airflow/providers/jdbc/provider.yaml
+++ b/airflow/providers/jdbc/provider.yaml
@@ -38,3 +38,6 @@ hooks:
   - integration-name: Java Database Connectivity (JDBC)
     python-modules:
       - airflow.providers.jdbc.hooks.jdbc
+
+hook-class-names:
+  - airflow.providers.jdbc.hooks.jdbc.JdbcHook
diff --git a/airflow/providers/jira/hooks/jira.py 
b/airflow/providers/jira/hooks/jira.py
index daf573f..d9faaef 100644
--- a/airflow/providers/jira/hooks/jira.py
+++ b/airflow/providers/jira/hooks/jira.py
@@ -33,7 +33,11 @@ class JiraHook(BaseHook):
     :type jira_conn_id: str
     """
 
-    def __init__(self, jira_conn_id: str = 'jira_default', proxies: 
Optional[Any] = None) -> None:
+    default_conn_name = 'jira_default'
+    conn_type = "jira"
+    conn_name_attr = "jira_conn_id"
+
+    def __init__(self, jira_conn_id: str = default_conn_name, proxies: 
Optional[Any] = None) -> None:
         super().__init__()
         self.jira_conn_id = jira_conn_id
         self.proxies = proxies
diff --git a/airflow/providers/jira/provider.yaml 
b/airflow/providers/jira/provider.yaml
index 9018b82..34c4855 100644
--- a/airflow/providers/jira/provider.yaml
+++ b/airflow/providers/jira/provider.yaml
@@ -43,3 +43,6 @@ hooks:
   - integration-name: Atlassian Jira
     python-modules:
       - airflow.providers.jira.hooks.jira
+
+hook-class-names:
+  - airflow.providers.jira.hooks.jira.JiraHook
diff --git a/airflow/providers/microsoft/azure/hooks/azure_batch.py 
b/airflow/providers/microsoft/azure/hooks/azure_batch.py
index 0102eac..2658786 100644
--- a/airflow/providers/microsoft/azure/hooks/azure_batch.py
+++ b/airflow/providers/microsoft/azure/hooks/azure_batch.py
@@ -37,7 +37,11 @@ class AzureBatchHook(BaseHook):
     The account url should be in extra parameter as account_url
     """
 
-    def __init__(self, azure_batch_conn_id: str = 'azure_batch_default') -> 
None:
+    conn_name_attr = 'azure_batch_conn_id'
+    default_conn_name = 'azure_batch_default'
+    conn_type = 'azure_batch'
+
+    def __init__(self, azure_batch_conn_id: str = default_conn_name) -> None:
         super().__init__()
         self.conn_id = azure_batch_conn_id
         self.connection = self.get_conn()
diff --git a/airflow/providers/microsoft/azure/hooks/azure_cosmos.py 
b/airflow/providers/microsoft/azure/hooks/azure_cosmos.py
index c73e0e4..bcb1de2 100644
--- a/airflow/providers/microsoft/azure/hooks/azure_cosmos.py
+++ b/airflow/providers/microsoft/azure/hooks/azure_cosmos.py
@@ -45,7 +45,11 @@ class AzureCosmosDBHook(BaseHook):
     :type azure_cosmos_conn_id: str
     """
 
-    def __init__(self, azure_cosmos_conn_id: str = 'azure_cosmos_default') -> 
None:
+    conn_name_attr = 'azure_cosmos_conn_id'
+    default_conn_name = 'azure_cosmos_default'
+    conn_type = 'azure_cosmos'
+
+    def __init__(self, azure_cosmos_conn_id: str = default_conn_name) -> None:
         super().__init__()
         self.conn_id = azure_cosmos_conn_id
         self._conn = None
diff --git a/airflow/providers/microsoft/azure/hooks/azure_data_lake.py 
b/airflow/providers/microsoft/azure/hooks/azure_data_lake.py
index 7d1974e..6ff0f3a 100644
--- a/airflow/providers/microsoft/azure/hooks/azure_data_lake.py
+++ b/airflow/providers/microsoft/azure/hooks/azure_data_lake.py
@@ -43,7 +43,11 @@ class AzureDataLakeHook(BaseHook):
     :type azure_data_lake_conn_id: str
     """
 
-    def __init__(self, azure_data_lake_conn_id: str = 
'azure_data_lake_default') -> None:
+    conn_name_attr = 'azure_data_lake_conn_id'
+    default_conn_name = 'azure_data_lake_default'
+    conn_type = 'azure_data_lake'
+
+    def __init__(self, azure_data_lake_conn_id: str = default_conn_name) -> 
None:
         super().__init__()
         self.conn_id = azure_data_lake_conn_id
         self._conn: Optional[core.AzureDLFileSystem] = None
diff --git a/airflow/providers/microsoft/azure/hooks/wasb.py 
b/airflow/providers/microsoft/azure/hooks/wasb.py
index e72c1ec..c337fce 100644
--- a/airflow/providers/microsoft/azure/hooks/wasb.py
+++ b/airflow/providers/microsoft/azure/hooks/wasb.py
@@ -54,7 +54,11 @@ class WasbHook(BaseHook):
     :type wasb_conn_id: str
     """
 
-    def __init__(self, wasb_conn_id: str = 'wasb_default') -> None:
+    conn_name_attr = 'wasb_conn_id'
+    default_conn_name = 'wasb_default'
+    conn_type = 'wasb'
+
+    def __init__(self, wasb_conn_id: str = default_conn_name) -> None:
         super().__init__()
         self.conn_id = wasb_conn_id
         self.connection = self.get_conn()
diff --git a/airflow/providers/microsoft/azure/provider.yaml 
b/airflow/providers/microsoft/azure/provider.yaml
index fa2a210..9ceb034 100644
--- a/airflow/providers/microsoft/azure/provider.yaml
+++ b/airflow/providers/microsoft/azure/provider.yaml
@@ -124,3 +124,9 @@ transfers:
     target-integration-name: Google Cloud Storage (GCS)
     how-to-guide: 
/docs/howto/operator/microsoft/transfer/blob_storage_to_gcs.rst
     python-module: 
airflow.providers.microsoft.azure.transfers.azure_blob_to_gcs
+
+hook-class-names:
+  - airflow.providers.microsoft.azure.hooks.azure_batch.AzureBatchHook
+  - airflow.providers.microsoft.azure.hooks.azure_cosmos.AzureCosmosDBHook
+  - airflow.providers.microsoft.azure.hooks.azure_data_lake.AzureDataLakeHook
+  - airflow.providers.microsoft.azure.hooks.wasb.WasbHook
diff --git a/airflow/providers/microsoft/mssql/hooks/mssql.py 
b/airflow/providers/microsoft/mssql/hooks/mssql.py
index 3e5ec6a..e3f7362 100644
--- a/airflow/providers/microsoft/mssql/hooks/mssql.py
+++ b/airflow/providers/microsoft/mssql/hooks/mssql.py
@@ -17,8 +17,6 @@
 # under the License.
 """Microsoft SQLServer hook module"""
 
-import warnings
-
 import pymssql
 
 from airflow.hooks.dbapi_hook import DbApiHook
@@ -29,18 +27,10 @@ class MsSqlHook(DbApiHook):
 
     conn_name_attr = 'mssql_conn_id'
     default_conn_name = 'mssql_default'
+    conn_type = 'mssql'
     supports_autocommit = True
 
     def __init__(self, *args, **kwargs) -> None:
-        warnings.warn(
-            (
-                "This class is deprecated and will be removed in Airflow 
2.0.\n"
-                "pymssql is discontinued.  See 
https://github.com/pymssql/pymssql/issues/668.\n";
-                "Please use `airflow.providers.odbc.hooks.odbc.OdbcHook`"
-            ),
-            DeprecationWarning,
-            stacklevel=2,
-        )
         super().__init__(*args, **kwargs)
         self.schema = kwargs.pop("schema", None)
 
diff --git a/airflow/providers/microsoft/mssql/provider.yaml 
b/airflow/providers/microsoft/mssql/provider.yaml
index b6930e3..36dc315 100644
--- a/airflow/providers/microsoft/mssql/provider.yaml
+++ b/airflow/providers/microsoft/mssql/provider.yaml
@@ -37,3 +37,6 @@ hooks:
   - integration-name: Microsoft SQL Server (MSSQL)
     python-modules:
       - airflow.providers.microsoft.mssql.hooks.mssql
+
+hook-class-names:
+  - airflow.providers.microsoft.mssql.hooks.mssql.MsSqlHook
diff --git a/airflow/providers/mongo/hooks/mongo.py 
b/airflow/providers/mongo/hooks/mongo.py
index d1fcc0b..3eb0636 100644
--- a/airflow/providers/mongo/hooks/mongo.py
+++ b/airflow/providers/mongo/hooks/mongo.py
@@ -40,9 +40,11 @@ class MongoHook(BaseHook):
         {"srv": true, "replicaSet": "test", "ssl": true, "connectTimeoutMS": 
30000}
     """
 
+    conn_name_attr = 'conn_id'
+    default_conn_name = 'mongo_default'
     conn_type = 'mongo'
 
-    def __init__(self, conn_id: str = 'mongo_default', *args, **kwargs) -> 
None:
+    def __init__(self, conn_id: str = default_conn_name, *args, **kwargs) -> 
None:
 
         super().__init__()
         self.mongo_conn_id = conn_id
diff --git a/airflow/providers/mongo/provider.yaml 
b/airflow/providers/mongo/provider.yaml
index b2e5b90..3ae1561 100644
--- a/airflow/providers/mongo/provider.yaml
+++ b/airflow/providers/mongo/provider.yaml
@@ -37,3 +37,6 @@ hooks:
   - integration-name: MongoDB
     python-modules:
       - airflow.providers.mongo.hooks.mongo
+
+hook-class-names:
+  - airflow.providers.mongo.hooks.mongo.MongoHook
diff --git a/airflow/providers/mysql/hooks/mysql.py 
b/airflow/providers/mysql/hooks/mysql.py
index 5eaa19d..998e1ef 100644
--- a/airflow/providers/mysql/hooks/mysql.py
+++ b/airflow/providers/mysql/hooks/mysql.py
@@ -41,6 +41,7 @@ class MySqlHook(DbApiHook):
 
     conn_name_attr = 'mysql_conn_id'
     default_conn_name = 'mysql_default'
+    conn_type = 'mysql'
     supports_autocommit = True
 
     def __init__(self, *args, **kwargs) -> None:
diff --git a/airflow/providers/mysql/provider.yaml 
b/airflow/providers/mysql/provider.yaml
index e696187..b5a9d98 100644
--- a/airflow/providers/mysql/provider.yaml
+++ b/airflow/providers/mysql/provider.yaml
@@ -51,3 +51,6 @@ transfers:
   - source-integration-name: Snowflake
     target-integration-name: MySQL
     python-module: airflow.providers.mysql.transfers.presto_to_mysql
+
+hook-class-names:
+  - airflow.providers.mysql.hooks.mysql.MySqlHook
diff --git a/airflow/providers/odbc/hooks/odbc.py 
b/airflow/providers/odbc/hooks/odbc.py
index 43dee05..89426a5 100644
--- a/airflow/providers/odbc/hooks/odbc.py
+++ b/airflow/providers/odbc/hooks/odbc.py
@@ -35,6 +35,7 @@ class OdbcHook(DbApiHook):
     DEFAULT_SQLALCHEMY_SCHEME = 'mssql+pyodbc'
     conn_name_attr = 'odbc_conn_id'
     default_conn_name = 'odbc_default'
+    conn_type = 'odbc'
     supports_autocommit = True
 
     def __init__(
diff --git a/airflow/providers/odbc/provider.yaml 
b/airflow/providers/odbc/provider.yaml
index f3e2420..907b0c1 100644
--- a/airflow/providers/odbc/provider.yaml
+++ b/airflow/providers/odbc/provider.yaml
@@ -32,3 +32,6 @@ hooks:
   - integration-name: ODBC
     python-modules:
       - airflow.providers.odbc.hooks.odbc
+
+hook-class-names:
+  - airflow.providers.odbc.hooks.odbc.OdbcHook
diff --git a/airflow/providers/oracle/hooks/oracle.py 
b/airflow/providers/oracle/hooks/oracle.py
index 1acc284..7a993c4 100644
--- a/airflow/providers/oracle/hooks/oracle.py
+++ b/airflow/providers/oracle/hooks/oracle.py
@@ -30,6 +30,7 @@ class OracleHook(DbApiHook):
 
     conn_name_attr = 'oracle_conn_id'
     default_conn_name = 'oracle_default'
+    conn_type = 'oracle'
     supports_autocommit = False
 
     # pylint: disable=c-extension-no-member
diff --git a/airflow/providers/oracle/provider.yaml 
b/airflow/providers/oracle/provider.yaml
index f3695fe..11f1124 100644
--- a/airflow/providers/oracle/provider.yaml
+++ b/airflow/providers/oracle/provider.yaml
@@ -32,11 +32,16 @@ operators:
   - integration-name: Oracle
     python-modules:
       - airflow.providers.oracle.operators.oracle
+
 hooks:
   - integration-name: Oracle
     python-modules:
       - airflow.providers.oracle.hooks.oracle
+
 transfers:
   - source-integration-name: Oracle
     target-integration-name: Oracle
     python-module: airflow.providers.oracle.transfers.oracle_to_oracle
+
+hook-class-names:
+  - airflow.providers.oracle.hooks.oracle.OracleHook
diff --git a/airflow/providers/postgres/hooks/postgres.py 
b/airflow/providers/postgres/hooks/postgres.py
index b207edb..6b5054d 100644
--- a/airflow/providers/postgres/hooks/postgres.py
+++ b/airflow/providers/postgres/hooks/postgres.py
@@ -57,6 +57,7 @@ class PostgresHook(DbApiHook):
 
     conn_name_attr = 'postgres_conn_id'
     default_conn_name = 'postgres_default'
+    conn_type = 'postgres'
     supports_autocommit = True
 
     def __init__(self, *args, **kwargs) -> None:
diff --git a/airflow/providers/postgres/provider.yaml 
b/airflow/providers/postgres/provider.yaml
index 55485c4..cc59227 100644
--- a/airflow/providers/postgres/provider.yaml
+++ b/airflow/providers/postgres/provider.yaml
@@ -38,3 +38,6 @@ hooks:
   - integration-name: PostgreSQL
     python-modules:
       - airflow.providers.postgres.hooks.postgres
+
+hook-class-names:
+  - airflow.providers.postgres.hooks.postgres.PostgresHook
diff --git a/airflow/providers/presto/hooks/presto.py 
b/airflow/providers/presto/hooks/presto.py
index a5a0576..045b7b5 100644
--- a/airflow/providers/presto/hooks/presto.py
+++ b/airflow/providers/presto/hooks/presto.py
@@ -55,6 +55,7 @@ class PrestoHook(DbApiHook):
 
     conn_name_attr = 'presto_conn_id'
     default_conn_name = 'presto_default'
+    conn_type = 'presto'
 
     def get_conn(self) -> Connection:
         """Returns a connection object"""
diff --git a/airflow/providers/presto/provider.yaml 
b/airflow/providers/presto/provider.yaml
index 8e8dccc..6896087 100644
--- a/airflow/providers/presto/provider.yaml
+++ b/airflow/providers/presto/provider.yaml
@@ -32,3 +32,6 @@ hooks:
   - integration-name: Presto
     python-modules:
       - airflow.providers.presto.hooks.presto
+
+hook-class-names:
+  - airflow.providers.presto.hooks.presto.PrestoHook
diff --git a/airflow/providers/redis/hooks/redis.py 
b/airflow/providers/redis/hooks/redis.py
index 340da60..cc6b351 100644
--- a/airflow/providers/redis/hooks/redis.py
+++ b/airflow/providers/redis/hooks/redis.py
@@ -31,7 +31,11 @@ class RedisHook(BaseHook):
     ``{"ssl": true, "ssl_cert_reqs": "require", "ssl_cert_file": 
"/path/to/cert.pem", etc}``.
     """
 
-    def __init__(self, redis_conn_id: str = 'redis_default') -> None:
+    conn_name_attr = 'redis_conn_id'
+    default_conn_name = 'redis_default'
+    conn_type = 'redis'
+
+    def __init__(self, redis_conn_id: str = default_conn_name) -> None:
         """
         Prepares hook to connect to a Redis database.
 
diff --git a/airflow/providers/redis/provider.yaml 
b/airflow/providers/redis/provider.yaml
index 2a6df5b..2b74a34 100644
--- a/airflow/providers/redis/provider.yaml
+++ b/airflow/providers/redis/provider.yaml
@@ -44,3 +44,6 @@ hooks:
   - integration-name: Redis
     python-modules:
       - airflow.providers.redis.hooks.redis
+
+hook-class-names:
+  - airflow.providers.redis.hooks.redis.RedisHook
diff --git a/airflow/providers/salesforce/hooks/tableau.py 
b/airflow/providers/salesforce/hooks/tableau.py
index bd47d10..56d812e 100644
--- a/airflow/providers/salesforce/hooks/tableau.py
+++ b/airflow/providers/salesforce/hooks/tableau.py
@@ -51,7 +51,11 @@ class TableauHook(BaseHook):
     :type tableau_conn_id: str
     """
 
-    def __init__(self, site_id: Optional[str] = None, tableau_conn_id: str = 
'tableau_default') -> None:
+    conn_name_attr = 'tableau_conn_id'
+    default_conn_name = 'tableau_default'
+    conn_type = 'tableau'
+
+    def __init__(self, site_id: Optional[str] = None, tableau_conn_id: str = 
default_conn_name) -> None:
         super().__init__()
         self.tableau_conn_id = tableau_conn_id
         self.conn = self.get_connection(self.tableau_conn_id)
diff --git a/airflow/providers/salesforce/provider.yaml 
b/airflow/providers/salesforce/provider.yaml
index 2651c43..ebc907e 100644
--- a/airflow/providers/salesforce/provider.yaml
+++ b/airflow/providers/salesforce/provider.yaml
@@ -44,3 +44,6 @@ hooks:
     python-modules:
       - airflow.providers.salesforce.hooks.salesforce
       - airflow.providers.salesforce.hooks.tableau
+
+hook-class-names:
+  - airflow.providers.salesforce.hooks.tableau.TableauHook
diff --git a/airflow/providers/snowflake/hooks/snowflake.py 
b/airflow/providers/snowflake/hooks/snowflake.py
index 6579f44..33eaf72 100644
--- a/airflow/providers/snowflake/hooks/snowflake.py
+++ b/airflow/providers/snowflake/hooks/snowflake.py
@@ -35,6 +35,7 @@ class SnowflakeHook(DbApiHook):
 
     conn_name_attr = 'snowflake_conn_id'
     default_conn_name = 'snowflake_default'
+    conn_type = 'snowflake'
     supports_autocommit = True
 
     def __init__(self, *args, **kwargs) -> None:
diff --git a/airflow/providers/snowflake/provider.yaml 
b/airflow/providers/snowflake/provider.yaml
index b306145..5948338 100644
--- a/airflow/providers/snowflake/provider.yaml
+++ b/airflow/providers/snowflake/provider.yaml
@@ -48,3 +48,6 @@ transfers:
   - source-integration-name: Snowflake
     target-integration-name: Slack
     python-module: airflow.providers.snowflake.transfers.snowflake_to_slack
+
+hook-class-names:
+  - airflow.providers.snowflake.hooks.snowflake.SnowflakeHook
diff --git a/airflow/providers/sqlite/hooks/sqlite.py 
b/airflow/providers/sqlite/hooks/sqlite.py
index 67b9273..c1a41c6 100644
--- a/airflow/providers/sqlite/hooks/sqlite.py
+++ b/airflow/providers/sqlite/hooks/sqlite.py
@@ -26,6 +26,7 @@ class SqliteHook(DbApiHook):
 
     conn_name_attr = 'sqlite_conn_id'
     default_conn_name = 'sqlite_default'
+    conn_type = 'sqlite'
 
     def get_conn(self) -> sqlite3.dbapi2.Connection:
         """Returns a sqlite connection object"""
diff --git a/airflow/providers/sqlite/provider.yaml 
b/airflow/providers/sqlite/provider.yaml
index 326d8f1..5558620 100644
--- a/airflow/providers/sqlite/provider.yaml
+++ b/airflow/providers/sqlite/provider.yaml
@@ -38,3 +38,6 @@ hooks:
   - integration-name: SQLite
     python-modules:
       - airflow.providers.sqlite.hooks.sqlite
+
+hook-class-names:
+  - airflow.providers.sqlite.hooks.sqlite.SqliteHook
diff --git a/airflow/providers/vertica/hooks/vertica.py 
b/airflow/providers/vertica/hooks/vertica.py
index acb86a5..071409d 100644
--- a/airflow/providers/vertica/hooks/vertica.py
+++ b/airflow/providers/vertica/hooks/vertica.py
@@ -27,6 +27,7 @@ class VerticaHook(DbApiHook):
 
     conn_name_attr = 'vertica_conn_id'
     default_conn_name = 'vertica_default'
+    conn_type = 'vertica'
     supports_autocommit = True
 
     def get_conn(self) -> connect:
diff --git a/airflow/providers/vertica/provider.yaml 
b/airflow/providers/vertica/provider.yaml
index 4ddb8c7..a63fdc4 100644
--- a/airflow/providers/vertica/provider.yaml
+++ b/airflow/providers/vertica/provider.yaml
@@ -38,3 +38,6 @@ hooks:
   - integration-name: Vertica
     python-modules:
       - airflow.providers.vertica.hooks.vertica
+
+hook-class-names:
+  - airflow.providers.vertica.hooks.vertica.VerticaHook
diff --git a/airflow/providers_manager.py b/airflow/providers_manager.py
index 44821f7..f30d433 100644
--- a/airflow/providers_manager.py
+++ b/airflow/providers_manager.py
@@ -17,6 +17,7 @@
 # under the License.
 """Manages all providers."""
 import fnmatch
+import importlib
 import json
 import logging
 import os
@@ -26,6 +27,8 @@ from typing import Dict, Tuple
 import jsonschema
 import yaml
 
+from airflow.utils.entry_points import entry_points_with_dist
+
 try:
     import importlib.resources as importlib_resources
 except ImportError:
@@ -60,8 +63,11 @@ class ProvidersManager:
         return cls._instance
 
     def __init__(self):
-        # Keeps list of providers keyed by module name and value is Tuple: 
version, provider_info
+        # Keeps dict of providers keyed by module name and value is Tuple: 
version, provider_info
         self._provider_dict: Dict[str, Tuple[str, Dict]] = {}
+        # Keeps dict of hooks keyed by connection type and value is
+        # Tuple: connection class, connection_id_attribute_name
+        self._hooks_dict: Dict[str, Tuple[str, str]] = {}
         self._validator = _create_validator()
         # Local source folders are loaded first. They should take precedence 
over the package ones for
         # Development purpose. In production provider.yaml files are not 
present in the 'airflow" directory
@@ -69,19 +75,9 @@ class ProvidersManager:
         # in case of local development
         self._discover_all_airflow_builtin_providers_from_local_sources()
         self._discover_all_providers_from_packages()
-        self._sort_provider_dictionary()
-
-    def _sort_provider_dictionary(self):
-        """
-        Sort provider_dictionary using OrderedDict.
-
-        The dictionary gets sorted so that when you iterate through it, the 
providers are by
-        default returned in alphabetical order.
-        """
-        sorted_dict = OrderedDict()
-        for provider_name in sorted(self._provider_dict.keys()):
-            sorted_dict[provider_name] = self._provider_dict[provider_name]
-        self._provider_dict = sorted_dict
+        self._discover_hooks()
+        self._provider_dict = OrderedDict(sorted(self.providers.items()))
+        self._hooks_dict = OrderedDict(sorted(self.hooks.items()))
 
     def _discover_all_providers_from_packages(self) -> None:
         """
@@ -89,9 +85,7 @@ class ProvidersManager:
         via the 'apache_airflow_provider' entrypoint as a dictionary 
conforming to the
         'airflow/provider.yaml.schema.json' schema.
         """
-        from airflow.plugins_manager import entry_points_with_dist
-
-        for (entry_point, dist) in 
entry_points_with_dist('apache_airflow_provider'):
+        for entry_point, dist in 
entry_points_with_dist('apache_airflow_provider'):
             package_name = dist.metadata['name']
             log.debug("Loading %s from package %s", entry_point, package_name)
             version = dist.version
@@ -101,8 +95,7 @@ class ProvidersManager:
             if package_name != provider_info_package_name:
                 raise Exception(
                     f"The package '{package_name}' from setuptools and "
-                    f"{provider_info_package_name} do not match. Please make 
sure they are"
-                    f"aligned"
+                    f"{provider_info_package_name} do not match. Please make 
sure they are aligned"
                 )
             if package_name not in self._provider_dict:
                 self._provider_dict[package_name] = (version, provider_info)
@@ -171,7 +164,72 @@ class ProvidersManager:
         except Exception as e:  # noqa pylint: disable=broad-except
             log.warning("Error when loading '%s': %s", path, e)
 
+    def _discover_hooks(self) -> None:
+        """Retrieves all connections defined in the providers"""
+        for name, provider in self._provider_dict.items():
+            provider_package = name
+            hook_class_names = provider[1].get("hook-class-names")
+            if hook_class_names:
+                for hook_class_name in hook_class_names:
+                    self._add_hook(hook_class_name, provider_package)
+
+    def _add_hook(self, hook_class_name, provider_package) -> None:
+        """
+        Adds hook class name to list of hooks
+
+        :param hook_class_name: name of the Hook class
+        :param provider_package: provider package adding the hook
+        """
+        if provider_package.startswith("apache-airflow"):
+            provider_path = provider_package[len("apache-") :].replace("-", 
".")
+            if not hook_class_name.startswith(provider_path):
+                log.warning(
+                    "Sanity check failed when importing '%s' from '%s' 
package. It should start with '%s'",
+                    hook_class_name,
+                    provider_package,
+                    provider_path,
+                )
+                return
+        if hook_class_name in self._hooks_dict:
+            log.warning(
+                "The hook_class '%s' has been already registered.",
+                hook_class_name,
+            )
+            return
+        try:
+            module, class_name = hook_class_name.rsplit('.', maxsplit=1)
+            hook_class = getattr(importlib.import_module(module), class_name)
+        except Exception as e:  # noqa pylint: disable=broad-except
+            log.warning(
+                "Exception when importing '%s' from '%s' package: %s",
+                hook_class_name,
+                provider_package,
+                e,
+            )
+            return
+        conn_type = getattr(hook_class, 'conn_type')
+        if not conn_type:
+            log.warning(
+                "The hook_class '%s' is missing connection_type attribute and 
cannot be registered",
+                hook_class,
+            )
+            return
+        connection_id_attribute_name = getattr(hook_class, 'conn_name_attr')
+        if not connection_id_attribute_name:
+            log.warning(
+                "The hook_class '%s' is missing conn_name_attr attribute and 
cannot be registered",
+                hook_class,
+            )
+            return
+
+        self._hooks_dict[conn_type] = (hook_class_name, 
connection_id_attribute_name)
+
     @property
     def providers(self):
         """Returns information about available providers."""
         return self._provider_dict
+
+    @property
+    def hooks(self):
+        """Returns dictionary of connection_type-to-hook mapping"""
+        return self._hooks_dict
diff --git a/airflow/providers/exasol/provider.yaml 
b/airflow/utils/entry_points.py
similarity index 55%
copy from airflow/providers/exasol/provider.yaml
copy to airflow/utils/entry_points.py
index ae13837..062aad8 100644
--- a/airflow/providers/exasol/provider.yaml
+++ b/airflow/utils/entry_points.py
@@ -15,26 +15,21 @@
 # specific language governing permissions and limitations
 # under the License.
 
----
-package-name: apache-airflow-providers-exasol
-name: Exasol
-description: |
-    `Exasol <https://docs.exasol.com/home.htm>`__
+import importlib_metadata
 
-versions:
-  - 1.0.0b2
 
-integrations:
-  - integration-name: Exasol
-    external-doc-url: https://docs.exasol.com/home.htm
-    tags: [software]
+def entry_points_with_dist(group: str):
+    """
+    Return EntryPoint objects of the given group, along with the distribution 
information.
 
-operators:
-  - integration-name: Exasol
-    python-modules:
-      - airflow.providers.exasol.operators.exasol
+    This is like the ``entry_points()`` function from importlib.metadata,
+    except it also returns the distribution the entry_point was loaded from.
 
-hooks:
-  - integration-name: Exasol
-    python-modules:
-      - airflow.providers.exasol.hooks.exasol
+    :param group: FIlter results to only this entrypoint group
+    :return: Generator of (EntryPoint, Distribution) objects for the specified 
groups
+    """
+    for dist in importlib_metadata.distributions():
+        for e in dist.entry_points:
+            if e.group != group:
+                continue
+            yield e, dist
diff --git a/scripts/ci/pre_commit/pre_commit_check_provider_yaml_files.py 
b/scripts/ci/pre_commit/pre_commit_check_provider_yaml_files.py
index 63e918a..5e58b75 100755
--- a/scripts/ci/pre_commit/pre_commit_check_provider_yaml_files.py
+++ b/scripts/ci/pre_commit/pre_commit_check_provider_yaml_files.py
@@ -23,7 +23,7 @@ import textwrap
 from collections import Counter
 from glob import glob
 from itertools import chain, product
-from typing import Any, Dict, Iterable
+from typing import Any, Dict, Iterable, List
 
 import jsonschema
 import yaml
@@ -39,6 +39,8 @@ DOCS_DIR = os.path.join(ROOT_DIR, 'docs')
 PROVIDER_DATA_SCHEMA_PATH = os.path.join(ROOT_DIR, "airflow", 
"provider.yaml.schema.json")
 CORE_INTEGRATIONS = ["SQL", "Local"]
 
+errors = []
+
 
 def _filepath_to_module(filepath: str):
     filepath = os.path.relpath(os.path.abspath(filepath), ROOT_DIR)
@@ -124,21 +126,39 @@ def assert_sets_equal(set1, set2):
     raise AssertionError(standard_msg)
 
 
+def check_if_objects_belongs_to_package(
+    object_names: List[str], provider_package: str, yaml_file_path: str, 
resource_type: str
+):
+    for object_name in object_names:
+        if not object_name.startswith(provider_package):
+            errors.append(
+                f"The `{object_name}` object in {resource_type} list in 
{yaml_file_path} does not start"
+                f" with the expected {provider_package}."
+            )
+
+
+def parse_module_data(provider_data, resource_type, yaml_file_path):
+    package_dir = ROOT_DIR + "/" + os.path.dirname(yaml_file_path)
+    provider_package = os.path.dirname(yaml_file_path).replace(os.sep, ".")
+    py_files = chain(
+        glob(f"{package_dir}/**/{resource_type}/*.py"), 
glob(f"{package_dir}/{resource_type}/*.py")
+    )
+    expected_modules = {_filepath_to_module(f) for f in py_files if not 
f.endswith("/__init__.py")}
+    resource_data = provider_data.get(resource_type, [])
+    return expected_modules, provider_package, resource_data
+
+
 def check_completeness_of_list_of_hooks_sensors_hooks(yaml_files: Dict[str, 
Dict]):
     print("Checking completeness of list of {sensors, hooks, operators}")
-    errors = []
     for (yaml_file_path, provider_data), resource_type in product(
         yaml_files.items(), ["sensors", "operators", "hooks"]
     ):
-        package_dir = ROOT_DIR + "/" + os.path.dirname(yaml_file_path)
-        py_files = chain(
-            glob(f"{package_dir}/**/{resource_type}/*.py"), 
glob(f"{package_dir}/{resource_type}/*.py")
+        expected_modules, provider_package, resource_data = parse_module_data(
+            provider_data, resource_type, yaml_file_path
         )
-        expected_modules = {_filepath_to_module(f) for f in py_files if not 
f.endswith("/__init__.py")}
-
-        resource_data = provider_data.get(resource_type, [])
 
-        current_modules = {i for r in resource_data for i in 
r.get('python-modules', [])}
+        current_modules = {str(i) for r in resource_data for i in 
r.get('python-modules', [])}
+        check_if_objects_belongs_to_package(current_modules, provider_package, 
yaml_file_path, resource_type)
         try:
             assert_sets_equal(set(expected_modules), set(current_modules))
         except AssertionError as ex:
@@ -147,17 +167,10 @@ def 
check_completeness_of_list_of_hooks_sensors_hooks(yaml_files: Dict[str, Dict
                 f"Incorrect content of key '{resource_type}/python-modules' "
                 f"in file: {yaml_file_path}\n{nested_error}"
             )
-    if errors:
-        print(f"Found {len(errors)} errors")
-        for error in errors:
-            print(error)
-            print()
-        sys.exit(1)
 
 
 def 
check_duplicates_in_integrations_names_of_hooks_sensors_operators(yaml_files: 
Dict[str, Dict]):
     print("Checking for duplicates in list of {sensors, hooks, operators}")
-    errors = []
     for (yaml_file_path, provider_data), resource_type in product(
         yaml_files.items(), ["sensors", "operators", "hooks"]
     ):
@@ -171,28 +184,17 @@ def 
check_duplicates_in_integrations_names_of_hooks_sensors_operators(yaml_files
                         f"in file: {yaml_file_path}"
                     )
 
-    if errors:
-        print(f"Found {len(errors)} errors")
-        for error in errors:
-            print(error)
-            print()
-        sys.exit(1)
-
 
 def check_completeness_of_list_of_transfers(yaml_files: Dict[str, Dict]):
     print("Checking completeness of list of transfers")
-    errors = []
     resource_type = 'transfers'
     for yaml_file_path, provider_data in yaml_files.items():
-        package_dir = ROOT_DIR + "/" + os.path.dirname(yaml_file_path)
-        py_files = chain(
-            glob(f"{package_dir}/**/{resource_type}/*.py"), 
glob(f"{package_dir}/{resource_type}/*.py")
+        expected_modules, provider_package, resource_data = parse_module_data(
+            provider_data, resource_type, yaml_file_path
         )
-        expected_modules = {_filepath_to_module(f) for f in py_files if not 
f.endswith("/__init__.py")}
-
-        resource_data = provider_data.get(resource_type, [])
 
         current_modules = {r.get('python-module') for r in resource_data}
+        check_if_objects_belongs_to_package(current_modules, provider_package, 
yaml_file_path, resource_type)
         try:
             assert_sets_equal(set(expected_modules), set(current_modules))
         except AssertionError as ex:
@@ -201,12 +203,18 @@ def check_completeness_of_list_of_transfers(yaml_files: 
Dict[str, Dict]):
                 f"Incorrect content of key '{resource_type}/python-module' "
                 f"in file: {yaml_file_path}\n{nested_error}"
             )
-    if errors:
-        print(f"Found {len(errors)} errors")
-        for error in errors:
-            print(error)
-            print()
-        sys.exit(1)
+
+
+def check_hook_classes(yaml_files: Dict[str, Dict]):
+    print("Checking connection classes belong to package")
+    resource_type = 'hook-class-names'
+    for yaml_file_path, provider_data in yaml_files.items():
+        provider_package = os.path.dirname(yaml_file_path).replace(os.sep, ".")
+        hook_class_names = provider_data.get(resource_type)
+        if hook_class_names:
+            check_if_objects_belongs_to_package(
+                hook_class_names, provider_package, yaml_file_path, 
resource_type
+            )
 
 
 def check_duplicates_in_list_of_transfers(yaml_files: Dict[str, Dict]):
@@ -230,17 +238,9 @@ def check_duplicates_in_list_of_transfers(yaml_files: 
Dict[str, Dict]):
                         f"in file: {yaml_file_path}"
                     )
 
-    if errors:
-        print(f"Found {len(errors)} errors")
-        for error in errors:
-            print(error)
-            print()
-        sys.exit(1)
-
 
 def check_invalid_integration(yaml_files: Dict[str, Dict]):
     print("Detect unregistered integrations")
-    errors = []
     all_integration_names = set(get_all_integration_names(yaml_files))
 
     for (yaml_file_path, provider_data), resource_type in product(
@@ -267,13 +267,6 @@ def check_invalid_integration(yaml_files: Dict[str, Dict]):
                 f"Invalid values: {invalid_names}"
             )
 
-    if errors:
-        print(f"Found {len(errors)} errors")
-        for error in errors:
-            print(error)
-            print()
-        sys.exit(1)
-
 
 # TODO: Delete after splitting the documentation for each provider.
 DOC_FILES_EXCLUDE_LIST = {
@@ -335,8 +328,16 @@ if __name__ == '__main__':
 
     check_completeness_of_list_of_transfers(all_parsed_yaml_files)
     check_duplicates_in_list_of_transfers(all_parsed_yaml_files)
+    check_hook_classes(all_parsed_yaml_files)
 
     if all_files_loaded:
         # Only check those if all provider files are loaded
         check_doc_files(all_parsed_yaml_files)
         check_invalid_integration(all_parsed_yaml_files)
+
+    if errors:
+        print(f"Found {len(errors)} errors")
+        for error in errors:
+            print(error)
+            print()
+        sys.exit(1)
diff --git a/scripts/in_container/run_install_and_test_provider_packages.sh 
b/scripts/in_container/run_install_and_test_provider_packages.sh
index 8c0a2b2..b151505 100755
--- a/scripts/in_container/run_install_and_test_provider_packages.sh
+++ b/scripts/in_container/run_install_and_test_provider_packages.sh
@@ -93,6 +93,26 @@ function discover_all_provider_packages() {
     fi
 }
 
+function discover_all_hooks() {
+    echo
+    echo Listing available hooks via 'airflow providers hooks'
+    echo
+
+    airflow providers hooks
+
+    local expected_number_of_hooks=33
+    local actual_number_of_hooks
+    actual_number_of_hooks=$(airflow providers hooks --output simple | grep -c 
conn_id | xargs)
+    if [[ ${actual_number_of_hooks} != "${expected_number_of_hooks}" ]]; then
+        >&2 echo "ERROR! Number of hooks registered is wrong!"
+        >&2 echo "Expected number was '${expected_number_of_hooks}' and got 
'${actual_number_of_hooks}'"
+        >&2 echo
+        >&2 echo "Either increase the number of hooks if you added one or fix 
problem with imports if you see one."
+        >&2 echo
+    fi
+}
+
 if [[ ${BACKPORT_PACKAGES} != "true" ]]; then
     discover_all_provider_packages
+    discover_all_hooks
 fi
diff --git a/tests/core/test_providers_manager.py 
b/tests/core/test_providers_manager.py
index 8459e77..69e2027 100644
--- a/tests/core/test_providers_manager.py
+++ b/tests/core/test_providers_manager.py
@@ -82,6 +82,42 @@ ALL_PROVIDERS = [
     'apache-airflow-providers-zendesk',
 ]
 
+CONNECTIONS_LIST = [
+    'azure_batch',
+    'azure_cosmos',
+    'azure_data_lake',
+    'cassandra',
+    'cloudant',
+    'dataprep',
+    'docker',
+    'elasticsearch',
+    'exasol',
+    'gcpcloudsql',
+    'gcpssh',
+    'google_cloud_platform',
+    'grpc',
+    'hive_cli',
+    'hiveserver2',
+    'imap',
+    'jdbc',
+    'jira',
+    'kubernetes',
+    'mongo',
+    'mssql',
+    'mysql',
+    'odbc',
+    'oracle',
+    'pig_cli',
+    'postgres',
+    'presto',
+    'redis',
+    'snowflake',
+    'sqlite',
+    'tableau',
+    'vertica',
+    'wasb',
+]
+
 
 class TestProviderManager(unittest.TestCase):
     def test_providers_are_loaded(self):
@@ -95,3 +131,8 @@ class TestProviderManager(unittest.TestCase):
             self.assertEqual(package_name, provider)
 
         self.assertEqual(ALL_PROVIDERS, provider_list)
+
+    def test_hooks(self):
+        provider_manager = ProvidersManager()
+        connections_list = list(provider_manager.hooks.keys())
+        self.assertEqual(CONNECTIONS_LIST, connections_list)
diff --git a/tests/models/test_connection.py b/tests/models/test_connection.py
index fbd275d..c43c85c 100644
--- a/tests/models/test_connection.py
+++ b/tests/models/test_connection.py
@@ -28,9 +28,7 @@ from parameterized import parameterized
 from airflow import AirflowException
 from airflow.hooks.base_hook import BaseHook
 from airflow.models import Connection, crypto
-from airflow.models.connection import CONN_TYPE_TO_HOOK
 from airflow.providers.sqlite.hooks.sqlite import SqliteHook
-from airflow.utils.module_loading import import_string
 from tests.test_utils.config import conf_vars
 
 ConnectionParts = namedtuple("ConnectionParts", ["conn_type", "login", 
"password", "host", "port", "schema"])
@@ -549,15 +547,3 @@ class TestConnection(unittest.TestCase):
             ),
         ):
             Connection(conn_id="TEST_ID", uri="mysql://", schema="AAA")
-
-
-class TestConnTypeToHook(unittest.TestCase):
-    def test_enforce_alphabetical_order(self):
-        current_keys = list(CONN_TYPE_TO_HOOK.keys())
-        expected_keys = sorted(current_keys)
-
-        self.assertEqual(expected_keys, current_keys)
-
-    def test_hooks_importable(self):
-        for hook_path, _ in CONN_TYPE_TO_HOOK.values():
-            self.assertTrue(issubclass(import_string(hook_path), BaseHook))

Reply via email to