This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new d47e070  Add HashiCorp Vault Hook (split-out from Vault secret 
backend) (#9333)
d47e070 is described below

commit d47e070a79b574cca043ca9c06f91d47eecb3040
Author: Jarek Potiuk <[email protected]>
AuthorDate: Wed Jun 17 18:04:52 2020 +0200

    Add HashiCorp Vault Hook (split-out from Vault secret backend) (#9333)
    
    Split-off vault hook from vault secret backend
---
 airflow/models/connection.py                       |   3 +-
 .../hashicorp/_internal_client/__init__.py         |  16 +
 .../hashicorp/_internal_client/vault_client.py     | 314 +++++++
 airflow/providers/hashicorp/hooks/__init__.py      |  16 +
 airflow/providers/hashicorp/hooks/vault.py         | 274 ++++++
 airflow/providers/hashicorp/secrets/vault.py       | 162 ++--
 docs/autoapi_templates/index.rst                   |   4 +
 docs/conf.py                                       |   4 +-
 docs/operators-and-hooks-ref.rst                   |  10 +-
 .../hashicorp/_internal_client/__init__.py         |  16 +
 .../_internal_client/test_vault_client.py          | 551 ++++++++++++
 tests/providers/hashicorp/hooks/__init__.py        |  16 +
 tests/providers/hashicorp/hooks/test_vault.py      | 962 +++++++++++++++++++++
 tests/providers/hashicorp/secrets/test_vault.py    |  46 +-
 14 files changed, 2243 insertions(+), 151 deletions(-)

diff --git a/airflow/models/connection.py b/airflow/models/connection.py
index 8234efb..aec100a 100644
--- a/airflow/models/connection.py
+++ b/airflow/models/connection.py
@@ -173,7 +173,8 @@ class Connection(Base, LoggingMixin):
         ('tableau', 'Tableau'),
         ('kubernetes', 'Kubernetes cluster Connection'),
         ('spark', 'Spark'),
-        ('imap', 'IMAP')
+        ('imap', 'IMAP'),
+        ('vault', 'Hashicorp Vault'),
     ]
 
     def __init__(
diff --git a/airflow/providers/hashicorp/_internal_client/__init__.py 
b/airflow/providers/hashicorp/_internal_client/__init__.py
new file mode 100644
index 0000000..13a8339
--- /dev/null
+++ b/airflow/providers/hashicorp/_internal_client/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/hashicorp/_internal_client/vault_client.py 
b/airflow/providers/hashicorp/_internal_client/vault_client.py
new file mode 100644
index 0000000..386199a
--- /dev/null
+++ b/airflow/providers/hashicorp/_internal_client/vault_client.py
@@ -0,0 +1,314 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import List, Optional
+
+import hvac
+from cached_property import cached_property
+from hvac.exceptions import InvalidPath, VaultError
+from requests import Response
+
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+
+class _VaultClient(LoggingMixin):  # pylint: 
disable=too-many-instance-attributes
+    """
+    Retrieves Authenticated client from Hashicorp Vault. This is purely 
internal class promoting
+    authentication code reuse between the Hook and the SecretBackend, it 
should not be used directly in
+    Airflow DAGs. Use VaultBackend for backend integration and Hook in case 
you want to communicate
+    with VaultHook using standard Airflow Connection definition.
+
+    :param url: Base URL for the Vault instance being addressed.
+    :type url: str
+    :param auth_type: Authentication Type for Vault. Default is ``token``. 
Available values are:
+        ('approle', 'github', 'gcp', 'kubernetes', 'ldap', 'token', 'userpass')
+    :type auth_type: str
+    :param mount_point: The "path" the secret engine was mounted on. Default 
is "secret". Note that
+         this mount_point is not used for authentication if authentication is 
done via a
+         different engine.
+    :type mount_point: str
+    :param kv_engine_version: Selects the version of the engine to run (``1`` 
or ``2``, default: ``2``).
+    :type kv_engine_version: int
+    :param token: Authentication token to include in requests sent to Vault
+        (for ``token`` and ``github`` auth_type).
+    :type token: str
+    :param username: Username for Authentication (for ``ldap`` and 
``userpass`` auth_types).
+    :type username: str
+    :param password: Password for Authentication (for ``ldap`` and 
``userpass`` auth_types).
+    :type password: str
+    :param secret_id: Secret ID for Authentication (for ``approle`` auth_type).
+    :type secret_id: str
+    :param role_id: Role ID for Authentication (for ``approle`` auth_type).
+    :type role_id: str
+    :param kubernetes_role: Role for Authentication (for ``kubernetes`` 
auth_type).
+    :type kubernetes_role: str
+    :param kubernetes_jwt_path: Path for kubernetes jwt token (for 
``kubernetes`` auth_type, default:
+        ``/var/run/secrets/kubernetes.io/serviceaccount/token``).
+    :type kubernetes_jwt_path: str
+    :param gcp_key_path: Path to GCP Credential JSON file (for ``gcp`` 
auth_type).
+    :type gcp_key_path: str
+    :param gcp_scopes: Comma-separated string containing GCP scopes (for 
``gcp`` auth_type).
+    :type gcp_scopes: str
+    """
+    def __init__(  # pylint: disable=too-many-arguments
+        self,
+        url: Optional[str] = None,
+        auth_type: str = 'token',
+        mount_point: str = "secret",
+        kv_engine_version: Optional[int] = None,
+        token: Optional[str] = None,
+        username: Optional[str] = None,
+        password: Optional[str] = None,
+        secret_id: Optional[str] = None,
+        role_id: Optional[str] = None,
+        kubernetes_role: Optional[str] = None,
+        kubernetes_jwt_path: Optional[str] = 
'/var/run/secrets/kubernetes.io/serviceaccount/token',
+        gcp_key_path: Optional[str] = None,
+        gcp_scopes: Optional[str] = None,
+        **kwargs
+    ):
+        super().__init__()
+        if kv_engine_version and kv_engine_version not in VALID_KV_VERSIONS:
+            raise VaultError(f"The version is not supported: 
{kv_engine_version}. "
+                             f"It should be one of {VALID_KV_VERSIONS}")
+        if auth_type not in VALID_AUTH_TYPES:
+            raise VaultError(f"The auth_type is not supported: {auth_type}. "
+                             f"It should be one of {VALID_AUTH_TYPES}")
+        if auth_type == "token" and not token:
+            raise VaultError("The 'token' authentication type requires 
'token'")
+        if auth_type == "github" and not token:
+            raise VaultError("The 'github' authentication type requires 
'token'")
+        if auth_type == "approle" and not role_id:
+            raise VaultError("The 'approle' authentication type requires 
'role_id'")
+        if auth_type == "kubernetes":
+            if not kubernetes_role:
+                raise VaultError("The 'kubernetes' authentication type 
requires 'kubernetes_role'")
+            if not kubernetes_jwt_path:
+                raise VaultError("The 'kubernetes' authentication type 
requires 'kubernetes_jwt_path'")
+
+        self.kv_engine_version = kv_engine_version if kv_engine_version else 2
+        self.url = url
+        self.auth_type = auth_type
+        self.kwargs = kwargs
+        self.token = token
+        self.mount_point = mount_point
+        self.username = username
+        self.password = password
+        self.secret_id = secret_id
+        self.role_id = role_id
+        self.kubernetes_role = kubernetes_role
+        self.kubernetes_jwt_path = kubernetes_jwt_path
+        self.gcp_key_path = gcp_key_path
+        self.gcp_scopes = gcp_scopes
+
+    @cached_property
+    def client(self) -> hvac.Client:
+        """
+        Return an authenticated Hashicorp Vault client.
+
+        :rtype: hvac.Client
+        :return: Vault Client
+
+        """
+        _client = hvac.Client(url=self.url, **self.kwargs)
+        if self.auth_type == "approle":
+            self._auth_approle(_client)
+        elif self.auth_type == "gcp":
+            self._auth_gcp(_client)
+        elif self.auth_type == "github":
+            self._auth_github(_client)
+        elif self.auth_type == "kubernetes":
+            self._auth_kubernetes(_client)
+        elif self.auth_type == "ldap":
+            self._auth_ldap(_client)
+        elif self.auth_type == "token":
+            _client.token = self.token
+        elif self.auth_type == "userpass":
+            self._auth_userpass(_client)
+        else:
+            raise VaultError(f"Authentication type '{self.auth_type}' not 
supported")
+
+        if _client.is_authenticated():
+            return _client
+        else:
+            raise VaultError("Vault Authentication Error!")
+
+    def _auth_userpass(self, _client: hvac.Client) -> None:
+        _client.auth_userpass(username=self.username, password=self.password)
+
+    def _auth_ldap(self, _client: hvac.Client) -> None:
+        _client.auth.ldap.login(
+            username=self.username, password=self.password)
+
+    def _auth_kubernetes(self, _client: hvac.Client) -> None:
+        if not self.kubernetes_jwt_path:
+            raise VaultError("The kubernetes_jwt_path should be set here. This 
should not happen.")
+        with open(self.kubernetes_jwt_path) as f:
+            jwt = f.read()
+            _client.auth_kubernetes(role=self.kubernetes_role, jwt=jwt)
+
+    def _auth_github(self, _client: hvac.Client) -> None:
+        _client.auth.github.login(token=self.token)
+
+    def _auth_gcp(self, _client: hvac.Client) -> None:
+        # noinspection PyProtectedMember
+        from airflow.providers.google.cloud.utils.credentials_provider import (
+            get_credentials_and_project_id,
+            _get_scopes
+        )
+        scopes = _get_scopes(self.gcp_scopes)
+        credentials, _ = 
get_credentials_and_project_id(key_path=self.gcp_key_path,
+                                                        scopes=scopes)
+        _client.auth.gcp.configure(credentials=credentials)
+
+    def _auth_approle(self, _client: hvac.Client) -> None:
+        _client.auth_approle(role_id=self.role_id, secret_id=self.secret_id)
+
+    def get_secret(self, secret_path: str, secret_version: Optional[int] = 
None) -> Optional[dict]:
+        """
+        Get secret value from the KV engine.
+
+        :param secret_path: The path of the secret.
+        :type secret_path: str
+        :param secret_version: Specifies the version of Secret to return. If 
not set, the latest
+            version is returned. (Can only be used in case of version 2 of KV).
+        :type secret_version: int
+
+        See 
https://hvac.readthedocs.io/en/stable/usage/secrets_engines/kv_v1.html
+        and 
https://hvac.readthedocs.io/en/stable/usage/secrets_engines/kv_v2.html for 
details.
+
+        :return: secret stored in the vault as a dictionary
+        """
+        try:
+            if self.kv_engine_version == 1:
+                if secret_version:
+                    raise VaultError("Secret version can only be used with 
version 2 of the KV engine")
+                response = self.client.secrets.kv.v1.read_secret(
+                    path=secret_path, mount_point=self.mount_point)
+            else:
+                response = self.client.secrets.kv.v2.read_secret_version(
+                    path=secret_path, mount_point=self.mount_point, 
version=secret_version)
+        except InvalidPath:
+            self.log.debug("Secret not found %s with mount point %s", 
secret_path, self.mount_point)
+            return None
+
+        return_data = response["data"] if self.kv_engine_version == 1 else 
response["data"]["data"]
+        return return_data
+
+    def get_secret_metadata(self, secret_path: str) -> Optional[dict]:
+        """
+        Reads secret metadata (including versions) from the engine. It is only 
valid for KV version 2.
+
+        :param secret_path: The path of the secret.
+        :type secret_path: str
+        :rtype: dict
+        :return: secret metadata. This is a Dict containing metadata for the 
secret.
+
+                 See 
https://hvac.readthedocs.io/en/stable/usage/secrets_engines/kv_v2.html for 
details.
+
+        """
+        if self.kv_engine_version == 1:
+            raise VaultError("Metadata might only be used with version 2 of 
the KV engine.")
+        try:
+            return self.client.secrets.kv.v2.read_secret_metadata(
+                path=secret_path,
+                mount_point=self.mount_point)
+        except InvalidPath:
+            self.log.debug("Secret not found %s with mount point %s", 
secret_path, self.mount_point)
+            return None
+
+    def get_secret_including_metadata(self,
+                                      secret_path: str,
+                                      secret_version: Optional[int] = None) -> 
Optional[dict]:
+        """
+        Reads secret including metadata. It is only valid for KV version 2.
+
+        See 
https://hvac.readthedocs.io/en/stable/usage/secrets_engines/kv_v2.html for 
details.
+
+        :param secret_path: The path of the secret.
+        :type secret_path: str
+        :param secret_version: Specifies the version of Secret to return. If 
not set, the latest
+            version is returned. (Can only be used in case of version 2 of KV).
+        :type secret_version: int
+        :rtype: dict
+        :return: The key info. This is a Dict with "data" mapping keeping 
secret
+                 and "metadata" mapping keeping metadata of the secret.
+        """
+        if self.kv_engine_version == 1:
+            raise VaultError("Metadata might only be used with version 2 of 
the KV engine.")
+        try:
+            return self.client.secrets.kv.v2.read_secret_version(
+                path=secret_path, mount_point=self.mount_point,
+                version=secret_version)
+        except InvalidPath:
+            self.log.debug("Secret not found %s with mount point %s and 
version %s",
+                           secret_path, self.mount_point, secret_version)
+            return None
+
+    def create_or_update_secret(self,
+                                secret_path: str,
+                                secret: dict,
+                                method: Optional[str] = None,
+                                cas: Optional[int] = None) -> Response:
+        """
+        Creates or updates secret.
+
+        :param secret_path: The path of the secret.
+        :type secret_path: str
+        :param secret: Secret to create or update for the path specified
+        :type secret: dict
+        :param method: Optional parameter to explicitly request a POST 
(create) or PUT (update) request to
+            the selected kv secret engine. If no argument is provided for this 
parameter, hvac attempts to
+            intelligently determine which method is appropriate. Only valid 
for KV engine version 1
+        :type method: str
+        :param cas: Set the "cas" value to use a Check-And-Set operation. If 
not set the write will be
+            allowed. If set to 0 a write will only be allowed if the key 
doesn't exist.
+            If the index is non-zero the write will only be allowed if the 
key's current version
+            matches the version specified in the cas parameter. Only valid for 
KV engine version 2.
+        :type cas: int
+        :rtype: requests.Response
+        :return: The response of the create_or_update_secret request.
+
+                 See 
https://hvac.readthedocs.io/en/stable/usage/secrets_engines/kv_v1.html
+                 and 
https://hvac.readthedocs.io/en/stable/usage/secrets_engines/kv_v2.html for 
details.
+
+        """
+        if self.kv_engine_version == 2 and method:
+            raise VaultError("The method parameter is only valid for version 
1")
+        if self.kv_engine_version == 1 and cas:
+            raise VaultError("The cas parameter is only valid for version 2")
+        if self.kv_engine_version == 1:
+            response = self.client.secrets.kv.v1.create_or_update_secret(
+                secret_path=secret_path, secret=secret, 
mount_point=self.mount_point, method=method)
+        else:
+            response = self.client.secrets.kv.v2.create_or_update_secret(
+                secret_path=secret_path, secret=secret, 
mount_point=self.mount_point, cas=cas)
+        return response
+
+
+DEFAULT_KUBERNETES_JWT_PATH = 
'/var/run/secrets/kubernetes.io/serviceaccount/token'
+DEFAULT_KV_ENGINE_VERSION = 2
+VALID_KV_VERSIONS: List[int] = [1, 2]
+VALID_AUTH_TYPES: List[str] = [
+    'approle',
+    'github',
+    'gcp',
+    'kubernetes',
+    'ldap',
+    'token',
+    'userpass'
+]
diff --git a/airflow/providers/hashicorp/hooks/__init__.py 
b/airflow/providers/hashicorp/hooks/__init__.py
new file mode 100644
index 0000000..13a8339
--- /dev/null
+++ b/airflow/providers/hashicorp/hooks/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/hashicorp/hooks/vault.py 
b/airflow/providers/hashicorp/hooks/vault.py
new file mode 100644
index 0000000..ca98396
--- /dev/null
+++ b/airflow/providers/hashicorp/hooks/vault.py
@@ -0,0 +1,274 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Hook for HashiCorp Vault"""
+from typing import Optional, Tuple
+
+import hvac
+from hvac.exceptions import VaultError
+from requests import Response
+
+from airflow.hooks.base_hook import BaseHook
+# noinspection PyProtectedMember
+from airflow.providers.hashicorp._internal_client.vault_client import (  # noqa
+    DEFAULT_KUBERNETES_JWT_PATH, DEFAULT_KV_ENGINE_VERSION, _VaultClient,
+)
+
+
+class VaultHook(BaseHook):
+    """
+    Hook to Interact with HashiCorp Vault KeyValue Secret engine.
+
+    HashiCorp hvac documentation:
+       * https://hvac.readthedocs.io/en/stable/
+
+    You connect to the host specified as host in the connection. The 
login/password from the connection
+    are used as credentials usually and you can specify different 
authentication parameters
+    via init params or via corresponding extras in the connection.
+
+    The mount point should be placed as a path in the URL - similarly to 
Vault's URL schema:
+    This indicates the "path" the secret engine is mounted on. Default id not 
specified is "secret".
+    Note that this ``mount_point`` is not used for authentication if 
authentication is done via a
+    different engines. Each engine uses it's own engine-specific 
authentication mount_point.
+
+    The extras in the connection are named the same as the parameters 
('kv_engine_version', 'auth_type', ...).
+
+    The URL schemas supported are "vault", "http" (using http to connect to 
the vault) or
+    "vaults" and "https" (using https to connect to the vault).
+
+    Example URL:
+
+    .. code-block::
+
+        
vault://user:password@host:port/mount_point?kv_engine_version=1&auth_type=github
+
+
+    Login/Password are used as credentials:
+
+        * approle: password -> secret_id
+        * github: password -> token
+        * token: password -> token
+        * ldap: login -> username,   password -> password
+        * userpass: login -> username, password -> password
+
+    :param vault_conn_id: The id of the connection to use
+    :type vault_conn_id: str
+    :param auth_type: Authentication Type for the Vault. Default is ``token``. 
Available values are:
+        ('approle', 'github', 'gcp', 'kubernetes', 'ldap', 'token', 'userpass')
+    :type auth_type: str
+    :param kv_engine_version: Select the version of the engine to run (``1`` 
or ``2``). Defaults to
+          version defined in connection or ``2`` if not defined in connection.
+    :type kv_engine_version: int
+    :param role_id: Role ID for Authentication (for ``approle`` auth_type)
+    :type role_id: str
+    :param kubernetes_role: Role for Authentication (for ``kubernetes`` 
auth_type)
+    :type kubernetes_role: str
+    :param kubernetes_jwt_path: Path for kubernetes jwt token (for 
``kubernetes`` auth_type, default:
+        ``/var/run/secrets/kubernetes.io/serviceaccount/token``)
+    :type kubernetes_jwt_path: str
+    :param gcp_key_path: Path to GCP Credential JSON file (for ``gcp`` 
auth_type)
+    :type gcp_key_path: str
+    :param gcp_scopes: Comma-separated string containing GCP scopes (for 
``gcp`` auth_type)
+    :type gcp_scopes: str
+
+    """
+    def __init__(  # pylint: disable=too-many-arguments
+        self,
+        vault_conn_id: str,
+        auth_type: Optional[str] = None,
+        kv_engine_version: Optional[int] = None,
+        role_id: Optional[str] = None,
+        kubernetes_role: Optional[str] = None,
+        kubernetes_jwt_path: Optional[str] = None,
+        gcp_key_path: Optional[str] = None,
+        gcp_scopes: Optional[str] = None,
+    ):
+        super().__init__()
+        self.connection = self.get_connection(vault_conn_id)
+
+        if not auth_type:
+            auth_type = self.connection.extra_dejson.get('auth_type') or 
"token"
+
+        if not kv_engine_version:
+            conn_version = 
self.connection.extra_dejson.get("kv_engine_version")
+            try:
+                kv_engine_version = int(conn_version) if conn_version else 
DEFAULT_KV_ENGINE_VERSION
+            except ValueError:
+                raise VaultError(f"The version is not an int: {conn_version}. 
")
+
+        if auth_type in ["approle"]:
+            if not role_id:
+                role_id = self.connection.extra_dejson.get('role_id')
+
+        gcp_key_path, gcp_scopes = \
+            self._get_gcp_parameters_from_connection(gcp_key_path, gcp_scopes) 
\
+            if auth_type == 'gcp' else (None, None)
+        kubernetes_jwt_path, kubernetes_role = \
+            
self._get_kubernetes_parameters_from_connection(kubernetes_jwt_path, 
kubernetes_role) \
+            if auth_type == 'kubernetes' else (None, None)
+
+        if self.connection.conn_type == 'vault':
+            conn_protocol = 'http'
+        elif self.connection.conn_type == 'vaults':
+            conn_protocol = 'https'
+        elif self.connection.conn_type == 'http':
+            conn_protocol = 'http'
+        elif self.connection.conn_type == 'https':
+            conn_protocol = 'https'
+        else:
+            raise VaultError("The url schema must be one of ['http', 'https', 
'vault', 'vaults' ]")
+
+        url = f"{conn_protocol}://{self.connection.host}"
+        if self.connection.port:
+            url += f":{self.connection.port}"
+
+        # Schema is really path in the Connection definition. This is pretty 
confusing because of URL schema
+        mount_point = self.connection.schema if self.connection.schema else 
'secret'
+
+        self.vault_client = _VaultClient(
+            url=url,
+            auth_type=auth_type,
+            mount_point=mount_point,
+            kv_engine_version=kv_engine_version,
+            token=self.connection.password,
+            username=self.connection.login,
+            password=self.connection.password,
+            secret_id=self.connection.password,
+            role_id=role_id,
+            kubernetes_role=kubernetes_role,
+            kubernetes_jwt_path=kubernetes_jwt_path,
+            gcp_key_path=gcp_key_path,
+            gcp_scopes=gcp_scopes,
+        )
+
+    def _get_kubernetes_parameters_from_connection(
+        self, kubernetes_jwt_path: Optional[str], kubernetes_role: 
Optional[str]) \
+            -> Tuple[str, Optional[str]]:
+        if not kubernetes_jwt_path:
+            kubernetes_jwt_path = 
self.connection.extra_dejson.get("kubernetes_jwt_path")
+            if not kubernetes_jwt_path:
+                kubernetes_jwt_path = DEFAULT_KUBERNETES_JWT_PATH
+        if not kubernetes_role:
+            kubernetes_role = 
self.connection.extra_dejson.get("kubernetes_role")
+        return kubernetes_jwt_path, kubernetes_role
+
+    def _get_gcp_parameters_from_connection(
+        self,
+        gcp_key_path: Optional[str],
+        gcp_scopes: Optional[str],
+    ) -> Tuple[Optional[str], Optional[str]]:
+        if not gcp_scopes:
+            gcp_scopes = self.connection.extra_dejson.get("gcp_scopes")
+        if not gcp_key_path:
+            gcp_key_path = self.connection.extra_dejson.get("gcp_key_path")
+        return gcp_key_path, gcp_scopes
+
+    def get_conn(self) -> hvac.Client:
+        """
+        Retrieves connection to Vault.
+
+        :rtype: hvac.Client
+        :return: connection used.
+        """
+        return self.vault_client.client
+
+    def get_secret(self, secret_path: str, secret_version: Optional[int] = 
None) -> Optional[dict]:
+        """
+        Get secret value from the engine.
+
+        :param secret_path: Path of the secret
+        :type secret_path: str
+        :param secret_version: Optional version of key to read - can only be 
used in case of version 2 of KV
+        :type secret_version: int
+
+        See 
https://hvac.readthedocs.io/en/stable/usage/secrets_engines/kv_v1.html
+        and 
https://hvac.readthedocs.io/en/stable/usage/secrets_engines/kv_v2.html for 
details.
+
+        :param secret_path: Path of the secret
+        :type secret_path: str
+        :rtype: dict
+        :return: secret stored in the vault as a dictionary
+        """
+        return self.vault_client.get_secret(secret_path=secret_path, 
secret_version=secret_version)
+
+    def get_secret_metadata(self, secret_path: str) -> Optional[dict]:
+        """
+        Reads secret metadata (including versions) from the engine. It is only 
valid for KV version 2.
+
+        :param secret_path: Path to read from
+        :type secret_path: str
+        :rtype: dict
+        :return: secret metadata. This is a Dict containing metadata for the 
secret.
+
+        See 
https://hvac.readthedocs.io/en/stable/usage/secrets_engines/kv_v2.html for 
details.
+
+        """
+        return self.vault_client.get_secret_metadata(secret_path=secret_path)
+
+    def get_secret_including_metadata(self,
+                                      secret_path: str,
+                                      secret_version: Optional[int] = None) -> 
Optional[dict]:
+        """
+        Reads secret including metadata. It is only valid for KV version 2.
+
+        See 
https://hvac.readthedocs.io/en/stable/usage/secrets_engines/kv_v2.html for 
details.
+
+        :param secret_path: Path of the secret
+        :type secret_path: str
+        :param secret_version: Optional version of key to read - can only be 
used in case of version 2 of KV
+        :type secret_version: int
+        :rtype: dict
+        :return: key info. This is a Dict with "data" mapping keeping secret
+            and "metadata" mapping keeping metadata of the secret.
+
+        """
+        return self.vault_client.get_secret_including_metadata(
+            secret_path=secret_path, secret_version=secret_version)
+
+    def create_or_update_secret(self,
+                                secret_path: str,
+                                secret: dict,
+                                method: Optional[str] = None,
+                                cas: Optional[int] = None) -> Response:
+        """
+        Creates or updates secret.
+
+        :param secret_path: Path to read from
+        :type secret_path: str
+        :param secret: Secret to create or update for the path specified
+        :type secret: dict
+        :param method: Optional parameter to explicitly request a POST 
(create) or PUT (update) request to
+            the selected kv secret engine. If no argument is provided for this 
parameter, hvac attempts to
+            intelligently determine which method is appropriate. Only valid 
for KV engine version 1
+        :type method: str
+        :param cas: Set the "cas" value to use a Check-And-Set operation. If 
not set the write will be
+            allowed. If set to 0 a write will only be allowed if the key 
doesn't exist.
+            If the index is non-zero the write will only be allowed if the 
key's current version
+            matches the version specified in the cas parameter. Only valid for 
KV engine version 2.
+        :type cas: int
+        :rtype: requests.Response
+        :return: The response of the create_or_update_secret request.
+
+        See 
https://hvac.readthedocs.io/en/stable/usage/secrets_engines/kv_v1.html
+        and 
https://hvac.readthedocs.io/en/stable/usage/secrets_engines/kv_v2.html for 
details.
+
+        """
+        return self.vault_client.create_or_update_secret(
+            secret_path=secret_path,
+            secret=secret,
+            method=method,
+            cas=cas)
diff --git a/airflow/providers/hashicorp/secrets/vault.py 
b/airflow/providers/hashicorp/secrets/vault.py
index 27058e9..15510f3 100644
--- a/airflow/providers/hashicorp/secrets/vault.py
+++ b/airflow/providers/hashicorp/secrets/vault.py
@@ -20,19 +20,15 @@ Objects relating to sourcing connections & variables from 
Hashicorp Vault
 """
 from typing import Optional
 
-import hvac
-from cached_property import cached_property
-from hvac.exceptions import InvalidPath, VaultError
-
-from airflow.exceptions import AirflowException
+from airflow.providers.hashicorp._internal_client.vault_client import 
_VaultClient  # noqa
 from airflow.secrets import BaseSecretsBackend
 from airflow.utils.log.logging_mixin import LoggingMixin
 
 
-# pylint: disable=too-many-instance-attributes
+# pylint: disable=too-many-instance-attributes,too-many-locals
 class VaultBackend(BaseSecretsBackend, LoggingMixin):
     """
-    Retrieves Connections and Variables from Hashicorp Vault
+    Retrieves Connections and Variables from Hashicorp Vault.
 
     Configurable via ``airflow.cfg`` as follows:
 
@@ -50,40 +46,42 @@ class VaultBackend(BaseSecretsBackend, LoggingMixin):
     would be accessible if you provide ``{"connections_path": "connections"}`` 
and request
     conn_id ``smtp_default``.
 
-    :param connections_path: Specifies the path of the secret to read to get 
Connections.
-        (default: 'connections')
+    :param connections_path: Specifies the path of the secret to read to get 
Connections
+        (default: 'connections').
     :type connections_path: str
-    :param variables_path: Specifies the path of the secret to read to get 
Variables.
-        (default: 'variables')
+    :param variables_path: Specifies the path of the secret to read to get 
Variables
+        (default: 'variables').
     :type variables_path: str
     :param url: Base URL for the Vault instance being addressed.
     :type url: str
-    :param auth_type: Authentication Type for Vault (one of 'token', 'ldap', 
'userpass', 'approle',
-        'github', 'gcp', 'kubernetes'). Default is ``token``.
+    :param auth_type: Authentication Type for Vault. Default is ``token``. 
Available values are:
+        ('approle', 'github', 'gcp', 'kubernetes', 'ldap', 'token', 'userpass')
     :type auth_type: str
-    :param mount_point: The "path" the secret engine was mounted on. (Default: 
``secret``)
+    :param mount_point: The "path" the secret engine was mounted on. Default 
is "secret". Note that
+         this mount_point is not used for authentication if authentication is 
done via a
+         different engine.
     :type mount_point: str
+    :param kv_engine_version: Select the version of the engine to run (``1`` 
or ``2``, default: ``2``).
+    :type kv_engine_version: int
     :param token: Authentication token to include in requests sent to Vault.
         (for ``token`` and ``github`` auth_type)
     :type token: str
-    :param kv_engine_version: Select the version of the engine to run (``1`` 
or ``2``, default: ``2``)
-    :type kv_engine_version: int
-    :param username: Username for Authentication (for ``ldap`` and 
``userpass`` auth_type)
+    :param username: Username for Authentication (for ``ldap`` and 
``userpass`` auth_type).
     :type username: str
-    :param password: Password for Authentication (for ``ldap`` and 
``userpass`` auth_type)
+    :param password: Password for Authentication (for ``ldap`` and 
``userpass`` auth_type).
     :type password: str
-    :param role_id: Role ID for Authentication (for ``approle`` auth_type)
+    :param secret_id: Secret ID for Authentication (for ``approle`` auth_type).
+    :type secret_id: str
+    :param role_id: Role ID for Authentication (for ``approle`` auth_type).
     :type role_id: str
-    :param kubernetes_role: Role for Authentication (for ``kubernetes`` 
auth_type)
+    :param kubernetes_role: Role for Authentication (for ``kubernetes`` 
auth_type).
     :type kubernetes_role: str
-    :param kubernetes_jwt_path: Path for kubernetes jwt token (for 
``kubernetes`` auth_type, deafult:
-        ``/var/run/secrets/kubernetes.io/serviceaccount/token``)
+    :param kubernetes_jwt_path: Path for kubernetes jwt token (for 
``kubernetes`` auth_type, default:
+        ``/var/run/secrets/kubernetes.io/serviceaccount/token``).
     :type kubernetes_jwt_path: str
-    :param secret_id: Secret ID for Authentication (for ``approle`` auth_type)
-    :type secret_id: str
-    :param gcp_key_path: Path to GCP Credential JSON file (for ``gcp`` 
auth_type)
+    :param gcp_key_path: Path to GCP Credential JSON file (for ``gcp`` 
auth_type).
     :type gcp_key_path: str
-    :param gcp_scopes: Comma-separated string containing GCP scopes (for 
``gcp`` auth_type)
+    :param gcp_scopes: Comma-separated string containing GCP scopes (for 
``gcp`` auth_type).
     :type gcp_scopes: str
     """
     def __init__(  # pylint: disable=too-many-arguments
@@ -97,10 +95,10 @@ class VaultBackend(BaseSecretsBackend, LoggingMixin):
         token: Optional[str] = None,
         username: Optional[str] = None,
         password: Optional[str] = None,
+        secret_id: Optional[str] = None,
         role_id: Optional[str] = None,
         kubernetes_role: Optional[str] = None,
         kubernetes_jwt_path: str = 
'/var/run/secrets/kubernetes.io/serviceaccount/token',
-        secret_id: Optional[str] = None,
         gcp_key_path: Optional[str] = None,
         gcp_scopes: Optional[str] = None,
         **kwargs
@@ -108,71 +106,36 @@ class VaultBackend(BaseSecretsBackend, LoggingMixin):
         super().__init__(**kwargs)
         self.connections_path = connections_path.rstrip('/')
         self.variables_path = variables_path.rstrip('/')
-        self.url = url
-        self.auth_type = auth_type
-        self.kwargs = kwargs
-        self.token = token
-        self.username = username
-        self.password = password
-        self.role_id = role_id
-        self.kubernetes_role = kubernetes_role
-        self.kubernetes_jwt_path = kubernetes_jwt_path
-        self.secret_id = secret_id
         self.mount_point = mount_point
         self.kv_engine_version = kv_engine_version
-        self.gcp_key_path = gcp_key_path
-        self.gcp_scopes = gcp_scopes
-
-    @cached_property
-    def client(self) -> hvac.Client:
-        """
-        Return an authenticated Hashicorp Vault client
-        """
-
-        _client = hvac.Client(url=self.url, **self.kwargs)
-        if self.auth_type == "token":
-            if not self.token:
-                raise VaultError("token cannot be None for auth_type='token'")
-            _client.token = self.token
-        elif self.auth_type == "ldap":
-            _client.auth.ldap.login(
-                username=self.username, password=self.password)
-        elif self.auth_type == "userpass":
-            _client.auth_userpass(username=self.username, 
password=self.password)
-        elif self.auth_type == "approle":
-            _client.auth_approle(role_id=self.role_id, 
secret_id=self.secret_id)
-        elif self.auth_type == "kubernetes":
-            if not self.kubernetes_role:
-                raise VaultError("kubernetes_role cannot be None for 
auth_type='kubernetes'")
-            with open(self.kubernetes_jwt_path) as f:
-                jwt = f.read()
-                _client.auth_kubernetes(role=self.kubernetes_role, jwt=jwt)
-        elif self.auth_type == "github":
-            _client.auth.github.login(token=self.token)
-        elif self.auth_type == "gcp":
-            from airflow.providers.google.cloud.utils.credentials_provider 
import (
-                get_credentials_and_project_id,
-                _get_scopes
-            )
-            scopes = _get_scopes(self.gcp_scopes)
-            credentials, _ = 
get_credentials_and_project_id(key_path=self.gcp_key_path, scopes=scopes)
-            _client.auth.gcp.configure(credentials=credentials)
-        else:
-            raise AirflowException(f"Authentication type '{self.auth_type}' 
not supported")
-
-        if _client.is_authenticated():
-            return _client
-        else:
-            raise VaultError("Vault Authentication Error!")
+        self.vault_client = _VaultClient(
+            url=url,
+            auth_type=auth_type,
+            mount_point=mount_point,
+            kv_engine_version=kv_engine_version,
+            token=token,
+            username=username,
+            password=password,
+            secret_id=secret_id,
+            role_id=role_id,
+            kubernetes_role=kubernetes_role,
+            kubernetes_jwt_path=kubernetes_jwt_path,
+            gcp_key_path=gcp_key_path,
+            gcp_scopes=gcp_scopes,
+            **kwargs
+        )
 
     def get_conn_uri(self, conn_id: str) -> Optional[str]:
         """
         Get secret value from Vault. Store the secret in the form of URI
 
-        :param conn_id: connection id
+        :param conn_id: The connection id
         :type conn_id: str
+        :rtype: str
+        :return: The connection uri retrieved from the secret
         """
-        response = self._get_secret(self.connections_path, conn_id)
+        secret_path = self.build_path(self.connections_path, conn_id)
+        response = self.vault_client.get_secret(secret_path=secret_path)
         return response.get("conn_uri") if response else None
 
     def get_variable(self, key: str) -> Optional[str]:
@@ -180,33 +143,10 @@ class VaultBackend(BaseSecretsBackend, LoggingMixin):
         Get Airflow Variable from Environment Variable
 
         :param key: Variable Key
-        :return: Variable Value
+        :type key: str
+        :rtype: str
+        :return: Variable Value retrieved from the vault
         """
-        response = self._get_secret(self.variables_path, key)
+        secret_path = self.build_path(self.variables_path, key)
+        response = self.vault_client.get_secret(secret_path=secret_path)
         return response.get("value") if response else None
-
-    def _get_secret(self, path_prefix: str, secret_id: str) -> Optional[dict]:
-        """
-        Get secret value from Vault.
-
-        :param path_prefix: Prefix for the Path to get Secret
-        :type path_prefix: str
-        :param secret_id: Secret Key
-        :type secret_id: str
-        """
-        secret_path = self.build_path(path_prefix, secret_id)
-
-        try:
-            if self.kv_engine_version == 1:
-                response = self.client.secrets.kv.v1.read_secret(
-                    path=secret_path, mount_point=self.mount_point
-                )
-            else:
-                response = self.client.secrets.kv.v2.read_secret_version(
-                    path=secret_path, mount_point=self.mount_point)
-        except InvalidPath:
-            self.log.debug("Secret %s not found in Path: %s", secret_id, 
secret_path)
-            return None
-
-        return_data = response["data"] if self.kv_engine_version == 1 else 
response["data"]["data"]
-        return return_data
diff --git a/docs/autoapi_templates/index.rst b/docs/autoapi_templates/index.rst
index d3d102a..a6d2ee1 100644
--- a/docs/autoapi_templates/index.rst
+++ b/docs/autoapi_templates/index.rst
@@ -140,6 +140,8 @@ All operators are in the following packages:
 
   airflow/providers/grpc/operators/index
 
+  airflow/providers/hashicorp/hooks/index
+
   airflow/providers/http/operators/index
 
   airflow/providers/http/sensors/index
@@ -289,6 +291,8 @@ All hooks are in the following packages:
 
   airflow/providers/grpc/hooks/index
 
+  airflow/providers/hashicorp/hooks/index
+
   airflow/providers/http/hooks/index
 
   airflow/providers/imap/hooks/index
diff --git a/docs/conf.py b/docs/conf.py
index 2aafb30..69a1667 100644
--- a/docs/conf.py
+++ b/docs/conf.py
@@ -203,6 +203,8 @@ exclude_patterns: List[str] = [
     "_api/airflow/providers/cncf/index.rst",
     # Utils for internal use
     '_api/airflow/providers/google/cloud/utils',
+    # Internal client for hashicorp
+    '_api/airflow/providers/hashicorp/_internal_client',
     # Templates or partials
     'autoapi_templates',
     'howto/operator/gcp/_partials',
@@ -498,7 +500,7 @@ autoapi_ignore = [
     '*/airflow/contrib/sensors/*',
     '*/airflow/contrib/hooks/*',
     '*/airflow/contrib/operators/*',
-
+    '*/_internal*',
     '*/node_modules/*',
     '*/migrations/*',
 ]
diff --git a/docs/operators-and-hooks-ref.rst b/docs/operators-and-hooks-ref.rst
index 01b1401..53cdd4d 100644
--- a/docs/operators-and-hooks-ref.rst
+++ b/docs/operators-and-hooks-ref.rst
@@ -195,7 +195,6 @@ Foundation.
      -
      - :mod:`airflow.providers.apache.hdfs.sensors.web_hdfs`
 
-
 Transfer operators and hooks
 ''''''''''''''''''''''''''''
 
@@ -318,7 +317,6 @@ These integrations allow you to perform various operations 
within the Microsoft
      -
      -
 
-
 Transfer operators and hooks
 ''''''''''''''''''''''''''''
 
@@ -830,7 +828,6 @@ These integrations allow you to perform various operations 
within the Google Clo
      - :mod:`airflow.providers.google.cloud.operators.vision`
      -
 
-
 Transfer operators and hooks
 """"""""""""""""""""""""""""
 
@@ -1245,6 +1242,7 @@ These integrations allow you to perform various 
operations within various servic
      -
      -
 
+
 Transfer operators and hooks
 ''''''''''''''''''''''''''''
 
@@ -1323,6 +1321,12 @@ These integrations allow you to perform various 
operations using various softwar
      - :mod:`airflow.operators.bash`
      - :mod:`airflow.sensors.bash`
 
+   * - `Hashicorp Vault <https://www.vaultproject.io/>`__
+     -
+     - :mod:`airflow.providers.hashicorp.hooks.vault`
+     -
+     -
+
    * - `Kubernetes <https://kubernetes.io/>`__
      - :doc:`How to use <howto/operator/kubernetes>`
      - :mod:`airflow.providers.cncf.kubernetes.hooks.kubernetes`
diff --git a/tests/providers/hashicorp/_internal_client/__init__.py 
b/tests/providers/hashicorp/_internal_client/__init__.py
new file mode 100644
index 0000000..13a8339
--- /dev/null
+++ b/tests/providers/hashicorp/_internal_client/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/providers/hashicorp/_internal_client/test_vault_client.py 
b/tests/providers/hashicorp/_internal_client/test_vault_client.py
new file mode 100644
index 0000000..471ae0e
--- /dev/null
+++ b/tests/providers/hashicorp/_internal_client/test_vault_client.py
@@ -0,0 +1,551 @@
+# pylint: disable=no-member
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from unittest import mock
+from unittest.case import TestCase
+
+from hvac.exceptions import InvalidPath, VaultError
+from mock import mock_open, patch
+
+from airflow.providers.hashicorp._internal_client.vault_client import 
_VaultClient  # noqa
+
+
+# noinspection DuplicatedCode,PyUnresolvedReferences
+class TestVaultClient(TestCase):
+
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_version_wrong(self, mock_hvac):
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+        with self.assertRaisesRegex(VaultError, 'The version is not supported: 
4'):
+            _VaultClient(auth_type="approle", kv_engine_version=4)
+
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_custom_mount_point(self, mock_hvac):
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+        vault_client = _VaultClient(auth_type="userpass", mount_point="custom")
+        self.assertEqual("custom", vault_client.mount_point)
+
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_version_one_init(self, mock_hvac):
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+
+        vault_client = _VaultClient(auth_type="userpass", kv_engine_version=1)
+        self.assertEqual(1, vault_client.kv_engine_version)
+
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_approle(self, mock_hvac):
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+        vault_client = _VaultClient(auth_type="approle", role_id="role", 
url="http://localhost:8180";,
+                                    secret_id="pass")
+        client = vault_client.client
+        mock_hvac.Client.assert_called_with(url='http://localhost:8180')
+        client.auth_approle.assert_called_with(role_id="role", 
secret_id="pass")
+        client.is_authenticated.assert_called_with()
+        self.assertEqual(2, vault_client.kv_engine_version)
+
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_approle_missing_role(self, mock_hvac):
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+        with self.assertRaisesRegex(VaultError, "requires 'role_id'"):
+            _VaultClient(
+                auth_type="approle",
+                url="http://localhost:8180";,
+                secret_id="pass")
+
+    
@mock.patch("airflow.providers.google.cloud.utils.credentials_provider._get_scopes")
+    
@mock.patch("airflow.providers.google.cloud.utils.credentials_provider.get_credentials_and_project_id")
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_gcp(self, mock_hvac, mock_get_credentials, mock_get_scopes):
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+        mock_get_scopes.return_value = ['scope1', 'scope2']
+        mock_get_credentials.return_value = ("credentials", "project_id")
+        vault_client = _VaultClient(auth_type="gcp", gcp_key_path="path.json", 
gcp_scopes="scope1,scope2",
+                                    url="http://localhost:8180";)
+        client = vault_client.client
+        mock_hvac.Client.assert_called_with(url='http://localhost:8180')
+        mock_get_scopes.assert_called_with("scope1,scope2")
+        mock_get_credentials.assert_called_with(
+            key_path="path.json",
+            scopes=['scope1', 'scope2']
+        )
+        mock_hvac.Client.assert_called_with(url='http://localhost:8180')
+        client.auth.gcp.configure.assert_called_with(
+            credentials="credentials",
+        )
+        client.is_authenticated.assert_called_with()
+        self.assertEqual(2, vault_client.kv_engine_version)
+
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_github(self, mock_hvac):
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+        vault_client = _VaultClient(auth_type="github",
+                                    token="s.7AU0I51yv1Q1lxOIg1F3ZRAS",
+                                    url="http://localhost:8180";)
+        client = vault_client.client
+        mock_hvac.Client.assert_called_with(url='http://localhost:8180')
+        client.auth.github.login.assert_called_with(
+            token="s.7AU0I51yv1Q1lxOIg1F3ZRAS")
+        client.is_authenticated.assert_called_with()
+        self.assertEqual(2, vault_client.kv_engine_version)
+
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_github_missing_token(self, mock_hvac):
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+        with self.assertRaisesRegex(VaultError, "'github' authentication type 
requires 'token'"):
+            _VaultClient(auth_type="github", url="http://localhost:8180";)
+
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_kubernetes_default_path(self, mock_hvac):
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+        vault_client = _VaultClient(auth_type="kubernetes",
+                                    kubernetes_role="kube_role",
+                                    url="http://localhost:8180";)
+        with patch("builtins.open", mock_open(read_data="data")) as mock_file:
+            client = vault_client.client
+        
mock_file.assert_called_with("/var/run/secrets/kubernetes.io/serviceaccount/token")
+        mock_hvac.Client.assert_called_with(url='http://localhost:8180')
+        client.auth_kubernetes.assert_called_with(
+            role="kube_role", jwt="data")
+        client.is_authenticated.assert_called_with()
+        self.assertEqual(2, vault_client.kv_engine_version)
+
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_kubernetes(self, mock_hvac):
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+        vault_client = _VaultClient(auth_type="kubernetes",
+                                    kubernetes_role="kube_role",
+                                    kubernetes_jwt_path="path",
+                                    url="http://localhost:8180";)
+        with patch("builtins.open", mock_open(read_data="data")) as mock_file:
+            client = vault_client.client
+        mock_file.assert_called_with("path")
+        mock_hvac.Client.assert_called_with(url='http://localhost:8180')
+        client.auth_kubernetes.assert_called_with(
+            role="kube_role", jwt="data")
+        client.is_authenticated.assert_called_with()
+        self.assertEqual(2, vault_client.kv_engine_version)
+
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_kubernetes_missing_role(self, mock_hvac):
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+        with self.assertRaisesRegex(VaultError, "requires 'kubernetes_role'"):
+            _VaultClient(auth_type="kubernetes",
+                         kubernetes_jwt_path="path",
+                         url="http://localhost:8180";)
+
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_kubernetes_kubernetes_jwt_path_none(self, mock_hvac):
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+        with self.assertRaisesRegex(VaultError, "requires 
'kubernetes_jwt_path'"):
+            _VaultClient(auth_type="kubernetes",
+                         kubernetes_role='kube_role',
+                         kubernetes_jwt_path=None,
+                         url="http://localhost:8180";)
+
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_ldap(self, mock_hvac):
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+        vault_client = _VaultClient(auth_type="ldap",
+                                    username="user",
+                                    password="pass",
+                                    url="http://localhost:8180";)
+        client = vault_client.client
+        mock_hvac.Client.assert_called_with(url='http://localhost:8180')
+        client.auth.ldap.login.assert_called_with(
+            username="user", password="pass")
+        client.is_authenticated.assert_called_with()
+        self.assertEqual(2, vault_client.kv_engine_version)
+
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_token_missing_token(self, mock_hvac):
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+        with self.assertRaisesRegex(VaultError, "'token' authentication type 
requires 'token'"):
+            _VaultClient(auth_type="token", url="http://localhost:8180";)
+
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_token(self, mock_hvac):
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+        vault_client = _VaultClient(auth_type="token",
+                                    token="s.7AU0I51yv1Q1lxOIg1F3ZRAS", 
url="http://localhost:8180";)
+        client = vault_client.client
+        mock_hvac.Client.assert_called_with(url='http://localhost:8180')
+        client.is_authenticated.assert_called_with()
+        self.assertEqual("s.7AU0I51yv1Q1lxOIg1F3ZRAS", client.token)
+        self.assertEqual(2, vault_client.kv_engine_version)
+        self.assertEqual("secret", vault_client.mount_point)
+
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_default_auth_type(self, mock_hvac):
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+        vault_client = _VaultClient(token="s.7AU0I51yv1Q1lxOIg1F3ZRAS", 
url="http://localhost:8180";)
+        client = vault_client.client
+        mock_hvac.Client.assert_called_with(url='http://localhost:8180')
+        client.is_authenticated.assert_called_with()
+        self.assertEqual("s.7AU0I51yv1Q1lxOIg1F3ZRAS", client.token)
+        self.assertEqual("token", vault_client.auth_type)
+        self.assertEqual(2, vault_client.kv_engine_version)
+        self.assertEqual("secret", vault_client.mount_point)
+
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_userpass(self, mock_hvac):
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+        vault_client = _VaultClient(auth_type="userpass",
+                                    username="user", password="pass", 
url="http://localhost:8180";)
+        client = vault_client.client
+        mock_hvac.Client.assert_called_with(url='http://localhost:8180')
+        client.auth_userpass.assert_called_with(
+            username="user", password="pass")
+        client.is_authenticated.assert_called_with()
+        self.assertEqual(2, vault_client.kv_engine_version)
+
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_get_non_existing_key_v2(self, mock_hvac):
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+        # Response does not contain the requested key
+        mock_client.secrets.kv.v2.read_secret_version.side_effect = 
InvalidPath()
+        vault_client = _VaultClient(auth_type="token", 
token="s.7AU0I51yv1Q1lxOIg1F3ZRAS",
+                                    url="http://localhost:8180";)
+        secret = vault_client.get_secret(secret_path="missing")
+        self.assertIsNone(secret)
+        mock_client.secrets.kv.v2.read_secret_version.assert_called_once_with(
+            mount_point='secret', path='missing', version=None)
+
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_get_non_existing_key_v2_different_auth(self, mock_hvac):
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+        # Response does not contain the requested key
+        mock_client.secrets.kv.v2.read_secret_version.side_effect = 
InvalidPath()
+        vault_client = _VaultClient(auth_type="approle", role_id="role", 
url="http://localhost:8180";,
+                                    secret_id="pass")
+        secret = vault_client.get_secret(secret_path="missing")
+        self.assertIsNone(secret)
+        self.assertEqual("secret", vault_client.mount_point)
+        mock_client.secrets.kv.v2.read_secret_version.assert_called_once_with(
+            mount_point='secret', path='missing', version=None)
+
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_get_non_existing_key_v1(self, mock_hvac):
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+        # Response does not contain the requested key
+        mock_client.secrets.kv.v1.read_secret.side_effect = InvalidPath()
+        vault_client = _VaultClient(auth_type="approle", role_id="role", 
url="http://localhost:8180";,
+                                    secret_id="pass", kv_engine_version=1)
+        secret = vault_client.get_secret(secret_path="missing")
+        self.assertIsNone(secret)
+        mock_client.secrets.kv.v1.read_secret.assert_called_once_with(
+            mount_point='secret', path='missing')
+
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_get_existing_key_v2(self, mock_hvac):
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+
+        mock_client.secrets.kv.v2.read_secret_version.return_value = {
+            'request_id': '94011e25-f8dc-ec29-221b-1f9c1d9ad2ae',
+            'lease_id': '',
+            'renewable': False,
+            'lease_duration': 0,
+            'data': {
+                'data': {'secret_key': 'secret_value'},
+                'metadata': {'created_time': '2020-03-16T21:01:43.331126Z',
+                             'deletion_time': '',
+                             'destroyed': False,
+                             'version': 1}},
+            'wrap_info': None,
+            'warnings': None,
+            'auth': None
+        }
+
+        vault_client = _VaultClient(auth_type="approle", role_id="role", 
url="http://localhost:8180";,
+                                    secret_id="pass")
+        secret = vault_client.get_secret(secret_path="missing")
+        self.assertEqual({'secret_key': 'secret_value'}, secret)
+        mock_client.secrets.kv.v2.read_secret_version.assert_called_once_with(
+            mount_point='secret', path='missing', version=None)
+
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_get_existing_key_v2_version(self, mock_hvac):
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+
+        mock_client.secrets.kv.v2.read_secret_version.return_value = {
+            'request_id': '94011e25-f8dc-ec29-221b-1f9c1d9ad2ae',
+            'lease_id': '',
+            'renewable': False,
+            'lease_duration': 0,
+            'data': {
+                'data': {'secret_key': 'secret_value'},
+                'metadata': {'created_time': '2020-03-16T21:01:43.331126Z',
+                             'deletion_time': '',
+                             'destroyed': False,
+                             'version': 1}},
+            'wrap_info': None,
+            'warnings': None,
+            'auth': None
+        }
+
+        vault_client = _VaultClient(auth_type="approle", role_id="role", 
url="http://localhost:8180";,
+                                    secret_id="pass")
+        secret = vault_client.get_secret(secret_path="missing", 
secret_version=1)
+        self.assertEqual({'secret_key': 'secret_value'}, secret)
+        mock_client.secrets.kv.v2.read_secret_version.assert_called_once_with(
+            mount_point='secret', path='missing', version=1)
+
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_get_existing_key_v1(self, mock_hvac):
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+
+        mock_client.secrets.kv.v1.read_secret.return_value = {
+            'request_id': '182d0673-618c-9889-4cba-4e1f4cfe4b4b',
+            'lease_id': '',
+            'renewable': False,
+            'lease_duration': 2764800,
+            'data': {'value': 'world'},
+            'wrap_info': None,
+            'warnings': None,
+            'auth': None}
+
+        vault_client = _VaultClient(auth_type="approle", role_id="role", 
url="http://localhost:8180";,
+                                    secret_id="pass", kv_engine_version=1)
+        secret = vault_client.get_secret(secret_path="missing")
+        self.assertEqual({'value': 'world'}, secret)
+        mock_client.secrets.kv.v1.read_secret.assert_called_once_with(
+            mount_point='secret', path='missing')
+
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_get_existing_key_v1_version(self, mock_hvac):
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+        vault_client = _VaultClient(auth_type="token", 
token="s.7AU0I51yv1Q1lxOIg1F3ZRAS",
+                                    url="http://localhost:8180";, 
kv_engine_version=1)
+        with self.assertRaisesRegex(VaultError, "Secret version"):
+            vault_client.get_secret(secret_path="missing", secret_version=1)
+
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_get_secret_metadata_v2(self, mock_hvac):
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+        mock_client.secrets.kv.v2.read_secret_metadata.return_value = {
+            'request_id': '94011e25-f8dc-ec29-221b-1f9c1d9ad2ae',
+            'lease_id': '',
+            'renewable': False,
+            'lease_duration': 0,
+            'metadata': [
+                {'created_time': '2020-03-16T21:01:43.331126Z',
+                 'deletion_time': '',
+                 'destroyed': False,
+                 'version': 1},
+                {'created_time': '2020-03-16T21:01:43.331126Z',
+                 'deletion_time': '',
+                 'destroyed': False,
+                 'version': 2},
+            ]
+        }
+        vault_client = _VaultClient(auth_type="token", 
token="s.7AU0I51yv1Q1lxOIg1F3ZRAS",
+                                    url="http://localhost:8180";)
+        metadata = vault_client.get_secret_metadata(secret_path="missing")
+        self.assertEqual(
+            {
+                'request_id': '94011e25-f8dc-ec29-221b-1f9c1d9ad2ae',
+                'lease_id': '',
+                'renewable': False,
+                'lease_duration': 0,
+                'metadata': [
+                    {'created_time': '2020-03-16T21:01:43.331126Z',
+                     'deletion_time': '',
+                     'destroyed': False,
+                     'version': 1},
+                    {'created_time': '2020-03-16T21:01:43.331126Z',
+                     'deletion_time': '',
+                     'destroyed': False,
+                     'version': 2},
+                ]
+            }, metadata)
+        mock_client.secrets.kv.v2.read_secret_metadata.assert_called_once_with(
+            mount_point='secret', path='missing')
+
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_get_secret_metadata_v1(self, mock_hvac):
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+
+        vault_client = _VaultClient(auth_type="approle", role_id="role", 
url="http://localhost:8180";,
+                                    secret_id="pass", kv_engine_version=1)
+        with self.assertRaisesRegex(VaultError, "Metadata might only be used 
with"
+                                                " version 2 of the KV 
engine."):
+            vault_client.get_secret_metadata(secret_path="missing")
+
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_get_secret_including_metadata_v2(self, mock_hvac):
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+
+        mock_client.secrets.kv.v2.read_secret_version.return_value = {
+            'request_id': '94011e25-f8dc-ec29-221b-1f9c1d9ad2ae',
+            'lease_id': '',
+            'renewable': False,
+            'lease_duration': 0,
+            'data': {
+                'data': {'secret_key': 'secret_value'},
+                'metadata': {'created_time': '2020-03-16T21:01:43.331126Z',
+                             'deletion_time': '',
+                             'destroyed': False,
+                             'version': 1}},
+            'wrap_info': None,
+            'warnings': None,
+            'auth': None
+        }
+        vault_client = _VaultClient(auth_type="approle", role_id="role", 
url="http://localhost:8180";,
+                                    secret_id="pass")
+        metadata = 
vault_client.get_secret_including_metadata(secret_path="missing")
+        self.assertEqual(
+            {
+                'request_id': '94011e25-f8dc-ec29-221b-1f9c1d9ad2ae',
+                'lease_id': '',
+                'renewable': False,
+                'lease_duration': 0,
+                'data': {
+                    'data': {'secret_key': 'secret_value'},
+                    'metadata': {'created_time': '2020-03-16T21:01:43.331126Z',
+                                 'deletion_time': '',
+                                 'destroyed': False,
+                                 'version': 1}},
+                'wrap_info': None,
+                'warnings': None,
+                'auth': None
+            }, metadata)
+        mock_client.secrets.kv.v2.read_secret_version.assert_called_once_with(
+            mount_point='secret', path='missing', version=None)
+
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_get_secret_including_metadata_v1(self, mock_hvac):
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+
+        vault_client = _VaultClient(auth_type="approle", role_id="role", 
url="http://localhost:8180";,
+                                    secret_id="pass", kv_engine_version=1)
+        with self.assertRaisesRegex(VaultError, "Metadata might only be used 
with"
+                                                " version 2 of the KV 
engine."):
+            vault_client.get_secret_including_metadata(secret_path="missing")
+
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_create_or_update_secret_v2(self, mock_hvac):
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+
+        vault_client = _VaultClient(auth_type="approle", role_id="role", 
url="http://localhost:8180";,
+                                    secret_id="pass")
+        vault_client.create_or_update_secret(
+            secret_path="path",
+            secret={'key': 'value'}
+        )
+        
mock_client.secrets.kv.v2.create_or_update_secret.assert_called_once_with(
+            mount_point='secret', secret_path='path', secret={'key': 'value'}, 
cas=None)
+
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_create_or_update_secret_v2_method(self, mock_hvac):
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+
+        vault_client = _VaultClient(auth_type="approle", role_id="role", 
url="http://localhost:8180";,
+                                    secret_id="pass")
+        with self.assertRaisesRegex(VaultError, "The method parameter is only 
valid for version 1"):
+            vault_client.create_or_update_secret(
+                secret_path="path",
+                secret={'key': 'value'},
+                method="post"
+            )
+
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_create_or_update_secret_v2_cas(self, mock_hvac):
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+
+        vault_client = _VaultClient(auth_type="approle", role_id="role", 
url="http://localhost:8180";,
+                                    secret_id="pass")
+        vault_client.create_or_update_secret(
+            secret_path="path",
+            secret={'key': 'value'},
+            cas=10
+        )
+        
mock_client.secrets.kv.v2.create_or_update_secret.assert_called_once_with(
+            mount_point='secret', secret_path='path', secret={'key': 'value'}, 
cas=10)
+
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_create_or_update_secret_v1(self, mock_hvac):
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+
+        vault_client = _VaultClient(auth_type="approle", role_id="role", 
url="http://localhost:8180";,
+                                    secret_id="pass", kv_engine_version=1)
+        vault_client.create_or_update_secret(
+            secret_path="path",
+            secret={'key': 'value'}
+        )
+        
mock_client.secrets.kv.v1.create_or_update_secret.assert_called_once_with(
+            mount_point='secret', secret_path='path', secret={'key': 'value'}, 
method=None)
+
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_create_or_update_secret_v1_cas(self, mock_hvac):
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+
+        vault_client = _VaultClient(auth_type="approle", role_id="role", 
url="http://localhost:8180";,
+                                    secret_id="pass", kv_engine_version=1)
+        with self.assertRaisesRegex(VaultError, "The cas parameter is only 
valid for version 2"):
+            vault_client.create_or_update_secret(
+                secret_path="path",
+                secret={'key': 'value'},
+                cas=10
+            )
+
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_create_or_update_secret_v1_post(self, mock_hvac):
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+
+        vault_client = _VaultClient(auth_type="approle", role_id="role", 
url="http://localhost:8180";,
+                                    secret_id="pass", kv_engine_version=1)
+        vault_client.create_or_update_secret(
+            secret_path="path",
+            secret={'key': 'value'},
+            method="post"
+        )
+        
mock_client.secrets.kv.v1.create_or_update_secret.assert_called_once_with(
+            mount_point='secret', secret_path='path', secret={'key': 'value'}, 
method="post")
diff --git a/tests/providers/hashicorp/hooks/__init__.py 
b/tests/providers/hashicorp/hooks/__init__.py
new file mode 100644
index 0000000..13a8339
--- /dev/null
+++ b/tests/providers/hashicorp/hooks/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/providers/hashicorp/hooks/test_vault.py 
b/tests/providers/hashicorp/hooks/test_vault.py
new file mode 100644
index 0000000..bebdd0d
--- /dev/null
+++ b/tests/providers/hashicorp/hooks/test_vault.py
@@ -0,0 +1,962 @@
+# pylint: disable=no-member
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from unittest import mock
+from unittest.case import TestCase
+
+from hvac.exceptions import VaultError
+from mock import PropertyMock, mock_open, patch
+
+from airflow.providers.hashicorp.hooks.vault import VaultHook
+
+
+# noinspection DuplicatedCode,PyUnresolvedReferences
+class TestVaultHook(TestCase):
+
+    @staticmethod
+    def get_mock_connection(conn_type="vault",
+                            schema="secret",
+                            host="localhost",
+                            port=8180,
+                            user="user",
+                            password="pass"):
+        mock_connection = mock.MagicMock()
+        type(mock_connection).conn_type = PropertyMock(return_value=conn_type)
+        type(mock_connection).host = PropertyMock(return_value=host)
+        type(mock_connection).port = PropertyMock(return_value=port)
+        type(mock_connection).login = PropertyMock(return_value=user)
+        type(mock_connection).password = PropertyMock(return_value=password)
+        type(mock_connection).schema = PropertyMock(return_value=schema)
+        return mock_connection
+
+    
@mock.patch("airflow.providers.hashicorp.hooks.vault.VaultHook.get_connection")
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_version_not_int(self, mock_hvac, mock_get_connection):
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+        mock_connection = self.get_mock_connection()
+        mock_get_connection.return_value = mock_connection
+
+        connection_dict = {
+            "auth_type": "userpass",
+            "kv_engine_version": "text"
+        }
+
+        mock_connection.extra_dejson.get.side_effect = connection_dict.get
+        kwargs = {
+            "vault_conn_id": "vault_conn_id",
+        }
+        with self.assertRaisesRegex(VaultError, 'The version is not an int: 
text'):
+            VaultHook(**kwargs)
+
+    
@mock.patch("airflow.providers.hashicorp.hooks.vault.VaultHook.get_connection")
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_version_as_string(self, mock_hvac, mock_get_connection):
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+        mock_connection = self.get_mock_connection()
+        mock_get_connection.return_value = mock_connection
+
+        connection_dict = {
+            "auth_type": "userpass",
+            "kv_engine_version": "2"
+        }
+
+        mock_connection.extra_dejson.get.side_effect = connection_dict.get
+        kwargs = {
+            "vault_conn_id": "vault_conn_id",
+        }
+        test_hook = VaultHook(**kwargs)
+        self.assertEqual(2, test_hook.vault_client.kv_engine_version)
+
+    
@mock.patch("airflow.providers.hashicorp.hooks.vault.VaultHook.get_connection")
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_custom_mount_point_dejson(self, mock_hvac, mock_get_connection):
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+        mock_connection = self.get_mock_connection(schema='custom')
+        mock_get_connection.return_value = mock_connection
+
+        connection_dict = {
+            "auth_type": "userpass",
+        }
+
+        mock_connection.extra_dejson.get.side_effect = connection_dict.get
+        kwargs = {
+            "vault_conn_id": "vault_conn_id",
+        }
+        test_hook = VaultHook(**kwargs)
+        self.assertEqual("custom", test_hook.vault_client.mount_point)
+
+    
@mock.patch("airflow.providers.hashicorp.hooks.vault.VaultHook.get_connection")
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_version_one_init(self, mock_hvac, mock_get_connection):
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+        mock_connection = self.get_mock_connection()
+        mock_get_connection.return_value = mock_connection
+
+        connection_dict = {
+            "auth_type": "userpass",
+            "kv_engine_version": 1
+        }
+
+        mock_connection.extra_dejson.get.side_effect = connection_dict.get
+        kwargs = {
+            "vault_conn_id": "vault_conn_id",
+        }
+        test_hook = VaultHook(**kwargs)
+        self.assertEqual(1, test_hook.vault_client.kv_engine_version)
+
+    
@mock.patch("airflow.providers.hashicorp.hooks.vault.VaultHook.get_connection")
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_version_one_dejson(self, mock_hvac, mock_get_connection):
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+        mock_connection = self.get_mock_connection()
+        mock_get_connection.return_value = mock_connection
+
+        connection_dict = {
+            "auth_type": "userpass",
+        }
+
+        mock_connection.extra_dejson.get.side_effect = connection_dict.get
+        kwargs = {
+            "kv_engine_version": 1,
+            "vault_conn_id": "vault_conn_id",
+        }
+        test_hook = VaultHook(**kwargs)
+        self.assertEqual(1, test_hook.vault_client.kv_engine_version)
+
+    
@mock.patch("airflow.providers.hashicorp.hooks.vault.VaultHook.get_connection")
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_vaults_protocol(self, mock_hvac, mock_get_connection):
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+        mock_connection = self.get_mock_connection(conn_type='vaults')
+        mock_get_connection.return_value = mock_connection
+
+        connection_dict = {}
+
+        mock_connection.extra_dejson.get.side_effect = connection_dict.get
+        kwargs = {
+            "vault_conn_id": "vault_conn_id",
+            "auth_type": "approle",
+            "role_id": "role",
+            "kv_engine_version": 2
+        }
+
+        test_hook = VaultHook(**kwargs)
+        mock_get_connection.assert_called_with("vault_conn_id")
+        test_client = test_hook.get_conn()
+        mock_hvac.Client.assert_called_with(url='https://localhost:8180')
+        test_client.auth_approle.assert_called_with(role_id="role", 
secret_id="pass")
+        test_client.is_authenticated.assert_called_with()
+        self.assertEqual(2, test_hook.vault_client.kv_engine_version)
+
+    
@mock.patch("airflow.providers.hashicorp.hooks.vault.VaultHook.get_connection")
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_http_protocol(self, mock_hvac, mock_get_connection):
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+        mock_connection = self.get_mock_connection(conn_type='http')
+        mock_get_connection.return_value = mock_connection
+
+        connection_dict = {}
+
+        mock_connection.extra_dejson.get.side_effect = connection_dict.get
+        kwargs = {
+            "vault_conn_id": "vault_conn_id",
+            "auth_type": "approle",
+            "role_id": "role",
+            "kv_engine_version": 2
+        }
+
+        test_hook = VaultHook(**kwargs)
+        mock_get_connection.assert_called_with("vault_conn_id")
+        test_client = test_hook.get_conn()
+        mock_hvac.Client.assert_called_with(url='http://localhost:8180')
+        test_client.auth_approle.assert_called_with(role_id="role", 
secret_id="pass")
+        test_client.is_authenticated.assert_called_with()
+        self.assertEqual(2, test_hook.vault_client.kv_engine_version)
+
+    
@mock.patch("airflow.providers.hashicorp.hooks.vault.VaultHook.get_connection")
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_https_protocol(self, mock_hvac, mock_get_connection):
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+        mock_connection = self.get_mock_connection(conn_type='https')
+        mock_get_connection.return_value = mock_connection
+
+        connection_dict = {}
+
+        mock_connection.extra_dejson.get.side_effect = connection_dict.get
+        kwargs = {
+            "vault_conn_id": "vault_conn_id",
+            "auth_type": "approle",
+            "role_id": "role",
+            "kv_engine_version": 2
+        }
+
+        test_hook = VaultHook(**kwargs)
+        mock_get_connection.assert_called_with("vault_conn_id")
+        test_client = test_hook.get_conn()
+        mock_hvac.Client.assert_called_with(url='https://localhost:8180')
+        test_client.auth_approle.assert_called_with(role_id="role", 
secret_id="pass")
+        test_client.is_authenticated.assert_called_with()
+        self.assertEqual(2, test_hook.vault_client.kv_engine_version)
+
+    
@mock.patch("airflow.providers.hashicorp.hooks.vault.VaultHook.get_connection")
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_approle_init_params(self, mock_hvac, mock_get_connection):
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+        mock_connection = self.get_mock_connection()
+        mock_get_connection.return_value = mock_connection
+
+        connection_dict = {}
+
+        mock_connection.extra_dejson.get.side_effect = connection_dict.get
+        kwargs = {
+            "vault_conn_id": "vault_conn_id",
+            "auth_type": "approle",
+            "role_id": "role",
+            "kv_engine_version": 2
+        }
+
+        test_hook = VaultHook(**kwargs)
+        mock_get_connection.assert_called_with("vault_conn_id")
+        test_client = test_hook.get_conn()
+        mock_hvac.Client.assert_called_with(url='http://localhost:8180')
+        test_client.auth_approle.assert_called_with(role_id="role", 
secret_id="pass")
+        test_client.is_authenticated.assert_called_with()
+        self.assertEqual(2, test_hook.vault_client.kv_engine_version)
+
+    
@mock.patch("airflow.providers.hashicorp.hooks.vault.VaultHook.get_connection")
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_approle_dejson(self, mock_hvac, mock_get_connection):
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+        mock_connection = self.get_mock_connection()
+        mock_get_connection.return_value = mock_connection
+
+        connection_dict = {
+            "auth_type": "approle",
+            'role_id': "role",
+        }
+
+        mock_connection.extra_dejson.get.side_effect = connection_dict.get
+        kwargs = {
+            "vault_conn_id": "vault_conn_id",
+        }
+
+        test_hook = VaultHook(**kwargs)
+        mock_get_connection.assert_called_with("vault_conn_id")
+        test_client = test_hook.get_conn()
+        mock_hvac.Client.assert_called_with(url='http://localhost:8180')
+        test_client.auth_approle.assert_called_with(role_id="role", 
secret_id="pass")
+        test_client.is_authenticated.assert_called_with()
+        self.assertEqual(2, test_hook.vault_client.kv_engine_version)
+
+    
@mock.patch("airflow.providers.google.cloud.utils.credentials_provider._get_scopes")
+    
@mock.patch("airflow.providers.google.cloud.utils.credentials_provider.get_credentials_and_project_id")
+    
@mock.patch("airflow.providers.hashicorp.hooks.vault.VaultHook.get_connection")
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_gcp_init_params(self, mock_hvac, mock_get_connection,
+                             mock_get_credentials, mock_get_scopes):
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+        mock_connection = self.get_mock_connection()
+        mock_get_connection.return_value = mock_connection
+        mock_get_scopes.return_value = ['scope1', 'scope2']
+        mock_get_credentials.return_value = ("credentials", "project_id")
+
+        connection_dict = {}
+
+        mock_connection.extra_dejson.get.side_effect = connection_dict.get
+        kwargs = {
+            "vault_conn_id": "vault_conn_id",
+            "auth_type": "gcp",
+            "gcp_key_path": "path.json",
+            "gcp_scopes": "scope1,scope2",
+        }
+
+        test_hook = VaultHook(**kwargs)
+        test_client = test_hook.get_conn()
+        mock_get_connection.assert_called_with("vault_conn_id")
+        mock_get_scopes.assert_called_with("scope1,scope2")
+        mock_get_credentials.assert_called_with(
+            key_path="path.json",
+            scopes=['scope1', 'scope2']
+        )
+        mock_hvac.Client.assert_called_with(url='http://localhost:8180')
+        test_client.auth.gcp.configure.assert_called_with(
+            credentials="credentials",
+        )
+        test_client.is_authenticated.assert_called_with()
+        self.assertEqual(2, test_hook.vault_client.kv_engine_version)
+
+    
@mock.patch("airflow.providers.google.cloud.utils.credentials_provider._get_scopes")
+    
@mock.patch("airflow.providers.google.cloud.utils.credentials_provider.get_credentials_and_project_id")
+    
@mock.patch("airflow.providers.hashicorp.hooks.vault.VaultHook.get_connection")
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_gcp_dejson(self, mock_hvac, mock_get_connection,
+                        mock_get_credentials, mock_get_scopes):
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+        mock_connection = self.get_mock_connection()
+        mock_get_connection.return_value = mock_connection
+        mock_get_scopes.return_value = ['scope1', 'scope2']
+        mock_get_credentials.return_value = ("credentials", "project_id")
+
+        connection_dict = {
+            "auth_type": "gcp",
+            "gcp_key_path": "path.json",
+            "gcp_scopes": "scope1,scope2",
+        }
+
+        mock_connection.extra_dejson.get.side_effect = connection_dict.get
+        kwargs = {
+            "vault_conn_id": "vault_conn_id",
+        }
+
+        test_hook = VaultHook(**kwargs)
+        test_client = test_hook.get_conn()
+        mock_get_connection.assert_called_with("vault_conn_id")
+        mock_get_scopes.assert_called_with("scope1,scope2")
+        mock_get_credentials.assert_called_with(
+            key_path="path.json",
+            scopes=['scope1', 'scope2']
+        )
+        mock_hvac.Client.assert_called_with(url='http://localhost:8180')
+        test_client.auth.gcp.configure.assert_called_with(
+            credentials="credentials",
+        )
+        test_client.is_authenticated.assert_called_with()
+        self.assertEqual(2, test_hook.vault_client.kv_engine_version)
+
+    
@mock.patch("airflow.providers.hashicorp.hooks.vault.VaultHook.get_connection")
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_github_init_params(self, mock_hvac, mock_get_connection):
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+        mock_connection = self.get_mock_connection()
+        mock_get_connection.return_value = mock_connection
+
+        connection_dict = {}
+
+        mock_connection.extra_dejson.get.side_effect = connection_dict.get
+        kwargs = {
+            "auth_type": "github",
+            "vault_conn_id": "vault_conn_id",
+        }
+
+        test_hook = VaultHook(**kwargs)
+        mock_get_connection.assert_called_with("vault_conn_id")
+        test_client = test_hook.get_conn()
+        mock_hvac.Client.assert_called_with(url='http://localhost:8180')
+        test_client.auth.github.login.assert_called_with(
+            token="pass")
+        test_client.is_authenticated.assert_called_with()
+        self.assertEqual(2, test_hook.vault_client.kv_engine_version)
+
+    
@mock.patch("airflow.providers.hashicorp.hooks.vault.VaultHook.get_connection")
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_github_dejson(self, mock_hvac, mock_get_connection):
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+        mock_connection = self.get_mock_connection()
+        mock_get_connection.return_value = mock_connection
+
+        connection_dict = {
+            "auth_type": "github",
+        }
+
+        mock_connection.extra_dejson.get.side_effect = connection_dict.get
+        kwargs = {
+            "vault_conn_id": "vault_conn_id",
+        }
+
+        test_hook = VaultHook(**kwargs)
+        mock_get_connection.assert_called_with("vault_conn_id")
+        test_client = test_hook.get_conn()
+        mock_hvac.Client.assert_called_with(url='http://localhost:8180')
+        test_client.auth.github.login.assert_called_with(
+            token="pass")
+        test_client.is_authenticated.assert_called_with()
+        self.assertEqual(2, test_hook.vault_client.kv_engine_version)
+
+    
@mock.patch("airflow.providers.hashicorp.hooks.vault.VaultHook.get_connection")
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_kubernetes_default_path(self, mock_hvac, mock_get_connection):
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+        mock_connection = self.get_mock_connection()
+        mock_get_connection.return_value = mock_connection
+
+        connection_dict = {}
+
+        mock_connection.extra_dejson.get.side_effect = connection_dict.get
+        kwargs = {
+            "auth_type": "kubernetes",
+            "kubernetes_role": "kube_role",
+            "vault_conn_id": "vault_conn_id",
+        }
+
+        with patch("builtins.open", mock_open(read_data="data")) as mock_file:
+            test_hook = VaultHook(**kwargs)
+            test_client = test_hook.get_conn()
+        mock_get_connection.assert_called_with("vault_conn_id")
+        
mock_file.assert_called_with("/var/run/secrets/kubernetes.io/serviceaccount/token")
+        mock_hvac.Client.assert_called_with(url='http://localhost:8180')
+        test_client.auth_kubernetes.assert_called_with(
+            role="kube_role", jwt="data")
+        test_client.is_authenticated.assert_called_with()
+        self.assertEqual(2, test_hook.vault_client.kv_engine_version)
+
+    
@mock.patch("airflow.providers.hashicorp.hooks.vault.VaultHook.get_connection")
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_kubernetes_init_params(self, mock_hvac, mock_get_connection):
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+        mock_connection = self.get_mock_connection()
+        mock_get_connection.return_value = mock_connection
+
+        connection_dict = {
+            "kubernetes_role": "kube_role",
+            "kubernetes_jwt_path": "path",
+        }
+
+        mock_connection.extra_dejson.get.side_effect = connection_dict.get
+        kwargs = {
+            "auth_type": "kubernetes",
+            "vault_conn_id": "vault_conn_id",
+        }
+        with patch("builtins.open", mock_open(read_data="data")) as mock_file:
+            test_hook = VaultHook(**kwargs)
+            test_client = test_hook.get_conn()
+        mock_get_connection.assert_called_with("vault_conn_id")
+        mock_file.assert_called_with("path")
+        mock_hvac.Client.assert_called_with(url='http://localhost:8180')
+        test_client.auth_kubernetes.assert_called_with(
+            role="kube_role", jwt="data")
+        test_client.is_authenticated.assert_called_with()
+        self.assertEqual(2, test_hook.vault_client.kv_engine_version)
+
+    
@mock.patch("airflow.providers.hashicorp.hooks.vault.VaultHook.get_connection")
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_kubernetes_dejson(self, mock_hvac, mock_get_connection):
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+        mock_connection = self.get_mock_connection()
+        mock_get_connection.return_value = mock_connection
+
+        connection_dict = {}
+
+        mock_connection.extra_dejson.get.side_effect = connection_dict.get
+        kwargs = {
+            "kubernetes_role": "kube_role",
+            "kubernetes_jwt_path": "path",
+            "auth_type": "kubernetes",
+            "vault_conn_id": "vault_conn_id",
+        }
+        with patch("builtins.open", mock_open(read_data="data")) as mock_file:
+            test_hook = VaultHook(**kwargs)
+            test_client = test_hook.get_conn()
+        mock_get_connection.assert_called_with("vault_conn_id")
+        mock_file.assert_called_with("path")
+        mock_hvac.Client.assert_called_with(url='http://localhost:8180')
+        test_client.auth_kubernetes.assert_called_with(
+            role="kube_role", jwt="data")
+        test_client.is_authenticated.assert_called_with()
+        self.assertEqual(2, test_hook.vault_client.kv_engine_version)
+
+    
@mock.patch("airflow.providers.hashicorp.hooks.vault.VaultHook.get_connection")
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_ldap_init_params(self, mock_hvac, mock_get_connection):
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+        mock_connection = self.get_mock_connection()
+        mock_get_connection.return_value = mock_connection
+
+        connection_dict = {}
+
+        mock_connection.extra_dejson.get.side_effect = connection_dict.get
+        kwargs = {
+            "auth_type": "ldap",
+            "vault_conn_id": "vault_conn_id",
+        }
+
+        test_hook = VaultHook(**kwargs)
+        mock_get_connection.assert_called_with("vault_conn_id")
+        test_client = test_hook.get_conn()
+        mock_hvac.Client.assert_called_with(url='http://localhost:8180')
+        test_client.auth.ldap.login.assert_called_with(
+            username="user", password="pass")
+        test_client.is_authenticated.assert_called_with()
+        self.assertEqual(2, test_hook.vault_client.kv_engine_version)
+
+    
@mock.patch("airflow.providers.hashicorp.hooks.vault.VaultHook.get_connection")
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_ldap_dejson(self, mock_hvac, mock_get_connection):
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+        mock_connection = self.get_mock_connection()
+        mock_get_connection.return_value = mock_connection
+
+        connection_dict = {
+            "auth_type": "ldap",
+        }
+
+        mock_connection.extra_dejson.get.side_effect = connection_dict.get
+        kwargs = {
+            "vault_conn_id": "vault_conn_id",
+        }
+
+        test_hook = VaultHook(**kwargs)
+        mock_get_connection.assert_called_with("vault_conn_id")
+        test_client = test_hook.get_conn()
+        mock_hvac.Client.assert_called_with(url='http://localhost:8180')
+        test_client.auth.ldap.login.assert_called_with(
+            username="user", password="pass")
+        test_client.is_authenticated.assert_called_with()
+        self.assertEqual(2, test_hook.vault_client.kv_engine_version)
+
+    
@mock.patch("airflow.providers.hashicorp.hooks.vault.VaultHook.get_connection")
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_token_init_params(self, mock_hvac, mock_get_connection):
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+        mock_connection = self.get_mock_connection()
+        mock_get_connection.return_value = mock_connection
+        connection_dict = {}
+        mock_connection.extra_dejson.get.side_effect = connection_dict.get
+        kwargs = {
+            "vault_conn_id": "vault_conn_id",
+            "auth_type": "token",
+            "kv_engine_version": 2
+        }
+
+        test_hook = VaultHook(**kwargs)
+        mock_get_connection.assert_called_with("vault_conn_id")
+        test_client = test_hook.get_conn()
+        mock_hvac.Client.assert_called_with(url='http://localhost:8180')
+        test_client.is_authenticated.assert_called_with()
+        self.assertEqual("pass", test_client.token)
+        self.assertEqual(2, test_hook.vault_client.kv_engine_version)
+        self.assertEqual("secret", test_hook.vault_client.mount_point)
+
+    
@mock.patch("airflow.providers.hashicorp.hooks.vault.VaultHook.get_connection")
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_token_dejson(self, mock_hvac, mock_get_connection):
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+        mock_connection = self.get_mock_connection()
+        mock_get_connection.return_value = mock_connection
+
+        connection_dict = {
+            "auth_type": "token",
+        }
+
+        mock_connection.extra_dejson.get.side_effect = connection_dict.get
+        kwargs = {
+            "vault_conn_id": "vault_conn_id",
+        }
+
+        test_hook = VaultHook(**kwargs)
+        mock_get_connection.assert_called_with("vault_conn_id")
+        test_client = test_hook.get_conn()
+        mock_hvac.Client.assert_called_with(url='http://localhost:8180')
+        test_client.is_authenticated.assert_called_with()
+        self.assertEqual("pass", test_client.token)
+        self.assertEqual(2, test_hook.vault_client.kv_engine_version)
+
+    
@mock.patch("airflow.providers.hashicorp.hooks.vault.VaultHook.get_connection")
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_userpass_init_params(self, mock_hvac, mock_get_connection):
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+        mock_connection = self.get_mock_connection()
+        mock_get_connection.return_value = mock_connection
+
+        connection_dict = {}
+
+        mock_connection.extra_dejson.get.side_effect = connection_dict.get
+        kwargs = {
+            "vault_conn_id": "vault_conn_id",
+            "auth_type": "userpass",
+            "kv_engine_version": 2
+        }
+
+        test_hook = VaultHook(**kwargs)
+        mock_get_connection.assert_called_with("vault_conn_id")
+        test_client = test_hook.get_conn()
+        mock_hvac.Client.assert_called_with(url='http://localhost:8180')
+        test_client.auth_userpass.assert_called_with(
+            username="user", password="pass")
+        test_client.is_authenticated.assert_called_with()
+        self.assertEqual(2, test_hook.vault_client.kv_engine_version)
+
+    
@mock.patch("airflow.providers.hashicorp.hooks.vault.VaultHook.get_connection")
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_userpass_dejson(self, mock_hvac, mock_get_connection):
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+        mock_connection = self.get_mock_connection()
+        mock_get_connection.return_value = mock_connection
+
+        connection_dict = {
+            "auth_type": "userpass",
+        }
+
+        mock_connection.extra_dejson.get.side_effect = connection_dict.get
+        kwargs = {
+            "vault_conn_id": "vault_conn_id",
+        }
+
+        test_hook = VaultHook(**kwargs)
+        mock_get_connection.assert_called_with("vault_conn_id")
+        test_client = test_hook.get_conn()
+        mock_hvac.Client.assert_called_with(url='http://localhost:8180')
+        test_client.auth_userpass.assert_called_with(
+            username="user", password="pass")
+        test_client.is_authenticated.assert_called_with()
+        self.assertEqual(2, test_hook.vault_client.kv_engine_version)
+
+    
@mock.patch("airflow.providers.hashicorp.hooks.vault.VaultHook.get_connection")
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_get_existing_key_v2(self, mock_hvac, mock_get_connection):
+        mock_connection = self.get_mock_connection()
+        mock_get_connection.return_value = mock_connection
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+
+        connection_dict = {}
+
+        mock_client.secrets.kv.v2.read_secret_version.return_value = {
+            'request_id': '94011e25-f8dc-ec29-221b-1f9c1d9ad2ae',
+            'lease_id': '',
+            'renewable': False,
+            'lease_duration': 0,
+            'data': {
+                'data': {'secret_key': 'secret_value'},
+                'metadata': {'created_time': '2020-03-16T21:01:43.331126Z',
+                             'deletion_time': '',
+                             'destroyed': False,
+                             'version': 1}},
+            'wrap_info': None,
+            'warnings': None,
+            'auth': None
+        }
+
+        mock_connection.extra_dejson.get.side_effect = connection_dict.get
+        kwargs = {
+            "vault_conn_id": "vault_conn_id",
+            "auth_type": "token",
+            "kv_engine_version": 2
+        }
+
+        test_hook = VaultHook(**kwargs)
+        secret = test_hook.get_secret(secret_path="missing")
+        self.assertEqual({'secret_key': 'secret_value'}, secret)
+        mock_client.secrets.kv.v2.read_secret_version.assert_called_once_with(
+            mount_point='secret', path='missing', version=None)
+
+    
@mock.patch("airflow.providers.hashicorp.hooks.vault.VaultHook.get_connection")
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_get_existing_key_v2_version(self, mock_hvac, mock_get_connection):
+        mock_connection = self.get_mock_connection()
+        mock_get_connection.return_value = mock_connection
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+
+        connection_dict = {}
+
+        mock_client.secrets.kv.v2.read_secret_version.return_value = {
+            'request_id': '94011e25-f8dc-ec29-221b-1f9c1d9ad2ae',
+            'lease_id': '',
+            'renewable': False,
+            'lease_duration': 0,
+            'data': {
+                'data': {'secret_key': 'secret_value'},
+                'metadata': {'created_time': '2020-03-16T21:01:43.331126Z',
+                             'deletion_time': '',
+                             'destroyed': False,
+                             'version': 1}},
+            'wrap_info': None,
+            'warnings': None,
+            'auth': None
+        }
+
+        mock_connection.extra_dejson.get.side_effect = connection_dict.get
+        kwargs = {
+            "vault_conn_id": "vault_conn_id",
+            "auth_type": "token",
+            "kv_engine_version": 2
+        }
+
+        test_hook = VaultHook(**kwargs)
+        secret = test_hook.get_secret(secret_path="missing", secret_version=1)
+        self.assertEqual({'secret_key': 'secret_value'}, secret)
+        mock_client.secrets.kv.v2.read_secret_version.assert_called_once_with(
+            mount_point='secret', path='missing', version=1)
+
+    
@mock.patch("airflow.providers.hashicorp.hooks.vault.VaultHook.get_connection")
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_get_existing_key_v1(self, mock_hvac, mock_get_connection):
+        mock_connection = self.get_mock_connection()
+        mock_get_connection.return_value = mock_connection
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+
+        connection_dict = {}
+
+        mock_client.secrets.kv.v1.read_secret.return_value = {
+            'request_id': '182d0673-618c-9889-4cba-4e1f4cfe4b4b',
+            'lease_id': '',
+            'renewable': False,
+            'lease_duration': 2764800,
+            'data': {'value': 'world'},
+            'wrap_info': None,
+            'warnings': None,
+            'auth': None}
+
+        mock_connection.extra_dejson.get.side_effect = connection_dict.get
+        kwargs = {
+            "vault_conn_id": "vault_conn_id",
+            "auth_type": "token",
+            "kv_engine_version": 1
+        }
+
+        test_hook = VaultHook(**kwargs)
+        secret = test_hook.get_secret(secret_path="missing")
+        self.assertEqual({'value': 'world'}, secret)
+        mock_client.secrets.kv.v1.read_secret.assert_called_once_with(
+            mount_point='secret', path='missing')
+
+    
@mock.patch("airflow.providers.hashicorp.hooks.vault.VaultHook.get_connection")
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_get_secret_metadata_v2(self, mock_hvac, mock_get_connection):
+        mock_connection = self.get_mock_connection()
+        mock_get_connection.return_value = mock_connection
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+
+        connection_dict = {}
+
+        mock_client.secrets.kv.v2.read_secret_metadata.return_value = {
+            'request_id': '94011e25-f8dc-ec29-221b-1f9c1d9ad2ae',
+            'lease_id': '',
+            'renewable': False,
+            'lease_duration': 0,
+            'metadata': [
+                {'created_time': '2020-03-16T21:01:43.331126Z',
+                 'deletion_time': '',
+                 'destroyed': False,
+                 'version': 1},
+                {'created_time': '2020-03-16T21:01:43.331126Z',
+                 'deletion_time': '',
+                 'destroyed': False,
+                 'version': 2},
+            ]
+        }
+
+        mock_connection.extra_dejson.get.side_effect = connection_dict.get
+        kwargs = {
+            "vault_conn_id": "vault_conn_id",
+            "auth_type": "token",
+            "kv_engine_version": 2
+        }
+
+        test_hook = VaultHook(**kwargs)
+        metadata = test_hook.get_secret_metadata(secret_path="missing")
+        self.assertEqual(
+            {
+                'request_id': '94011e25-f8dc-ec29-221b-1f9c1d9ad2ae',
+                'lease_id': '',
+                'renewable': False,
+                'lease_duration': 0,
+                'metadata': [
+                    {'created_time': '2020-03-16T21:01:43.331126Z',
+                     'deletion_time': '',
+                     'destroyed': False,
+                     'version': 1},
+                    {'created_time': '2020-03-16T21:01:43.331126Z',
+                     'deletion_time': '',
+                     'destroyed': False,
+                     'version': 2},
+                ]
+            }, metadata)
+        mock_client.secrets.kv.v2.read_secret_metadata.assert_called_once_with(
+            mount_point='secret', path='missing')
+
+    
@mock.patch("airflow.providers.hashicorp.hooks.vault.VaultHook.get_connection")
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_get_secret_including_metadata_v2(self, mock_hvac, 
mock_get_connection):
+        mock_connection = self.get_mock_connection()
+        mock_get_connection.return_value = mock_connection
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+
+        connection_dict = {}
+
+        mock_client.secrets.kv.v2.read_secret_version.return_value = {
+            'request_id': '94011e25-f8dc-ec29-221b-1f9c1d9ad2ae',
+            'lease_id': '',
+            'renewable': False,
+            'lease_duration': 0,
+            'data': {
+                'data': {'secret_key': 'secret_value'},
+                'metadata': {'created_time': '2020-03-16T21:01:43.331126Z',
+                             'deletion_time': '',
+                             'destroyed': False,
+                             'version': 1}},
+            'wrap_info': None,
+            'warnings': None,
+            'auth': None
+        }
+
+        mock_connection.extra_dejson.get.side_effect = connection_dict.get
+        kwargs = {
+            "vault_conn_id": "vault_conn_id",
+            "auth_type": "token",
+            "kv_engine_version": 2
+        }
+
+        test_hook = VaultHook(**kwargs)
+        metadata = 
test_hook.get_secret_including_metadata(secret_path="missing")
+        self.assertEqual(
+            {
+                'request_id': '94011e25-f8dc-ec29-221b-1f9c1d9ad2ae',
+                'lease_id': '',
+                'renewable': False,
+                'lease_duration': 0,
+                'data': {
+                    'data': {'secret_key': 'secret_value'},
+                    'metadata': {'created_time': '2020-03-16T21:01:43.331126Z',
+                                 'deletion_time': '',
+                                 'destroyed': False,
+                                 'version': 1}},
+                'wrap_info': None,
+                'warnings': None,
+                'auth': None
+            }, metadata)
+        mock_client.secrets.kv.v2.read_secret_version.assert_called_once_with(
+            mount_point='secret', path='missing', version=None)
+
+    
@mock.patch("airflow.providers.hashicorp.hooks.vault.VaultHook.get_connection")
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_create_or_update_secret_v2(self, mock_hvac, mock_get_connection):
+        mock_connection = self.get_mock_connection()
+        mock_get_connection.return_value = mock_connection
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+
+        connection_dict = {}
+
+        mock_connection.extra_dejson.get.side_effect = connection_dict.get
+        kwargs = {
+            "vault_conn_id": "vault_conn_id",
+            "auth_type": "token",
+            "kv_engine_version": 2
+        }
+
+        test_hook = VaultHook(**kwargs)
+        test_hook.create_or_update_secret(
+            secret_path="path",
+            secret={'key': 'value'}
+        )
+        
mock_client.secrets.kv.v2.create_or_update_secret.assert_called_once_with(
+            mount_point='secret', secret_path='path', secret={'key': 'value'}, 
cas=None)
+
+    
@mock.patch("airflow.providers.hashicorp.hooks.vault.VaultHook.get_connection")
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_create_or_update_secret_v2_cas(self, mock_hvac, 
mock_get_connection):
+        mock_connection = self.get_mock_connection()
+        mock_get_connection.return_value = mock_connection
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+
+        connection_dict = {}
+
+        mock_connection.extra_dejson.get.side_effect = connection_dict.get
+        kwargs = {
+            "vault_conn_id": "vault_conn_id",
+            "auth_type": "token",
+            "kv_engine_version": 2
+        }
+
+        test_hook = VaultHook(**kwargs)
+        test_hook.create_or_update_secret(
+            secret_path="path",
+            secret={'key': 'value'},
+            cas=10
+        )
+        
mock_client.secrets.kv.v2.create_or_update_secret.assert_called_once_with(
+            mount_point='secret', secret_path='path', secret={'key': 'value'}, 
cas=10)
+
+    
@mock.patch("airflow.providers.hashicorp.hooks.vault.VaultHook.get_connection")
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_create_or_update_secret_v1(self, mock_hvac, mock_get_connection):
+        mock_connection = self.get_mock_connection()
+        mock_get_connection.return_value = mock_connection
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+
+        connection_dict = {}
+
+        mock_connection.extra_dejson.get.side_effect = connection_dict.get
+        kwargs = {
+            "vault_conn_id": "vault_conn_id",
+            "auth_type": "token",
+            "kv_engine_version": 1
+        }
+
+        test_hook = VaultHook(**kwargs)
+        test_hook.create_or_update_secret(
+            secret_path="path",
+            secret={'key': 'value'}
+        )
+        
mock_client.secrets.kv.v1.create_or_update_secret.assert_called_once_with(
+            mount_point='secret', secret_path='path', secret={'key': 'value'}, 
method=None)
+
+    
@mock.patch("airflow.providers.hashicorp.hooks.vault.VaultHook.get_connection")
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    def test_create_or_update_secret_v1_post(self, mock_hvac, 
mock_get_connection):
+        mock_connection = self.get_mock_connection()
+        mock_get_connection.return_value = mock_connection
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+
+        connection_dict = {}
+
+        mock_connection.extra_dejson.get.side_effect = connection_dict.get
+        kwargs = {
+            "vault_conn_id": "vault_conn_id",
+            "auth_type": "token",
+            "kv_engine_version": 1
+        }
+
+        test_hook = VaultHook(**kwargs)
+        test_hook.create_or_update_secret(
+            secret_path="path",
+            secret={'key': 'value'},
+            method="post"
+        )
+        
mock_client.secrets.kv.v1.create_or_update_secret.assert_called_once_with(
+            mount_point='secret', secret_path='path', secret={'key': 'value'}, 
method="post")
diff --git a/tests/providers/hashicorp/secrets/test_vault.py 
b/tests/providers/hashicorp/secrets/test_vault.py
index eb576a6..c5d5cf4 100644
--- a/tests/providers/hashicorp/secrets/test_vault.py
+++ b/tests/providers/hashicorp/secrets/test_vault.py
@@ -24,7 +24,7 @@ from airflow.providers.hashicorp.secrets.vault import 
VaultBackend
 
 class TestVaultSecrets(TestCase):
 
-    @mock.patch("airflow.providers.hashicorp.secrets.vault.hvac")
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
     def test_get_conn_uri(self, mock_hvac):
         mock_client = mock.MagicMock()
         mock_hvac.Client.return_value = mock_client
@@ -56,7 +56,7 @@ class TestVaultSecrets(TestCase):
         returned_uri = test_client.get_conn_uri(conn_id="test_postgres")
         self.assertEqual('postgresql://airflow:airflow@host:5432/airflow', 
returned_uri)
 
-    @mock.patch("airflow.providers.hashicorp.secrets.vault.hvac")
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
     def test_get_conn_uri_engine_version_1(self, mock_hvac):
         mock_client = mock.MagicMock()
         mock_hvac.Client.return_value = mock_client
@@ -88,10 +88,10 @@ class TestVaultSecrets(TestCase):
     @mock.patch.dict('os.environ', {
         'AIRFLOW_CONN_TEST_MYSQL': 'mysql://airflow:airflow@host:5432/airflow',
     })
-    @mock.patch("airflow.providers.hashicorp.secrets.vault.hvac")
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
     def test_get_conn_uri_non_existent_key(self, mock_hvac):
         """
-        Test that if the key with connection ID is not present in Vault, 
VaultClient.get_connections
+        Test that if the key with connection ID is not present in Vault, 
_VaultClient.get_connections
         should return None
         """
         mock_client = mock.MagicMock()
@@ -110,10 +110,10 @@ class TestVaultSecrets(TestCase):
         test_client = VaultBackend(**kwargs)
         self.assertIsNone(test_client.get_conn_uri(conn_id="test_mysql"))
         mock_client.secrets.kv.v2.read_secret_version.assert_called_once_with(
-            mount_point='airflow', path='connections/test_mysql')
+            mount_point='airflow', path='connections/test_mysql', version=None)
         self.assertEqual([], test_client.get_connections(conn_id="test_mysql"))
 
-    @mock.patch("airflow.providers.hashicorp.secrets.vault.hvac")
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
     def test_get_variable_value(self, mock_hvac):
         mock_client = mock.MagicMock()
         mock_hvac.Client.return_value = mock_client
@@ -144,7 +144,7 @@ class TestVaultSecrets(TestCase):
         returned_uri = test_client.get_variable("hello")
         self.assertEqual('world', returned_uri)
 
-    @mock.patch("airflow.providers.hashicorp.secrets.vault.hvac")
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
     def test_get_variable_value_engine_version_1(self, mock_hvac):
         mock_client = mock.MagicMock()
         mock_hvac.Client.return_value = mock_client
@@ -176,10 +176,10 @@ class TestVaultSecrets(TestCase):
     @mock.patch.dict('os.environ', {
         'AIRFLOW_VAR_HELLO': 'world',
     })
-    @mock.patch("airflow.providers.hashicorp.secrets.vault.hvac")
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
     def test_get_variable_value_non_existent_key(self, mock_hvac):
         """
-        Test that if the key with connection ID is not present in Vault, 
VaultClient.get_connections
+        Test that if the key with connection ID is not present in Vault, 
_VaultClient.get_connections
         should return None
         """
         mock_client = mock.MagicMock()
@@ -198,10 +198,10 @@ class TestVaultSecrets(TestCase):
         test_client = VaultBackend(**kwargs)
         self.assertIsNone(test_client.get_variable("hello"))
         mock_client.secrets.kv.v2.read_secret_version.assert_called_once_with(
-            mount_point='airflow', path='variables/hello')
+            mount_point='airflow', path='variables/hello', version=None)
         self.assertIsNone(test_client.get_variable("hello"))
 
-    @mock.patch("airflow.providers.hashicorp.secrets.vault.hvac")
+    
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
     def test_auth_failure_raises_error(self, mock_hvac):
         mock_client = mock.MagicMock()
         mock_hvac.Client.return_value = mock_client
@@ -218,30 +218,6 @@ class TestVaultSecrets(TestCase):
         with self.assertRaisesRegex(VaultError, "Vault Authentication Error!"):
             VaultBackend(**kwargs).get_connections(conn_id='test')
 
-    @mock.patch("airflow.providers.hashicorp.secrets.vault.hvac")
-    def test_empty_token_raises_error(self, mock_hvac):
-        mock_client = mock.MagicMock()
-        mock_hvac.Client.return_value = mock_client
-
-        kwargs = {
-            "connections_path": "connections",
-            "mount_point": "airflow",
-            "auth_type": "token",
-            "url": "http://127.0.0.1:8200";,
-        }
-
-        with self.assertRaisesRegex(VaultError, "token cannot be None for 
auth_type='token'"):
-            VaultBackend(**kwargs).get_connections(conn_id='test')
-
-    def test_auth_type_kubernetes_without_role_raises_error(self):
-        kwargs = {
-            "auth_type": "kubernetes",
-            "url": "http://127.0.0.1:8200";,
-        }
-
-        with self.assertRaisesRegex(VaultError, "kubernetes_role cannot be 
None for auth_type='kubernetes'"):
-            VaultBackend(**kwargs).get_connections(conn_id='test')
-
     def test_auth_type_kubernetes_with_unreadable_jwt_raises_error(self):
         path = "/var/tmp/this_does_not_exist/334e918ef11987d3ef2f9553458ea09f"
         kwargs = {

Reply via email to