Repository: incubator-airflow Updated Branches: refs/heads/master fdefe1f82 -> 261b65670
[AIRFLOW-770] Refactor BaseHook so env vars are always read The WebHDFS and HDFS hooks ignore connections set in the environment variables because they use `BaseHook.get_connections()` directly, which fetches a list of connections from DB. I moved that method's logic to `_get_connections_from_db()` and made a new `get_connections()` that first checks environment variables before falling back on connections in DB. Also because connection extras cannot be specified when using environment variables, I added an arg to HDFSHook for using Snakebite's AutoConfigClient, which can be initialized without any connection info. Closes #2056 from dhuang/AIRFLOW-770 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/261b6567 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/261b6567 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/261b6567 Branch: refs/heads/master Commit: 261b65670a610d33a25f96b824207ba5771524f2 Parents: fdefe1f Author: Daniel Huang <dxhu...@gmail.com> Authored: Mon Mar 13 18:04:14 2017 -0400 Committer: Jeremiah Lowin <jlo...@apache.org> Committed: Mon Mar 13 18:04:14 2017 -0400 ---------------------------------------------------------------------- airflow/hooks/base_hook.py | 18 +++++++++-- airflow/hooks/hdfs_hook.py | 60 ++++++++++++++++++++++++------------ tests/core.py | 67 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 123 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/261b6567/airflow/hooks/base_hook.py ---------------------------------------------------------------------- diff --git a/airflow/hooks/base_hook.py b/airflow/hooks/base_hook.py index f5eeb6a..cef8c97 100644 --- a/airflow/hooks/base_hook.py +++ b/airflow/hooks/base_hook.py @@ -41,7 +41,7 @@ class BaseHook(object): pass @classmethod - def get_connections(cls, conn_id): + def _get_connections_from_db(cls, conn_id): session = settings.Session() db = ( session.query(Connection) @@ -56,13 +56,25 @@ class BaseHook(object): return db @classmethod - def get_connection(cls, conn_id): + def _get_connection_from_env(cls, conn_id): environment_uri = os.environ.get(CONN_ENV_PREFIX + conn_id.upper()) conn = None if environment_uri: conn = Connection(conn_id=conn_id, uri=environment_uri) + return conn + + @classmethod + def get_connections(cls, conn_id): + conn = cls._get_connection_from_env(conn_id) + if conn: + conns = [conn] else: - conn = random.choice(cls.get_connections(conn_id)) + conns = cls._get_connections_from_db(conn_id) + return conns + + @classmethod + def get_connection(cls, conn_id): + conn = random.choice(cls.get_connections(conn_id)) if conn.host: logging.info("Using connection to: " + conn.host) return conn http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/261b6567/airflow/hooks/hdfs_hook.py ---------------------------------------------------------------------- diff --git a/airflow/hooks/hdfs_hook.py b/airflow/hooks/hdfs_hook.py index 69dccd0..549b609 100644 --- a/airflow/hooks/hdfs_hook.py +++ b/airflow/hooks/hdfs_hook.py @@ -31,8 +31,16 @@ class HDFSHookException(AirflowException): class HDFSHook(BaseHook): """ Interact with HDFS. This class is a wrapper around the snakebite library. + + :param hdfs_conn_id: Connection id to fetch connection info + :type conn_id: string + :param proxy_user: effective user for HDFS operations + :type proxy_user: string + :param autoconfig: use snakebite's automatically configured client + :type autoconfig: bool """ - def __init__(self, hdfs_conn_id='hdfs_default', proxy_user=None): + def __init__(self, hdfs_conn_id='hdfs_default', proxy_user=None, + autoconfig=False): if not snakebite_imported: raise ImportError( 'This HDFSHook implementation requires snakebite, but ' @@ -41,33 +49,47 @@ class HDFSHook(BaseHook): 'this hook -- or help by submitting a PR!') self.hdfs_conn_id = hdfs_conn_id self.proxy_user = proxy_user + self.autoconfig = autoconfig def get_conn(self): """ Returns a snakebite HDFSClient object. """ - connections = self.get_connections(self.hdfs_conn_id) - use_sasl = False - if configuration.get('core', 'security') == 'kerberos': - use_sasl = True + # When using HAClient, proxy_user must be the same, so is ok to always + # take the first. + effective_user = self.proxy_user + autoconfig = self.autoconfig + use_sasl = configuration.get('core', 'security') == 'kerberos' + + try: + connections = self.get_connections(self.hdfs_conn_id) + + if not effective_user: + effective_user = connections[0].login + if not autoconfig: + autoconfig = connections[0].extra_dejson.get('autoconfig', + False) + hdfs_namenode_principal = connections[0].extra_dejson.get( + 'hdfs_namenode_principal') + except AirflowException: + if not autoconfig: + raise - # When using HAClient, proxy_user must be the same, so is ok to always take the first. - effective_user = self.proxy_user or connections[0].login - if len(connections) == 1: - autoconfig = connections[0].extra_dejson.get('autoconfig', False) - if autoconfig: - client = AutoConfigClient(effective_user=effective_user, use_sasl=use_sasl) - else: - hdfs_namenode_principal = connections[0].extra_dejson.get('hdfs_namenode_principal') - client = Client(connections[0].host, connections[0].port, - effective_user=effective_user, use_sasl=use_sasl, - hdfs_namenode_principal=hdfs_namenode_principal) + if autoconfig: + # will read config info from $HADOOP_HOME conf files + client = AutoConfigClient(effective_user=effective_user, + use_sasl=use_sasl) + elif len(connections) == 1: + client = Client(connections[0].host, connections[0].port, + effective_user=effective_user, use_sasl=use_sasl, + hdfs_namenode_principal=hdfs_namenode_principal) elif len(connections) > 1: - hdfs_namenode_principal = connections[0].extra_dejson.get('hdfs_namenode_principal') nn = [Namenode(conn.host, conn.port) for conn in connections] - client = HAClient(nn, effective_user=effective_user, use_sasl=use_sasl, + client = HAClient(nn, effective_user=effective_user, + use_sasl=use_sasl, hdfs_namenode_principal=hdfs_namenode_principal) else: - raise HDFSHookException("conn_id doesn't exist in the repository") + raise HDFSHookException("conn_id doesn't exist in the repository " + "and autoconfig is not specified") return client http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/261b6567/tests/core.py ---------------------------------------------------------------------- diff --git a/tests/core.py b/tests/core.py index 67400e1..e17920a 100644 --- a/tests/core.py +++ b/tests/core.py @@ -15,6 +15,7 @@ from __future__ import print_function import doctest +import json import os import re import unittest @@ -2045,6 +2046,22 @@ class ConnectionTest(unittest.TestCase): self.assertIsInstance(engine, sqlalchemy.engine.Engine) self.assertEqual('postgres://username:passw...@ec2.compute.com:5432/the_database', str(engine.url)) + def test_get_connections_env_var(self): + conns = SqliteHook.get_connections(conn_id='test_uri') + assert len(conns) == 1 + assert conns[0].host == 'ec2.compute.com' + assert conns[0].schema == 'the_database' + assert conns[0].login == 'username' + assert conns[0].password == 'password' + assert conns[0].port == 5432 + + def test_get_connections_db(self): + conns = BaseHook.get_connections(conn_id='airflow_db') + assert len(conns) == 1 + assert conns[0].host == 'localhost' + assert conns[0].schema == 'airflow' + assert conns[0].login == 'root' + class WebHDFSHookTest(unittest.TestCase): def setUp(self): @@ -2062,6 +2079,56 @@ class WebHDFSHookTest(unittest.TestCase): try: + from airflow.hooks.hdfs_hook import HDFSHook + import snakebite +except ImportError: + HDFSHook = None + + +@unittest.skipIf(HDFSHook is None, + "Skipping test because HDFSHook is not installed") +class HDFSHookTest(unittest.TestCase): + def setUp(self): + configuration.load_test_config() + os.environ['AIRFLOW_CONN_HDFS_DEFAULT'] = ('hdfs://localhost:8020') + + def test_get_client(self): + client = HDFSHook(proxy_user='foo').get_conn() + self.assertIsInstance(client, snakebite.client.Client) + self.assertEqual('localhost', client.host) + self.assertEqual(8020, client.port) + self.assertEqual('foo', client.service.channel.effective_user) + + @mock.patch('airflow.hooks.hdfs_hook.AutoConfigClient') + @mock.patch('airflow.hooks.hdfs_hook.HDFSHook.get_connections') + def test_get_autoconfig_client(self, mock_get_connections, + MockAutoConfigClient): + c = models.Connection(conn_id='hdfs', conn_type='hdfs', + host='localhost', port=8020, login='foo', + extra=json.dumps({'autoconfig': True})) + mock_get_connections.return_value = [c] + HDFSHook(hdfs_conn_id='hdfs').get_conn() + MockAutoConfigClient.assert_called_once_with(effective_user='foo', + use_sasl=False) + + @mock.patch('airflow.hooks.hdfs_hook.AutoConfigClient') + def test_get_autoconfig_client_no_conn(self, MockAutoConfigClient): + HDFSHook(hdfs_conn_id='hdfs_missing', autoconfig=True).get_conn() + MockAutoConfigClient.assert_called_once_with(effective_user=None, + use_sasl=False) + + @mock.patch('airflow.hooks.hdfs_hook.HDFSHook.get_connections') + def test_get_ha_client(self, mock_get_connections): + c1 = models.Connection(conn_id='hdfs_default', conn_type='hdfs', + host='localhost', port=8020) + c2 = models.Connection(conn_id='hdfs_default', conn_type='hdfs', + host='localhost2', port=8020) + mock_get_connections.return_value = [c1, c2] + client = HDFSHook().get_conn() + self.assertIsInstance(client, snakebite.client.HAClient) + + +try: from airflow.hooks.S3_hook import S3Hook except ImportError: S3Hook = None