This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v1-10-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 85a8e68ec8eb71c17c8d8f5289b6de66605f787b Author: Kaxil Naik <[email protected]> AuthorDate: Wed Jul 8 13:29:54 2020 +0100 Get Airflow configs with sensitive data from Secret Backends (#9645) (cherry picked from commit 2f31b3060ed8274d5d1b1db7349ce607640b9199) --- airflow/configuration.py | 73 ++++++++++++++++++++++---- airflow/contrib/secrets/aws_secrets_manager.py | 21 +++++++- airflow/contrib/secrets/hashicorp_vault.py | 22 +++++++- airflow/secrets/__init__.py | 43 +++++++++------ airflow/secrets/base_secrets.py | 16 ++++-- airflow/secrets/metastore.py | 5 +- docs/howto/set-config.rst | 29 ++++++++-- tests/test_configuration.py | 50 ++++++++++++++++-- 8 files changed, 215 insertions(+), 44 deletions(-) diff --git a/airflow/configuration.py b/airflow/configuration.py index 8720d77..290843f 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -102,6 +102,15 @@ def run_command(command): return output +def _get_config_value_from_secret_backend(config_key): + """Get Config option values from Secret Backend""" + from airflow import secrets + secrets_client = secrets.get_custom_secret_backend() + if not secrets_client: + return None + return secrets_client.get_config(config_key) + + def _read_default_config_file(file_name): templates_dir = os.path.join(os.path.dirname(__file__), 'config_templates') file_path = os.path.join(templates_dir, file_name) @@ -136,7 +145,9 @@ class AirflowConfigParser(ConfigParser): # These configuration elements can be fetched as the stdout of commands # following the "{section}__{name}__cmd" pattern, the idea behind this # is to not store password on boxes in text files. - as_command_stdout = { + # These configs can also be fetched from Secrets backend + # following the "{section}__{name}__secret" pattern + sensitive_config_values = { ('core', 'sql_alchemy_conn'), ('core', 'fernet_key'), ('celery', 'broker_url'), @@ -275,19 +286,32 @@ class AirflowConfigParser(ConfigParser): env_var_cmd = env_var + '_CMD' if env_var_cmd in os.environ: # if this is a valid command key... - if (section, key) in self.as_command_stdout: + if (section, key) in self.sensitive_config_values: return run_command(os.environ[env_var_cmd]) + # alternatively AIRFLOW__{SECTION}__{KEY}_SECRET (to get from Secrets Backend) + env_var_secret_path = env_var + '_SECRET' + if env_var_secret_path in os.environ: + # if this is a valid secret path... + if (section, key) in self.sensitive_config_values: + return _get_config_value_from_secret_backend(os.environ[env_var_secret_path]) def _get_cmd_option(self, section, key): fallback_key = key + '_cmd' # if this is a valid command key... - if (section, key) in self.as_command_stdout: - if super(AirflowConfigParser, self) \ - .has_option(section, fallback_key): - command = super(AirflowConfigParser, self) \ - .get(section, fallback_key) + if (section, key) in self.sensitive_config_values: + if super(AirflowConfigParser, self).has_option(section, fallback_key): + command = super(AirflowConfigParser, self).get(section, fallback_key) return run_command(command) + def _get_secret_option(self, section, key): + """Get Config option values from Secret Backend""" + fallback_key = key + '_secret' + # if this is a valid secret key... + if (section, key) in self.sensitive_config_values: + if super(AirflowConfigParser, self).has_option(section, fallback_key): + secrets_path = super(AirflowConfigParser, self).get(section, fallback_key) + return _get_config_value_from_secret_backend(secrets_path) + def get(self, section, key, **kwargs): section = str(section).lower() key = str(key).lower() @@ -329,6 +353,16 @@ class AirflowConfigParser(ConfigParser): self._warn_deprecate(section, key, deprecated_name) return option + # ...then from secret backends + option = self._get_secret_option(section, key) + if option: + return option + if deprecated_name: + option = self._get_secret_option(section, deprecated_name) + if option: + self._warn_deprecate(section, key, deprecated_name) + return option + # ...then the default config if self.airflow_defaults.has_option(section, key) or 'fallback' in kwargs: return expand_env_var( @@ -466,7 +500,8 @@ class AirflowConfigParser(ConfigParser): def as_dict( self, display_source=False, display_sensitive=False, raw=False, - include_env=True, include_cmds=True): + include_env=True, include_cmds=True, include_secret=True + ): """ Returns the current configuration as an OrderedDict of OrderedDicts. @@ -488,6 +523,12 @@ class AirflowConfigParser(ConfigParser): set (True, default), or should the _cmd options be left as the command to run (False) :type include_cmds: bool + :param include_secret: Should the result of calling any *_secret config be + set (True, default), or should the _secret options be left as the + path to get the secret from (False) + :type include_secret: bool + :return: Dictionary, where the key is the name of the section and the content is + the dictionary with the name of the parameter and its value. """ cfg = {} configs = [ @@ -529,7 +570,7 @@ class AirflowConfigParser(ConfigParser): # add bash commands if include_cmds: - for (section, key) in self.as_command_stdout: + for (section, key) in self.sensitive_config_values: opt = self._get_cmd_option(section, key) if opt: if not display_sensitive: @@ -541,6 +582,20 @@ class AirflowConfigParser(ConfigParser): cfg.setdefault(section, OrderedDict()).update({key: opt}) del cfg[section][key + '_cmd'] + # add config from secret backends + if include_secret: + for (section, key) in self.sensitive_config_values: + opt = self._get_secret_option(section, key) + if opt: + if not display_sensitive: + opt = '< hidden >' + if display_source: + opt = (opt, 'secret') + elif raw: + opt = opt.replace('%', '%%') + cfg.setdefault(section, OrderedDict()).update({key: opt}) + del cfg[section][key + '_secret'] + return cfg def load_test_config(self): diff --git a/airflow/contrib/secrets/aws_secrets_manager.py b/airflow/contrib/secrets/aws_secrets_manager.py index 7896dac..4df9bdb 100644 --- a/airflow/contrib/secrets/aws_secrets_manager.py +++ b/airflow/contrib/secrets/aws_secrets_manager.py @@ -42,8 +42,11 @@ class SecretsManagerBackend(BaseSecretsBackend, LoggingMixin): For example, if secrets prefix is ``airflow/connections/smtp_default``, this would be accessible if you provide ``{"connections_prefix": "airflow/connections"}`` and request conn_id ``smtp_default``. - And if variables prefix is ``airflow/variables/hello``, this would be accessible + If variables prefix is ``airflow/variables/hello``, this would be accessible if you provide ``{"variables_prefix": "airflow/variables"}`` and request variable key ``hello``. + And if config_prefix is ``airflow/config/sql_alchemy_conn``, this would be accessible + if you provide ``{"config_prefix": "airflow/config"}`` and request config + key ``sql_alchemy_conn``. You can also pass additional keyword arguments like ``aws_secret_access_key``, ``aws_access_key_id`` or ``region_name`` to this class and they would be passed on to Boto3 client. @@ -52,6 +55,8 @@ class SecretsManagerBackend(BaseSecretsBackend, LoggingMixin): :type connections_prefix: str :param variables_prefix: Specifies the prefix of the secret to read to get Variables. :type variables_prefix: str + :param config_prefix: Specifies the prefix of the secret to read to get Variables. + :type config_prefix: str :param profile_name: The name of a profile to use. If not given, then the default profile is used. :type profile_name: str :param sep: separator used to concatenate secret_prefix and secret_id. Default: "/" @@ -62,6 +67,7 @@ class SecretsManagerBackend(BaseSecretsBackend, LoggingMixin): self, connections_prefix='airflow/connections', # type: str variables_prefix='airflow/variables', # type: str + config_prefix='airflow/config', # type: str profile_name=None, # type: Optional[str] sep="/", # type: str **kwargs @@ -69,6 +75,7 @@ class SecretsManagerBackend(BaseSecretsBackend, LoggingMixin): super(SecretsManagerBackend, self).__init__(**kwargs) self.connections_prefix = connections_prefix.rstrip("/") self.variables_prefix = variables_prefix.rstrip('/') + self.config_prefix = config_prefix.rstrip('/') self.profile_name = profile_name self.sep = sep self.kwargs = kwargs @@ -96,13 +103,23 @@ class SecretsManagerBackend(BaseSecretsBackend, LoggingMixin): def get_variable(self, key): # type: (str) -> Optional[str] """ - Get Airflow Variable from Environment Variable + Get Airflow Variable :param key: Variable Key :return: Variable Value """ return self._get_secret(self.variables_prefix, key) + def get_config(self, key): + # type: (str) -> Optional[str] + """ + Get Airflow Configuration + + :param key: Configuration Option Key + :return: Configuration Option Value + """ + return self._get_secret(self.config_prefix, key) + def _get_secret(self, path_prefix, secret_id): # type: (str, str) -> Optional[str] """ diff --git a/airflow/contrib/secrets/hashicorp_vault.py b/airflow/contrib/secrets/hashicorp_vault.py index 8f6b53c..536e7f9 100644 --- a/airflow/contrib/secrets/hashicorp_vault.py +++ b/airflow/contrib/secrets/hashicorp_vault.py @@ -55,6 +55,9 @@ class VaultBackend(BaseSecretsBackend, LoggingMixin): :param variables_path: Specifies the path of the secret to read to get Variables. (default: 'variables') :type variables_path: str + :param config_path: Specifies the path of the secret to read Airflow Configurations + (default: 'configs'). + :type config_path: str :param url: Base URL for the Vault instance being addressed. :type url: str :param auth_type: Authentication Type for Vault (one of 'token', 'ldap', 'userpass', 'approle', @@ -89,6 +92,7 @@ class VaultBackend(BaseSecretsBackend, LoggingMixin): self, connections_path='connections', # type: str variables_path='variables', # type: str + config_path='config', # type: str url=None, # type: Optional[str] auth_type='token', # type: str mount_point='secret', # type: str @@ -104,9 +108,10 @@ class VaultBackend(BaseSecretsBackend, LoggingMixin): gcp_scopes=None, # type: Optional[str] **kwargs ): - super(VaultBackend, self).__init__(**kwargs) + super(VaultBackend, self).__init__() self.connections_path = connections_path.rstrip('/') self.variables_path = variables_path.rstrip('/') + self.config_path = config_path.rstrip('/') self.url = url self.auth_type = auth_type self.kwargs = kwargs @@ -179,7 +184,7 @@ class VaultBackend(BaseSecretsBackend, LoggingMixin): def get_variable(self, key): # type: (str) -> Optional[str] """ - Get Airflow Variable from Environment Variable + Get Airflow Variable :param key: Variable Key :return: Variable Value @@ -213,3 +218,16 @@ class VaultBackend(BaseSecretsBackend, LoggingMixin): return_data = response["data"] if self.kv_engine_version == 1 else response["data"]["data"] return return_data + + def get_config(self, key): + # type: (str) -> Optional[str] + """ + Get Airflow Configuration + + :param key: Configuration Option Key + :type key: str + :rtype: str + :return: Configuration Option Value retrieved from the vault + """ + response = self._get_secret(self.config_path, key) + return response.get("value") if response else None diff --git a/airflow/secrets/__init__.py b/airflow/secrets/__init__.py index 57dde4b..8736ac6 100644 --- a/airflow/secrets/__init__.py +++ b/airflow/secrets/__init__.py @@ -22,17 +22,20 @@ Secrets framework provides means of getting connection objects from various sour * Metatsore database * AWS SSM Parameter store """ -__all__ = ['BaseSecretsBackend', 'get_connections', 'get_variable'] +__all__ = ['BaseSecretsBackend', 'get_connections', 'get_variable', 'get_custom_secret_backend'] import json -from typing import List, Optional +from typing import TYPE_CHECKING, List, Optional from airflow.configuration import conf from airflow.exceptions import AirflowException -from airflow.models.connection import Connection from airflow.secrets.base_secrets import BaseSecretsBackend from airflow.utils.module_loading import import_string +if TYPE_CHECKING: + from airflow.models.connection import Connection + + CONFIG_SECTION = "secrets" DEFAULT_SECRETS_SEARCH_PATH = [ "airflow.secrets.environment_variables.EnvironmentVariablesBackend", @@ -41,7 +44,7 @@ DEFAULT_SECRETS_SEARCH_PATH = [ def get_connections(conn_id): - # type: (str) -> List[Connection] + # type: (str) -> List['Connection'] """ Get all connections as an iterable. @@ -72,25 +75,35 @@ def get_variable(key): return None +def get_custom_secret_backend(): + # type: (...) -> Optional[BaseSecretsBackend] + """Get Secret Backend if defined in airflow.cfg""" + alternative_secrets_backend = conf.get(section=CONFIG_SECTION, key='backend', fallback='') + + if alternative_secrets_backend: + try: + alternative_secrets_config_dict = json.loads( + conf.get(section=CONFIG_SECTION, key='backend_kwargs', fallback='{}') + ) + except ValueError: + alternative_secrets_config_dict = {} + secrets_backend_cls = import_string(alternative_secrets_backend) + return secrets_backend_cls(**alternative_secrets_config_dict) + return None + + def initialize_secrets_backends(): # type: (...) -> List[BaseSecretsBackend] """ * import secrets backend classes * instantiate them and return them in a list """ - alternative_secrets_backend = conf.get(section=CONFIG_SECTION, key='backend', fallback='') - try: - alternative_secrets_config_dict = json.loads( - conf.get(section=CONFIG_SECTION, key='backend_kwargs', fallback='{}') - ) - except ValueError: - alternative_secrets_config_dict = {} - backend_list = [] - if alternative_secrets_backend: - secrets_backend_cls = import_string(alternative_secrets_backend) - backend_list.append(secrets_backend_cls(**alternative_secrets_config_dict)) + custom_secret_backend = get_custom_secret_backend() + + if custom_secret_backend is not None: + backend_list.append(custom_secret_backend) for class_name in DEFAULT_SECRETS_SEARCH_PATH: secrets_backend_cls = import_string(class_name) diff --git a/airflow/secrets/base_secrets.py b/airflow/secrets/base_secrets.py index 2394f40..a8c0e6b 100644 --- a/airflow/secrets/base_secrets.py +++ b/airflow/secrets/base_secrets.py @@ -16,9 +16,7 @@ # under the License. from abc import ABCMeta -from typing import List, Optional - -from airflow.models.connection import Connection +from typing import Optional class BaseSecretsBackend: @@ -56,13 +54,13 @@ class BaseSecretsBackend: raise NotImplementedError() def get_connections(self, conn_id): - # type: (str) -> List[Connection] """ Get connections with a specific ID :param conn_id: connection id :type conn_id: str """ + from airflow.models.connection import Connection conn_uri = self.get_conn_uri(conn_id=conn_id) if not conn_uri: return [] @@ -78,3 +76,13 @@ class BaseSecretsBackend: :return: Variable Value """ raise NotImplementedError() + + def get_config(self, key): # pylint: disable=unused-argument + # type: (str) -> Optional[str] + """ + Return value for Airflow Config Key + + :param key: Config Key + :return: Config Value + """ + return None diff --git a/airflow/secrets/metastore.py b/airflow/secrets/metastore.py index 51d8740..f1412e9 100644 --- a/airflow/secrets/metastore.py +++ b/airflow/secrets/metastore.py @@ -19,9 +19,6 @@ Objects relating to sourcing connections from metastore database """ -from typing import List - -from airflow.models.connection import Connection from airflow.secrets import BaseSecretsBackend from airflow.utils.db import provide_session @@ -34,7 +31,7 @@ class MetastoreBackend(BaseSecretsBackend): # pylint: disable=missing-docstring @provide_session def get_connections(self, conn_id, session=None): - # type: (...) -> List[Connection] + from airflow.models.connection import Connection conn_list = session.query(Connection).filter(Connection.conn_id == conn_id).all() session.expunge_all() return conn_list diff --git a/docs/howto/set-config.rst b/docs/howto/set-config.rst index 035bc29..270fd7f 100644 --- a/docs/howto/set-config.rst +++ b/docs/howto/set-config.rst @@ -47,7 +47,21 @@ the key like this: [core] sql_alchemy_conn_cmd = bash_command_to_run -The following config options support this ``_cmd`` version: +You can also derive the connection string at run time by appending ``_secret`` to +the key like this: + +.. code-block:: ini + + [core] + sql_alchemy_conn_secret = sql_alchemy_conn + # You can also add a nested path + # example: + # sql_alchemy_conn_secret = core/sql_alchemy_conn + +This will retrieve config option from Secret Backends e.g Hashicorp Vault. See +:ref:`Secrets Backends<secrets_backend_configuration>` for more details. + +The following config options support this ``_cmd`` and ``_secret`` version: * ``sql_alchemy_conn`` in ``[core]`` section * ``fernet_key`` in ``[core]`` section @@ -66,12 +80,21 @@ the same way the usual config options can. For example: export AIRFLOW__CORE__SQL_ALCHEMY_CONN_CMD=bash_command_to_run +Similarly, ``_secret`` config options can also be set using a corresponding environment variable. +For example: + +.. code-block:: bash + + export AIRFLOW__CORE__SQL_ALCHEMY_CONN_SECRET=sql_alchemy_conn + The idea behind this is to not store passwords on boxes in plain text files. The universal order of precedence for all configuration options is as follows: -#. set as an environment variable -#. set as a command environment variable +#. set as an environment variable (``AIRFLOW__CORE__SQL_ALCHEMY_CONN``) +#. set as a command environment variable (``AIRFLOW__CORE__SQL_ALCHEMY_CONN_CMD``) +#. set as a secret environment variable (``AIRFLOW__CORE__SQL_ALCHEMY_CONN_SECRET``) #. set in ``airflow.cfg`` #. command in ``airflow.cfg`` +#. secret key in ``airflow.cfg`` #. Airflow's built in defaults diff --git a/tests/test_configuration.py b/tests/test_configuration.py index 5c40cad..fcf7964 100644 --- a/tests/test_configuration.py +++ b/tests/test_configuration.py @@ -191,7 +191,7 @@ key6 = value6 test_conf = AirflowConfigParser( default_config=parameterized_config(TEST_CONFIG_DEFAULT)) test_conf.read_string(TEST_CONFIG) - test_conf.as_command_stdout = test_conf.as_command_stdout | { + test_conf.sensitive_config_values = test_conf.sensitive_config_values | { ('test', 'key2'), ('test', 'key4'), } @@ -225,6 +225,46 @@ key6 = value6 self.assertNotIn('key4', cfg_dict['test']) self.assertEqual('printf key4_result', cfg_dict['test']['key4_cmd']) + @mock.patch("airflow.contrib.secrets.hashicorp_vault.hvac") + @conf_vars({ + ("secrets", "backend"): "airflow.contrib.secrets.hashicorp_vault.VaultBackend", + ("secrets", "backend_kwargs"): '{"url": "http://127.0.0.1:8200", "token": "token"}', + }) + def test_config_from_secret_backend(self, mock_hvac): + """Get Config Value from a Secret Backend""" + mock_client = mock.MagicMock() + mock_hvac.Client.return_value = mock_client + mock_client.secrets.kv.v2.read_secret_version.return_value = { + 'request_id': '2d48a2ad-6bcb-e5b6-429d-da35fdf31f56', + 'lease_id': '', + 'renewable': False, + 'lease_duration': 0, + 'data': {'data': {'value': 'sqlite:////Users/airflow/airflow/airflow.db'}, + 'metadata': {'created_time': '2020-03-28T02:10:54.301784Z', + 'deletion_time': '', + 'destroyed': False, + 'version': 1}}, + 'wrap_info': None, + 'warnings': None, + 'auth': None + } + + test_config = '''[test] +sql_alchemy_conn_secret = sql_alchemy_conn +''' + test_config_default = '''[test] +sql_alchemy_conn = airflow +''' + + test_conf = AirflowConfigParser(default_config=parameterized_config(test_config_default)) + test_conf.read_string(test_config) + test_conf.sensitive_config_values = test_conf.sensitive_config_values | { + ('test', 'sql_alchemy_conn'), + } + + self.assertEqual( + 'sqlite:////Users/airflow/airflow/airflow.db', test_conf.get('test', 'sql_alchemy_conn')) + def test_getboolean(self): """Test AirflowConfigParser.getboolean""" TEST_CONFIG = """ @@ -410,7 +450,7 @@ AIRFLOW_HOME = /root/airflow # Guarantee we have a deprecated setting, so we test the deprecation # lookup even if we remove this explicit fallback conf.deprecated_options['celery'] = {'result_backend': 'celery_result_backend'} - conf.as_command_stdout.add(('celery', 'celery_result_backend')) + conf.sensitive_config_values.add(('celery', 'celery_result_backend')) conf.remove_option('celery', 'result_backend') conf.set('celery', 'celery_result_backend_cmd', '/bin/echo 99') @@ -478,13 +518,13 @@ notacommand = OK ''' test_cmdenv_conf = AirflowConfigParser() test_cmdenv_conf.read_string(TEST_CMDENV_CONFIG) - test_cmdenv_conf.as_command_stdout.add(('testcmdenv', 'itsacommand')) + test_cmdenv_conf.sensitive_config_values.add(('testcmdenv', 'itsacommand')) with mock.patch.dict('os.environ'): # AIRFLOW__TESTCMDENV__ITSACOMMAND_CMD maps to ('testcmdenv', 'itsacommand') in - # as_command_stdout and therefore should return 'OK' from the environment variable's + # sensitive_config_values and therefore should return 'OK' from the environment variable's # echo command, and must not return 'NOT OK' from the configuration self.assertEqual(test_cmdenv_conf.get('testcmdenv', 'itsacommand'), 'OK') - # AIRFLOW__TESTCMDENV__NOTACOMMAND_CMD maps to no entry in as_command_stdout and therefore + # AIRFLOW__TESTCMDENV__NOTACOMMAND_CMD maps to no entry in sensitive_config_values and therefore # the option should return 'OK' from the configuration, and must not return 'NOT OK' from # the environement variable's echo command self.assertEqual(test_cmdenv_conf.get('testcmdenv', 'notacommand'), 'OK')
