Repository: incubator-airflow Updated Branches: refs/heads/master 485280a9f -> a097627d8
[AIRFLOW-826] Add Zendesk hook Closes #2066 from shreyasjoshis/add-zendesk-hook Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/a097627d Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/a097627d Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/a097627d Branch: refs/heads/master Commit: a097627d8236d9244d39eee0b8cbfa22f726fe8f Parents: 485280a Author: Shreyas Joshi <shreyasjos...@github.com> Authored: Tue Feb 14 14:55:36 2017 -0800 Committer: Chris Riccomini <chr...@wepay.com> Committed: Tue Feb 14 14:55:36 2017 -0800 ---------------------------------------------------------------------- airflow/hooks/zendesk_hook.py | 102 +++++++++++++++++++++++++++++++ tests/contrib/hooks/zendesk_hook.py | 90 +++++++++++++++++++++++++++ 2 files changed, 192 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a097627d/airflow/hooks/zendesk_hook.py ---------------------------------------------------------------------- diff --git a/airflow/hooks/zendesk_hook.py b/airflow/hooks/zendesk_hook.py new file mode 100644 index 0000000..438597f --- /dev/null +++ b/airflow/hooks/zendesk_hook.py @@ -0,0 +1,102 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +""" +A hook to talk to Zendesk +""" + +import logging +import time +from zdesk import Zendesk, RateLimitError, ZendeskError +from airflow.hooks import BaseHook + + +class ZendeskHook(BaseHook): + def __init__(self, zendesk_conn_id): + self.__zendesk_conn_id = zendesk_conn_id + self.__url = None + + def get_conn(self): + conn = self.get_connection(self.__zendesk_conn_id) + self.__url = "https://" + conn.host + return Zendesk(self.__url, conn.login, conn.password, True) + + def __handle_rate_limit_exception(self, rate_limit_exception): + """ + Sleep for the time specified in the exception. If not specified, wait + for 60 seconds. + """ + retry_after = int( + rate_limit_exception.response.headers.get('Retry-After', 60)) + logging.info( + "Hit Zendesk API rate limit. Pausing for {} " + "seconds".format( + retry_after)) + time.sleep(retry_after) + + def call(self, path, query=None, get_all_pages=True): + """ + Call Zendesk API and return results + + :param path: The Zendesk API to call + :param query: Query parameters + :param get_all_pages: Accumulate results over all pages before + returning. Due to strict rate limiting, this can often timeout. + Waits for recommended period between tries after a timeout. + """ + zendesk = self.get_conn() + first_request_successful = False + + while not first_request_successful: + try: + results = zendesk.call(path, query) + first_request_successful = True + except RateLimitError as rle: + self.__handle_rate_limit_exception(rle) + + # Find the key with the results + key = path.split("/")[-1].split(".json")[0] + next_page = results['next_page'] + results = results[key] + + if get_all_pages: + while next_page is not None: + try: + # Need to split because the next page URL has + # `github.zendesk...` + # in it, but the call function needs it removed. + next_url = next_page.split(self.__url)[1] + logging.info("Calling {}".format(next_url)) + more_res = zendesk.call(next_url) + results.extend(more_res[key]) + if next_page == more_res['next_page']: + # Unfortunately zdesk doesn't always throw ZendeskError + # when we are done getting all the data. Sometimes the + # next just refers to the current set of results. Hence, + # need to deal with this special case + break + else: + next_page = more_res['next_page'] + except RateLimitError as rle: + self.__handle_rate_limit_exception(rle) + except ZendeskError as ze: + if b"Use a start_time older than 5 minutes" in ze.msg: + # We have pretty up to date data + break + else: + raise ze + + return results http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a097627d/tests/contrib/hooks/zendesk_hook.py ---------------------------------------------------------------------- diff --git a/tests/contrib/hooks/zendesk_hook.py b/tests/contrib/hooks/zendesk_hook.py new file mode 100644 index 0000000..66b8e6b --- /dev/null +++ b/tests/contrib/hooks/zendesk_hook.py @@ -0,0 +1,90 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +from unittest.mock import Mock, patch +from plugins.hooks.zendesk_hook import ZendeskHook +from zdesk import RateLimitError +from pytest import raises + + +@patch("plugins.hooks.zendesk_hook.time") +@patch("plugins.hooks.zendesk_hook.Zendesk") +def test_sleeps_for_correct_interval(_, mocked_time): + sleep_time = 10 + + # To break out of the otherwise infinite tries + mocked_time.sleep = Mock(side_effect=ValueError) + conn_mock = Mock() + mock_response = Mock() + mock_response.headers.get.return_value = sleep_time + conn_mock.call = Mock( + side_effect=RateLimitError(msg="some message", code="some code", + response=mock_response)) + + zendesk_hook = ZendeskHook("conn_id") + zendesk_hook.get_conn = Mock(return_value=conn_mock) + + with raises(ValueError): + zendesk_hook.call("some_path", get_all_pages=False) + mocked_time.sleep.assert_called_with(sleep_time) + + +@patch("plugins.hooks.zendesk_hook.Zendesk") +def test_returns_single_page_if_get_all_pages_false(_): + zendesk_hook = ZendeskHook("conn_id") + mock_connection = Mock() + mock_connection.host = "some_host" + zendesk_hook.get_connection = Mock(return_value=mock_connection) + zendesk_hook.get_conn() + + mock_conn = Mock() + mock_call = Mock( + return_value={'next_page': 'https://some_host/something', 'path': []}) + mock_conn.call = mock_call + zendesk_hook.get_conn = Mock(return_value=mock_conn) + zendesk_hook.call("path", get_all_pages=False) + mock_call.assert_called_once_with("path", None) + + +@patch("plugins.hooks.zendesk_hook.Zendesk") +def test_returns_multiple_pages_if_get_all_pages_true(_): + zendesk_hook = ZendeskHook("conn_id") + mock_connection = Mock() + mock_connection.host = "some_host" + zendesk_hook.get_connection = Mock(return_value=mock_connection) + zendesk_hook.get_conn() + + mock_conn = Mock() + mock_call = Mock( + return_value={'next_page': 'https://some_host/something', 'path': []}) + mock_conn.call = mock_call + zendesk_hook.get_conn = Mock(return_value=mock_conn) + zendesk_hook.call("path", get_all_pages=True) + assert mock_call.call_count == 2 + + +@patch("plugins.hooks.zendesk_hook.Zendesk") +def test_zdesk_is_inited_correctly(mock_zendesk): + conn_mock = Mock() + conn_mock.host = "conn_host" + conn_mock.login = "conn_login" + conn_mock.password = "conn_pass" + + zendesk_hook = ZendeskHook("conn_id") + zendesk_hook.get_connection = Mock(return_value=conn_mock) + zendesk_hook.get_conn() + mock_zendesk.assert_called_with('https://conn_host', 'conn_login', + 'conn_pass', True)