This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 888bc2e233 Fix retrieval of deprecated non-config values (#23723)
888bc2e233 is described below

commit 888bc2e233b1672a61433929e26b82210796fd71
Author: Jarek Potiuk <[email protected]>
AuthorDate: Fri May 20 16:09:52 2022 +0200

    Fix retrieval of deprecated non-config values (#23723)
    
    It turned out that deprecation of config values did not work as
    intended. While deprecation worked fine when the value was specified
    in configuration value it did not work when `run_as_user` was used.
    
    In those cases the "as_dict" option was used to generate temporary
    configuratin and this temporary configuration contained default value
    for the new configuration value - for example it caused that
    the generated temporary value contained:
    
    ```
    [database]
    sql_alchemy_conn=sqlite:///{AIRFLOW_HOME}/airflow.db
    ```
    
    Even if the deprecated `core/sql_alchemy_conn` was set (and no
    new `database/sql_alchemy_conn` was set at the same time.
    
    This effectively rendered the old installation that did not convert
    to the new "database" configuration not working for run_as_user, because
    the tasks run with "run_as_user" used wrong, empty sqlite database
    instaead of the one configured for Airflow.
    
    Also during adding tests, it turned out that the mechanism was also
    not working as intended before - in case `_CMD` or `_SECRET` were used
    as environment variables rather than configuration. In those cases
    both _CMD and _SECRET should be evaluated during as_dict() evaluation,
    because the "run_as_user" might have not enough permission to run the
    command or retrieve secret. The _cmd and _secret variables were only
    evaluated during as_dict() when they were in the config file (note
    that this only happens when include_cmd, include_env, include_secret
    are set to True).
    
    The changes implemented in this PR fix both problems:
    
    * the _CMD and _SECRET env vars are evaluated during as_dict when the
      respective include_* is set
    * the defaults are only set for the values that have deprecations
      in case the deprecations have no values set in either of the ways:
        * in config file
        * in env variable
        * in _cmd (via config file or env variable)
        * in _secret (via config file or env variable)
    
    Fixes: #23679
---
 airflow/config_templates/config.yml.schema.json |   2 +-
 airflow/configuration.py                        | 180 ++++++-
 tests/config_templates/deprecated.cfg           |  28 +
 tests/config_templates/deprecated_cmd.cfg       |  29 ++
 tests/config_templates/deprecated_secret.cfg    |  29 ++
 tests/config_templates/empty.cfg                |  26 +
 tests/core/test_configuration.py                | 665 ++++++++++++++++++------
 tests/utils/test_config.py                      |  79 +++
 8 files changed, 849 insertions(+), 189 deletions(-)

diff --git a/airflow/config_templates/config.yml.schema.json 
b/airflow/config_templates/config.yml.schema.json
index 9fef382fa4..1b43306090 100644
--- a/airflow/config_templates/config.yml.schema.json
+++ b/airflow/config_templates/config.yml.schema.json
@@ -36,7 +36,7 @@
             },
             "sensitive": {
               "type": "boolean",
-              "description": "When true, this option is sensitive and can be 
specified using AIRFLOW__{section}___{name}__SECRET or 
AIRFLOW__{section}___{name}__CMD environment variables. See: 
airflow.configuration.AirflowConfigParser.sensitive_config_values"
+              "description": "When true, this option is sensitive and can be 
specified using AIRFLOW__{section}___{name}__SECRET or 
AIRFLOW__{section}___{name}_CMD environment variables. See: 
airflow.configuration.AirflowConfigParser.sensitive_config_values"
             }
           },
           "required": [
diff --git a/airflow/configuration.py b/airflow/configuration.py
index 91b8a4e94e..a33b3f3998 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -59,6 +59,8 @@ ConfigOptionsDictType = Dict[str, ConfigType]
 ConfigSectionSourcesType = Dict[str, Union[str, Tuple[str, str]]]
 ConfigSourcesType = Dict[str, ConfigSectionSourcesType]
 
+ENV_VAR_PREFIX = 'AIRFLOW__'
+
 
 def _parse_sqlite_version(s: str) -> Tuple[int, ...]:
     match = _SQLITE3_VERSION_PATTERN.match(s)
@@ -144,7 +146,7 @@ class AirflowConfigParser(ConfigParser):
     """Custom Airflow Configparser supporting defaults and deprecated 
options"""
 
     # These configuration elements can be fetched as the stdout of commands
-    # following the "{section}__{name}__cmd" pattern, the idea behind this
+    # following the "{section}__{name}_cmd" pattern, the idea behind this
     # is to not store password on boxes in text files.
     # These configs can also be fetched from Secrets backend
     # following the "{section}__{name}__secret" pattern
@@ -435,10 +437,8 @@ class AirflowConfigParser(ConfigParser):
             FutureWarning,
         )
 
-    ENV_VAR_PREFIX = 'AIRFLOW__'
-
     def _env_var_name(self, section: str, key: str) -> str:
-        return f'{self.ENV_VAR_PREFIX}{section.upper()}__{key.upper()}'
+        return f'{ENV_VAR_PREFIX}{section.upper()}__{key.upper()}'
 
     def _get_env_var_option(self, section: str, key: str):
         # must have format AIRFLOW__{SECTION}__{KEY} (note double underscore)
@@ -461,23 +461,53 @@ class AirflowConfigParser(ConfigParser):
 
     def _get_cmd_option(self, section: str, key: str):
         fallback_key = key + '_cmd'
-        # if this is a valid command key...
         if (section, key) in self.sensitive_config_values:
             if super().has_option(section, fallback_key):
                 command = super().get(section, fallback_key)
                 return run_command(command)
         return None
 
+    def _get_cmd_option_from_config_sources(
+        self, config_sources: ConfigSourcesType, section: str, key: str
+    ) -> Optional[str]:
+        fallback_key = key + '_cmd'
+        if (section, key) in self.sensitive_config_values:
+            section_dict = config_sources.get(section)
+            if section_dict is not None:
+                command_value = section_dict.get(fallback_key)
+                if command_value is not None:
+                    if isinstance(command_value, str):
+                        command = command_value
+                    else:
+                        command = command_value[0]
+                    return run_command(command)
+        return None
+
     def _get_secret_option(self, section: str, key: str) -> Optional[str]:
         """Get Config option values from Secret Backend"""
         fallback_key = key + '_secret'
-        # if this is a valid secret key...
         if (section, key) in self.sensitive_config_values:
             if super().has_option(section, fallback_key):
                 secrets_path = super().get(section, fallback_key)
                 return _get_config_value_from_secret_backend(secrets_path)
         return None
 
+    def _get_secret_option_from_config_sources(
+        self, config_sources: ConfigSourcesType, section: str, key: str
+    ) -> Optional[str]:
+        fallback_key = key + '_secret'
+        if (section, key) in self.sensitive_config_values:
+            section_dict = config_sources.get(section)
+            if section_dict is not None:
+                secrets_path_value = section_dict.get(fallback_key)
+                if secrets_path_value is not None:
+                    if isinstance(secrets_path_value, str):
+                        secrets_path = secrets_path_value
+                    else:
+                        secrets_path = secrets_path_value[0]
+                    return _get_config_value_from_secret_backend(secrets_path)
+        return None
+
     def get_mandatory_value(self, section: str, key: str, **kwargs) -> str:
         value = self.get(section, key, **kwargs)
         if value is None:
@@ -859,7 +889,16 @@ class AirflowConfigParser(ConfigParser):
             ('airflow.cfg', self),
         ]
 
