Repository: incubator-airflow
Updated Branches:
  refs/heads/master c7a472ed6 -> 32c5f445e


[AIRFLOW-610] Respect _cmd option in config before defaults

The command versions of config parameters were
overriden by the
default config. E.g sql_alchemy_conn got the
default value even
when sql_alchemy_conn_cmd was specified.

Closes #3029 from cjgu/airflow-610


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/32c5f445
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/32c5f445
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/32c5f445

Branch: refs/heads/master
Commit: 32c5f445e450f63fa488973e221100512c32ca1b
Parents: c7a472e
Author: Carl Johan Gustavsson <carl.jo...@tictail.com>
Authored: Mon Apr 16 10:12:08 2018 +0200
Committer: Fokko Driesprong <fokkodriespr...@godatadriven.com>
Committed: Mon Apr 16 10:12:08 2018 +0200

----------------------------------------------------------------------
 airflow/bin/airflow                             |   6 +-
 airflow/config_templates/default_celery.py      |  22 ++-
 airflow/configuration.py                        | 194 +++++++++----------
 .../auth/backends/github_enterprise_auth.py     |   2 +-
 airflow/contrib/auth/backends/google_auth.py    |   2 +-
 airflow/contrib/auth/backends/kerberos_auth.py  |   7 +-
 airflow/contrib/auth/backends/ldap_auth.py      |  63 +++---
 airflow/contrib/executors/mesos_executor.py     |  37 ++--
 airflow/contrib/hooks/qubole_hook.py            |   6 +-
 airflow/contrib/operators/ssh_operator.py       |   5 +-
 airflow/executors/__init__.py                   |   2 +-
 airflow/executors/base_executor.py              |   2 +-
 airflow/executors/celery_executor.py            |   8 +-
 airflow/executors/dask_executor.py              |   2 +-
 airflow/hooks/hdfs_hook.py                      |   2 +-
 airflow/hooks/hive_hooks.py                     |   9 +-
 airflow/hooks/webhdfs_hook.py                   |   2 +-
 airflow/models.py                               |  44 +++--
 airflow/plugins_manager.py                      |   4 +-
 airflow/security/kerberos.py                    |  39 ++--
 airflow/task/task_runner/__init__.py            |   2 +-
 airflow/utils/email.py                          |  16 +-
 airflow/utils/helpers.py                        |   4 +-
 airflow/utils/log/gcs_task_handler.py           |   2 +-
 airflow/utils/log/s3_task_handler.py            |   4 +-
 airflow/utils/operator_resources.py             |   8 +-
 airflow/www/app.py                              |   7 +-
 airflow/www/utils.py                            |   4 +-
 tests/configuration.py                          |  87 ++++++++-
 tests/contrib/operators/test_sftp_operator.py   |   8 +-
 tests/contrib/operators/test_ssh_operator.py    |   8 +-
 tests/core.py                                   |  74 +++----
 tests/executors/dask_executor.py                |   2 +-
 tests/jobs.py                                   |   8 +-
 tests/operators/hive_operator.py                |   6 +-
 tests/utils.py                                  |  12 +-
 .../api/experimental/test_password_endpoints.py |   7 +-
 37 files changed, 417 insertions(+), 300 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/32c5f445/airflow/bin/airflow
----------------------------------------------------------------------
diff --git a/airflow/bin/airflow b/airflow/bin/airflow
index fff7aab..d0b7db3 100755
--- a/airflow/bin/airflow
+++ b/airflow/bin/airflow
@@ -23,9 +23,9 @@ from airflow.bin.cli import CLIFactory
 
 if __name__ == '__main__':
 
-    if configuration.get("core", "security") == 'kerberos':
-        os.environ['KRB5CCNAME'] = configuration.get('kerberos', 'ccache')
-        os.environ['KRB5_KTNAME'] = configuration.get('kerberos', 'keytab')
+    if configuration.conf.get("core", "security") == 'kerberos':
+        os.environ['KRB5CCNAME'] = configuration.conf.get('kerberos', 'ccache')
+        os.environ['KRB5_KTNAME'] = configuration.conf.get('kerberos', 
'keytab')
 
     parser = CLIFactory.get_parser()
     args = parser.parse_args()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/32c5f445/airflow/config_templates/default_celery.py
----------------------------------------------------------------------
diff --git a/airflow/config_templates/default_celery.py 
b/airflow/config_templates/default_celery.py
index b249943..728e4c5 100644
--- a/airflow/config_templates/default_celery.py
+++ b/airflow/config_templates/default_celery.py
@@ -25,7 +25,9 @@ from airflow.utils.log.logging_mixin import LoggingMixin
 
 log = LoggingMixin().log
 
-broker_transport_options = 
configuration.getsection('celery_broker_transport_options')
+broker_transport_options = configuration.conf.getsection(
+    'celery_broker_transport_options'
+)
 if broker_transport_options is None:
     broker_transport_options = {'visibility_timeout': 21600}
 
@@ -34,25 +36,25 @@ DEFAULT_CELERY_CONFIG = {
     'event_serializer': 'json',
     'worker_prefetch_multiplier': 1,
     'task_acks_late': True,
-    'task_default_queue': configuration.get('celery', 'DEFAULT_QUEUE'),
-    'task_default_exchange': configuration.get('celery', 'DEFAULT_QUEUE'),
-    'broker_url': configuration.get('celery', 'BROKER_URL'),
+    'task_default_queue': configuration.conf.get('celery', 'DEFAULT_QUEUE'),
+    'task_default_exchange': configuration.conf.get('celery', 'DEFAULT_QUEUE'),
+    'broker_url': configuration.conf.get('celery', 'BROKER_URL'),
     'broker_transport_options': broker_transport_options,
-    'result_backend': configuration.get('celery', 'RESULT_BACKEND'),
-    'worker_concurrency': configuration.getint('celery', 'WORKER_CONCURRENCY'),
+    'result_backend': configuration.conf.get('celery', 'RESULT_BACKEND'),
+    'worker_concurrency': configuration.conf.getint('celery', 
'WORKER_CONCURRENCY'),
 }
 
 celery_ssl_active = False
 try:
-    celery_ssl_active = configuration.getboolean('celery', 'SSL_ACTIVE')
+    celery_ssl_active = configuration.conf.getboolean('celery', 'SSL_ACTIVE')
 except AirflowConfigException as e:
     log.warning("Celery Executor will run without SSL")
 
 try:
     if celery_ssl_active:
