Repository: incubator-airflow Updated Branches: refs/heads/master fff87b5cf -> e48b8e36a
[AIRFLOW-2487] Enhance druid ingestion hook Closes #3380 from feng-tao/aiflow-2487 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/e48b8e36 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/e48b8e36 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/e48b8e36 Branch: refs/heads/master Commit: e48b8e36af3d47d52cdcb8de4d3a63f65e34c2de Parents: fff87b5 Author: Tao feng <[email protected]> Authored: Sat May 19 14:42:13 2018 +0100 Committer: Kaxil Naik <[email protected]> Committed: Sat May 19 14:42:13 2018 +0100 ---------------------------------------------------------------------- airflow/hooks/druid_hook.py | 13 +++++---- tests/hooks/test_druid_hook.py | 56 ++++++++++++++++++++++++------------- 2 files changed, 45 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e48b8e36/airflow/hooks/druid_hook.py ---------------------------------------------------------------------- diff --git a/airflow/hooks/druid_hook.py b/airflow/hooks/druid_hook.py index e8b61c0..ef4f233 100644 --- a/airflow/hooks/druid_hook.py +++ b/airflow/hooks/druid_hook.py @@ -57,16 +57,17 @@ class DruidHook(BaseHook): conn = self.get_connection(self.druid_ingest_conn_id) host = conn.host port = conn.port - schema = conn.extra_dejson.get('schema', 'http') + conn_type = 'http' if not conn.conn_type else conn.conn_type endpoint = conn.extra_dejson.get('endpoint', '') - return "http://{host}:{port}/{endpoint}".format(**locals()) + return "{conn_type}://{host}:{port}/{endpoint}".format(**locals()) def submit_indexing_job(self, json_index_spec): url = self.get_conn_url() req_index = requests.post(url, data=json_index_spec, headers=self.header) if (req_index.status_code != 200): - raise AirflowException("Did not get 200 when submitting the Druid job to {}".format(url)) + raise AirflowException('Did not get 200 when ' + 'submitting the Druid job to {}'.format(url)) req_json = req_index.json() # Wait until the job is completed @@ -85,7 +86,8 @@ class DruidHook(BaseHook): if self.max_ingestion_time and sec > self.max_ingestion_time: # ensure that the job gets killed if the max ingestion time is exceeded requests.post("{0}/{1}/shutdown".format(url, druid_task_id)) - raise AirflowException('Druid ingestion took more than %s seconds', self.max_ingestion_time) + raise AirflowException('Druid ingestion took more than ' + '%s seconds', self.max_ingestion_time) time.sleep(self.timeout) @@ -95,7 +97,8 @@ class DruidHook(BaseHook): elif status == 'SUCCESS': running = False # Great success! elif status == 'FAILED': - raise AirflowException('Druid indexing job failed, check console for more info') + raise AirflowException('Druid indexing job failed, ' + 'check console for more info') else: raise AirflowException('Could not get status of the job, got %s', status) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e48b8e36/tests/hooks/test_druid_hook.py ---------------------------------------------------------------------- diff --git a/tests/hooks/test_druid_hook.py b/tests/hooks/test_druid_hook.py index 72c88d8..6fd7b3c 100644 --- a/tests/hooks/test_druid_hook.py +++ b/tests/hooks/test_druid_hook.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -18,7 +18,7 @@ # under the License. # -import mock +from mock import MagicMock, patch import requests import requests_mock import unittest @@ -31,98 +31,116 @@ class TestDruidHook(unittest.TestCase): def setUp(self): super(TestDruidHook, self).setUp() - session = requests.Session() adapter = requests_mock.Adapter() session.mount('mock', adapter) + class TestDRuidhook(DruidHook): + def get_conn_url(self): + return 'http://druid-overlord:8081/druid/indexer/v1/task' + self.db_hook = TestDRuidhook() + @requests_mock.mock() def test_submit_gone_wrong(self, m): - hook = DruidHook() task_post = m.post( 'http://druid-overlord:8081/druid/indexer/v1/task', text='{"task":"9f8a7359-77d4-4612-b0cd-cc2f6a3c28de"}' ) status_check = m.get( - 'http://druid-overlord:8081/druid/indexer/v1/task/9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/status', + 'http://druid-overlord:8081/druid/indexer/v1/task/' + '9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/status', text='{"status":{"status": "FAILED"}}' ) # The job failed for some reason with self.assertRaises(AirflowException): - hook.submit_indexing_job('Long json file') + self.db_hook.submit_indexing_job('Long json file') self.assertTrue(task_post.called_once) self.assertTrue(status_check.called_once) @requests_mock.mock() def test_submit_ok(self, m): - hook = DruidHook() task_post = m.post( 'http://druid-overlord:8081/druid/indexer/v1/task', text='{"task":"9f8a7359-77d4-4612-b0cd-cc2f6a3c28de"}' ) status_check = m.get( - 'http://druid-overlord:8081/druid/indexer/v1/task/9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/status', + 'http://druid-overlord:8081/druid/indexer/v1/task/' + '9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/status', text='{"status":{"status": "SUCCESS"}}' ) # Exists just as it should - hook.submit_indexing_job('Long json file') + self.db_hook.submit_indexing_job('Long json file') self.assertTrue(task_post.called_once) self.assertTrue(status_check.called_once) @requests_mock.mock() def test_submit_unknown_response(self, m): - hook = DruidHook() task_post = m.post( 'http://druid-overlord:8081/druid/indexer/v1/task', text='{"task":"9f8a7359-77d4-4612-b0cd-cc2f6a3c28de"}' ) status_check = m.get( - 'http://druid-overlord:8081/druid/indexer/v1/task/9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/status', + 'http://druid-overlord:8081/druid/indexer/v1/task/' + '9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/status', text='{"status":{"status": "UNKNOWN"}}' ) # An unknown error code with self.assertRaises(AirflowException): - hook.submit_indexing_job('Long json file') + self.db_hook.submit_indexing_job('Long json file') self.assertTrue(task_post.called_once) self.assertTrue(status_check.called_once) @requests_mock.mock() def test_submit_timeout(self, m): - hook = DruidHook(timeout=0, max_ingestion_time=5) + self.db_hook.timeout = 0 + self.db_hook.max_ingestion_time = 5 task_post = m.post( 'http://druid-overlord:8081/druid/indexer/v1/task', text='{"task":"9f8a7359-77d4-4612-b0cd-cc2f6a3c28de"}' ) status_check = m.get( - 'http://druid-overlord:8081/druid/indexer/v1/task/9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/status', + 'http://druid-overlord:8081/druid/indexer/v1/task/' + '9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/status', text='{"status":{"status": "RUNNING"}}' ) shutdown_post = m.post( - 'http://druid-overlord:8081/druid/indexer/v1/task/9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/shutdown', + 'http://druid-overlord:8081/druid/indexer/v1/task/' + '9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/shutdown', text='{"task":"9f8a7359-77d4-4612-b0cd-cc2f6a3c28de"}' ) # Because the jobs keeps running with self.assertRaises(AirflowException): - hook.submit_indexing_job('Long json file') + self.db_hook.submit_indexing_job('Long json file') self.assertTrue(task_post.called_once) self.assertTrue(status_check.called) self.assertTrue(shutdown_post.called_once) + @patch('airflow.hooks.druid_hook.DruidHook.get_connection') + def test_get_conn_url(self, mock_get_connection): + get_conn_value = MagicMock() + get_conn_value.host = 'test_host' + get_conn_value.conn_type = 'https' + get_conn_value.port = '1' + get_conn_value.extra_dejson = {'endpoint': 'ingest'} + mock_get_connection.return_value = get_conn_value + hook = DruidHook(timeout=0, max_ingestion_time=5) + self.assertEquals(hook.get_conn_url(), 'https://test_host:1/ingest') + class TestDruidDbApiHook(unittest.TestCase): def setUp(self): super(TestDruidDbApiHook, self).setUp() - self.cur = mock.MagicMock() - self.conn = conn = mock.MagicMock() + self.cur = MagicMock() + self.conn = conn = MagicMock() self.conn.host = 'host' self.conn.port = '1000' self.conn.conn_type = 'druid'