-        self._replace_config_with_display_sources(config_sources, configs, 
display_source, raw)
+        self._replace_config_with_display_sources(
+            config_sources,
+            configs,
+            display_source,
+            raw,
+            self.deprecated_options,
+            include_cmds=include_cmds,
+            include_env=include_env,
+            include_secret=include_secret,
+        )
 
         # add env vars and overwrite because they have priority
         if include_env:
@@ -889,7 +928,7 @@ class AirflowConfigParser(ConfigParser):
         raw: bool,
     ):
         for (section, key) in self.sensitive_config_values:
-            value: Optional[str] = self._get_secret_option(section, key)
+            value: Optional[str] = 
self._get_secret_option_from_config_sources(config_sources, section, key)
             if value:
                 if not display_sensitive:
                     value = '< hidden >'
@@ -910,17 +949,20 @@ class AirflowConfigParser(ConfigParser):
         raw: bool,
     ):
         for (section, key) in self.sensitive_config_values:
-            opt = self._get_cmd_option(section, key)
+            opt = self._get_cmd_option_from_config_sources(config_sources, 
section, key)
             if not opt:
                 continue
+            opt_to_set: Union[str, Tuple[str, str], None] = opt
             if not display_sensitive:
-                opt = '< hidden >'
+                opt_to_set = '< hidden >'
             if display_source:
-                opt = (opt, 'cmd')
+                opt_to_set = (str(opt_to_set), 'cmd')
             elif raw:
-                opt = opt.replace('%', '%%')
-            config_sources.setdefault(section, OrderedDict()).update({key: 
opt})
-            del config_sources[section][key + '_cmd']
+                opt_to_set = str(opt_to_set).replace('%', '%%')
+            if opt_to_set is not None:
+                dict_to_update: Dict[str, Union[str, Tuple[str, str]]] = {key: 
opt_to_set}
+                config_sources.setdefault(section, 
OrderedDict()).update(dict_to_update)
+                del config_sources[section][key + '_cmd']
 
     def _include_envs(
         self,
@@ -930,7 +972,7 @@ class AirflowConfigParser(ConfigParser):
         raw: bool,
     ):
         for env_var in [
-            os_environment for os_environment in os.environ if 
os_environment.startswith(self.ENV_VAR_PREFIX)
+            os_environment for os_environment in os.environ if 
os_environment.startswith(ENV_VAR_PREFIX)
         ]:
             try:
                 _, section, key = env_var.split('__', 2)
@@ -1010,13 +1052,82 @@ class AirflowConfigParser(ConfigParser):
         configs: Iterable[Tuple[str, ConfigParser]],
         display_source: bool,
         raw: bool,
+        deprecated_options: Dict[Tuple[str, str], Tuple[str, str, str]],
+        include_env: bool,
+        include_cmds: bool,
+        include_secret: bool,
     ):
         for (source_name, config) in configs:
             for section in config.sections():
                 
AirflowConfigParser._replace_section_config_with_display_sources(
-                    config, config_sources, display_source, raw, section, 
source_name
+                    config,
+                    config_sources,
+                    display_source,
+                    raw,
+                    section,
+                    source_name,
+                    deprecated_options,
+                    configs,
+                    include_env=include_env,
+                    include_cmds=include_cmds,
+                    include_secret=include_secret,
                 )
 
+    @staticmethod
+    def _deprecated_value_is_set_in_config(
+        deprecated_section: str,
+        deprecated_key: str,
+        configs: Iterable[Tuple[str, ConfigParser]],
+    ) -> bool:
+        for config_type, config in configs:
+            if config_type == 'default':
+                continue
+            try:
+                deprecated_section_array = 
config.items(section=deprecated_section, raw=True)
+                for (key_candidate, _) in deprecated_section_array:
+                    if key_candidate == deprecated_key:
+                        return True
+            except NoSectionError:
+                pass
+        return False
+
+    @staticmethod
+    def _deprecated_variable_is_set(deprecated_section: str, deprecated_key: 
str) -> bool:
+        return (
+            
os.environ.get(f'{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}')
+            is not None
+        )
+
+    @staticmethod
+    def _deprecated_command_is_set_in_config(
+        deprecated_section: str, deprecated_key: str, configs: 
Iterable[Tuple[str, ConfigParser]]
+    ) -> bool:
+        return AirflowConfigParser._deprecated_value_is_set_in_config(
+            deprecated_section=deprecated_section, 
deprecated_key=deprecated_key + "_cmd", configs=configs
+        )
+
+    @staticmethod
+    def _deprecated_variable_command_is_set(deprecated_section: str, 
deprecated_key: str) -> bool:
+        return (
+            
os.environ.get(f'{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}_CMD')
+            is not None
+        )
+
+    @staticmethod
+    def _deprecated_secret_is_set_in_config(
+        deprecated_section: str, deprecated_key: str, configs: 
Iterable[Tuple[str, ConfigParser]]
+    ) -> bool:
+        return AirflowConfigParser._deprecated_value_is_set_in_config(
+            deprecated_section=deprecated_section, 
deprecated_key=deprecated_key + "_secret", configs=configs
+        )
+
+    @staticmethod
+    def _deprecated_variable_secret_is_set(deprecated_section: str, 
deprecated_key: str) -> bool:
+        return (
+            
os.environ.get(f'{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}_SECRET')
+            is not None
+        )
+
     @staticmethod
     def _replace_section_config_with_display_sources(
         config: ConfigParser,
@@ -1025,9 +1136,46 @@ class AirflowConfigParser(ConfigParser):
         raw: bool,
         section: str,
         source_name: str,
+        deprecated_options: Dict[Tuple[str, str], Tuple[str, str, str]],
+        configs: Iterable[Tuple[str, ConfigParser]],
+        include_env: bool,
+        include_cmds: bool,
+        include_secret: bool,
     ):
         sect = config_sources.setdefault(section, OrderedDict())
         for (k, val) in config.items(section=section, raw=raw):