-        broker_use_ssl = {'keyfile': configuration.get('celery', 'SSL_KEY'),
-                          'certfile': configuration.get('celery', 'SSL_CERT'),
-                          'ca_certs': configuration.get('celery', 
'SSL_CACERT'),
+        broker_use_ssl = {'keyfile': configuration.conf.get('celery', 
'SSL_KEY'),
+                          'certfile': configuration.conf.get('celery', 
'SSL_CERT'),
+                          'ca_certs': configuration.conf.get('celery', 
'SSL_CACERT'),
                           'cert_reqs': ssl.CERT_REQUIRED}
         DEFAULT_CELERY_CONFIG['broker_use_ssl'] = broker_use_ssl
 except AirflowConfigException as e:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/32c5f445/airflow/configuration.py
----------------------------------------------------------------------
diff --git a/airflow/configuration.py b/airflow/configuration.py
index e0d53b9..130356c 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -25,7 +25,6 @@ from __future__ import unicode_literals
 import copy
 import errno
 import os
-import six
 import subprocess
 import warnings
 import shlex
@@ -33,7 +32,10 @@ import sys
 
 from future import standard_library
 
+import six
 from six import iteritems
+from backports.configparser import ConfigParser
+from zope.deprecation import deprecated as _deprecated
 
 from airflow.utils.log.logging_mixin import LoggingMixin
 
@@ -41,10 +43,10 @@ standard_library.install_aliases()
 
 from builtins import str
 from collections import OrderedDict
-from six.moves import configparser
 
 from airflow.exceptions import AirflowConfigException
 
+
 log = LoggingMixin().log
 
 # show Airflow's deprecation warnings
@@ -53,11 +55,6 @@ warnings.filterwarnings(
 warnings.filterwarnings(
     action='default', category=PendingDeprecationWarning, module='airflow')
 
-if six.PY3:
-    ConfigParser = configparser.ConfigParser
-else:
-    ConfigParser = configparser.SafeConfigParser
-
 
 def generate_fernet_key():
     try:
@@ -107,11 +104,16 @@ def run_command(command):
 
     return output
 
+
 _templates_dir = os.path.join(os.path.dirname(__file__), 'config_templates')
 with open(os.path.join(_templates_dir, 'default_airflow.cfg')) as f:
     DEFAULT_CONFIG = f.read()
+    if six.PY2:
+        DEFAULT_CONFIG = DEFAULT_CONFIG.decode('utf-8')
 with open(os.path.join(_templates_dir, 'default_test.cfg')) as f:
     TEST_CONFIG = f.read()
+    if six.PY2:
+        TEST_CONFIG = TEST_CONFIG.decode('utf-8')
 
 
 class AirflowConfigParser(ConfigParser):
@@ -126,25 +128,14 @@ class AirflowConfigParser(ConfigParser):
         ('celery', 'result_backend')
     }
 
-    def __init__(self, *args, **kwargs):
-        ConfigParser.__init__(self, *args, **kwargs)
-        self.read_string(parameterized_config(DEFAULT_CONFIG))
-        self.is_validated = False
+    def __init__(self, default_config=None, *args, **kwargs):
+        super(AirflowConfigParser, self).__init__(*args, **kwargs)
 
-    def read_string(self, string, source='<string>'):
-        """
-        Read configuration from a string.
+        self.defaults = ConfigParser(*args, **kwargs)
+        if default_config is not None:
+            self.defaults.read_string(default_config)
 
-        A backwards-compatible version of the ConfigParser.read_string()
-        method that was introduced in Python 3.
-        """
-        # Python 3 added read_string() method
-        if six.PY3:
-            ConfigParser.read_string(self, string, source=source)
-        # Python 2 requires StringIO buffer
-        else:
-            import StringIO
-            self.readfp(StringIO.StringIO(string))
+        self.is_validated = False
 
     def _validate(self):
         if (
@@ -183,13 +174,11 @@ class AirflowConfigParser(ConfigParser):
     def _get_cmd_option(self, section, key):
         fallback_key = key + '_cmd'
         # if this is a valid command key...
-        if (section, key) in AirflowConfigParser.as_command_stdout:
-            # if the original key is present, return it no matter what
-            if self.has_option(section, key):
-                return ConfigParser.get(self, section, key)
-            # otherwise, execute the fallback key
-            elif self.has_option(section, fallback_key):
-                command = self.get(section, fallback_key)
+        if (section, key) in self.as_command_stdout:
+            if super(AirflowConfigParser, self) \
+                    .has_option(section, fallback_key):
+                command = super(AirflowConfigParser, self) \
+                    .get(section, fallback_key)
                 return run_command(command)
 
     def get(self, section, key, **kwargs):
@@ -202,15 +191,22 @@ class AirflowConfigParser(ConfigParser):
             return option
 
         # ...then the config file
-        if self.has_option(section, key):
+        if super(AirflowConfigParser, self).has_option(section, key):
+            # Use the parent's methods to get the actual config here to be 
able to
+            # separate the config from default config.
             return expand_env_var(
-                ConfigParser.get(self, section, key, **kwargs))
+                super(AirflowConfigParser, self).get(section, key, **kwargs))
 
         # ...then commands
         option = self._get_cmd_option(section, key)
         if option:
             return option
 
+        # ...then the default config
+        if self.defaults.has_option(section, key):
+            return expand_env_var(
+                self.defaults.get(section, key, **kwargs))
+
         else:
             log.warning(
                 "section/key [{section}/{key}] not found in 
config".format(**locals())
@@ -240,9 +236,30 @@ class AirflowConfigParser(ConfigParser):
         return float(self.get(section, key))
 
     def read(self, filenames):
-        ConfigParser.read(self, filenames)
+        super(AirflowConfigParser, self).read(filenames)
         self._validate()
 
+    def has_option(self, section, option):
+        try:
+            # Using self.get() to avoid reimplementing the priority order
+            # of config variables (env, config, cmd, defaults)
+            self.get(section, option)
+            return True
+        except AirflowConfigException:
+            return False
+
+    def remove_option(self, section, option, remove_default=True):
+        """
+        Remove an option if it exists in config from a file or
+        default config. If both of config have the same option, this removes
+        the option in both configs unless remove_default=False.
+        """
+        if super(AirflowConfigParser, self).has_option(section, option):
+            super(AirflowConfigParser, self).remove_option(section, option)
+
+        if self.defaults.has_option(section, option) and remove_default:
+            self.defaults.remove_option(section, option)
+
     def getsection(self, section):
         """
         Returns the section as a dict. Values are converted to int, float, bool
@@ -250,23 +267,27 @@ class AirflowConfigParser(ConfigParser):
         :param section: section from the config
         :return: dict
         """
+        if section not in self._sections and section not in 
self.defaults._sections:
+            return None
+
+        _section = copy.deepcopy(self.defaults._sections[section])
+
         if section in self._sections:
-            _section = self._sections[section]
-            for key, val in iteritems(self._sections[section]):
+            _section.update(copy.deepcopy(self._sections[section]))
+
+        for key, val in iteritems(_section):
+            try:
+                val = int(val)
+            except ValueError:
                 try:
-                    val = int(val)
+                    val = float(val)
                 except ValueError:
-                    try:
-                        val = float(val)
-                    except ValueError:
-                        if val.lower() in ('t', 'true'):
-                            val = True
-                        elif val.lower() in ('f', 'false'):
-                            val = False
-                _section[key] = val
-            return _section
-
-        return None
+                    if val.lower() in ('t', 'true'):
+                        val = True
+                    elif val.lower() in ('f', 'false'):
+                        val = False
+            _section[key] = val
+        return _section
 
     def as_dict(self, display_source=False, display_sensitive=False):
         """
@@ -280,7 +301,8 @@ class AirflowConfigParser(ConfigParser):
             are shown as '< hidden >'
         :type display_sensitive: bool
         """
-        cfg = copy.deepcopy(self._sections)
+        cfg = copy.deepcopy(self.defaults._sections)
+        cfg.update(copy.deepcopy(self._sections))
 
         # remove __name__ (affects Python 2 only)
         for options in cfg.values():
@@ -310,7 +332,7 @@ class AirflowConfigParser(ConfigParser):
                     {key.lower(): opt})
 
         # add bash commands
-        for (section, key) in AirflowConfigParser.as_command_stdout:
+        for (section, key) in self.as_command_stdout:
             opt = self._get_cmd_option(section, key)
             if opt:
                 if not display_sensitive:
@@ -328,7 +350,7 @@ class AirflowConfigParser(ConfigParser):
         Note: this is not reversible.
         """
         # override any custom settings with defaults
-        self.read_string(parameterized_config(DEFAULT_CONFIG))
+        self.defaults.read_string(parameterized_config(DEFAULT_CONFIG))
         # then read test config
         self.read_string(parameterized_config(TEST_CONFIG))
         # then read any "custom" test settings
@@ -424,7 +446,7 @@ if not os.path.isfile(AIRFLOW_CONFIG):
 
 log.info("Reading the config from %s", AIRFLOW_CONFIG)
 
-conf = AirflowConfigParser()
+conf = AirflowConfigParser(default_config=parameterized_config(DEFAULT_CONFIG))
 conf.read(AIRFLOW_CONFIG)
 
 if conf.getboolean('webserver', 'rbac'):
@@ -438,51 +460,27 @@ if conf.getboolean('webserver', 'rbac'):
         with open(WEBSERVER_CONFIG, 'w') as f:
             f.write(DEFAULT_WEBSERVER_CONFIG)
 
-def load_test_config():
-    """
-    Load the unit test configuration.
-
-    Note: this is not reversible.
-    """
-    conf.load_test_config()
-
 if conf.getboolean('core', 'unit_test_mode'):
-    load_test_config()
-
-
-def get(section, key, **kwargs):
-    return conf.get(section, key, **kwargs)
-
-
-def getboolean(section, key):
-    return conf.getboolean(section, key)
-
-
-def getfloat(section, key):
-    return conf.getfloat(section, key)
-
-
-def getint(section, key):
-    return conf.getint(section, key)
-
-
-def getsection(section):
-    return conf.getsection(section)
-
-
-def has_option(section, key):
-    return conf.has_option(section, key)
-
-
-def remove_option(section, option):
-    return conf.remove_option(section, option)
-
-
-def as_dict(display_source=False, display_sensitive=False):
-    return conf.as_dict(
-        display_source=display_source, display_sensitive=display_sensitive)
-as_dict.__doc__ = conf.as_dict.__doc__
-
+    conf.load_test_config()
 
-def set(section, option, value):  # noqa
-    return conf.set(section, option, value)
+# Historical convenience functions to access config entries
+
+load_test_config = conf.load_test_config
+get = conf.get
+getboolean = conf.getboolean
+getfloat = conf.getfloat
+getint = conf.getint
+getsection = conf.getsection
+has_option = conf.has_option
+remove_option = conf.remove_option
+as_dict = conf.as_dict
+set = conf.set # noqa
+
+for func in [load_test_config, get, getboolean, getfloat, getint, has_option,
+             remove_option, as_dict, set]:
+    _deprecated(
+        func,
+        "Accessing configuration method '{f.__name__}' directly from "
+        "the configuration module is deprecated. Please access the "
+        "configuration from the 'configuration.conf' object via "
+        "'conf.{f.__name__}'".format(f=func))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/32c5f445/airflow/contrib/auth/backends/github_enterprise_auth.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/auth/backends/github_enterprise_auth.py 
b/airflow/contrib/auth/backends/github_enterprise_auth.py
index 2ae737f..e131ec4 100644
--- a/airflow/contrib/auth/backends/github_enterprise_auth.py
+++ b/airflow/contrib/auth/backends/github_enterprise_auth.py
@@ -39,7 +39,7 @@ log = LoggingMixin().log
 
 
 def get_config_param(param):
-    return str(configuration.get('github_enterprise', param))
+    return str(configuration.conf.get('github_enterprise', param))
 
 
 class GHEUser(models.User):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/32c5f445/airflow/contrib/auth/backends/google_auth.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/auth/backends/google_auth.py 
b/airflow/contrib/auth/backends/google_auth.py
index feccbdf..259874f 100644
--- a/airflow/contrib/auth/backends/google_auth.py
+++ b/airflow/contrib/auth/backends/google_auth.py
@@ -38,7 +38,7 @@ log = LoggingMixin().log
 
 
 def get_config_param(param):
-    return str(configuration.get('google', param))
+    return str(configuration.conf.get('google', param))
 
 
 class GoogleUser(models.User):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/32c5f445/airflow/contrib/auth/backends/kerberos_auth.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/auth/backends/kerberos_auth.py 
b/airflow/contrib/auth/backends/kerberos_auth.py
index 245af32..f6f9d69 100644
--- a/airflow/contrib/auth/backends/kerberos_auth.py
+++ b/airflow/contrib/auth/backends/kerberos_auth.py
@@ -52,8 +52,11 @@ class KerberosUser(models.User, LoggingMixin):
 
     @staticmethod
     def authenticate(username, password):
-        service_principal = "%s/%s" % (configuration.get('kerberos', 
'principal'), utils.get_fqdn())
-        realm = configuration.get("kerberos", "default_realm")
+        service_principal = "%s/%s" % (
+            configuration.conf.get('kerberos', 'principal'),
+            utils.get_fqdn()
+        )
+        realm = configuration.conf.get("kerberos", "default_realm")
         user_principal = utils.principal_from_username(username)
 
         try:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/32c5f445/airflow/contrib/auth/backends/ldap_auth.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/auth/backends/ldap_auth.py 
b/airflow/contrib/auth/backends/ldap_auth.py
index e3d8019..c887ffb 100644
--- a/airflow/contrib/auth/backends/ldap_auth.py
+++ b/airflow/contrib/auth/backends/ldap_auth.py
@@ -59,13 +59,13 @@ def get_ldap_connection(dn=None, password=None):
     tls_configuration = None
     use_ssl = False
     try:
-        cacert = configuration.get("ldap", "cacert")
+        cacert = configuration.conf.get("ldap", "cacert")
         tls_configuration = Tls(validate=ssl.CERT_REQUIRED, 
ca_certs_file=cacert)
         use_ssl = True
     except:
         pass
 
-    server = Server(configuration.get("ldap", "uri"), use_ssl, 
tls_configuration)
+    server = Server(configuration.conf.get("ldap", "uri"), use_ssl, 
tls_configuration)
     conn = Connection(server, native(dn), native(password))
 
     if not conn.bind():
@@ -91,7 +91,7 @@ def group_contains_user(conn, search_base, group_filter, 
user_name_attr, usernam
 def groups_user(conn, search_base, user_filter, user_name_att, username):
     search_filter = "(&({0})({1}={2}))".format(user_filter, user_name_att, 
username)
     try:
-        memberof_attr = configuration.get("ldap", "group_member_attr")
+        memberof_attr = configuration.conf.get("ldap", "group_member_attr")
     except:
         memberof_attr = "memberOf"
     res = conn.search(native(search_base), native(search_filter),
@@ -127,13 +127,13 @@ class LdapUser(models.User):
         self.ldap_groups = []
 
         # Load and cache superuser and data_profiler settings.
-        conn = get_ldap_connection(configuration.get("ldap", "bind_user"),
-                                   configuration.get("ldap", "bind_password"))
+        conn = get_ldap_connection(configuration.conf.get("ldap", "bind_user"),
+                                   configuration.conf.get("ldap", 
"bind_password"))
 
         superuser_filter = None
         data_profiler_filter = None
         try:
-            superuser_filter = configuration.get("ldap", "superuser_filter")
+            superuser_filter = configuration.conf.get("ldap", 
"superuser_filter")
         except AirflowConfigException:
             pass
 
@@ -142,14 +142,14 @@ class LdapUser(models.User):
             log.debug("Missing configuration for superuser settings or empty. 
Skipping.")
         else:
             self.superuser = group_contains_user(conn,
-                                                 configuration.get("ldap", 
"basedn"),
+                                                 
configuration.conf.get("ldap", "basedn"),
                                                  superuser_filter,
-                                                 configuration.get("ldap",
-                                                                   
"user_name_attr"),
+                                                 configuration.conf.get("ldap",
+                                                                        
"user_name_attr"),
                                                  user.username)
 
         try:
-            data_profiler_filter = configuration.get("ldap", 
"data_profiler_filter")
+            data_profiler_filter = configuration.conf.get("ldap", 
"data_profiler_filter")
         except AirflowConfigException:
             pass
 
@@ -158,31 +158,35 @@ class LdapUser(models.User):
             log.debug("Missing configuration for data profiler settings or 
empty. "
                       "Skipping.")
         else:
-            self.data_profiler = group_contains_user(conn,
-                                                     configuration.get("ldap", 
"basedn"),
-                                                     data_profiler_filter,
-                                                     configuration.get("ldap",
-                                                                       
"user_name_attr"),
-                                                     user.username)
+            self.data_profiler = group_contains_user(
+                conn,
+                configuration.conf.get("ldap", "basedn"),
+                data_profiler_filter,
+                configuration.conf.get("ldap",
+                                       "user_name_attr"),
+                user.username
+            )
 
         # Load the ldap group(s) a user belongs to
         try:
-            self.ldap_groups = groups_user(conn,
-                                           configuration.get("ldap", "basedn"),
-                                           configuration.get("ldap", 
"user_filter"),
-                                           configuration.get("ldap", 
"user_name_attr"),
-                                           user.username)
+            self.ldap_groups = groups_user(
+                conn,
+                configuration.conf.get("ldap", "basedn"),
+                configuration.conf.get("ldap", "user_filter"),
+                configuration.conf.get("ldap", "user_name_attr"),
+                user.username
+            )
         except AirflowConfigException:
             log.debug("Missing configuration for ldap settings. Skipping")
 
     @staticmethod
     def try_login(username, password):
-        conn = get_ldap_connection(configuration.get("ldap", "bind_user"),
-                                   configuration.get("ldap", "bind_password"))
+        conn = get_ldap_connection(configuration.conf.get("ldap", "bind_user"),
+                                   configuration.conf.get("ldap", 
"bind_password"))
 
         search_filter = "(&({0})({1}={2}))".format(
-            configuration.get("ldap", "user_filter"),
-            configuration.get("ldap", "user_name_attr"),
+            configuration.conf.get("ldap", "user_filter"),
+            configuration.conf.get("ldap", "user_name_attr"),
             username
         )
 
@@ -193,12 +197,15 @@ class LdapUser(models.User):
         }
 
         search_scope = LEVEL
-        if configuration.has_option("ldap", "search_scope"):
-            search_scope = SUBTREE if configuration.get("ldap", 
"search_scope") == "SUBTREE" else LEVEL
+        if configuration.conf.has_option("ldap", "search_scope"):
+            if configuration.conf.get("ldap", "search_scope") == "SUBTREE":
+                search_scope = SUBTREE
+            else:
+                search_scope = LEVEL
 
         # todo: BASE or ONELEVEL?
 
-        res = conn.search(native(configuration.get("ldap", "basedn")),
+        res = conn.search(native(configuration.conf.get("ldap", "basedn")),
                           native(search_filter),
                           search_scope=native(search_scope))
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/32c5f445/airflow/contrib/executors/mesos_executor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/executors/mesos_executor.py 
b/airflow/contrib/executors/mesos_executor.py
index c4874da..c022f6a 100644
--- a/airflow/contrib/executors/mesos_executor.py
+++ b/airflow/contrib/executors/mesos_executor.py
@@ -42,9 +42,9 @@ FRAMEWORK_CONNID_PREFIX = 'mesos_framework_'
 
 
 def get_framework_name():
-    if not configuration.get('mesos', 'FRAMEWORK_NAME'):
+    if not configuration.conf.get('mesos', 'FRAMEWORK_NAME'):
         return DEFAULT_FRAMEWORK_NAME
-    return configuration.get('mesos', 'FRAMEWORK_NAME')
+    return configuration.conf.get('mesos', 'FRAMEWORK_NAME')
 
 
 # AirflowMesosScheduler, implements Mesos Scheduler interface
@@ -72,7 +72,8 @@ class AirflowMesosScheduler(mesos.interface.Scheduler, 
LoggingMixin):
     def registered(self, driver, frameworkId, masterInfo):
         self.log.info("AirflowScheduler registered to Mesos with framework ID 
%s", frameworkId.value)
 
-        if configuration.getboolean('mesos', 'CHECKPOINT') and 
configuration.get('mesos', 'FAILOVER_TIMEOUT'):
+        if configuration.conf.getboolean('mesos', 'CHECKPOINT') and \
+                configuration.conf.get('mesos', 'FAILOVER_TIMEOUT'):
             # Import here to work around a circular import error
             from airflow.models import Connection
 
@@ -207,28 +208,28 @@ class MesosExecutor(BaseExecutor, LoginMixin):
         framework = mesos_pb2.FrameworkInfo()
         framework.user = ''
 
-        if not configuration.get('mesos', 'MASTER'):
+        if not configuration.conf.get('mesos', 'MASTER'):
             self.log.error("Expecting mesos master URL for mesos executor")
             raise AirflowException("mesos.master not provided for mesos 
executor")
 
-        master = configuration.get('mesos', 'MASTER')
+        master = configuration.conf.get('mesos', 'MASTER')
 
         framework.name = get_framework_name()
 
-        if not configuration.get('mesos', 'TASK_CPU'):
+        if not configuration.conf.get('mesos', 'TASK_CPU'):
             task_cpu = 1
         else:
-            task_cpu = configuration.getint('mesos', 'TASK_CPU')
+            task_cpu = configuration.conf.getint('mesos', 'TASK_CPU')
 
-        if not configuration.get('mesos', 'TASK_MEMORY'):
+        if not configuration.conf.get('mesos', 'TASK_MEMORY'):
             task_memory = 256
         else:
-            task_memory = configuration.getint('mesos', 'TASK_MEMORY')
+            task_memory = configuration.conf.getint('mesos', 'TASK_MEMORY')
 
-        if configuration.getboolean('mesos', 'CHECKPOINT'):
+        if configuration.conf.getboolean('mesos', 'CHECKPOINT'):
             framework.checkpoint = True
 
-            if configuration.get('mesos', 'FAILOVER_TIMEOUT'):
+            if configuration.conf.get('mesos', 'FAILOVER_TIMEOUT'):
                 # Import here to work around a circular import error
                 from airflow.models import Connection
 
@@ -240,7 +241,9 @@ class MesosExecutor(BaseExecutor, LoginMixin):
                     # Set the Framework ID to let the scheduler reconnect with 
running tasks.
                     framework.id.value = connection.extra
 
-                framework.failover_timeout = configuration.getint('mesos', 
'FAILOVER_TIMEOUT')
+                framework.failover_timeout = configuration.conf.getint(
+                    'mesos', 'FAILOVER_TIMEOUT'
+                )
         else:
             framework.checkpoint = False
 
@@ -251,17 +254,17 @@ class MesosExecutor(BaseExecutor, LoginMixin):
 
         implicit_acknowledgements = 1
 
-        if configuration.getboolean('mesos', 'AUTHENTICATE'):
-            if not configuration.get('mesos', 'DEFAULT_PRINCIPAL'):
+        if configuration.conf.getboolean('mesos', 'AUTHENTICATE'):
+            if not configuration.conf.get('mesos', 'DEFAULT_PRINCIPAL'):
                 self.log.error("Expecting authentication principal in the 
environment")
                 raise AirflowException("mesos.default_principal not provided 
in authenticated mode")
-            if not configuration.get('mesos', 'DEFAULT_SECRET'):
+            if not configuration.conf.get('mesos', 'DEFAULT_SECRET'):
                 self.log.error("Expecting authentication secret in the 
environment")
                 raise AirflowException("mesos.default_secret not provided in 
authenticated mode")
 
             credential = mesos_pb2.Credential()
-            credential.principal = configuration.get('mesos', 
'DEFAULT_PRINCIPAL')
-            credential.secret = configuration.get('mesos', 'DEFAULT_SECRET')
+            credential.principal = configuration.conf.get('mesos', 
'DEFAULT_PRINCIPAL')
+            credential.secret = configuration.conf.get('mesos', 
'DEFAULT_SECRET')
 
             framework.principal = credential.principal
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/32c5f445/airflow/contrib/hooks/qubole_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/qubole_hook.py 
b/airflow/contrib/hooks/qubole_hook.py
index 20e6ae1..d59fcdf 100755
--- a/airflow/contrib/hooks/qubole_hook.py
+++ b/airflow/contrib/hooks/qubole_hook.py
@@ -40,7 +40,7 @@ COMMAND_CLASSES = {
     "prestocmd": PrestoCommand,
     "hadoopcmd": HadoopCommand,
     "shellcmd": ShellCommand,
-    "pigcmd":  PigCommand,
+    "pigcmd": PigCommand,
     "sparkcmd": SparkCommand,
     "dbtapquerycmd": DbTapQueryCommand,
     "dbexportcmd": DbExportCommand,
@@ -146,7 +146,9 @@ class QuboleHook(BaseHook, LoggingMixin):
         """
         if fp is None:
             iso = datetime.datetime.utcnow().isoformat()
-            logpath = os.path.expanduser(configuration.get('core', 
'BASE_LOG_FOLDER'))
+            logpath = os.path.expanduser(
+                configuration.conf.get('core', 'BASE_LOG_FOLDER')
+            )
             resultpath = logpath + '/' + self.dag_id + '/' + self.task_id + 
'/results'
             configuration.mkdir_p(resultpath)
             fp = open(resultpath + '/' + iso, 'wb')

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/32c5f445/airflow/contrib/operators/ssh_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/ssh_operator.py 
b/airflow/contrib/operators/ssh_operator.py
index b8e320a..0ee0b71 100644
--- a/airflow/contrib/operators/ssh_operator.py
+++ b/airflow/contrib/operators/ssh_operator.py
@@ -136,8 +136,9 @@ class SSHOperator(BaseOperator):
             if exit_status is 0:
                 # returning output if do_xcom_push is set
                 if self.do_xcom_push:
-                    enable_pickling = configuration.getboolean('core',
-                                                               
'enable_xcom_pickling')
+                    enable_pickling = configuration.conf.getboolean(
+                        'core', 'enable_xcom_pickling'
+                    )
                     if enable_pickling:
                         return agg_stdout
                     else:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/32c5f445/airflow/executors/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/executors/__init__.py b/airflow/executors/__init__.py
index befff72..8aa15e4 100644
--- a/airflow/executors/__init__.py
+++ b/airflow/executors/__init__.py
@@ -42,7 +42,7 @@ def GetDefaultExecutor():
     if DEFAULT_EXECUTOR is not None:
         return DEFAULT_EXECUTOR
 
-    executor_name = configuration.get('core', 'EXECUTOR')
+    executor_name = configuration.conf.get('core', 'EXECUTOR')
 
     DEFAULT_EXECUTOR = _get_executor(executor_name)
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/32c5f445/airflow/executors/base_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/base_executor.py 
b/airflow/executors/base_executor.py
index fd53294..30ecee0 100644
--- a/airflow/executors/base_executor.py
+++ b/airflow/executors/base_executor.py
@@ -22,7 +22,7 @@ from airflow import configuration
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.state import State
 
-PARALLELISM = configuration.getint('core', 'PARALLELISM')
+PARALLELISM = configuration.conf.getint('core', 'PARALLELISM')
 
 
 class BaseExecutor(LoggingMixin):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/32c5f445/airflow/executors/celery_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/celery_executor.py 
b/airflow/executors/celery_executor.py
index f4ef3ed..efabca5 100644
--- a/airflow/executors/celery_executor.py
+++ b/airflow/executors/celery_executor.py
@@ -31,22 +31,22 @@ from airflow import configuration
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.module_loading import import_string
 
-PARALLELISM = configuration.get('core', 'PARALLELISM')
+PARALLELISM = configuration.conf.get('core', 'PARALLELISM')
 
 '''
 To start the celery worker, run the command:
 airflow worker
 '''
 
-if configuration.has_option('celery', 'celery_config_options'):
+if configuration.conf.has_option('celery', 'celery_config_options'):
     celery_configuration = import_string(
-        configuration.get('celery', 'celery_config_options')
+        configuration.conf.get('celery', 'celery_config_options')
     )
 else:
     celery_configuration = DEFAULT_CELERY_CONFIG
 
 app = Celery(
-    configuration.get('celery', 'CELERY_APP_NAME'),
+    configuration.conf.get('celery', 'CELERY_APP_NAME'),
     config_source=celery_configuration)
 
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/32c5f445/airflow/executors/dask_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/dask_executor.py 
b/airflow/executors/dask_executor.py
index 5b0ca29..0d914ba 100644
--- a/airflow/executors/dask_executor.py
+++ b/airflow/executors/dask_executor.py
@@ -31,7 +31,7 @@ class DaskExecutor(BaseExecutor):
     """
     def __init__(self, cluster_address=None):
         if cluster_address is None:
-            cluster_address = configuration.get('dask', 'cluster_address')
+            cluster_address = configuration.conf.get('dask', 'cluster_address')
         if not cluster_address:
             raise ValueError(
                 'Please provide a Dask cluster address in airflow.cfg')

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/32c5f445/airflow/hooks/hdfs_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/hdfs_hook.py b/airflow/hooks/hdfs_hook.py
index b1dff9c..c6e68c8 100644
--- a/airflow/hooks/hdfs_hook.py
+++ b/airflow/hooks/hdfs_hook.py
@@ -64,7 +64,7 @@ class HDFSHook(BaseHook):
         # take the first.
         effective_user = self.proxy_user
         autoconfig = self.autoconfig
-        use_sasl = configuration.get('core', 'security') == 'kerberos'
+        use_sasl = configuration.conf.get('core', 'security') == 'kerberos'
 
         try:
             connections = self.get_connections(self.hdfs_conn_id)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/32c5f445/airflow/hooks/hive_hooks.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/hive_hooks.py b/airflow/hooks/hive_hooks.py
index deff994..d278483 100644
--- a/airflow/hooks/hive_hooks.py
+++ b/airflow/hooks/hive_hooks.py
@@ -104,7 +104,7 @@ class HiveCliHook(BaseHook):
         if self.use_beeline:
             hive_bin = 'beeline'
             jdbc_url = "jdbc:hive2://{conn.host}:{conn.port}/{conn.schema}"
-            if configuration.get('core', 'security') == 'kerberos':
+            if configuration.conf.get('core', 'security') == 'kerberos':
                 template = conn.extra_dejson.get(
                     'principal', "hive/_h...@example.com")
                 if "_HOST" in template:
@@ -463,12 +463,13 @@ class HiveMetastoreHook(BaseHook):
         from hive_service import ThriftHive
         ms = self.metastore_conn
         auth_mechanism = ms.extra_dejson.get('authMechanism', 'NOSASL')
-        if configuration.get('core', 'security') == 'kerberos':
+        if configuration.conf.get('core', 'security') == 'kerberos':
             auth_mechanism = ms.extra_dejson.get('authMechanism', 'GSSAPI')
             kerberos_service_name = 
ms.extra_dejson.get('kerberos_service_name', 'hive')
 
         socket = TSocket.TSocket(ms.host, ms.port)
-        if configuration.get('core', 'security') == 'kerberos' and 
auth_mechanism == 'GSSAPI':
+        if configuration.conf.get('core', 'security') == 'kerberos' \
+                and auth_mechanism == 'GSSAPI':
             try:
                 import saslwrapper as sasl
             except ImportError:
@@ -740,7 +741,7 @@ class HiveServer2Hook(BaseHook):
         db = self.get_connection(self.hiveserver2_conn_id)
         auth_mechanism = db.extra_dejson.get('authMechanism', 'PLAIN')
         kerberos_service_name = None
-        if configuration.get('core', 'security') == 'kerberos':
+        if configuration.conf.get('core', 'security') == 'kerberos':
             auth_mechanism = db.extra_dejson.get('authMechanism', 'GSSAPI')
             kerberos_service_name = 
db.extra_dejson.get('kerberos_service_name', 'hive')
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/32c5f445/airflow/hooks/webhdfs_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/webhdfs_hook.py b/airflow/hooks/webhdfs_hook.py
index a2d62e2..6445e2b 100644
--- a/airflow/hooks/webhdfs_hook.py
+++ b/airflow/hooks/webhdfs_hook.py
@@ -24,7 +24,7 @@ from hdfs import InsecureClient, HdfsError
 
 from airflow.utils.log.logging_mixin import LoggingMixin
 
-_kerberos_security_mode = configuration.get("core", "security") == "kerberos"
+_kerberos_security_mode = configuration.conf.get("core", "security") == 
"kerberos"
 if _kerberos_security_mode:
     try:
         from hdfs.ext.kerberos import KerberosClient

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/32c5f445/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 12f7a38..9635460 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -116,7 +116,7 @@ def get_fernet():
     except:
         raise AirflowException('Failed to import Fernet, it may not be 
installed')
     try:
-        return Fernet(configuration.get('core', 'FERNET_KEY').encode('utf-8'))
+        return Fernet(configuration.conf.get('core', 
'FERNET_KEY').encode('utf-8'))
     except (ValueError, TypeError) as ve:
         raise AirflowException("Could not create Fernet object: {}".format(ve))
 
@@ -193,7 +193,7 @@ class DagBag(BaseDagBag, LoggingMixin):
             self,
             dag_folder=None,
             executor=None,
-            include_examples=configuration.getboolean('core', 
'LOAD_EXAMPLES')):
+            include_examples=configuration.conf.getboolean('core', 
'LOAD_EXAMPLES')):
 
         # do not use default arg in signature, to fix import cycle on plugin 
load
         if executor is None:
@@ -295,7 +295,7 @@ class DagBag(BaseDagBag, LoggingMixin):
             if mod_name in sys.modules:
                 del sys.modules[mod_name]
 
-            with timeout(configuration.getint('core', 
"DAGBAG_IMPORT_TIMEOUT")):
+            with timeout(configuration.conf.getint('core', 
"DAGBAG_IMPORT_TIMEOUT")):
                 try:
                     m = imp.load_source(mod_name, filepath)
                     mods.append(m)
@@ -364,7 +364,7 @@ class DagBag(BaseDagBag, LoggingMixin):
         from airflow.jobs import LocalTaskJob as LJ
         self.log.info("Finding 'running' jobs without a recent heartbeat")
         TI = TaskInstance
-        secs = configuration.getint('scheduler', 
'scheduler_zombie_task_threshold')
+        secs = configuration.conf.getint('scheduler', 
'scheduler_zombie_task_threshold')
         limit_dttm = timezone.utcnow() - timedelta(seconds=secs)
         self.log.info("Failing jobs without heartbeat after %s", limit_dttm)
 
@@ -1042,14 +1042,14 @@ class TaskInstance(Base, LoggingMixin):
     @property
     def log_filepath(self):
         iso = self.execution_date.isoformat()
-        log = os.path.expanduser(configuration.get('core', 'BASE_LOG_FOLDER'))
+        log = os.path.expanduser(configuration.conf.get('core', 
'BASE_LOG_FOLDER'))
         return (
             "{log}/{self.dag_id}/{self.task_id}/{iso}.log".format(**locals()))
 
     @property
     def log_url(self):
         iso = quote(self.execution_date.isoformat())
-        BASE_URL = configuration.get('webserver', 'BASE_URL')
+        BASE_URL = configuration.conf.get('webserver', 'BASE_URL')
         if settings.RBAC:
             return BASE_URL + (
                 "/log/list/"
@@ -1068,7 +1068,7 @@ class TaskInstance(Base, LoggingMixin):
     @property
     def mark_success_url(self):
         iso = quote(self.execution_date.isoformat())
-        BASE_URL = configuration.get('webserver', 'BASE_URL')
+        BASE_URL = configuration.conf.get('webserver', 'BASE_URL')
         if settings.RBAC:
             return BASE_URL + (
                 "/success"
@@ -2231,7 +2231,7 @@ class BaseOperator(LoggingMixin):
     def __init__(
             self,
             task_id,
-            owner=configuration.get('operators', 'DEFAULT_OWNER'),
+            owner=configuration.conf.get('operators', 'DEFAULT_OWNER'),
             email=None,
             email_on_retry=True,
             email_on_failure=True,
@@ -2250,7 +2250,7 @@ class BaseOperator(LoggingMixin):
             adhoc=False,
             priority_weight=1,
             weight_rule=WeightRule.DOWNSTREAM,
-            queue=configuration.get('celery', 'default_queue'),
+            queue=configuration.conf.get('celery', 'default_queue'),
             pool=None,
             sla=None,
             execution_timeout=None,
@@ -2910,7 +2910,7 @@ class DagModel(Base):
     dag_id = Column(String(ID_LEN), primary_key=True)
     # A DAG can be paused from the UI / DB
     # Set this default value of is_paused based on a configuration value!
-    is_paused_at_creation = configuration.getboolean('core',
+    is_paused_at_creation = configuration.conf.getboolean('core',
                                                      
'dags_are_paused_at_creation')
     is_paused = Column(Boolean, default=is_paused_at_creation)
     # Whether the DAG is a subdag
@@ -3036,14 +3036,14 @@ class DAG(BaseDag, LoggingMixin):
             user_defined_macros=None,
             user_defined_filters=None,
             default_args=None,
-            concurrency=configuration.getint('core', 'dag_concurrency'),
-            max_active_runs=configuration.getint(
+            concurrency=configuration.conf.getint('core', 'dag_concurrency'),
+            max_active_runs=configuration.conf.getint(
                 'core', 'max_active_runs_per_dag'),
             dagrun_timeout=None,
             sla_miss_callback=None,
-            default_view=configuration.get('webserver', 
'dag_default_view').lower(),
-            orientation=configuration.get('webserver', 'dag_orientation'),
-            catchup=configuration.getboolean('scheduler', 
'catchup_by_default'),
+            default_view=configuration.conf.get('webserver', 
'dag_default_view').lower(),
+            orientation=configuration.conf.get('webserver', 'dag_orientation'),
+            catchup=configuration.conf.getboolean('scheduler', 
'catchup_by_default'),
             on_success_callback=None, on_failure_callback=None,
             params=None):
 
@@ -3892,7 +3892,7 @@ class DAG(BaseDag, LoggingMixin):
             include_adhoc=False,
             local=False,
             executor=None,
-            donot_pickle=configuration.getboolean('core', 'donot_pickle'),
+            donot_pickle=configuration.conf.getboolean('core', 'donot_pickle'),
             ignore_task_deps=False,
             ignore_first_depends_on_past=False,
             pool=None,
@@ -4355,7 +4355,9 @@ class XCom(Base, LoggingMixin):
         session.expunge_all()
 
         if enable_pickling is None:
-            enable_pickling = configuration.getboolean('core', 
'enable_xcom_pickling')
+            enable_pickling = configuration.conf.getboolean(
+                'core', 'enable_xcom_pickling'
+            )
 
         if enable_pickling:
             value = pickle.dumps(value)
@@ -4427,7 +4429,9 @@ class XCom(Base, LoggingMixin):
         result = query.first()
         if result:
             if enable_pickling is None:
-                enable_pickling = configuration.getboolean('core', 
'enable_xcom_pickling')
+                enable_pickling = configuration.conf.getboolean(
+                    'core', 'enable_xcom_pickling'
+                )
 
             if enable_pickling:
                 return pickle.loads(result.value)
@@ -4477,7 +4481,9 @@ class XCom(Base, LoggingMixin):
                 .limit(limit))
         results = query.all()
         if enable_pickling is None:
-            enable_pickling = configuration.getboolean('core', 
'enable_xcom_pickling')
+            enable_pickling = configuration.conf.getboolean(
+                'core', 'enable_xcom_pickling'
+            )
         for result in results:
             if enable_pickling:
                 result.value = pickle.loads(result.value)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/32c5f445/airflow/plugins_manager.py
----------------------------------------------------------------------
diff --git a/airflow/plugins_manager.py b/airflow/plugins_manager.py
index 5b13e50..735f2de 100644
--- a/airflow/plugins_manager.py
+++ b/airflow/plugins_manager.py
@@ -55,9 +55,9 @@ class AirflowPlugin(object):
             raise AirflowPluginException("Your plugin needs a name.")
 
 
-plugins_folder = configuration.get('core', 'plugins_folder')
+plugins_folder = configuration.conf.get('core', 'plugins_folder')
 if not plugins_folder:
-    plugins_folder = configuration.get('core', 'airflow_home') + '/plugins'
+    plugins_folder = configuration.conf.get('core', 'airflow_home') + 
'/plugins'
 plugins_folder = os.path.expanduser(plugins_folder)
 
 if plugins_folder not in sys.path:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/32c5f445/airflow/security/kerberos.py
----------------------------------------------------------------------
diff --git a/airflow/security/kerberos.py b/airflow/security/kerberos.py
index 3af21e8..43c9fcc 100644
--- a/airflow/security/kerberos.py
+++ b/airflow/security/kerberos.py
@@ -30,15 +30,19 @@ log = LoggingMixin().log
 def renew_from_kt():
     # The config is specified in seconds. But we ask for that same amount in
     # minutes to give ourselves a large renewal buffer.
-    renewal_lifetime = "%sm" % configuration.getint('kerberos', 
'reinit_frequency')
-    principal = configuration.get('kerberos', 'principal').replace("_HOST", 
socket.getfqdn())
-
-    cmdv = [configuration.get('kerberos', 'kinit_path'),
-            "-r", renewal_lifetime,
-            "-k",  # host ticket
-            "-t", configuration.get('kerberos', 'keytab'),  # specify keytab
-            "-c", configuration.get('kerberos', 'ccache'),  # specify 
credentials cache
-            principal]
+    renewal_lifetime = "%sm" % configuration.conf.getint('kerberos', 
'reinit_frequency')
+    principal = configuration.conf.get('kerberos', 'principal').replace(
+        "_HOST", socket.getfqdn()
+    )
+
+    cmdv = [
+        configuration.conf.get('kerberos', 'kinit_path'),
+        "-r", renewal_lifetime,
+        "-k",  # host ticket
+        "-t", configuration.conf.get('kerberos', 'keytab'),  # specify keytab
+        "-c", configuration.conf.get('kerberos', 'ccache'),  # specify 
credentials cache
+        principal
+    ]
     log.info("Reinitting kerberos from keytab: " + " ".join(cmdv))
 
     subp = subprocess.Popen(cmdv,
@@ -66,8 +70,8 @@ def renew_from_kt():
 
 
 def perform_krb181_workaround():
-    cmdv = [configuration.get('kerberos', 'kinit_path'),
-            "-c", configuration.get('kerberos', 'ccache'),
+    cmdv = [configuration.conf.get('kerberos', 'kinit_path'),
+            "-c", configuration.conf.get('kerberos', 'ccache'),
             "-R"]  # Renew ticket_cache
 
     log.info("Renewing kerberos ticket to work around kerberos 1.8.1: " +
@@ -76,9 +80,12 @@ def perform_krb181_workaround():
     ret = subprocess.call(cmdv, close_fds=True)
 
     if ret != 0:
-        principal = "%s/%s" % (configuration.get('kerberos', 'principal'), 
socket.getfqdn())
+        principal = "%s/%s" % (
+            configuration.conf.get('kerberos', 'principal'),
+            socket.getfqdn()
+        )
         fmt_dict = dict(princ=principal,
-                        ccache=configuration.get('kerberos', 'principal'))
+                        ccache=configuration.conf.get('kerberos', 'principal'))
         log.error("Couldn't renew kerberos ticket in order to work around "
                   "Kerberos 1.8.1 issue. Please check that the ticket for "
                   "'%(princ)s' is still renewable:\n"
@@ -96,7 +103,7 @@ def detect_conf_var():
     Sun Java Krb5LoginModule in Java6, so we need to take an action to work
     around it.
     """
-    ticket_cache = configuration.get('kerberos', 'ccache')
+    ticket_cache = configuration.conf.get('kerberos', 'ccache')
 
     with open(ticket_cache, 'rb') as f:
         # Note: this file is binary, so we check against a bytearray.
@@ -104,10 +111,10 @@ def detect_conf_var():
 
 
 def run():
-    if configuration.get('kerberos', 'keytab') is None:
+    if configuration.conf.get('kerberos', 'keytab') is None:
         log.debug("Keytab renewer not starting, no keytab configured")
         sys.exit(0)
 
     while True:
         renew_from_kt()
-        time.sleep(configuration.getint('kerberos', 'reinit_frequency'))
+        time.sleep(configuration.conf.getint('kerberos', 'reinit_frequency'))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/32c5f445/airflow/task/task_runner/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/task/task_runner/__init__.py 
b/airflow/task/task_runner/__init__.py
index f66ff58..f2820c7 100644
--- a/airflow/task/task_runner/__init__.py
+++ b/airflow/task/task_runner/__init__.py
@@ -21,7 +21,7 @@ from airflow import configuration
 from airflow.task.task_runner.bash_task_runner import BashTaskRunner
 from airflow.exceptions import AirflowException
 
-_TASK_RUNNER = configuration.get('core', 'TASK_RUNNER')
+_TASK_RUNNER = configuration.conf.get('core', 'TASK_RUNNER')
 
 
 def get_task_runner(local_task_job):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/32c5f445/airflow/utils/email.py
----------------------------------------------------------------------
diff --git a/airflow/utils/email.py b/airflow/utils/email.py
index c5c2ec9..afe3f28 100644
--- a/airflow/utils/email.py
+++ b/airflow/utils/email.py
@@ -45,7 +45,7 @@ def send_email(to, subject, html_content, files=None,
     """
     Send email using backend specified in EMAIL_BACKEND.
     """
-    path, attr = configuration.get('email', 'EMAIL_BACKEND').rsplit('.', 1)
+    path, attr = configuration.conf.get('email', 'EMAIL_BACKEND').rsplit('.', 
1)
     module = importlib.import_module(path)
     backend = getattr(module, attr)
     return backend(to, subject, html_content, files=files,
@@ -61,7 +61,7 @@ def send_email_smtp(to, subject, html_content, files=None,
 
     >>> send_email('t...@example.com', 'foo', '<b>Foo</b> bar', ['/dev/null'], 
dryrun=True)
     """
-    SMTP_MAIL_FROM = configuration.get('smtp', 'SMTP_MAIL_FROM')
+    SMTP_MAIL_FROM = configuration.conf.get('smtp', 'SMTP_MAIL_FROM')
 
     to = get_email_address_list(to)
 
@@ -101,16 +101,16 @@ def send_email_smtp(to, subject, html_content, files=None,
 def send_MIME_email(e_from, e_to, mime_msg, dryrun=False):
     log = LoggingMixin().log
 
-    SMTP_HOST = configuration.get('smtp', 'SMTP_HOST')
-    SMTP_PORT = configuration.getint('smtp', 'SMTP_PORT')
-    SMTP_STARTTLS = configuration.getboolean('smtp', 'SMTP_STARTTLS')
-    SMTP_SSL = configuration.getboolean('smtp', 'SMTP_SSL')
+    SMTP_HOST = configuration.conf.get('smtp', 'SMTP_HOST')
+    SMTP_PORT = configuration.conf.getint('smtp', 'SMTP_PORT')
+    SMTP_STARTTLS = configuration.conf.getboolean('smtp', 'SMTP_STARTTLS')
+    SMTP_SSL = configuration.conf.getboolean('smtp', 'SMTP_SSL')
     SMTP_USER = None
     SMTP_PASSWORD = None
 
     try:
-        SMTP_USER = configuration.get('smtp', 'SMTP_USER')
-        SMTP_PASSWORD = configuration.get('smtp', 'SMTP_PASSWORD')
+        SMTP_USER = configuration.conf.get('smtp', 'SMTP_USER')
+        SMTP_PASSWORD = configuration.conf.get('smtp', 'SMTP_PASSWORD')
     except AirflowConfigException:
         log.debug("No user/password found for SMTP, so logging in with no 
authentication.")
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/32c5f445/airflow/utils/helpers.py
----------------------------------------------------------------------
diff --git a/airflow/utils/helpers.py b/airflow/utils/helpers.py
index 5efa760..d2affe5 100644
--- a/airflow/utils/helpers.py
+++ b/airflow/utils/helpers.py
@@ -43,7 +43,9 @@ from airflow.exceptions import AirflowException
 
 # When killing processes, time to wait after issuing a SIGTERM before issuing a
 # SIGKILL.
-DEFAULT_TIME_TO_WAIT_AFTER_SIGTERM = configuration.getint('core', 
'KILLED_TASK_CLEANUP_TIME')
+DEFAULT_TIME_TO_WAIT_AFTER_SIGTERM = configuration.conf.getint(
+    'core', 'KILLED_TASK_CLEANUP_TIME'
+)
 
 
 def validate_key(k, max_length=250):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/32c5f445/airflow/utils/log/gcs_task_handler.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/gcs_task_handler.py 
b/airflow/utils/log/gcs_task_handler.py
index 0cc9fa2..d4a9871 100644
--- a/airflow/utils/log/gcs_task_handler.py
+++ b/airflow/utils/log/gcs_task_handler.py
@@ -40,7 +40,7 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
         self.upload_on_close = True
 
     def _build_hook(self):
-        remote_conn_id = configuration.get('core', 'REMOTE_LOG_CONN_ID')
+        remote_conn_id = configuration.conf.get('core', 'REMOTE_LOG_CONN_ID')
         try:
             from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
             return GoogleCloudStorageHook(

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/32c5f445/airflow/utils/log/s3_task_handler.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/s3_task_handler.py 
b/airflow/utils/log/s3_task_handler.py
index f998176..f29a92f 100644
--- a/airflow/utils/log/s3_task_handler.py
+++ b/airflow/utils/log/s3_task_handler.py
@@ -38,7 +38,7 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin):
         self.upload_on_close = True
 
     def _build_hook(self):
-        remote_conn_id = configuration.get('core', 'REMOTE_LOG_CONN_ID')
+        remote_conn_id = configuration.conf.get('core', 'REMOTE_LOG_CONN_ID')
         try:
             from airflow.hooks.S3_hook import S3Hook
             return S3Hook(remote_conn_id)
@@ -168,7 +168,7 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin):
                 log,
                 key=remote_log_location,
                 replace=True,
-                encrypt=configuration.getboolean('core', 'ENCRYPT_S3_LOGS'),
+                encrypt=configuration.conf.getboolean('core', 
'ENCRYPT_S3_LOGS'),
             )
         except:
             self.log.exception('Could not write logs to %s', 
remote_log_location)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/32c5f445/airflow/utils/operator_resources.py
----------------------------------------------------------------------
diff --git a/airflow/utils/operator_resources.py 
b/airflow/utils/operator_resources.py
index 6f0b785..47ff336 100644
--- a/airflow/utils/operator_resources.py
+++ b/airflow/utils/operator_resources.py
@@ -105,10 +105,10 @@ class Resources(object):
     :type gpus: long
     """
     def __init__(self,
-                 cpus=configuration.getint('operators', 'default_cpus'),
-                 ram=configuration.getint('operators', 'default_ram'),
-                 disk=configuration.getint('operators', 'default_disk'),
-                 gpus=configuration.getint('operators', 'default_gpus')
+                 cpus=configuration.conf.getint('operators', 'default_cpus'),
+                 ram=configuration.conf.getint('operators', 'default_ram'),
+                 disk=configuration.conf.getint('operators', 'default_disk'),
+                 gpus=configuration.conf.getint('operators', 'default_gpus')
                  ):
         self.cpus = CpuResource(cpus)
         self.ram = RamResource(ram)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/32c5f445/airflow/www/app.py
----------------------------------------------------------------------
diff --git a/airflow/www/app.py b/airflow/www/app.py
index fa7949b..ca0589d 100644
--- a/airflow/www/app.py
+++ b/airflow/www/app.py
@@ -43,8 +43,9 @@ csrf = CSRFProtect()
 
 def create_app(config=None, testing=False):
     app = Flask(__name__)
-    app.secret_key = configuration.get('webserver', 'SECRET_KEY')
-    app.config['LOGIN_DISABLED'] = not configuration.getboolean('webserver', 
'AUTHENTICATE')
+    app.secret_key = configuration.conf.get('webserver', 'SECRET_KEY')
+    app.config['LOGIN_DISABLED'] = not configuration.conf.getboolean(
+        'webserver', 'AUTHENTICATE')
 
     csrf.init_app(app)
 
@@ -175,7 +176,7 @@ def root_app(env, resp):
 def cached_app(config=None, testing=False):
     global app
     if not app:
-        base_url = urlparse(configuration.get('webserver', 'base_url'))[2]
+        base_url = urlparse(configuration.conf.get('webserver', 'base_url'))[2]
         if not base_url or base_url == '/':
             base_url = ""
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/32c5f445/airflow/www/utils.py
----------------------------------------------------------------------
diff --git a/airflow/www/utils.py b/airflow/www/utils.py
index 14144fe..ada8ce2 100644
--- a/airflow/www/utils.py
+++ b/airflow/www/utils.py
@@ -41,7 +41,7 @@ from airflow.utils.db import create_session
 from airflow.utils import timezone
 from airflow.utils.json import AirflowJsonEncoder
 
-AUTHENTICATE = configuration.getboolean('webserver', 'AUTHENTICATE')
+AUTHENTICATE = configuration.conf.getboolean('webserver', 'AUTHENTICATE')
 
 DEFAULT_SENSITIVE_VARIABLE_FIELDS = (
     'password',
@@ -56,7 +56,7 @@ DEFAULT_SENSITIVE_VARIABLE_FIELDS = (
 
 def should_hide_value_for_key(key_name):
     return any(s in key_name.lower() for s in 
DEFAULT_SENSITIVE_VARIABLE_FIELDS) \
-           and configuration.getboolean('admin', 
'hide_sensitive_variable_fields')
+        and configuration.conf.getboolean('admin', 
'hide_sensitive_variable_fields')
 
 
 class LoginMixin(object):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/32c5f445/tests/configuration.py
----------------------------------------------------------------------
diff --git a/tests/configuration.py b/tests/configuration.py
index 6080059..ca1d1d7 100644
--- a/tests/configuration.py
+++ b/tests/configuration.py
@@ -18,12 +18,15 @@
 # under the License.
 
 from __future__ import print_function
+from __future__ import unicode_literals
+
 import unittest
+from collections import OrderedDict
 
 import six
 
 from airflow import configuration
-from airflow.configuration import conf
+from airflow.configuration import conf, AirflowConfigParser, 
parameterized_config
 
 
 class ConfTest(unittest.TestCase):
@@ -60,6 +63,88 @@ class ConfTest(unittest.TestCase):
         self.assertEqual(
             cfg_dict['testsection']['testkey'], ('testvalue', 'env var'))
 
+    def test_command_config(self):
+        TEST_CONFIG = '''[test]
+key1 = hello
+key2_cmd = printf cmd_result
+key3 = airflow
+key4_cmd = printf key4_result
+'''
+        TEST_CONFIG_DEFAULT = '''[test]
+key1 = awesome
+key2 = airflow
+
+[another]
+key6 = value6
+'''
+
+        test_conf = AirflowConfigParser(
+            default_config=parameterized_config(TEST_CONFIG_DEFAULT))
+        test_conf.read_string(TEST_CONFIG)
+        test_conf.as_command_stdout = test_conf.as_command_stdout | {
+            ('test', 'key2'),
+            ('test', 'key4'),
+        }
+        self.assertEqual('hello', test_conf.get('test', 'key1'))
+        self.assertEqual('cmd_result', test_conf.get('test', 'key2'))
+        self.assertEqual('airflow', test_conf.get('test', 'key3'))
+        self.assertEqual('key4_result', test_conf.get('test', 'key4'))
+        self.assertEqual('value6', test_conf.get('another', 'key6'))
+
+        self.assertTrue(test_conf.has_option('test', 'key1'))
+        self.assertTrue(test_conf.has_option('test', 'key2'))
+        self.assertTrue(test_conf.has_option('test', 'key3'))
+        self.assertTrue(test_conf.has_option('test', 'key4'))
+        self.assertFalse(test_conf.has_option('test', 'key5'))
+        self.assertTrue(test_conf.has_option('another', 'key6'))
+
+    def test_remove_option(self):
+        TEST_CONFIG = '''[test]
+key1 = hello
+key2 = airflow
+'''
+        TEST_CONFIG_DEFAULT = '''[test]
+key1 = awesome
+key2 = airflow
+'''
+
+        test_conf = AirflowConfigParser(
+            default_config=parameterized_config(TEST_CONFIG_DEFAULT))
+        test_conf.read_string(TEST_CONFIG)
+
+        self.assertEqual('hello', test_conf.get('test', 'key1'))
+        test_conf.remove_option('test', 'key1', remove_default=False)
+        self.assertEqual('awesome', test_conf.get('test', 'key1'))
+
+        test_conf.remove_option('test', 'key2')
+        self.assertFalse(test_conf.has_option('test', 'key2'))
+
+    def test_getsection(self):
+        TEST_CONFIG = '''
+[test]
+key1 = hello
+'''
+        TEST_CONFIG_DEFAULT = '''
+[test]
+key1 = awesome
+key2 = airflow
+
+[another]
+key3 = value3
+'''
+        test_conf = AirflowConfigParser(
+            default_config=parameterized_config(TEST_CONFIG_DEFAULT))
+        test_conf.read_string(TEST_CONFIG)
+
+        self.assertEqual(
+            OrderedDict([('key1', 'hello'), ('key2', 'airflow')]),
+            test_conf.getsection('test')
+        )
+        self.assertEqual(
+            OrderedDict([('key3', 'value3')]),
+            test_conf.getsection('another')
+        )
+
     def test_broker_transport_options(self):
         section_dict = conf.getsection("celery_broker_transport_options")
         self.assertTrue(isinstance(section_dict['visibility_timeout'], int))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/32c5f445/tests/contrib/operators/test_sftp_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_sftp_operator.py 
b/tests/contrib/operators/test_sftp_operator.py
index 8a2fd21..01446a6 100644
--- a/tests/contrib/operators/test_sftp_operator.py
+++ b/tests/contrib/operators/test_sftp_operator.py
@@ -68,7 +68,7 @@ class SFTPOperatorTest(unittest.TestCase):
                                                      self.test_remote_filename)
 
     def test_pickle_file_transfer_put(self):
-        configuration.set("core", "enable_xcom_pickling", "True")
+        configuration.conf.set("core", "enable_xcom_pickling", "True")
         test_local_file_content = \
             b"This is local file content \n which is multiline " \
             b"continuing....with other character\nanother line here \n this is 
last line"
@@ -105,7 +105,7 @@ class SFTPOperatorTest(unittest.TestCase):
                 test_local_file_content)
 
     def test_json_file_transfer_put(self):
-        configuration.set("core", "enable_xcom_pickling", "False")
+        configuration.conf.set("core", "enable_xcom_pickling", "False")
         test_local_file_content = \
             b"This is local file content \n which is multiline " \
             b"continuing....with other character\nanother line here \n this is 
last line"
@@ -143,7 +143,7 @@ class SFTPOperatorTest(unittest.TestCase):
 
 
     def test_pickle_file_transfer_get(self):
-        configuration.set("core", "enable_xcom_pickling", "True")
+        configuration.conf.set("core", "enable_xcom_pickling", "True")
         test_remote_file_content = \
             "This is remote file content \n which is also multiline " \
             "another line here \n this is last line. EOF"
@@ -181,7 +181,7 @@ class SFTPOperatorTest(unittest.TestCase):
         self.assertEqual(content_received.strip(), test_remote_file_content)
 
     def test_json_file_transfer_get(self):
-        configuration.set("core", "enable_xcom_pickling", "False")
+        configuration.conf.set("core", "enable_xcom_pickling", "False")
         test_remote_file_content = \
             "This is remote file content \n which is also multiline " \
             "another line here \n this is last line. EOF"

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/32c5f445/tests/contrib/operators/test_ssh_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_ssh_operator.py 
b/tests/contrib/operators/test_ssh_operator.py
index b6f6315..b97ba84 100644
--- a/tests/contrib/operators/test_ssh_operator.py
+++ b/tests/contrib/operators/test_ssh_operator.py
@@ -59,7 +59,7 @@ class SSHOperatorTest(unittest.TestCase):
         self.dag = dag
 
     def test_json_command_execution(self):
-        configuration.set("core", "enable_xcom_pickling", "False")
+        configuration.conf.set("core", "enable_xcom_pickling", "False")
         task = SSHOperator(
                 task_id="test",
                 ssh_hook=self.hook,
@@ -78,7 +78,7 @@ class SSHOperatorTest(unittest.TestCase):
                          b64encode(b'airflow').decode('utf-8'))
 
     def test_pickle_command_execution(self):
-        configuration.set("core", "enable_xcom_pickling", "True")
+        configuration.conf.set("core", "enable_xcom_pickling", "True")
         task = SSHOperator(
                 task_id="test",
                 ssh_hook=self.hook,
@@ -96,7 +96,7 @@ class SSHOperatorTest(unittest.TestCase):
         self.assertEqual(ti.xcom_pull(task_ids='test', key='return_value'), 
b'airflow')
 
     def test_command_execution_with_env(self):
-        configuration.set("core", "enable_xcom_pickling", "True")
+        configuration.conf.set("core", "enable_xcom_pickling", "True")
         task = SSHOperator(
             task_id="test",
             ssh_hook=self.hook,
@@ -114,7 +114,7 @@ class SSHOperatorTest(unittest.TestCase):
         self.assertEqual(ti.xcom_pull(task_ids='test', key='return_value'), 
b'airflow')
 
     def test_no_output_command(self):
-        configuration.set("core", "enable_xcom_pickling", "True")
+        configuration.conf.set("core", "enable_xcom_pickling", "True")
         task = SSHOperator(
             task_id="test",
             ssh_hook=self.hook,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/32c5f445/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index 26ba23b..e4dffcf 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -45,7 +45,7 @@ from airflow import configuration
 from airflow.executors import SequentialExecutor
 from airflow.models import Variable
 
-configuration.load_test_config()
+configuration.conf.load_test_config()
 from airflow import jobs, models, DAG, utils, macros, settings, exceptions
 from airflow.models import BaseOperator
 from airflow.operators.bash_operator import BashOperator
@@ -116,7 +116,7 @@ class CoreTest(unittest.TestCase):
     default_scheduler_args = {"num_runs": 1}
 
     def setUp(self):
-        configuration.load_test_config()
+        configuration.conf.load_test_config()
         self.dagbag = models.DagBag(
             dag_folder=DEV_NULL, include_examples=True)
         self.args = {'owner': 'airflow', 'start_date': DEFAULT_DATE}
@@ -299,7 +299,7 @@ class CoreTest(unittest.TestCase):
         self.assertIsNone(additional_dag_run)
 
     def test_confirm_unittest_mod(self):
-        self.assertTrue(configuration.get('core', 'unit_test_mode'))
+        self.assertTrue(configuration.conf.get('core', 'unit_test_mode'))
 
     def test_pickling(self):
         dp = self.dag.pickle()
@@ -662,14 +662,14 @@ class CoreTest(unittest.TestCase):
         self.assertNotIn("{FERNET_KEY}", cfg)
 
     def test_config_use_original_when_original_and_fallback_are_present(self):
-        self.assertTrue(configuration.has_option("core", "FERNET_KEY"))
-        self.assertFalse(configuration.has_option("core", "FERNET_KEY_CMD"))
+        self.assertTrue(configuration.conf.has_option("core", "FERNET_KEY"))
+        self.assertFalse(configuration.conf.has_option("core", 
"FERNET_KEY_CMD"))
 
-        FERNET_KEY = configuration.get('core', 'FERNET_KEY')
+        FERNET_KEY = configuration.conf.get('core', 'FERNET_KEY')
 
-        configuration.set("core", "FERNET_KEY_CMD", "printf HELLO")
+        configuration.conf.set("core", "FERNET_KEY_CMD", "printf HELLO")
 
-        FALLBACK_FERNET_KEY = configuration.get(
+        FALLBACK_FERNET_KEY = configuration.conf.get(
             "core",
             "FERNET_KEY"
         )
@@ -677,25 +677,25 @@ class CoreTest(unittest.TestCase):
         self.assertEqual(FERNET_KEY, FALLBACK_FERNET_KEY)
 
         # restore the conf back to the original state
-        configuration.remove_option("core", "FERNET_KEY_CMD")
+        configuration.conf.remove_option("core", "FERNET_KEY_CMD")
 
     def test_config_throw_error_when_original_and_fallback_is_absent(self):
-        self.assertTrue(configuration.has_option("core", "FERNET_KEY"))
-        self.assertFalse(configuration.has_option("core", "FERNET_KEY_CMD"))
+        self.assertTrue(configuration.conf.has_option("core", "FERNET_KEY"))
+        self.assertFalse(configuration.conf.has_option("core", 
"FERNET_KEY_CMD"))
 
-        FERNET_KEY = configuration.get("core", "FERNET_KEY")
-        configuration.remove_option("core", "FERNET_KEY")
+        FERNET_KEY = configuration.conf.get("core", "FERNET_KEY")
+        configuration.conf.remove_option("core", "FERNET_KEY")
 
         with self.assertRaises(AirflowConfigException) as cm:
-            configuration.get("core", "FERNET_KEY")
+            configuration.conf.get("core", "FERNET_KEY")
 
         exception = str(cm.exception)
         message = "section/key [core/fernet_key] not found in config"
         self.assertEqual(message, exception)
 
         # restore the conf back to the original state
-        configuration.set("core", "FERNET_KEY", FERNET_KEY)
-        self.assertTrue(configuration.has_option("core", "FERNET_KEY"))
+        configuration.conf.set("core", "FERNET_KEY", FERNET_KEY)
+        self.assertTrue(configuration.conf.has_option("core", "FERNET_KEY"))
 
     def test_config_override_original_when_non_empty_envvar_is_provided(self):
         key = "AIRFLOW__CORE__FERNET_KEY"
@@ -703,7 +703,7 @@ class CoreTest(unittest.TestCase):
         self.assertNotIn(key, os.environ)
 
         os.environ[key] = value
-        FERNET_KEY = configuration.get('core', 'FERNET_KEY')
+        FERNET_KEY = configuration.conf.get('core', 'FERNET_KEY')
         self.assertEqual(value, FERNET_KEY)
 
         # restore the envvar back to the original state
@@ -715,7 +715,7 @@ class CoreTest(unittest.TestCase):
         self.assertNotIn(key, os.environ)
 
         os.environ[key] = value
-        FERNET_KEY = configuration.get('core', 'FERNET_KEY')
+        FERNET_KEY = configuration.conf.get('core', 'FERNET_KEY')
         self.assertEqual(value, FERNET_KEY)
 
         # restore the envvar back to the original state
@@ -1815,7 +1815,7 @@ class SecureModeWebUiTests(unittest.TestCase):
         self.assertEqual(response.status_code, 404)
 
     def tearDown(self):
-        configuration.remove_option("core", "SECURE_MODE")
+        configuration.conf.remove_option("core", "SECURE_MODE")
 
 
 class WebPasswordAuthTest(unittest.TestCase):
@@ -1858,7 +1858,7 @@ class WebPasswordAuthTest(unittest.TestCase):
         return self.app.get('/admin/airflow/logout', follow_redirects=True)
 
     def test_login_logout_password_auth(self):
-        self.assertTrue(configuration.getboolean('webserver', 'authenticate'))
+        self.assertTrue(configuration.conf.getboolean('webserver', 
'authenticate'))
 
         response = self.login('user1', 'whatever')
         self.assertIn('Incorrect login details', response.data.decode('utf-8'))
@@ -1925,7 +1925,7 @@ class WebLdapAuthTest(unittest.TestCase):
         return self.app.get('/admin/airflow/logout', follow_redirects=True)
 
     def test_login_logout_ldap(self):
-        self.assertTrue(configuration.getboolean('webserver', 'authenticate'))
+        self.assertTrue(configuration.conf.getboolean('webserver', 
'authenticate'))
 
         response = self.login('user1', 'userx')
         self.assertIn('Incorrect login details', response.data.decode('utf-8'))
@@ -2370,7 +2370,7 @@ send_email_test = mock.Mock()
 
 class EmailTest(unittest.TestCase):
     def setUp(self):
-        configuration.remove_option('email', 'EMAIL_BACKEND')
+        configuration.conf.remove_option('email', 'EMAIL_BACKEND')
 
     @mock.patch('airflow.utils.email.send_email')
     def test_default_backend(self, mock_send_email):
@@ -2380,7 +2380,7 @@ class EmailTest(unittest.TestCase):
 
     @mock.patch('airflow.utils.email.send_email_smtp')
     def test_custom_backend(self, mock_send_email):
-        configuration.set('email', 'EMAIL_BACKEND', 
'tests.core.send_email_test')
+        configuration.conf.set('email', 'EMAIL_BACKEND', 
'tests.core.send_email_test')
         utils.email.send_email('to', 'subject', 'content')
         send_email_test.assert_called_with(
             'to', 'subject', 'content', files=None, dryrun=False,
@@ -2391,7 +2391,7 @@ class EmailTest(unittest.TestCase):
 
 class EmailSmtpTest(unittest.TestCase):
     def setUp(self):
-        configuration.set('smtp', 'SMTP_SSL', 'False')
+        configuration.conf.set('smtp', 'SMTP_SSL', 'False')
 
     @mock.patch('airflow.utils.email.send_MIME_email')
     def test_send_smtp(self, mock_send_mime):
@@ -2401,11 +2401,11 @@ class EmailSmtpTest(unittest.TestCase):
         utils.email.send_email_smtp('to', 'subject', 'content', 
files=[attachment.name])
         self.assertTrue(mock_send_mime.called)
         call_args = mock_send_mime.call_args[0]
-        self.assertEqual(configuration.get('smtp', 'SMTP_MAIL_FROM'), 
call_args[0])
+        self.assertEqual(configuration.conf.get('smtp', 'SMTP_MAIL_FROM'), 
call_args[0])
         self.assertEqual(['to'], call_args[1])
         msg = call_args[2]
         self.assertEqual('subject', msg['Subject'])
-        self.assertEqual(configuration.get('smtp', 'SMTP_MAIL_FROM'), 
msg['From'])
+        self.assertEqual(configuration.conf.get('smtp', 'SMTP_MAIL_FROM'), 
msg['From'])
         self.assertEqual(2, len(msg.get_payload()))
         self.assertEqual(u'attachment; filename="' + 
os.path.basename(attachment.name) + '"',
                          msg.get_payload()[-1].get(u'Content-Disposition'))
@@ -2420,11 +2420,11 @@ class EmailSmtpTest(unittest.TestCase):
         utils.email.send_email_smtp('to', 'subject', 'content', 
files=[attachment.name], cc='cc', bcc='bcc')
         self.assertTrue(mock_send_mime.called)
         call_args = mock_send_mime.call_args[0]
-        self.assertEqual(configuration.get('smtp', 'SMTP_MAIL_FROM'), 
call_args[0])
+        self.assertEqual(configuration.conf.get('smtp', 'SMTP_MAIL_FROM'), 
call_args[0])
         self.assertEqual(['to', 'cc', 'bcc'], call_args[1])
         msg = call_args[2]
         self.assertEqual('subject', msg['Subject'])
-        self.assertEqual(configuration.get('smtp', 'SMTP_MAIL_FROM'), 
msg['From'])
+        self.assertEqual(configuration.conf.get('smtp', 'SMTP_MAIL_FROM'), 
msg['From'])
         self.assertEqual(2, len(msg.get_payload()))
         self.assertEqual(u'attachment; filename="' + 
os.path.basename(attachment.name) + '"',
                          msg.get_payload()[-1].get(u'Content-Disposition'))
@@ -2439,13 +2439,13 @@ class EmailSmtpTest(unittest.TestCase):
         msg = MIMEMultipart()
         utils.email.send_MIME_email('from', 'to', msg, dryrun=False)
         mock_smtp.assert_called_with(
-            configuration.get('smtp', 'SMTP_HOST'),
-            configuration.getint('smtp', 'SMTP_PORT'),
+            configuration.conf.get('smtp', 'SMTP_HOST'),
+            configuration.conf.getint('smtp', 'SMTP_PORT'),
         )
         self.assertTrue(mock_smtp.return_value.starttls.called)
         mock_smtp.return_value.login.assert_called_with(
-            configuration.get('smtp', 'SMTP_USER'),
-            configuration.get('smtp', 'SMTP_PASSWORD'),
+            configuration.conf.get('smtp', 'SMTP_USER'),
+            configuration.conf.get('smtp', 'SMTP_PASSWORD'),
         )
         mock_smtp.return_value.sendmail.assert_called_with('from', 'to', 
msg.as_string())
         self.assertTrue(mock_smtp.return_value.quit.called)
@@ -2453,14 +2453,14 @@ class EmailSmtpTest(unittest.TestCase):
     @mock.patch('smtplib.SMTP_SSL')
     @mock.patch('smtplib.SMTP')
     def test_send_mime_ssl(self, mock_smtp, mock_smtp_ssl):
-        configuration.set('smtp', 'SMTP_SSL', 'True')
+        configuration.conf.set('smtp', 'SMTP_SSL', 'True')
         mock_smtp.return_value = mock.Mock()
         mock_smtp_ssl.return_value = mock.Mock()
         utils.email.send_MIME_email('from', 'to', MIMEMultipart(), 
dryrun=False)
         self.assertFalse(mock_smtp.called)
         mock_smtp_ssl.assert_called_with(
-            configuration.get('smtp', 'SMTP_HOST'),
-            configuration.getint('smtp', 'SMTP_PORT'),
+            configuration.conf.get('smtp', 'SMTP_HOST'),
+            configuration.conf.getint('smtp', 'SMTP_PORT'),
         )
 
     @mock.patch('smtplib.SMTP_SSL')
@@ -2473,8 +2473,8 @@ class EmailSmtpTest(unittest.TestCase):
         utils.email.send_MIME_email('from', 'to', MIMEMultipart(), 
dryrun=False)
         self.assertFalse(mock_smtp_ssl.called)
         mock_smtp.assert_called_with(
-            configuration.get('smtp', 'SMTP_HOST'),
-            configuration.getint('smtp', 'SMTP_PORT'),
+            configuration.conf.get('smtp', 'SMTP_HOST'),
+            configuration.conf.getint('smtp', 'SMTP_PORT'),
         )
         self.assertFalse(mock_smtp.login.called)
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/32c5f445/tests/executors/dask_executor.py
----------------------------------------------------------------------
diff --git a/tests/executors/dask_executor.py b/tests/executors/dask_executor.py
index eda7a86..40796bc 100644
--- a/tests/executors/dask_executor.py
+++ b/tests/executors/dask_executor.py
@@ -33,7 +33,7 @@ try:
 except ImportError:
     SKIP_DASK = True
 
-if 'sqlite' in configuration.get('core', 'sql_alchemy_conn'):
+if 'sqlite' in configuration.conf.get('core', 'sql_alchemy_conn'):
     SKIP_DASK = True
 
 # Always skip due to issues on python 3 issues

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/32c5f445/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index b3bcdb5..d6336f0 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -90,7 +90,7 @@ class BackfillJobTest(unittest.TestCase):
         self.parser = cli.CLIFactory.get_parser()
         self.dagbag = DagBag(include_examples=True)
 
-    @unittest.skipIf('sqlite' in configuration.get('core', 'sql_alchemy_conn'),
+    @unittest.skipIf('sqlite' in configuration.conf.get('core', 
'sql_alchemy_conn'),
                      "concurrent access not supported in sqlite")
     def test_trigger_controller_dag(self):
         dag = self.dagbag.get_dag('example_trigger_controller_dag')
@@ -119,7 +119,7 @@ class BackfillJobTest(unittest.TestCase):
         target_dag.clear()
         dag.clear()
 
-    @unittest.skipIf('sqlite' in configuration.get('core', 'sql_alchemy_conn'),
+    @unittest.skipIf('sqlite' in configuration.conf.get('core', 
'sql_alchemy_conn'),
                      "concurrent access not supported in sqlite")
     def test_backfill_multi_dates(self):
         dag = self.dagbag.get_dag('example_bash_operator')
@@ -147,7 +147,7 @@ class BackfillJobTest(unittest.TestCase):
         dag.clear()
         session.close()
 
-    @unittest.skipIf('sqlite' in configuration.get('core', 'sql_alchemy_conn'),
+    @unittest.skipIf('sqlite' in configuration.conf.get('core', 
'sql_alchemy_conn'),
                      "concurrent access not supported in sqlite")
     def test_backfill_examples(self):
         """
@@ -2728,7 +2728,7 @@ class SchedulerJobTest(unittest.TestCase):
                   default_args=default_args
                   )
 
-        default_catchup = configuration.getboolean('scheduler', 
'catchup_by_default')
+        default_catchup = configuration.conf.getboolean('scheduler', 
'catchup_by_default')
         # Test configs have catchup by default ON
 
         self.assertEqual(default_catchup, True)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/32c5f445/tests/operators/hive_operator.py
----------------------------------------------------------------------
diff --git a/tests/operators/hive_operator.py b/tests/operators/hive_operator.py
index 6250473..9cc0f1a 100644
--- a/tests/operators/hive_operator.py
+++ b/tests/operators/hive_operator.py
@@ -69,8 +69,10 @@ class HiveOperatorConfigTest(HiveEnvironmentTest):
             dag=self.dag)
 
         # just check that the correct default value in test_default.cfg is used
-        test_config_hive_mapred_queue = configuration.get('hive',
-                                                          
'default_hive_mapred_queue')
+        test_config_hive_mapred_queue = configuration.conf.get(
+            'hive',
+            'default_hive_mapred_queue'
+        )
         self.assertEqual(t.get_hook().mapred_queue, 
test_config_hive_mapred_queue)
 
     def test_hive_airflow_default_config_queue_override(self):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/32c5f445/tests/utils.py
----------------------------------------------------------------------
diff --git a/tests/utils.py b/tests/utils.py
index fd4f295..12d3ff0 100644
--- a/tests/utils.py
+++ b/tests/utils.py
@@ -78,21 +78,21 @@ class OperatorResourcesTest(unittest.TestCase):
         resources = Resources(cpus=0, disk=1)
         self.assertEqual(resources.cpus.qty, 0)
         self.assertEqual(resources.ram.qty,
-                         configuration.getint('operators', 'default_ram'))
+                         configuration.conf.getint('operators', 'default_ram'))
         self.assertEqual(resources.disk.qty, 1)
         self.assertEqual(resources.gpus.qty,
-                         configuration.getint('operators', 'default_gpus'))
+                         configuration.conf.getint('operators', 
'default_gpus'))
 
     def test_no_resources_specified(self):
         resources = Resources()
         self.assertEqual(resources.cpus.qty,
-                         configuration.getint('operators', 'default_cpus'))
+                         configuration.conf.getint('operators', 
'default_cpus'))
         self.assertEqual(resources.ram.qty,
-                         configuration.getint('operators', 'default_ram'))
+                         configuration.conf.getint('operators', 'default_ram'))
         self.assertEqual(resources.disk.qty,
-                         configuration.getint('operators', 'default_disk'))
+                         configuration.conf.getint('operators', 
'default_disk'))
         self.assertEqual(resources.gpus.qty,
-                         configuration.getint('operators', 'default_gpus'))
+                         configuration.conf.getint('operators', 
'default_gpus'))
 
     def test_negative_resource_qty(self):
         with self.assertRaises(AirflowException):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/32c5f445/tests/www/api/experimental/test_password_endpoints.py
----------------------------------------------------------------------
diff --git a/tests/www/api/experimental/test_password_endpoints.py 
b/tests/www/api/experimental/test_password_endpoints.py
index fc4e6a9..ecddff1 100644
--- a/tests/www/api/experimental/test_password_endpoints.py
+++ b/tests/www/api/experimental/test_password_endpoints.py
@@ -22,17 +22,14 @@ import unittest
 
 from datetime import datetime
 
+from backports.configparser import DuplicateSectionError
+
 from airflow import models
 from airflow import configuration
 from airflow.www import app as application
 from airflow.settings import Session
 from airflow.contrib.auth.backends.password_auth import PasswordUser
 
-try:
-    from ConfigParser import DuplicateSectionError
-except ImportError:
-    from configparser import DuplicateSectionError
-
 
 class ApiPasswordTests(unittest.TestCase):
     def setUp(self):


Reply via email to