Repository: incubator-airflow Updated Branches: refs/heads/master 644f5d43a -> d231dce37
[AIRFLOW-300] Add Google Pubsub hook and operator Only publishing and topic creation are included. Topic consumption was explicitly not included in this feature request. Closes #2036 from wwlian/airflow-300 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/d231dce3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/d231dce3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/d231dce3 Branch: refs/heads/master Commit: d231dce37d753ed196a26d9b244ddf376385de38 Parents: 644f5d4 Author: Wilson Lian <[email protected]> Authored: Wed Jul 5 16:20:17 2017 -0700 Committer: Chris Riccomini <[email protected]> Committed: Wed Jul 5 16:20:26 2017 -0700 ---------------------------------------------------------------------- airflow/contrib/hooks/__init__.py | 3 +- airflow/contrib/hooks/gcp_pubsub_hook.py | 93 ++++++++++++ airflow/contrib/operators/pubsub_operator.py | 170 ++++++++++++++++++++++ tests/contrib/hooks/gcp_pubsub_hook.py | 102 +++++++++++++ tests/contrib/operators/pubsub_operator.py | 77 ++++++++++ 5 files changed, 444 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d231dce3/airflow/contrib/hooks/__init__.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/__init__.py b/airflow/contrib/hooks/__init__.py index 4941314..977c2ce 100644 --- a/airflow/contrib/hooks/__init__.py +++ b/airflow/contrib/hooks/__init__.py @@ -46,7 +46,8 @@ _hooks = { 'spark_submit_operator': ['SparkSubmitOperator'], 'cloudant_hook': ['CloudantHook'], 'fs_hook': ['FSHook'], - 'wasb_hook': ['WasbHook'] + 'wasb_hook': ['WasbHook'], + 'gcp_pubsub_hook': ['PubSubHook'] } import os as _os http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d231dce3/airflow/contrib/hooks/gcp_pubsub_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/gcp_pubsub_hook.py b/airflow/contrib/hooks/gcp_pubsub_hook.py new file mode 100644 index 0000000..529d121 --- /dev/null +++ b/airflow/contrib/hooks/gcp_pubsub_hook.py @@ -0,0 +1,93 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from apiclient.discovery import build +from apiclient import errors + +from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook + + +def _format_topic(project, topic): + return 'projects/%s/topics/%s' % (project, topic) + + +class PubSubHook(GoogleCloudBaseHook): + """Hook for accessing Google Pub/Sub. + + The GCP project against which actions are applied is determined by + the project embedded in the Connection referenced by gcp_conn_id. + """ + + def __init__(self, + gcp_conn_id='google_cloud_default', + delegate_to=None): + super(PubSubHook, self).__init__(gcp_conn_id, delegate_to=delegate_to) + + def get_conn(self): + """Returns a Pub/Sub service object. + + :rtype: apiclient.discovery.Resource + """ + http_authorized = self._authorize() + return build('pubsub', 'v1', http=http_authorized) + + def publish(self, project, topic, messages): + """Publishes messages to a Pub/Sub topic. + + :param project: the GCP project name or ID in which to publish + :type project: string + :param topic: the Pub/Sub topic to which to publish; do not + include the 'projects/{project}/topics/' prefix. + :type topic: string + :param messages: messages to publish; if the data field in a + message is set, it should already be base64 encoded. + :type messages: list of PubSub messages; see + http://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage + """ + body = {'messages': messages} + full_topic = _format_topic(project, topic) + request = self.get_conn().projects().topics().publish( + topic=full_topic, body=body) + try: + request.execute() + except errors.HttpError as e: + raise Exception('Error publishing to topic %s' % full_topic, e) + + def create_topic(self, project, topic, fail_if_exists=False): + """Creates a Pub/Sub topic, if it does not already exist. + + :param project: the GCP project name or ID in which to create + the topic + :type project: string + :param topic: the Pub/Sub topic name to create; do not + include the 'projects/{project}/topics/' prefix. + :type topic: string + :param fail_if_exists: if set, raise an exception if the topic + already exists + :type fail_if_exists: bool + """ + service = self.get_conn() + full_topic = _format_topic(project, topic) + try: + service.projects().topics().create( + name=full_topic, body={}).execute() + except errors.HttpError as e: + # Status code 409 indicates that the topic already exists. + if str(e.resp['status']) == '409': + if fail_if_exists: + raise Exception( + 'Error creating topic. Topic already exists: %s' + % full_topic) + else: + raise Exception('Error creating topic %s' % full_topic, e) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d231dce3/airflow/contrib/operators/pubsub_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/pubsub_operator.py b/airflow/contrib/operators/pubsub_operator.py new file mode 100644 index 0000000..f68c6a9 --- /dev/null +++ b/airflow/contrib/operators/pubsub_operator.py @@ -0,0 +1,170 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from airflow.contrib.hooks.gcp_pubsub_hook import PubSubHook +from airflow.models import BaseOperator +from airflow.utils.decorators import apply_defaults + + +class PubSubTopicCreateOperator(BaseOperator): + """Create a PubSub topic. + + By default, if the topic already exists, this operator will + not cause the DAG to fail. + ``` + with DAG('successful DAG') as dag: + ( + dag + >> PubSubTopicCreateOperator(topic='my_new_topic') + >> PubSubTopicCreateOperator(topic='my_new_topic') + ) + ``` + + The operator can be configured to fail if the topic already exists. + ``` + with DAG('failing DAG') as dag: + ( + dag + >> PubSubTopicCreateOperator(topic='my_new_topic') + >> PubSubTopicCreateOperator(topic='my_new_topic', + fail_if_exists=True) + ) + ``` + + Both ``project`` and ``topic`` are templated so you can use + variables in them. + """ + template_fields = ['project', 'topic'] + ui_color = '#0273d4' + + @apply_defaults + def __init__( + self, + project, + topic, + fail_if_exists=False, + gcp_conn_id='google_cloud_default', + delegate_to=None, + *args, + **kwargs): + """ + :param project: the GCP project name or ID in which to work + (templated) + :type project: string + :param topic: the topic to create. Do not include the + full topic path. In other words, instead of + ``projects/{project}/topics/{topic}``, provide only + ``{topic}``. (templated) + :type topic: string + :param gcp_conn_id: The connection ID to use connecting to + Google Cloud Platform. + :type gcp_conn_id: string + :param delegate_to: The account to impersonate, if any. + For this to work, the service account making the request + must have domain-wide delegation enabled. + :type delegate_to: string + """ + super(PubSubTopicCreateOperator, self).__init__(*args, **kwargs) + + self.project = project + self.topic = topic + self.fail_if_exists = fail_if_exists + self.gcp_conn_id = gcp_conn_id + self.delegate_to = delegate_to + + def execute(self, context): + hook = PubSubHook(gcp_conn_id=self.gcp_conn_id, + delegate_to=self.delegate_to) + + hook.create_topic(self.project, self.topic, + fail_if_exists=self.fail_if_exists) + + +class PubSubPublishOperator(BaseOperator): + """Publish messages to a PubSub topic. + + Each Task publishes all provided messages to the same topic + in a single GCP project. If the topic does not exist, this + task will fail. + + ``` + from base64 import b64encode as b64e + + m1 = {'data': b64e('Hello, World!'), + 'attributes': {'type': 'greeting'} + } + m2 = {'data': b64e('Knock, knock')} + m3 = {'attributes': {'foo': ''}} + + t1 = PubSubPublishOperator( + topic='my_topic', + messages=[m1, m2, m3], + create_topic=True, + dag=dag) + ``` + Both ``project`` and ``topic`` are templated so you can use + variables in them. + """ + template_fields = ['project', 'topic', 'messages'] + ui_color = '#0273d4' + + @apply_defaults + def __init__( + self, + project, + topic, + messages, + gcp_conn_id='google_cloud_default', + delegate_to=None, + *args, + **kwargs): + """ + :param project: the GCP project name or ID in which to work + (templated) + :type project: string + :param topic: the topic to which to publish. Do not include the + full topic path. In other words, instead of + ``projects/{project}/topics/{topic}``, provide only + ``{topic}``. (templated) + :type topic: string + :param messages: a list of messages to be published to the + topic. Each message is a dict with one or more of the + following keys-value mappings: + * 'data': a base64-encoded string + * 'attributes': {'key1': 'value1', ...} + Each message must contain at least a non-empty 'data' value + or an attribute dict with at least one key. See + https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage + (templated) + :type messages: list + :param gcp_conn_id: The connection ID to use connecting to + Google Cloud Platform. + :type gcp_conn_id: string + :param delegate_to: The account to impersonate, if any. + For this to work, the service account making the request + must have domain-wide delegation enabled. + :type delegate_to: string + """ + super(PubSubPublishOperator, self).__init__(*args, **kwargs) + + self.gcp_conn_id = gcp_conn_id + self.delegate_to = delegate_to + self.project = project + self.topic = topic + self.messages = messages + + def execute(self, context): + hook = PubSubHook(gcp_conn_id=self.gcp_conn_id, + delegate_to=self.delegate_to) + hook.publish(self.project, self.topic, self.messages) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d231dce3/tests/contrib/hooks/gcp_pubsub_hook.py ---------------------------------------------------------------------- diff --git a/tests/contrib/hooks/gcp_pubsub_hook.py b/tests/contrib/hooks/gcp_pubsub_hook.py new file mode 100644 index 0000000..9572c33 --- /dev/null +++ b/tests/contrib/hooks/gcp_pubsub_hook.py @@ -0,0 +1,102 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from base64 import b64encode as b64e +import unittest + +from apiclient.errors import HttpError + +from airflow.contrib.hooks.gcp_pubsub_hook import PubSubHook + +try: + from unittest import mock +except ImportError: + try: + import mock + except ImportError: + mock = None + + +BASE_STRING = 'airflow.contrib.hooks.gcp_api_base_hook.{}' +PUBSUB_STRING = 'airflow.contrib.hooks.gcp_pubsub_hook.{}' + +TEST_PROJECT = 'test-project' +TEST_TOPIC = 'test-topic' +TEST_MESSAGES = [ + { + 'data': b64e('Hello, World!'), + 'attributes': {'type': 'greeting'} + }, + {'data': b64e('Knock, knock')}, + {'attributes': {'foo': ''}}] + +EXPANDED_TOPIC = 'projects/%s/topics/%s' % (TEST_PROJECT, TEST_TOPIC) + + +def mock_init(self, gcp_conn_id, delegate_to=None): + pass + + +class PubSubHookTest(unittest.TestCase): + def setUp(self): + with mock.patch(BASE_STRING.format('GoogleCloudBaseHook.__init__'), + new=mock_init): + self.pubsub_hook = PubSubHook(gcp_conn_id='test') + + @mock.patch(PUBSUB_STRING.format('PubSubHook.get_conn')) + def test_create_nonexistent_topic(self, mock_service): + self.pubsub_hook.create_topic(TEST_PROJECT, TEST_TOPIC) + + create_method = (mock_service.return_value.projects.return_value.topics + .return_value.create) + create_method.assert_called_with(body={}, name=EXPANDED_TOPIC) + create_method.return_value.execute.assert_called_with() + + @mock.patch(PUBSUB_STRING.format('PubSubHook.get_conn')) + def test_create_preexisting_topic_failifexists(self, mock_service): + (mock_service.return_value.projects.return_value.topics.return_value + .create.return_value.execute.side_effect) = HttpError( + resp={'status': '409'}, content='') + + try: + self.pubsub_hook.create_topic(TEST_PROJECT, TEST_TOPIC, + fail_if_exists=True) + except Exception: + pass # Expected. + else: + self.fail('Topic creation should fail for existing topic when ' + 'fail_if_exists=True') + + @mock.patch(PUBSUB_STRING.format('PubSubHook.get_conn')) + def test_create_preexisting_topic_nofailifexists(self, mock_service): + (mock_service.return_value.projects.return_value.topics.return_value + .get.return_value.execute.side_effect) = HttpError( + resp={'status': '409'}, content='') + + try: + self.pubsub_hook.create_topic(TEST_PROJECT, TEST_TOPIC, + fail_if_exists=False) + except Exception: + self.fail('Topic creation should not fail for existing topic when ' + 'fail_if_exists=False') + + @mock.patch(PUBSUB_STRING.format('PubSubHook.get_conn')) + def test_publish(self, mock_service): + self.pubsub_hook.publish(TEST_PROJECT, TEST_TOPIC, TEST_MESSAGES) + + publish_method = (mock_service.return_value.projects.return_value + .topics.return_value.publish) + publish_method.assert_called_with( + topic=EXPANDED_TOPIC, body={'messages': TEST_MESSAGES}) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d231dce3/tests/contrib/operators/pubsub_operator.py ---------------------------------------------------------------------- diff --git a/tests/contrib/operators/pubsub_operator.py b/tests/contrib/operators/pubsub_operator.py new file mode 100644 index 0000000..a52bbc6 --- /dev/null +++ b/tests/contrib/operators/pubsub_operator.py @@ -0,0 +1,77 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from base64 import b64encode as b64e +import unittest + +from airflow.contrib.operators.pubsub_operator import PubSubPublishOperator +from airflow.contrib.operators.pubsub_operator import PubSubTopicCreateOperator + +try: + from unittest import mock +except ImportError: + try: + import mock + except ImportError: + mock = None + +TASK_ID = 'test-task-id' +TEST_PROJECT = 'test-project' +TEST_TOPIC = 'test-topic' +TEST_MESSAGES = [ + { + 'data': b64e('Hello, World!'), + 'attributes': {'type': 'greeting'} + }, + {'data': b64e('Knock, knock')}, + {'attributes': {'foo': ''}}] + + +class PubSubTopicCreateOperatorTest(unittest.TestCase): + + @mock.patch('airflow.contrib.operators.pubsub_operator.PubSubHook') + def test_failifexists(self, mock_hook): + operator = PubSubTopicCreateOperator(task_id=TASK_ID, + project=TEST_PROJECT, + topic=TEST_TOPIC, + fail_if_exists=True) + + operator.execute(None) + mock_hook.return_value.create_topic.assert_called_once_with( + TEST_PROJECT, TEST_TOPIC, fail_if_exists=True) + + @mock.patch('airflow.contrib.operators.pubsub_operator.PubSubHook') + def test_succeedifexists(self, mock_hook): + operator = PubSubTopicCreateOperator(task_id=TASK_ID, + project=TEST_PROJECT, + topic=TEST_TOPIC, + fail_if_exists=False) + + operator.execute(None) + mock_hook.return_value.create_topic.assert_called_once_with( + TEST_PROJECT, TEST_TOPIC, fail_if_exists=False) + + +class PubSubPublishOperatorTest(unittest.TestCase): + + @mock.patch('airflow.contrib.operators.pubsub_operator.PubSubHook') + def test_publish(self, mock_hook): + operator = PubSubPublishOperator(task_id=TASK_ID, + project=TEST_PROJECT, + topic=TEST_TOPIC, + messages=TEST_MESSAGES) + + operator.execute(None) + mock_hook.return_value.publish.assert_called_once_with( + TEST_PROJECT, TEST_TOPIC, TEST_MESSAGES)
