Repository: incubator-airflow Updated Branches: refs/heads/master 721bc0927 -> 64206615a
[AIRFLOW-2217] Add Slack webhook operator Add the Slack webhook hook/operator pair. This allows posting messages to Slack in an easy, light-weight manner. Closes #3129 from danielvdende/AIRFLOW-2217-add- slack-webhook-operator Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/64206615 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/64206615 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/64206615 Branch: refs/heads/master Commit: 64206615a790c90893d5836da8d2f7159bda23ac Parents: 721bc09 Author: Daniel van der Ende <[email protected]> Authored: Wed Mar 28 23:23:44 2018 +0200 Committer: Fokko Driesprong <[email protected]> Committed: Wed Mar 28 23:23:44 2018 +0200 ---------------------------------------------------------------------- airflow/contrib/hooks/slack_webhook_hook.py | 124 +++++++++++++++++++ .../contrib/operators/slack_webhook_operator.py | 87 +++++++++++++ tests/contrib/hooks/test_slack_webhook_hook.py | 87 +++++++++++++ .../operators/test_slack_webhook_operator.py | 65 ++++++++++ 4 files changed, 363 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/64206615/airflow/contrib/hooks/slack_webhook_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/slack_webhook_hook.py b/airflow/contrib/hooks/slack_webhook_hook.py new file mode 100644 index 0000000..d946e51 --- /dev/null +++ b/airflow/contrib/hooks/slack_webhook_hook.py @@ -0,0 +1,124 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import json + +from airflow.hooks.http_hook import HttpHook +from airflow.exceptions import AirflowException + + +class SlackWebhookHook(HttpHook): + """ + This hook allows you to post messages to Slack using incoming webhooks. + Takes both Slack webhook token directly and connection that has Slack webhook token. + If both supplied, Slack webhook token will be used. + + Each Slack webhook token can be pre-configured to use a specific channel, username and + icon. You can override these defaults in this hook. + + :param http_conn_id: connection that has Slack webhook token in the extra field + :type http_conn_id: str + :param webhook_token: Slack webhook token + :type webhook_token: str + :param message: The message you want to send on Slack + :type message: str + :param channel: The channel the message should be posted to + :type channel: str + :param username: The username to post to slack with + :type username: str + :param icon_emoji: The emoji to use as icon for the user posting to Slack + :type icon_emoji: str + :param link_names: Whether or not to find and link channel and usernames in your + message + :type link_names: bool + :param proxy: Proxy to use to make the Slack webhook call + :type proxy: str + """ + def __init__(self, + http_conn_id=None, + webhook_token=None, + message="", + channel=None, + username=None, + icon_emoji=None, + link_names=False, + proxy=None, + *args, + **kwargs + ): + super(SlackWebhookHook, self).__init__(*args, **kwargs) + self.http_conn_id = http_conn_id + self.webhook_token = self._get_token(webhook_token, http_conn_id) + self.message = message + self.channel = channel + self.username = username + self.icon_emoji = icon_emoji + self.link_names = link_names + self.proxy = proxy + + def _get_token(self, token, http_conn_id): + """ + Given either a manually set token or a conn_id, return the webhook_token to use + :param token: The manually provided token + :param conn_id: The conn_id provided + :return: webhook_token (str) to use + """ + if token: + return token + elif http_conn_id: + conn = self.get_connection(http_conn_id) + extra = conn.extra_dejson + return extra.get('webhook_token', '') + else: + raise AirflowException('Cannot get token: No valid Slack ' + 'webhook token nor conn_id supplied') + + def _build_slack_message(self): + """ + Construct the Slack message. All relevant parameters are combined here to a valid + Slack json message + :return: Slack message (str) to send + """ + cmd = {} + + if self.channel: + cmd['channel'] = self.channel + if self.username: + cmd['username'] = self.username + if self.icon_emoji: + cmd['icon_emoji'] = self.icon_emoji + if self.link_names: + cmd['link_names'] = 1 + + # there should always be a message to post ;-) + cmd['text'] = self.message + return json.dumps(cmd) + + def execute(self): + """ + Remote Popen (actually execute the slack webhook call) + + :param cmd: command to remotely execute + :param kwargs: extra arguments to Popen (see subprocess.Popen) + """ + proxies = {} + if self.proxy: + # we only need https proxy for Slack, as the endpoint is https + proxies = {'https': self.proxy} + + slack_message = self._build_slack_message() + self.run(endpoint=self.webhook_token, + data=slack_message, + headers={'Content-type': 'application/json'}, + extra_options={'proxies': proxies}) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/64206615/airflow/contrib/operators/slack_webhook_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/slack_webhook_operator.py b/airflow/contrib/operators/slack_webhook_operator.py new file mode 100644 index 0000000..dff1c40 --- /dev/null +++ b/airflow/contrib/operators/slack_webhook_operator.py @@ -0,0 +1,87 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from airflow.operators.http_operator import SimpleHttpOperator +from airflow.utils.decorators import apply_defaults +from airflow.contrib.hooks.slack_webhook_hook import SlackWebhookHook + + +class SlackWebhookOperator(SimpleHttpOperator): + """ + This operator allows you to post messages to Slack using incoming webhooks. + Takes both Slack webhook token directly and connection that has Slack webhook token. + If both supplied, Slack webhook token will be used. + + Each Slack webhook token can be pre-configured to use a specific channel, username and + icon. You can override these defaults in this hook. + + :param conn_id: connection that has Slack webhook token in the extra field + :type conn_id: str + :param webhook_token: Slack webhook token + :type webhook_token: str + :param message: The message you want to send on Slack + :type message: str + :param channel: The channel the message should be posted to + :type channel: str + :param username: The username to post to slack with + :type username: str + :param icon_emoji: The emoji to use as icon for the user posting to Slack + :type icon_emoji: str + :param link_names: Whether or not to find and link channel and usernames in your + message + :type link_names: bool + :param proxy: Proxy to use to make the Slack webhook call + :type proxy: str + """ + + @apply_defaults + def __init__(self, + http_conn_id=None, + webhook_token=None, + message="", + channel=None, + username=None, + icon_emoji=None, + link_names=False, + proxy=None, + *args, + **kwargs): + super(SlackWebhookOperator, self).__init__(endpoint=webhook_token, + *args, + **kwargs) + self.http_conn_id = http_conn_id + self.webhook_token = webhook_token + self.message = message + self.channel = channel + self.username = username + self.icon_emoji = icon_emoji + self.link_names = link_names + self.proxy = proxy + self.hook = None + + def execute(self, context): + """ + Call the SparkSqlHook to run the provided sql query + """ + self.hook = SlackWebhookHook( + self.http_conn_id, + self.webhook_token, + self.message, + self.channel, + self.username, + self.icon_emoji, + self.link_names, + self.proxy + ) + self.hook.execute() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/64206615/tests/contrib/hooks/test_slack_webhook_hook.py ---------------------------------------------------------------------- diff --git a/tests/contrib/hooks/test_slack_webhook_hook.py b/tests/contrib/hooks/test_slack_webhook_hook.py new file mode 100644 index 0000000..7977ee6 --- /dev/null +++ b/tests/contrib/hooks/test_slack_webhook_hook.py @@ -0,0 +1,87 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import json +import unittest + +from airflow import configuration, models +from airflow.utils import db + +from airflow.contrib.hooks.slack_webhook_hook import SlackWebhookHook + + +class TestSlackWebhookHook(unittest.TestCase): + + _config = { + 'http_conn_id': 'slack-webhook-default', + 'webhook_token': 'manual_token', + 'message': 'Awesome message to put on Slack', + 'channel': '#general', + 'username': 'SlackMcSlackFace', + 'icon_emoji': ':hankey:', + 'link_names': True, + 'proxy': 'https://my-horrible-proxy.proxyist.com:8080' + } + expected_message_dict = {'channel': _config['channel'], + 'username': _config['username'], + 'icon_emoji': _config['icon_emoji'], + 'link_names': 1, + 'text': _config['message'] + } + expected_message = json.dumps(expected_message_dict) + + def setUp(self): + configuration.load_test_config() + db.merge_conn( + models.Connection( + conn_id='slack-webhook-default', + extra='{"webhook_token": "your_token_here"}') + ) + + def test_get_token_manual_token(self): + # Given + manual_token = 'manual_token_here' + hook = SlackWebhookHook(webhook_token=manual_token) + + # When + webhook_token = hook._get_token(manual_token, None) + + # Then + self.assertEqual(webhook_token, manual_token) + + def test_get_token_conn_id(self): + # Given + conn_id = 'slack-webhook-default' + hook = SlackWebhookHook(http_conn_id=conn_id) + expected_webhook_token = 'your_token_here' + + # When + webhook_token = hook._get_token(None, conn_id) + + # Then + self.assertEqual(webhook_token, expected_webhook_token) + + def test_build_slack_message(self): + # Given + hook = SlackWebhookHook(**self._config) + + # When + message = hook._build_slack_message() + + # Then + self.assertEqual(self.expected_message, message) + + +if __name__ == '__main__': + unittest.main() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/64206615/tests/contrib/operators/test_slack_webhook_operator.py ---------------------------------------------------------------------- diff --git a/tests/contrib/operators/test_slack_webhook_operator.py b/tests/contrib/operators/test_slack_webhook_operator.py new file mode 100644 index 0000000..4ea0a60 --- /dev/null +++ b/tests/contrib/operators/test_slack_webhook_operator.py @@ -0,0 +1,65 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import unittest + +from airflow import DAG, configuration + +from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator +from airflow.utils import timezone + +DEFAULT_DATE = timezone.datetime(2017, 1, 1) + + +class TestSlackWebhookOperator(unittest.TestCase): + _config = { + 'http_conn_id': 'slack-webhook-default', + 'webhook_token': 'manual_token', + 'message': 'your message here', + 'channel': '#general', + 'username': 'SlackMcSlackFace', + 'icon_emoji': ':hankey', + 'link_names': True, + 'proxy': 'https://my-horrible-proxy.proxyist.com:8080' + } + + def setUp(self): + configuration.load_test_config() + args = { + 'owner': 'airflow', + 'start_date': DEFAULT_DATE + } + self.dag = DAG('test_dag_id', default_args=args) + + def test_execute(self): + # Given / When + operator = SlackWebhookOperator( + task_id='slack_webhook_job', + dag=self.dag, + **self._config + ) + + self.assertEqual(self._config['http_conn_id'], operator.http_conn_id) + self.assertEqual(self._config['webhook_token'], operator.webhook_token) + self.assertEqual(self._config['message'], operator.message) + self.assertEqual(self._config['channel'], operator.channel) + self.assertEqual(self._config['username'], operator.username) + self.assertEqual(self._config['icon_emoji'], operator.icon_emoji) + self.assertEqual(self._config['link_names'], operator.link_names) + self.assertEqual(self._config['proxy'], operator.proxy) + + +if __name__ == '__main__': + unittest.main()