+            deprecated_section, deprecated_key, _ = 
deprecated_options.get((section, k), (None, None, None))
+            if deprecated_section and deprecated_key:
+                if source_name == 'default':
+                    # If deprecated entry has some non-default value set for 
any of the sources requested,
+                    # We should NOT set default for the new entry (because it 
will override anything
+                    # coming from the deprecated ones)
+                    if AirflowConfigParser._deprecated_value_is_set_in_config(
+                        deprecated_section, deprecated_key, configs
+                    ):
+                        continue
+                    if include_env and 
AirflowConfigParser._deprecated_variable_is_set(
+                        deprecated_section, deprecated_key
+                    ):
+                        continue
+                    if include_cmds and (
+                        
AirflowConfigParser._deprecated_variable_command_is_set(
+                            deprecated_section, deprecated_key
+                        )
+                        or 
AirflowConfigParser._deprecated_command_is_set_in_config(
+                            deprecated_section, deprecated_key, configs
+                        )
+                    ):
+                        continue
+                    if include_secret and (
+                        AirflowConfigParser._deprecated_variable_secret_is_set(
+                            deprecated_section, deprecated_key
+                        )
+                        or 
AirflowConfigParser._deprecated_secret_is_set_in_config(
+                            deprecated_section, deprecated_key, configs
+                        )
+                    ):
+                        continue
             if display_source:
                 sect[k] = (val, source_name)
             else:
diff --git a/tests/config_templates/deprecated.cfg 
b/tests/config_templates/deprecated.cfg
new file mode 100644
index 0000000000..cab0b6e7fa
--- /dev/null
+++ b/tests/config_templates/deprecated.cfg
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+# This is the template for Airflow's unit test configuration. When Airflow runs
+# unit tests, it looks for a configuration file at $AIRFLOW_HOME/unittests.cfg.
+# If it doesn't exist, Airflow uses this template to generate it by replacing
+# variables in curly braces with their global values from configuration.py.
+
+# Users should not modify this file; they should customize the generated
+# unittests.cfg instead.
+[core]
+sql_alchemy_conn = mysql://
diff --git a/tests/config_templates/deprecated_cmd.cfg 
b/tests/config_templates/deprecated_cmd.cfg
new file mode 100644
index 0000000000..ff2a9268dd
--- /dev/null
+++ b/tests/config_templates/deprecated_cmd.cfg
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+# This is the template for Airflow's unit test configuration. When Airflow runs
+# unit tests, it looks for a configuration file at $AIRFLOW_HOME/unittests.cfg.
+# If it doesn't exist, Airflow uses this template to generate it by replacing
+# variables in curly braces with their global values from configuration.py.
+
+# Users should not modify this file; they should customize the generated
+# unittests.cfg instead.
+
+[core]
+sql_alchemy_conn_cmd = echo -n "postgresql://"
diff --git a/tests/config_templates/deprecated_secret.cfg 
b/tests/config_templates/deprecated_secret.cfg
new file mode 100644
index 0000000000..6c4cda091a
--- /dev/null
+++ b/tests/config_templates/deprecated_secret.cfg
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+# This is the template for Airflow's unit test configuration. When Airflow runs
+# unit tests, it looks for a configuration file at $AIRFLOW_HOME/unittests.cfg.
+# If it doesn't exist, Airflow uses this template to generate it by replacing
+# variables in curly braces with their global values from configuration.py.
+
+# Users should not modify this file; they should customize the generated
+# unittests.cfg instead.
+
+[core]
+sql_alchemy_conn_secret = secret_path
diff --git a/tests/config_templates/empty.cfg b/tests/config_templates/empty.cfg
new file mode 100644
index 0000000000..36906c07cf
--- /dev/null
+++ b/tests/config_templates/empty.cfg
@@ -0,0 +1,26 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+# This is the template for Airflow's unit test configuration. When Airflow runs
+# unit tests, it looks for a configuration file at $AIRFLOW_HOME/unittests.cfg.
+# If it doesn't exist, Airflow uses this template to generate it by replacing
+# variables in curly braces with their global values from configuration.py.
+
+# Users should not modify this file; they should customize the generated
+# unittests.cfg instead.
diff --git a/tests/core/test_configuration.py b/tests/core/test_configuration.py
index 70a51806c4..991695a969 100644
--- a/tests/core/test_configuration.py
+++ b/tests/core/test_configuration.py
@@ -42,6 +42,14 @@ from airflow.configuration import (
 )
 from tests.test_utils.config import conf_vars
 from tests.test_utils.reset_warning_registry import reset_warning_registry
