This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v1-10-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 74faf5eeffd0028d294d6aaca8f2d8f5fc7d61ad Author: Kaxil Naik <kaxiln...@gmail.com> AuthorDate: Sun Sep 13 15:45:21 2020 +0100 Add Secrets backend for Microsoft Azure Key Vault (#10898) (cherry picked from commit f77a11d5b1e9d76b1d57c8a0d653b3ab28f33894) --- airflow/contrib/secrets/azure_key_vault.py | 148 +++++++++++++++++++++++++ docs/howto/use-alternative-secrets-backend.rst | 35 ++++++ setup.py | 6 +- tests/contrib/secrets/test_azure_key_vault.py | 105 ++++++++++++++++++ 4 files changed, 293 insertions(+), 1 deletion(-) diff --git a/airflow/contrib/secrets/azure_key_vault.py b/airflow/contrib/secrets/azure_key_vault.py new file mode 100644 index 0000000..2e48efe --- /dev/null +++ b/airflow/contrib/secrets/azure_key_vault.py @@ -0,0 +1,148 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from typing import Optional + +from azure.core.exceptions import ResourceNotFoundError +from azure.identity import DefaultAzureCredential +from azure.keyvault.secrets import SecretClient +from cached_property import cached_property + +from airflow.secrets import BaseSecretsBackend +from airflow.utils.log.logging_mixin import LoggingMixin + + +class AzureKeyVaultBackend(BaseSecretsBackend, LoggingMixin): + """ + Retrieves Airflow Connections or Variables from Azure Key Vault secrets. + + The Azure Key Vault can be configured as a secrets backend in the ``airflow.cfg``: + + .. code-block:: ini + + [secrets] + backend = airflow.providers.microsoft.azure.secrets.azure_key_vault.AzureKeyVaultBackend + backend_kwargs = {"connections_prefix": "airflow-connections", "vault_url": "<azure_key_vault_uri>"} + + For example, if the secrets prefix is ``airflow-connections-smtp-default``, this would be accessible + if you provide ``{"connections_prefix": "airflow-connections"}`` and request conn_id ``smtp-default``. + And if variables prefix is ``airflow-variables-hello``, this would be accessible + if you provide ``{"variables_prefix": "airflow-variables"}`` and request variable key ``hello``. + + :param connections_prefix: Specifies the prefix of the secret to read to get Connections + :type connections_prefix: str + :param variables_prefix: Specifies the prefix of the secret to read to get Variables + :type variables_prefix: str + :param config_prefix: Specifies the prefix of the secret to read to get Variables. + :type config_prefix: str + :param vault_url: The URL of an Azure Key Vault to use + :type vault_url: str + :param sep: separator used to concatenate secret_prefix and secret_id. Default: "-" + :type sep: str + """ + + def __init__( + self, + connections_prefix='airflow-connections', # type: str + variables_prefix='airflow-variables', # type: str + config_prefix='airflow-config', # type: str + vault_url='', # type: str + sep='-', # type: str + **kwargs + ): + super(AzureKeyVaultBackend, self).__init__() + self.vault_url = vault_url + self.connections_prefix = connections_prefix.rstrip(sep) + self.variables_prefix = variables_prefix.rstrip(sep) + self.config_prefix = config_prefix.rstrip(sep) + self.sep = sep + self.kwargs = kwargs + + @cached_property + def client(self): + """ + Create a Azure Key Vault client. + """ + credential = DefaultAzureCredential() + client = SecretClient(vault_url=self.vault_url, credential=credential, **self.kwargs) + return client + + def get_conn_uri(self, conn_id): + # type: (str) -> Optional[str] + """ + Get an Airflow Connection URI from an Azure Key Vault secret + + :param conn_id: The Airflow connection id to retrieve + :type conn_id: str + """ + return self._get_secret(self.connections_prefix, conn_id) + + def get_variable(self, key): + # type: (str) -> Optional[str] + """ + Get an Airflow Variable from an Azure Key Vault secret. + + :param key: Variable Key + :type key: str + :return: Variable Value + """ + return self._get_secret(self.variables_prefix, key) + + def get_config(self, key): + # type: (str) -> Optional[str] + """ + Get Airflow Configuration + + :param key: Configuration Option Key + :return: Configuration Option Value + """ + return self._get_secret(self.config_prefix, key) + + @staticmethod + def build_path(path_prefix, secret_id, sep='-'): + # type: (str, str, str) -> str + """ + Given a path_prefix and secret_id, build a valid secret name for the Azure Key Vault Backend. + Also replaces underscore in the path with dashes to support easy switching between + environment variables, so ``connection_default`` becomes ``connection-default``. + + :param path_prefix: The path prefix of the secret to retrieve + :type path_prefix: str + :param secret_id: Name of the secret + :type secret_id: str + :param sep: Separator used to concatenate path_prefix and secret_id + :type sep: str + """ + path = '{}{}{}'.format(path_prefix, sep, secret_id) + return path.replace('_', sep) + + def _get_secret(self, path_prefix, secret_id): + # type: (str, str) -> Optional[str] + """ + Get an Azure Key Vault secret value + + :param path_prefix: Prefix for the Path to get Secret + :type path_prefix: str + :param secret_id: Secret Key + :type secret_id: str + """ + name = self.build_path(path_prefix, secret_id, self.sep) + try: + secret = self.client.get_secret(name=name) + return secret.value + except ResourceNotFoundError as ex: + self.log.debug('Secret %s not found: %s', name, ex) + return None diff --git a/docs/howto/use-alternative-secrets-backend.rst b/docs/howto/use-alternative-secrets-backend.rst index 2fa2894..5639cce 100644 --- a/docs/howto/use-alternative-secrets-backend.rst +++ b/docs/howto/use-alternative-secrets-backend.rst @@ -408,6 +408,41 @@ When ``gcp_key_path`` is not provided, it will use the Application Default Crede The value of the Secrets Manager secret id must be the :ref:`connection URI representation <generating_connection_uri>` of the connection object. +Azure Key Vault Backend +^^^^^^^^^^^^^^^^^^^^^^^ + +To enable the Azure Key Vault as secrets backend, specify +:py:class:`~airflow.contrib.secrets.azure_key_vault.AzureKeyVaultBackend` +as the ``backend`` in ``[secrets]`` section of ``airflow.cfg``. + +Here is a sample configuration: + +.. code-block:: ini + + [secrets] + backend = airflow.contrib.secrets.azure_key_vault.AzureKeyVaultBackend + backend_kwargs = {"connections_prefix": "airflow-connections", "variables_prefix": "airflow-variables", "vault_url": "https://example-akv-resource-name.vault.azure.net/"} + +For client authentication, the ``DefaultAzureCredential`` from the Azure Python SDK is used as credential provider, +which supports service principal, managed identity and user credentials. + + +Storing and Retrieving Connections +"""""""""""""""""""""""""""""""""" + +If you have set ``connections_prefix`` as ``airflow-connections``, then for a connection id of ``smtp_default``, +you would want to store your connection at ``airflow-connections-smtp-default``. + +The value of the secret must be the :ref:`connection URI representation <generating_connection_uri>` +of the connection object. + +Storing and Retrieving Variables +"""""""""""""""""""""""""""""""" + +If you have set ``variables_prefix`` as ``airflow-variables``, then for an Variable key of ``hello``, +you would want to store your Variable at ``airflow-variables-hello``. + + .. _roll_your_own_secrets_backend: Roll your own secrets backend diff --git a/setup.py b/setup.py index 06892de..1388be9 100644 --- a/setup.py +++ b/setup.py @@ -199,6 +199,10 @@ azure_data_lake = [ 'azure-mgmt-datalake-store>=0.5.0', 'azure-mgmt-resource>=2.2.0', ] +azure_secrets = [ + 'azure-identity>=1.3.1', + 'azure-keyvault>=4.1.0', +] cassandra = [ 'cassandra-driver>=3.13.0,<3.21.0', ] @@ -481,7 +485,7 @@ EXTRAS_REQUIREMENTS = { 'async': async_packages, 'atlas': atlas, 'aws': aws, - 'azure': azure_blob_storage + azure_container_instances + azure_cosmos + azure_data_lake, + 'azure': azure_blob_storage + azure_container_instances + azure_cosmos + azure_data_lake + azure_secrets, 'azure_blob_storage': azure_blob_storage, 'azure_container_instances': azure_container_instances, 'azure_cosmos': azure_cosmos, diff --git a/tests/contrib/secrets/test_azure_key_vault.py b/tests/contrib/secrets/test_azure_key_vault.py new file mode 100644 index 0000000..a8ff9dc --- /dev/null +++ b/tests/contrib/secrets/test_azure_key_vault.py @@ -0,0 +1,105 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import unittest +from tests.compat import mock + +from azure.core.exceptions import ResourceNotFoundError + +from airflow.contrib.secrets.azure_key_vault import AzureKeyVaultBackend + + +class TestAzureKeyVaultBackend(unittest.TestCase): + @mock.patch('airflow.contrib.secrets.azure_key_vault.AzureKeyVaultBackend.get_conn_uri') + def test_get_connections(self, mock_get_uri): + mock_get_uri.return_value = 'scheme://user:pass@host:100' + conn_list = AzureKeyVaultBackend().get_connections('fake_conn') + conn = conn_list[0] + self.assertEqual(conn.host, 'host') + + @mock.patch('airflow.contrib.secrets.azure_key_vault.DefaultAzureCredential') + @mock.patch('airflow.contrib.secrets.azure_key_vault.SecretClient') + def test_get_conn_uri(self, mock_secret_client, mock_azure_cred): + mock_cred = mock.Mock() + mock_sec_client = mock.Mock() + mock_azure_cred.return_value = mock_cred + mock_secret_client.return_value = mock_sec_client + + mock_sec_client.get_secret.return_value = mock.Mock( + value='postgresql://airflow:airflow@host:5432/airflow' + ) + + backend = AzureKeyVaultBackend(vault_url="https://example-akv-resource-name.vault.azure.net/") + returned_uri = backend.get_conn_uri(conn_id='hi') + mock_secret_client.assert_called_once_with( + credential=mock_cred, vault_url='https://example-akv-resource-name.vault.azure.net/' + ) + self.assertEqual(returned_uri, 'postgresql://airflow:airflow@host:5432/airflow') + + @mock.patch('airflow.contrib.secrets.azure_key_vault.AzureKeyVaultBackend.client') + def test_get_conn_uri_non_existent_key(self, mock_client): + """ + Test that if the key with connection ID is not present, + AzureKeyVaultBackend.get_connections should return None + """ + conn_id = 'test_mysql' + mock_client.get_secret.side_effect = ResourceNotFoundError + backend = AzureKeyVaultBackend(vault_url="https://example-akv-resource-name.vault.azure.net/") + + self.assertIsNone(backend.get_conn_uri(conn_id=conn_id)) + self.assertEqual([], backend.get_connections(conn_id=conn_id)) + + @mock.patch('airflow.contrib.secrets.azure_key_vault.AzureKeyVaultBackend.client') + def test_get_variable(self, mock_client): + mock_client.get_secret.return_value = mock.Mock(value='world') + backend = AzureKeyVaultBackend() + returned_uri = backend.get_variable('hello') + mock_client.get_secret.assert_called_with(name='airflow-variables-hello') + self.assertEqual('world', returned_uri) + + @mock.patch('airflow.contrib.secrets.azure_key_vault.AzureKeyVaultBackend.client') + def test_get_variable_non_existent_key(self, mock_client): + """ + Test that if Variable key is not present, + AzureKeyVaultBackend.get_variables should return None + """ + mock_client.get_secret.side_effect = ResourceNotFoundError + backend = AzureKeyVaultBackend() + self.assertIsNone(backend.get_variable('test_mysql')) + + @mock.patch('airflow.contrib.secrets.azure_key_vault.AzureKeyVaultBackend.client') + def test_get_secret_value_not_found(self, mock_client): + """ + Test that if a non-existent secret returns None + """ + mock_client.get_secret.side_effect = ResourceNotFoundError + backend = AzureKeyVaultBackend() + self.assertIsNone( + backend._get_secret(path_prefix=backend.connections_prefix, secret_id='test_non_existent') + ) + + @mock.patch('airflow.contrib.secrets.azure_key_vault.AzureKeyVaultBackend.client') + def test_get_secret_value(self, mock_client): + """ + Test that get_secret returns the secret value + """ + mock_client.get_secret.return_value = mock.Mock(value='super-secret') + backend = AzureKeyVaultBackend() + secret_val = backend._get_secret('af-secrets', 'test_mysql_password') + mock_client.get_secret.assert_called_with(name='af-secrets-test-mysql-password') + self.assertEqual(secret_val, 'super-secret')