+from tests.utils.test_config import (
+    remove_all_configurations,
+    set_deprecated_options,
+    set_sensitive_config_values,
+    use_config,
+)
+
+HOME_DIR = os.path.expanduser('~')
 
 
 @unittest.mock.patch.dict(
@@ -511,89 +519,6 @@ AIRFLOW_HOME = /root/airflow
         assert isinstance(section_dict['_test_only_float'], float)
         assert isinstance(section_dict['_test_only_string'], str)
 
-    @conf_vars(
-        {
-            ("celery", "worker_concurrency"): None,
-            ("celery", "celeryd_concurrency"): None,
-        }
-    )
-    def test_deprecated_options(self):
-        # Guarantee we have a deprecated setting, so we test the deprecation
-        # lookup even if we remove this explicit fallback
-        conf.deprecated_options = {
-            ('celery', 'worker_concurrency'): ('celery', 
'celeryd_concurrency', '2.0.0'),
-        }
-
-        # Remove it so we are sure we use the right setting
-        conf.remove_option('celery', 'worker_concurrency')
-
-        with pytest.warns(DeprecationWarning):
-            with mock.patch.dict('os.environ', 
AIRFLOW__CELERY__CELERYD_CONCURRENCY="99"):
-                assert conf.getint('celery', 'worker_concurrency') == 99
-
-        with pytest.warns(DeprecationWarning), conf_vars({('celery', 
'celeryd_concurrency'): '99'}):
-            assert conf.getint('celery', 'worker_concurrency') == 99
-
-    @conf_vars(
-        {
-            ('logging', 'logging_level'): None,
-            ('core', 'logging_level'): None,
-        }
-    )
-    def test_deprecated_options_with_new_section(self):
-        # Guarantee we have a deprecated setting, so we test the deprecation
-        # lookup even if we remove this explicit fallback
-        conf.deprecated_options = {
-            ('logging', 'logging_level'): ('core', 'logging_level', '2.0.0'),
-        }
-
-        # Remove it so we are sure we use the right setting
-        conf.remove_option('core', 'logging_level')
-        conf.remove_option('logging', 'logging_level')
-
-        with pytest.warns(DeprecationWarning):
-            with mock.patch.dict('os.environ', 
AIRFLOW__CORE__LOGGING_LEVEL="VALUE"):
-                assert conf.get('logging', 'logging_level') == "VALUE"
-
-        with pytest.warns(DeprecationWarning), conf_vars({('core', 
'logging_level'): 'VALUE'}):
-            assert conf.get('logging', 'logging_level') == "VALUE"
-
-    @conf_vars(
-        {
-            ("celery", "result_backend"): None,
-            ("celery", "celery_result_backend"): None,
-            ("celery", "celery_result_backend_cmd"): None,
-        }
-    )
-    def test_deprecated_options_cmd(self):
-        # Guarantee we have a deprecated setting, so we test the deprecation
-        # lookup even if we remove this explicit fallback
-        conf.deprecated_options[('celery', "result_backend")] = 'celery', 
'celery_result_backend', '2.0.0'
-        conf.sensitive_config_values.add(('celery', 'celery_result_backend'))
-
-        conf.remove_option('celery', 'result_backend')
-        with conf_vars({('celery', 'celery_result_backend_cmd'): '/bin/echo 
99'}):
-            with pytest.warns(DeprecationWarning):
-                tmp = None
-                if 'AIRFLOW__CELERY__RESULT_BACKEND' in os.environ:
-                    tmp = os.environ.pop('AIRFLOW__CELERY__RESULT_BACKEND')
-                assert conf.getint('celery', 'result_backend') == 99
-                if tmp:
-                    os.environ['AIRFLOW__CELERY__RESULT_BACKEND'] = tmp
-
-    def test_deprecated_values_from_conf(self):
-        test_conf = AirflowConfigParser(default_config='')
-        # Guarantee we have deprecated settings, so we test the deprecation
-        # lookup even if we remove this explicit fallback
-        test_conf.deprecated_values = {
-            'core': {'hostname_callable': (re.compile(r':'), r'.', '2.1')},
-        }
-        test_conf.read_dict({'core': {'hostname_callable': 'socket:getfqdn'}})
-
-        with pytest.warns(FutureWarning):
-            test_conf.validate()
-            assert test_conf.get('core', 'hostname_callable') == 
'socket.getfqdn'
-
     def test_auth_backends_adds_session(self):
         test_conf = AirflowConfigParser(default_config='')
         # Guarantee we have deprecated settings, so we test the deprecation
@@ -616,95 +541,6 @@ AIRFLOW_HOME = /root/airflow
                 == 
'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session'
             )
 
-    @pytest.mark.parametrize(
-        "old, new",
-        [
-            (
-                ("api", "auth_backend", "airflow.api.auth.backend.basic_auth"),
-                (
-                    "api",
-                    "auth_backends",
-                    
"airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session",
-                ),
-            ),
-            (
-                ("core", "sql_alchemy_conn", 
"postgres+psycopg2://localhost/postgres"),
-                ("database", "sql_alchemy_conn", 
"postgresql://localhost/postgres"),
-            ),
-        ],
-    )
-    def test_deprecated_env_vars_upgraded_and_removed(self, old, new):
-        test_conf = AirflowConfigParser(default_config='')
-        old_section, old_key, old_value = old
-        new_section, new_key, new_value = new
-        old_env_var = test_conf._env_var_name(old_section, old_key)
-        new_env_var = test_conf._env_var_name(new_section, new_key)
-
-        with pytest.warns(FutureWarning):
-            with unittest.mock.patch.dict('os.environ', **{old_env_var: 
old_value}):
-                # Can't start with the new env var existing...
-                os.environ.pop(new_env_var, None)
-
-                test_conf.validate()
-                assert test_conf.get(new_section, new_key) == new_value
-                # We also need to make sure the deprecated env var is removed
-                # so that any subprocesses don't use it in place of our updated
-                # value.
-                assert old_env_var not in os.environ
-                # and make sure we track the old value as well, under the new 
section/key
-                assert test_conf.upgraded_values[(new_section, new_key)] == 
old_value
-
-    @pytest.mark.parametrize(
-        "conf_dict",
-        [
-            {},  # Even if the section is absent from config file, environ 
still needs replacing.
-            {'core': {'hostname_callable': 'socket:getfqdn'}},
-        ],
-    )
-    def test_deprecated_values_from_environ(self, conf_dict):
-        def make_config():
-            test_conf = AirflowConfigParser(default_config='')
-            # Guarantee we have a deprecated setting, so we test the 
deprecation
-            # lookup even if we remove this explicit fallback
-            test_conf.deprecated_values = {
-                'core': {'hostname_callable': (re.compile(r':'), r'.', '2.1')},
-            }
-            test_conf.read_dict(conf_dict)
-            test_conf.validate()
-            return test_conf
-
-        with pytest.warns(FutureWarning):
-            with unittest.mock.patch.dict('os.environ', 
AIRFLOW__CORE__HOSTNAME_CALLABLE='socket:getfqdn'):
-                test_conf = make_config()
-                assert test_conf.get('core', 'hostname_callable') == 
'socket.getfqdn'
-
-        with reset_warning_registry():
-            with warnings.catch_warnings(record=True) as warning:
-                with unittest.mock.patch.dict(
-                    'os.environ',
-                    AIRFLOW__CORE__HOSTNAME_CALLABLE='CarrierPigeon',
-                ):
-                    test_conf = make_config()
-                    assert test_conf.get('core', 'hostname_callable') == 
'CarrierPigeon'
-                    assert [] == warning
-
-    def test_deprecated_funcs(self):
-        for func in [
-            'load_test_config',
-            'get',
-            'getboolean',
-            'getfloat',
-            'getint',
-            'has_option',
-            'remove_option',
-            'as_dict',
-            'set',
-        ]:
-            with mock.patch(f'airflow.configuration.conf.{func}') as 
mock_method:
-                with pytest.warns(DeprecationWarning):
-                    getattr(configuration, func)()
-                mock_method.assert_called_once()
-
     def test_command_from_env(self):
         test_cmdenv_config = '''[testcmdenv]
 itsacommand = NOT OK
@@ -877,6 +713,20 @@ notacommand = OK
             assert 'sql_alchemy_conn' in conf_maintain_cmds['database']
             assert conf_maintain_cmds['database']['sql_alchemy_conn'] == 
conf_conn
 
+    @mock.patch.dict(
+        'os.environ', {"AIRFLOW__DATABASE__SQL_ALCHEMY_CONN_CMD": "echo -n 
'postgresql://'"}, clear=True
+    )
+    def test_as_dict_respects_sensitive_cmds_from_env(self):
+        test_conf = copy.deepcopy(conf)
+        test_conf.read_string("")
+
+        conf_materialize_cmds = test_conf.as_dict(display_sensitive=True, 
raw=True, include_cmds=True)
+
+        assert 'sql_alchemy_conn' in conf_materialize_cmds['database']
+        assert 'sql_alchemy_conn_cmd' not in conf_materialize_cmds['database']
+
+        assert conf_materialize_cmds['database']['sql_alchemy_conn'] == 
'postgresql://'
+
     def test_gettimedelta(self):
         test_config = '''
 [invalid]
@@ -940,3 +790,474 @@ key7 =
         assert test_conf.gettimedelta('valid', 'key6') == 
datetime.timedelta(seconds=300)
         assert isinstance(test_conf.gettimedelta('default', 'key7'), 
type(None))
         assert test_conf.gettimedelta('default', 'key7') is None
+
+
+class TestDeprecatedConf:
+    @conf_vars(
+        {
+            ("celery", "worker_concurrency"): None,
+            ("celery", "celeryd_concurrency"): None,
+        }
+    )
+    def test_deprecated_options(self):
+        # Guarantee we have a deprecated setting, so we test the deprecation
+        # lookup even if we remove this explicit fallback
+        with set_deprecated_options(
+            deprecated_options={('celery', 'worker_concurrency'): ('celery', 
'celeryd_concurrency', '2.0.0')}
+        ):
+            # Remove it so we are sure we use the right setting
+            conf.remove_option('celery', 'worker_concurrency')
+
+            with pytest.warns(DeprecationWarning):
+                with mock.patch.dict('os.environ', 
AIRFLOW__CELERY__CELERYD_CONCURRENCY="99"):
+                    assert conf.getint('celery', 'worker_concurrency') == 99
+
+            with pytest.warns(DeprecationWarning), conf_vars({('celery', 
'celeryd_concurrency'): '99'}):
+                assert conf.getint('celery', 'worker_concurrency') == 99
+
+    @conf_vars(
+        {
+            ('logging', 'logging_level'): None,
+            ('core', 'logging_level'): None,
+        }
+    )
+    def test_deprecated_options_with_new_section(self):
+        # Guarantee we have a deprecated setting, so we test the deprecation
+        # lookup even if we remove this explicit fallback
+        with set_deprecated_options(
+            deprecated_options={('logging', 'logging_level'): ('core', 
'logging_level', '2.0.0')}
+        ):
+            # Remove it so we are sure we use the right setting
+            conf.remove_option('core', 'logging_level')
+            conf.remove_option('logging', 'logging_level')
+
+            with pytest.warns(DeprecationWarning):
+                with mock.patch.dict('os.environ', 
AIRFLOW__CORE__LOGGING_LEVEL="VALUE"):
+                    assert conf.get('logging', 'logging_level') == "VALUE"
+
+            with pytest.warns(DeprecationWarning), conf_vars({('core', 
'logging_level'): 'VALUE'}):
+                assert conf.get('logging', 'logging_level') == "VALUE"
+
+    @conf_vars(
+        {
+            ("celery", "result_backend"): None,
+            ("celery", "celery_result_backend"): None,
+            ("celery", "celery_result_backend_cmd"): None,
+        }
+    )
+    def test_deprecated_options_cmd(self):
+        # Guarantee we have a deprecated setting, so we test the deprecation
+        # lookup even if we remove this explicit fallback
+        with set_deprecated_options(
+            deprecated_options={('celery', "result_backend"): ('celery', 
'celery_result_backend', '2.0.0')}
+        ), set_sensitive_config_values(sensitive_config_values={('celery', 
'celery_result_backend')}):
+            conf.remove_option('celery', 'result_backend')
+            with conf_vars({('celery', 'celery_result_backend_cmd'): 
'/bin/echo 99'}):
+                with pytest.warns(DeprecationWarning):
+                    tmp = None
+                    if 'AIRFLOW__CELERY__RESULT_BACKEND' in os.environ:
+                        tmp = os.environ.pop('AIRFLOW__CELERY__RESULT_BACKEND')
+                    assert conf.getint('celery', 'result_backend') == 99
+                    if tmp:
+                        os.environ['AIRFLOW__CELERY__RESULT_BACKEND'] = tmp
+
+    def test_deprecated_values_from_conf(self):
+        test_conf = AirflowConfigParser(
+            default_config="""
+[core]
+executor=SequentialExecutor
+[database]
+sql_alchemy_conn=sqlite://test
+"""
+        )
+        # Guarantee we have deprecated settings, so we test the deprecation
+        # lookup even if we remove this explicit fallback
+        test_conf.deprecated_values = {
+            'core': {'hostname_callable': (re.compile(r':'), r'.', '2.1')},
+        }
+        test_conf.read_dict({'core': {'hostname_callable': 'socket:getfqdn'}})
+
+        with pytest.warns(FutureWarning):
+            test_conf.validate()
+            assert test_conf.get('core', 'hostname_callable') == 
'socket.getfqdn'
+
+    @pytest.mark.parametrize(
+        "old, new",
+        [
+            (
+                ("api", "auth_backend", "airflow.api.auth.backend.basic_auth"),
+                (
+                    "api",
+                    "auth_backends",
+                    
"airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session",
+                ),
+            ),
+            (
+                ("core", "sql_alchemy_conn", 
"postgres+psycopg2://localhost/postgres"),
+                ("database", "sql_alchemy_conn", 
"postgresql://localhost/postgres"),
+            ),
+        ],
+    )
+    def test_deprecated_env_vars_upgraded_and_removed(self, old, new):
+        test_conf = AirflowConfigParser(
+            default_config="""
+[core]
+executor=SequentialExecutor
+[database]
+sql_alchemy_conn=sqlite://test
+"""
+        )
+        old_section, old_key, old_value = old
+        new_section, new_key, new_value = new
+        old_env_var = test_conf._env_var_name(old_section, old_key)
+        new_env_var = test_conf._env_var_name(new_section, new_key)
+
+        with pytest.warns(FutureWarning):
+            with unittest.mock.patch.dict('os.environ', **{old_env_var: 
old_value}):
+                # Can't start with the new env var existing...
+                os.environ.pop(new_env_var, None)
+
+                test_conf.validate()
+                assert test_conf.get(new_section, new_key) == new_value
+                # We also need to make sure the deprecated env var is removed
+                # so that any subprocesses don't use it in place of our updated
+                # value.
+                assert old_env_var not in os.environ
+                # and make sure we track the old value as well, under the new 
section/key
+                assert test_conf.upgraded_values[(new_section, new_key)] == 
old_value
+
+    @pytest.mark.parametrize(
+        "conf_dict",
+        [
+            {},  # Even if the section is absent from config file, environ 
still needs replacing.
+            {'core': {'hostname_callable': 'socket:getfqdn'}},
+        ],
+    )
+    def test_deprecated_values_from_environ(self, conf_dict):
+        def make_config():
+            test_conf = AirflowConfigParser(
+                default_config="""
+[core]
+executor=SequentialExecutor
+[database]
+sql_alchemy_conn=sqlite://test
+"""
+            )
+            # Guarantee we have a deprecated setting, so we test the 
deprecation
+            # lookup even if we remove this explicit fallback
+            test_conf.deprecated_values = {
+                'core': {'hostname_callable': (re.compile(r':'), r'.', '2.1')},
+            }
+            test_conf.read_dict(conf_dict)
+            test_conf.validate()
+            return test_conf
+
+        with pytest.warns(FutureWarning):
+            with unittest.mock.patch.dict('os.environ', 
AIRFLOW__CORE__HOSTNAME_CALLABLE='socket:getfqdn'):
+                test_conf = make_config()
+                assert test_conf.get('core', 'hostname_callable') == 
'socket.getfqdn'
+
+        with reset_warning_registry():
+            with warnings.catch_warnings(record=True) as warning:
+                with unittest.mock.patch.dict(
+                    'os.environ',
+                    AIRFLOW__CORE__HOSTNAME_CALLABLE='CarrierPigeon',
+                ):
+                    test_conf = make_config()
+                    assert test_conf.get('core', 'hostname_callable') == 
'CarrierPigeon'
+                    assert [] == warning
+
+    def test_deprecated_funcs(self):
+        for func in [
+            'load_test_config',
+            'get',
+            'getboolean',
+            'getfloat',
+            'getint',
+            'has_option',
+            'remove_option',
+            'as_dict',
+            'set',
+        ]:
+            with mock.patch(f'airflow.configuration.conf.{func}') as 
mock_method:
+                with pytest.warns(DeprecationWarning):
+                    getattr(configuration, func)()
+                mock_method.assert_called_once()
+
+    @pytest.mark.parametrize("display_source", [True, False])
+    @mock.patch.dict('os.environ', {}, clear=True)
+    def test_conf_as_dict_when_deprecated_value_in_config(self, 
display_source: bool):
+        with use_config(config="deprecated.cfg"):
+            cfg_dict = conf.as_dict(
+                display_source=display_source,
+                raw=True,
+                display_sensitive=True,
+                include_env=False,
+                include_cmds=False,
+            )
+            assert cfg_dict['core'].get('sql_alchemy_conn') == (
+                ('mysql://', "airflow.cfg") if display_source else 'mysql://'
+            )
+            # database should be None because the deprecated value is set in 
config
+            assert cfg_dict['database'].get('sql_alchemy_conn') is None
+            if not display_source:
+                remove_all_configurations()
+                conf.read_dict(dictionary=cfg_dict)
+                os.environ.clear()
+                assert conf.get('database', 'sql_alchemy_conn') == 'mysql://'
+
+    @pytest.mark.parametrize("display_source", [True, False])
+    @mock.patch.dict('os.environ', {"AIRFLOW__CORE__SQL_ALCHEMY_CONN": 
"postgresql://"}, clear=True)
+    def test_conf_as_dict_when_deprecated_value_in_both_env_and_config(self, 
display_source: bool):
+        with use_config(config="deprecated.cfg"):
+            cfg_dict = conf.as_dict(
+                display_source=display_source,
+                raw=True,
+                display_sensitive=True,
+                include_env=True,
+                include_cmds=False,
+            )
+            assert cfg_dict['core'].get('sql_alchemy_conn') == (
+                ('postgresql://', "env var") if display_source else 
'postgresql://'
+            )
+            # database should be None because the deprecated value is set in 
env value
+            assert cfg_dict['database'].get('sql_alchemy_conn') is None
+            if not display_source:
+                remove_all_configurations()
+                conf.read_dict(dictionary=cfg_dict)
+                os.environ.clear()
+                assert conf.get('database', 'sql_alchemy_conn') == 
'postgresql://'
+
+    @pytest.mark.parametrize("display_source", [True, False])
+    @mock.patch.dict('os.environ', {"AIRFLOW__CORE__SQL_ALCHEMY_CONN": 
"postgresql://"}, clear=True)
+    def 
test_conf_as_dict_when_deprecated_value_in_both_env_and_config_exclude_env(
+        self, display_source: bool
+    ):
+        with use_config(config="deprecated.cfg"):
+            cfg_dict = conf.as_dict(
+                display_source=display_source,
+                raw=True,
+                display_sensitive=True,
+                include_env=False,
+                include_cmds=False,
+            )
+            assert cfg_dict['core'].get('sql_alchemy_conn') == (
+                ('mysql://', "airflow.cfg") if display_source else 'mysql://'
+            )
+            # database should be None because the deprecated value is set in 
env value
+            assert cfg_dict['database'].get('sql_alchemy_conn') is None
+            if not display_source:
+                remove_all_configurations()
+                conf.read_dict(dictionary=cfg_dict)
+                os.environ.clear()
+                assert conf.get('database', 'sql_alchemy_conn') == 'mysql://'
+
+    @pytest.mark.parametrize("display_source", [True, False])
+    @mock.patch.dict('os.environ', {"AIRFLOW__CORE__SQL_ALCHEMY_CONN": 
"postgresql://"}, clear=True)
+    def test_conf_as_dict_when_deprecated_value_in_env(self, display_source: 
bool):
+        with use_config(config="empty.cfg"):
+            cfg_dict = conf.as_dict(
+                display_source=display_source, raw=True, 
display_sensitive=True, include_env=True
+            )
+            assert cfg_dict['core'].get('sql_alchemy_conn') == (
+                ('postgresql://', "env var") if display_source else 
'postgresql://'
+            )
+            # database should be None because the deprecated value is set in 
env value
+            assert cfg_dict['database'].get('sql_alchemy_conn') is None
+            if not display_source:
+                remove_all_configurations()
+                conf.read_dict(dictionary=cfg_dict)
+                os.environ.clear()
+                assert conf.get('database', 'sql_alchemy_conn') == 
'postgresql://'
+
+    @pytest.mark.parametrize("display_source", [True, False])
+    @mock.patch.dict('os.environ', {}, clear=True)
+    def test_conf_as_dict_when_both_conf_and_env_are_empty(self, 
display_source: bool):
+        with use_config(config="empty.cfg"):
+            cfg_dict = conf.as_dict(display_source=display_source, raw=True, 
display_sensitive=True)
+            assert cfg_dict['core'].get('sql_alchemy_conn') is None
+            # database should be taken from default because the deprecated 
value is missing in config
+            assert cfg_dict['database'].get('sql_alchemy_conn') == (
+                (f'sqlite:///{HOME_DIR}/airflow/airflow.db', "default")
+                if display_source
+                else f'sqlite:///{HOME_DIR}/airflow/airflow.db'
+            )
+            if not display_source:
+                remove_all_configurations()
+                conf.read_dict(dictionary=cfg_dict)
+                os.environ.clear()
+                assert conf.get('database', 'sql_alchemy_conn') == 
f'sqlite:///{HOME_DIR}/airflow/airflow.db'
+
+    @pytest.mark.parametrize("display_source", [True, False])
+    @mock.patch.dict('os.environ', {}, clear=True)
+    def test_conf_as_dict_when_deprecated_value_in_cmd_config(self, 
display_source: bool):
+        with use_config(config="deprecated_cmd.cfg"):
+            cfg_dict = conf.as_dict(
+                display_source=display_source,
+                raw=True,
+                display_sensitive=True,
+                include_env=True,
+                include_cmds=True,
+            )
+            assert cfg_dict['core'].get('sql_alchemy_conn') == (
+                ('postgresql://', "cmd") if display_source else 'postgresql://'
+            )
+            # database should be None because the deprecated value is set in 
env value
+            assert cfg_dict['database'].get('sql_alchemy_conn') is None
+            if not display_source:
+                remove_all_configurations()
+                conf.read_dict(dictionary=cfg_dict)
+                os.environ.clear()
+                assert conf.get('database', 'sql_alchemy_conn') == 
'postgresql://'
+
+    @pytest.mark.parametrize("display_source", [True, False])
+    @mock.patch.dict(
+        'os.environ', {"AIRFLOW__CORE__SQL_ALCHEMY_CONN_CMD": "echo -n 
'postgresql://'"}, clear=True
+    )
+    def test_conf_as_dict_when_deprecated_value_in_cmd_env(self, 
display_source: bool):
+        with use_config(config="empty.cfg"):
+            cfg_dict = conf.as_dict(
+                display_source=display_source,
+                raw=True,
+                display_sensitive=True,
+                include_env=True,
+                include_cmds=True,
+            )
+            assert cfg_dict['core'].get('sql_alchemy_conn') == (
+                ('postgresql://', "cmd") if display_source else 'postgresql://'
+            )
+            # database should be None because the deprecated value is set in 
env value
+            assert cfg_dict['database'].get('sql_alchemy_conn') is None
+            if not display_source:
+                remove_all_configurations()
+                conf.read_dict(dictionary=cfg_dict)
+                os.environ.clear()
+                assert conf.get('database', 'sql_alchemy_conn') == 
'postgresql://'
+
+    @pytest.mark.parametrize("display_source", [True, False])
+    @mock.patch.dict(
+        'os.environ', {"AIRFLOW__CORE__SQL_ALCHEMY_CONN_CMD": "echo -n 
'postgresql://'"}, clear=True
+    )
+    def test_conf_as_dict_when_deprecated_value_in_cmd_disabled_env(self, 
display_source: bool):
+        with use_config(config="empty.cfg"):
+            cfg_dict = conf.as_dict(
+                display_source=display_source,
+                raw=True,
+                display_sensitive=True,
+                include_env=True,
+                include_cmds=False,
+            )
+            assert cfg_dict['core'].get('sql_alchemy_conn') is None
+            assert cfg_dict['database'].get('sql_alchemy_conn') == (
+                (f'sqlite:///{HOME_DIR}/airflow/airflow.db', 'default')
+                if display_source
+                else f'sqlite:///{HOME_DIR}/airflow/airflow.db'
+            )
+            if not display_source:
+                remove_all_configurations()
+                conf.read_dict(dictionary=cfg_dict)
+                os.environ.clear()
+                assert conf.get('database', 'sql_alchemy_conn') == 
f'sqlite:///{HOME_DIR}/airflow/airflow.db'
+
+    @pytest.mark.parametrize("display_source", [True, False])
+    @mock.patch.dict('os.environ', {}, clear=True)
+    def test_conf_as_dict_when_deprecated_value_in_cmd_disabled_config(self, 
display_source: bool):
+        with use_config(config="deprecated_cmd.cfg"):
+            cfg_dict = conf.as_dict(
+                display_source=display_source,
+                raw=True,
+                display_sensitive=True,
+                include_env=True,
+                include_cmds=False,
+            )
+            assert cfg_dict['core'].get('sql_alchemy_conn') is None
+            assert cfg_dict['database'].get('sql_alchemy_conn') == (
+                (f'sqlite:///{HOME_DIR}/airflow/airflow.db', 'default')
+                if display_source
+                else f'sqlite:///{HOME_DIR}/airflow/airflow.db'
+            )
+            if not display_source:
+                remove_all_configurations()
+                conf.read_dict(dictionary=cfg_dict)
+                os.environ.clear()
+                assert conf.get('database', 'sql_alchemy_conn') == 
f'sqlite:///{HOME_DIR}/airflow/airflow.db'
+
+    @pytest.mark.parametrize("display_source", [True, False])
+    @mock.patch.dict('os.environ', {"AIRFLOW__CORE__SQL_ALCHEMY_CONN_SECRET": 
"secret_path'"}, clear=True)
+    @mock.patch("airflow.configuration.get_custom_secret_backend")
+    def test_conf_as_dict_when_deprecated_value_in_secrets(
+        self, get_custom_secret_backend, display_source: bool
+    ):
+        get_custom_secret_backend.return_value.get_config.return_value = 
"postgresql://"
+        with use_config(config="empty.cfg"):
+            cfg_dict = conf.as_dict(
+                display_source=display_source,
+                raw=True,
+                display_sensitive=True,
+                include_env=True,
+                include_secret=True,
+            )
+            assert cfg_dict['core'].get('sql_alchemy_conn') == (
+                ('postgresql://', "secret") if display_source else 
'postgresql://'
+            )
+            # database should be None because the deprecated value is set in 
env value
+            assert cfg_dict['database'].get('sql_alchemy_conn') is None
+            if not display_source:
+                remove_all_configurations()
+                conf.read_dict(dictionary=cfg_dict)
+                os.environ.clear()
+                assert conf.get('database', 'sql_alchemy_conn') == 
'postgresql://'
+
+    @pytest.mark.parametrize("display_source", [True, False])
+    @mock.patch.dict('os.environ', {"AIRFLOW__CORE__SQL_ALCHEMY_CONN_SECRET": 
"secret_path'"}, clear=True)
+    @mock.patch("airflow.configuration.get_custom_secret_backend")
+    def test_conf_as_dict_when_deprecated_value_in_secrets_disabled_env(
+        self, get_custom_secret_backend, display_source: bool
+    ):
+        get_custom_secret_backend.return_value.get_config.return_value = 
"postgresql://"
+        with use_config(config="empty.cfg"):
+            cfg_dict = conf.as_dict(
+                display_source=display_source,
+                raw=True,
+                display_sensitive=True,
+                include_env=True,
+                include_secret=False,
+            )
+            assert cfg_dict['core'].get('sql_alchemy_conn') is None
+            assert cfg_dict['database'].get('sql_alchemy_conn') == (
+                (f'sqlite:///{HOME_DIR}/airflow/airflow.db', 'default')
+                if display_source
+                else f'sqlite:///{HOME_DIR}/airflow/airflow.db'
+            )
+            if not display_source:
+                remove_all_configurations()
+                conf.read_dict(dictionary=cfg_dict)
+                os.environ.clear()
+                assert conf.get('database', 'sql_alchemy_conn') == 
f'sqlite:///{HOME_DIR}/airflow/airflow.db'
+
+    @pytest.mark.parametrize("display_source", [True, False])
+    @mock.patch("airflow.configuration.get_custom_secret_backend")
+    @mock.patch.dict('os.environ', {}, clear=True)
+    def test_conf_as_dict_when_deprecated_value_in_secrets_disabled_config(
+        self, get_custom_secret_backend, display_source: bool
+    ):
+        get_custom_secret_backend.return_value.get_config.return_value = 
"postgresql://"
+        with use_config(config="deprecated_secret.cfg"):
+            cfg_dict = conf.as_dict(
+                display_source=display_source,
+                raw=True,
+                display_sensitive=True,
+                include_env=True,
+                include_secret=False,
+            )
+            assert cfg_dict['core'].get('sql_alchemy_conn') is None
+            assert cfg_dict['database'].get('sql_alchemy_conn') == (
+                (f'sqlite:///{HOME_DIR}/airflow/airflow.db', 'default')
+                if display_source
+                else f'sqlite:///{HOME_DIR}/airflow/airflow.db'
+            )
+            if not display_source:
+                remove_all_configurations()
+                conf.read_dict(dictionary=cfg_dict)
+                os.environ.clear()
+                assert conf.get('database', 'sql_alchemy_conn') == 
f'sqlite:///{HOME_DIR}/airflow/airflow.db'
diff --git a/tests/utils/test_config.py b/tests/utils/test_config.py
new file mode 100644
index 0000000000..4728d33b4b
--- /dev/null
+++ b/tests/utils/test_config.py
@@ -0,0 +1,79 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import logging
+from contextlib import contextmanager
+from pathlib import Path
+from typing import Dict, Set, Tuple
+
+from airflow.configuration import conf
+
+log = logging.getLogger(__name__)
+
+# TODO(potiuk) change the tests use better approach sing Pytest fixtures 
rather than
+#              `unit_test_mode` parameter. It's potentially disruptive so we 
should not do it **JUST** yet
+
+
+def remove_all_configurations():
+    old_sections, old_proxies = (conf._sections, conf._proxies)
+    conf._sections = {}
+    conf._proxies = {}
+    return old_sections, old_proxies
+
+
+def restore_all_configurations(sections: Dict, proxies: Dict):
+    conf._sections = sections  # type: ignore
+    conf._proxies = proxies  # type: ignore
+
+
+@contextmanager
+def use_config(config: str):
+    """
+    Temporary load the deprecated test configuration.
+    """
+    sections, proxies = remove_all_configurations()
+    conf.read(str(Path(__file__).parents[1] / "config_templates" / config))
+    try:
+        yield
+    finally:
+        restore_all_configurations(sections, proxies)
+
+
+@contextmanager
+def set_deprecated_options(deprecated_options: Dict[Tuple[str, str], 
Tuple[str, str, str]]):
+    """
+    Temporary replaces deprecated options with the ones provided.
+    """
+    old_deprecated_options = conf.deprecated_options
+    conf.deprecated_options = deprecated_options
+    try:
+        yield
+    finally:
+        conf.deprecated_options = old_deprecated_options
+
+
+@contextmanager
+def set_sensitive_config_values(sensitive_config_values: Set[Tuple[str, str]]):
+    """
+    Temporary replaces sensitive values with the ones provided.
+    """
+    old_sensitive_config_values = conf.sensitive_config_values
+    conf.sensitive_config_values = sensitive_config_values
+    try:
+        yield
+    finally:
+        conf.sensitive_config_values = old_sensitive_config_values

Reply via email to