This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch v2-0-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 8faa1bdb02422bd62eb730da7d653164050a7dd9 Author: Kamil BreguĊa <[email protected]> AuthorDate: Tue Dec 22 13:02:59 2020 +0100 Support google-cloud-pubsub>=2.0.0 (#13127) (cherry picked from commit 8c00ec89b97aa6e725379d08c8ff29a01be47e73) --- airflow/providers/google/cloud/hooks/pubsub.py | 81 ++++---- airflow/providers/google/cloud/operators/pubsub.py | 3 +- airflow/providers/google/cloud/sensors/pubsub.py | 3 +- setup.py | 2 +- tests/providers/google/cloud/hooks/test_pubsub.py | 221 +++++++++++---------- .../google/cloud/operators/test_pubsub.py | 16 +- .../providers/google/cloud/sensors/test_pubsub.py | 16 +- 7 files changed, 177 insertions(+), 165 deletions(-) diff --git a/airflow/providers/google/cloud/hooks/pubsub.py b/airflow/providers/google/cloud/hooks/pubsub.py index f2ae190..37240a2 100644 --- a/airflow/providers/google/cloud/hooks/pubsub.py +++ b/airflow/providers/google/cloud/hooks/pubsub.py @@ -111,7 +111,7 @@ class PubSubHook(GoogleBaseHook): self._validate_messages(messages) publisher = self.get_conn() - topic_path = PublisherClient.topic_path(project_id, topic) # pylint: disable=no-member + topic_path = f"projects/{project_id}/topics/{topic}" self.log.info("Publish %d messages to topic (path) %s", len(messages), topic_path) try: @@ -206,7 +206,7 @@ class PubSubHook(GoogleBaseHook): :type metadata: Sequence[Tuple[str, str]]] """ publisher = self.get_conn() - topic_path = PublisherClient.topic_path(project_id, topic) # pylint: disable=no-member + topic_path = f"projects/{project_id}/topics/{topic}" # Add airflow-version label to the topic labels = labels or {} @@ -216,13 +216,15 @@ class PubSubHook(GoogleBaseHook): try: # pylint: disable=no-member publisher.create_topic( - name=topic_path, - labels=labels, - message_storage_policy=message_storage_policy, - kms_key_name=kms_key_name, + request={ + "name": topic_path, + "labels": labels, + "message_storage_policy": message_storage_policy, + "kms_key_name": kms_key_name, + }, retry=retry, timeout=timeout, - metadata=metadata, + metadata=metadata or (), ) except AlreadyExists: self.log.warning('Topic already exists: %s', topic) @@ -266,16 +268,13 @@ class PubSubHook(GoogleBaseHook): :type metadata: Sequence[Tuple[str, str]]] """ publisher = self.get_conn() - topic_path = PublisherClient.topic_path(project_id, topic) # pylint: disable=no-member + topic_path = f"projects/{project_id}/topics/{topic}" self.log.info("Deleting topic (path) %s", topic_path) try: # pylint: disable=no-member publisher.delete_topic( - topic=topic_path, - retry=retry, - timeout=timeout, - metadata=metadata, + request={"topic": topic_path}, retry=retry, timeout=timeout, metadata=metadata or () ) except NotFound: self.log.warning('Topic does not exist: %s', topic_path) @@ -401,27 +400,29 @@ class PubSubHook(GoogleBaseHook): labels['airflow-version'] = 'v' + version.replace('.', '-').replace('+', '-') # pylint: disable=no-member - subscription_path = SubscriberClient.subscription_path(subscription_project_id, subscription) - topic_path = SubscriberClient.topic_path(project_id, topic) + subscription_path = f"projects/{subscription_project_id}/subscriptions/{subscription}" + topic_path = f"projects/{project_id}/topics/{topic}" self.log.info("Creating subscription (path) %s for topic (path) %a", subscription_path, topic_path) try: subscriber.create_subscription( - name=subscription_path, - topic=topic_path, - push_config=push_config, - ack_deadline_seconds=ack_deadline_secs, - retain_acked_messages=retain_acked_messages, - message_retention_duration=message_retention_duration, - labels=labels, - enable_message_ordering=enable_message_ordering, - expiration_policy=expiration_policy, - filter_=filter_, - dead_letter_policy=dead_letter_policy, - retry_policy=retry_policy, + request={ + "name": subscription_path, + "topic": topic_path, + "push_config": push_config, + "ack_deadline_seconds": ack_deadline_secs, + "retain_acked_messages": retain_acked_messages, + "message_retention_duration": message_retention_duration, + "labels": labels, + "enable_message_ordering": enable_message_ordering, + "expiration_policy": expiration_policy, + "filter": filter_, + "dead_letter_policy": dead_letter_policy, + "retry_policy": retry_policy, + }, retry=retry, timeout=timeout, - metadata=metadata, + metadata=metadata or (), ) except AlreadyExists: self.log.warning('Subscription already exists: %s', subscription_path) @@ -466,13 +467,16 @@ class PubSubHook(GoogleBaseHook): """ subscriber = self.subscriber_client # noqa E501 # pylint: disable=no-member - subscription_path = SubscriberClient.subscription_path(project_id, subscription) + subscription_path = f"projects/{project_id}/subscriptions/{subscription}" self.log.info("Deleting subscription (path) %s", subscription_path) try: # pylint: disable=no-member subscriber.delete_subscription( - subscription=subscription_path, retry=retry, timeout=timeout, metadata=metadata + request={"subscription": subscription_path}, + retry=retry, + timeout=timeout, + metadata=metadata or (), ) except NotFound: @@ -527,18 +531,20 @@ class PubSubHook(GoogleBaseHook): """ subscriber = self.subscriber_client # noqa E501 # pylint: disable=no-member,line-too-long - subscription_path = SubscriberClient.subscription_path(project_id, subscription) + subscription_path = f"projects/{project_id}/subscriptions/{subscription}" self.log.info("Pulling max %d messages from subscription (path) %s", max_messages, subscription_path) try: # pylint: disable=no-member response = subscriber.pull( - subscription=subscription_path, - max_messages=max_messages, - return_immediately=return_immediately, + request={ + "subscription": subscription_path, + "max_messages": max_messages, + "return_immediately": return_immediately, + }, retry=retry, timeout=timeout, - metadata=metadata, + metadata=metadata or (), ) result = getattr(response, 'received_messages', []) self.log.info("Pulled %d messages from subscription (path) %s", len(result), subscription_path) @@ -591,17 +597,16 @@ class PubSubHook(GoogleBaseHook): subscriber = self.subscriber_client # noqa E501 # pylint: disable=no-member - subscription_path = SubscriberClient.subscription_path(project_id, subscription) + subscription_path = f"projects/{project_id}/subscriptions/{subscription}" self.log.info("Acknowledging %d ack_ids from subscription (path) %s", len(ack_ids), subscription_path) try: # pylint: disable=no-member subscriber.acknowledge( - subscription=subscription_path, - ack_ids=ack_ids, + request={"subscription": subscription_path, "ack_ids": ack_ids}, retry=retry, timeout=timeout, - metadata=metadata, + metadata=metadata or (), ) except (HttpError, GoogleAPICallError) as e: raise PubSubException( diff --git a/airflow/providers/google/cloud/operators/pubsub.py b/airflow/providers/google/cloud/operators/pubsub.py index e8cf735..23b545f 100644 --- a/airflow/providers/google/cloud/operators/pubsub.py +++ b/airflow/providers/google/cloud/operators/pubsub.py @@ -29,7 +29,6 @@ from google.cloud.pubsub_v1.types import ( ReceivedMessage, RetryPolicy, ) -from google.protobuf.json_format import MessageToDict from airflow.models import BaseOperator from airflow.providers.google.cloud.hooks.pubsub import PubSubHook @@ -958,6 +957,6 @@ class PubSubPullOperator(BaseOperator): :param context: same as in `execute` :return: value to be saved to XCom. """ - messages_json = [MessageToDict(m) for m in pulled_messages] + messages_json = [ReceivedMessage.to_dict(m) for m in pulled_messages] return messages_json diff --git a/airflow/providers/google/cloud/sensors/pubsub.py b/airflow/providers/google/cloud/sensors/pubsub.py index d6e0be5..ff1f811 100644 --- a/airflow/providers/google/cloud/sensors/pubsub.py +++ b/airflow/providers/google/cloud/sensors/pubsub.py @@ -20,7 +20,6 @@ import warnings from typing import Any, Callable, Dict, List, Optional, Sequence, Union from google.cloud.pubsub_v1.types import ReceivedMessage -from google.protobuf.json_format import MessageToDict from airflow.providers.google.cloud.hooks.pubsub import PubSubHook from airflow.sensors.base import BaseSensorOperator @@ -200,6 +199,6 @@ class PubSubPullSensor(BaseSensorOperator): :param context: same as in `execute` :return: value to be saved to XCom. """ - messages_json = [MessageToDict(m) for m in pulled_messages] + messages_json = [ReceivedMessage.to_dict(m) for m in pulled_messages] return messages_json diff --git a/setup.py b/setup.py index 1ec4f5d..ff9fd71 100644 --- a/setup.py +++ b/setup.py @@ -296,7 +296,7 @@ google = [ 'google-cloud-memcache>=0.2.0', 'google-cloud-monitoring>=0.34.0,<2.0.0', 'google-cloud-os-login>=2.0.0,<3.0.0', - 'google-cloud-pubsub>=1.0.0,<2.0.0', + 'google-cloud-pubsub>=2.0.0,<3.0.0', 'google-cloud-redis>=0.3.0,<2.0.0', 'google-cloud-secret-manager>=0.2.0,<2.0.0', 'google-cloud-spanner>=1.10.0,<2.0.0', diff --git a/tests/providers/google/cloud/hooks/test_pubsub.py b/tests/providers/google/cloud/hooks/test_pubsub.py index 4086526..628d619 100644 --- a/tests/providers/google/cloud/hooks/test_pubsub.py +++ b/tests/providers/google/cloud/hooks/test_pubsub.py @@ -25,7 +25,6 @@ import pytest from google.api_core.exceptions import AlreadyExists, GoogleAPICallError from google.cloud.exceptions import NotFound from google.cloud.pubsub_v1.types import ReceivedMessage -from google.protobuf.json_format import ParseDict from googleapiclient.errors import HttpError from parameterized import parameterized @@ -67,15 +66,12 @@ class TestPubSubHook(unittest.TestCase): def _generate_messages(self, count) -> List[ReceivedMessage]: return [ - ParseDict( - { - "ack_id": str(i), - "message": { - "data": f'Message {i}'.encode('utf8'), - "attributes": {"type": "generated message"}, - }, + ReceivedMessage( + ack_id=str(i), + message={ + "data": f'Message {i}'.encode('utf8'), + "attributes": {"type": "generated message"}, }, - ReceivedMessage(), ) for i in range(1, count + 1) ] @@ -112,20 +108,19 @@ class TestPubSubHook(unittest.TestCase): create_method = mock_service.return_value.create_topic self.pubsub_hook.create_topic(project_id=TEST_PROJECT, topic=TEST_TOPIC) create_method.assert_called_once_with( - name=EXPANDED_TOPIC, - labels=LABELS, - message_storage_policy=None, - kms_key_name=None, + request=dict(name=EXPANDED_TOPIC, labels=LABELS, message_storage_policy=None, kms_key_name=None), retry=None, timeout=None, - metadata=None, + metadata=(), ) @mock.patch(PUBSUB_STRING.format('PubSubHook.get_conn')) def test_delete_topic(self, mock_service): delete_method = mock_service.return_value.delete_topic self.pubsub_hook.delete_topic(project_id=TEST_PROJECT, topic=TEST_TOPIC) - delete_method.assert_called_once_with(topic=EXPANDED_TOPIC, retry=None, timeout=None, metadata=None) + delete_method.assert_called_once_with( + request=dict(topic=EXPANDED_TOPIC), retry=None, timeout=None, metadata=() + ) @mock.patch(PUBSUB_STRING.format('PubSubHook.get_conn')) def test_delete_nonexisting_topic_failifnotexists(self, mock_service): @@ -177,21 +172,23 @@ class TestPubSubHook(unittest.TestCase): project_id=TEST_PROJECT, topic=TEST_TOPIC, subscription=TEST_SUBSCRIPTION ) create_method.assert_called_once_with( - name=EXPANDED_SUBSCRIPTION, - topic=EXPANDED_TOPIC, - push_config=None, - ack_deadline_seconds=10, - retain_acked_messages=None, - message_retention_duration=None, - labels=LABELS, - enable_message_ordering=False, - expiration_policy=None, - filter_=None, - dead_letter_policy=None, - retry_policy=None, + request=dict( + name=EXPANDED_SUBSCRIPTION, + topic=EXPANDED_TOPIC, + push_config=None, + ack_deadline_seconds=10, + retain_acked_messages=None, + message_retention_duration=None, + labels=LABELS, + enable_message_ordering=False, + expiration_policy=None, + filter=None, + dead_letter_policy=None, + retry_policy=None, + ), retry=None, timeout=None, - metadata=None, + metadata=(), ) assert TEST_SUBSCRIPTION == response @@ -208,21 +205,23 @@ class TestPubSubHook(unittest.TestCase): 'a-different-project', TEST_SUBSCRIPTION ) create_method.assert_called_once_with( - name=expected_subscription, - topic=EXPANDED_TOPIC, - push_config=None, - ack_deadline_seconds=10, - retain_acked_messages=None, - message_retention_duration=None, - labels=LABELS, - enable_message_ordering=False, - expiration_policy=None, - filter_=None, - dead_letter_policy=None, - retry_policy=None, + request=dict( + name=expected_subscription, + topic=EXPANDED_TOPIC, + push_config=None, + ack_deadline_seconds=10, + retain_acked_messages=None, + message_retention_duration=None, + labels=LABELS, + enable_message_ordering=False, + expiration_policy=None, + filter=None, + dead_letter_policy=None, + retry_policy=None, + ), retry=None, timeout=None, - metadata=None, + metadata=(), ) assert TEST_SUBSCRIPTION == response @@ -232,7 +231,7 @@ class TestPubSubHook(unittest.TestCase): self.pubsub_hook.delete_subscription(project_id=TEST_PROJECT, subscription=TEST_SUBSCRIPTION) delete_method = mock_service.delete_subscription delete_method.assert_called_once_with( - subscription=EXPANDED_SUBSCRIPTION, retry=None, timeout=None, metadata=None + request=dict(subscription=EXPANDED_SUBSCRIPTION), retry=None, timeout=None, metadata=() ) @mock.patch(PUBSUB_STRING.format('PubSubHook.subscriber_client')) @@ -266,21 +265,23 @@ class TestPubSubHook(unittest.TestCase): response = self.pubsub_hook.create_subscription(project_id=TEST_PROJECT, topic=TEST_TOPIC) create_method.assert_called_once_with( - name=expected_name, - topic=EXPANDED_TOPIC, - push_config=None, - ack_deadline_seconds=10, - retain_acked_messages=None, - message_retention_duration=None, - labels=LABELS, - enable_message_ordering=False, - expiration_policy=None, - filter_=None, - dead_letter_policy=None, - retry_policy=None, + request=dict( + name=expected_name, + topic=EXPANDED_TOPIC, + push_config=None, + ack_deadline_seconds=10, + retain_acked_messages=None, + message_retention_duration=None, + labels=LABELS, + enable_message_ordering=False, + expiration_policy=None, + filter=None, + dead_letter_policy=None, + retry_policy=None, + ), retry=None, timeout=None, - metadata=None, + metadata=(), ) assert f'sub-{TEST_UUID}' == response @@ -292,21 +293,23 @@ class TestPubSubHook(unittest.TestCase): project_id=TEST_PROJECT, topic=TEST_TOPIC, subscription=TEST_SUBSCRIPTION, ack_deadline_secs=30 ) create_method.assert_called_once_with( - name=EXPANDED_SUBSCRIPTION, - topic=EXPANDED_TOPIC, - push_config=None, - ack_deadline_seconds=30, - retain_acked_messages=None, - message_retention_duration=None, - labels=LABELS, - enable_message_ordering=False, - expiration_policy=None, - filter_=None, - dead_letter_policy=None, - retry_policy=None, + request=dict( + name=EXPANDED_SUBSCRIPTION, + topic=EXPANDED_TOPIC, + push_config=None, + ack_deadline_seconds=30, + retain_acked_messages=None, + message_retention_duration=None, + labels=LABELS, + enable_message_ordering=False, + expiration_policy=None, + filter=None, + dead_letter_policy=None, + retry_policy=None, + ), retry=None, timeout=None, - metadata=None, + metadata=(), ) assert TEST_SUBSCRIPTION == response @@ -321,21 +324,23 @@ class TestPubSubHook(unittest.TestCase): filter_='attributes.domain="com"', ) create_method.assert_called_once_with( - name=EXPANDED_SUBSCRIPTION, - topic=EXPANDED_TOPIC, - push_config=None, - ack_deadline_seconds=10, - retain_acked_messages=None, - message_retention_duration=None, - labels=LABELS, - enable_message_ordering=False, - expiration_policy=None, - filter_='attributes.domain="com"', - dead_letter_policy=None, - retry_policy=None, + request=dict( + name=EXPANDED_SUBSCRIPTION, + topic=EXPANDED_TOPIC, + push_config=None, + ack_deadline_seconds=10, + retain_acked_messages=None, + message_retention_duration=None, + labels=LABELS, + enable_message_ordering=False, + expiration_policy=None, + filter='attributes.domain="com"', + dead_letter_policy=None, + retry_policy=None, + ), retry=None, timeout=None, - metadata=None, + metadata=(), ) assert TEST_SUBSCRIPTION == response @@ -401,12 +406,14 @@ class TestPubSubHook(unittest.TestCase): project_id=TEST_PROJECT, subscription=TEST_SUBSCRIPTION, max_messages=10 ) pull_method.assert_called_once_with( - subscription=EXPANDED_SUBSCRIPTION, - max_messages=10, - return_immediately=False, + request=dict( + subscription=EXPANDED_SUBSCRIPTION, + max_messages=10, + return_immediately=False, + ), retry=None, timeout=None, - metadata=None, + metadata=(), ) assert pulled_messages == response @@ -419,12 +426,14 @@ class TestPubSubHook(unittest.TestCase): project_id=TEST_PROJECT, subscription=TEST_SUBSCRIPTION, max_messages=10 ) pull_method.assert_called_once_with( - subscription=EXPANDED_SUBSCRIPTION, - max_messages=10, - return_immediately=False, + request=dict( + subscription=EXPANDED_SUBSCRIPTION, + max_messages=10, + return_immediately=False, + ), retry=None, timeout=None, - metadata=None, + metadata=(), ) assert [] == response @@ -445,12 +454,14 @@ class TestPubSubHook(unittest.TestCase): with pytest.raises(PubSubException): self.pubsub_hook.pull(project_id=TEST_PROJECT, subscription=TEST_SUBSCRIPTION, max_messages=10) pull_method.assert_called_once_with( - subscription=EXPANDED_SUBSCRIPTION, - max_messages=10, - return_immediately=False, + request=dict( + subscription=EXPANDED_SUBSCRIPTION, + max_messages=10, + return_immediately=False, + ), retry=None, timeout=None, - metadata=None, + metadata=(), ) @mock.patch(PUBSUB_STRING.format('PubSubHook.subscriber_client')) @@ -461,11 +472,13 @@ class TestPubSubHook(unittest.TestCase): project_id=TEST_PROJECT, subscription=TEST_SUBSCRIPTION, ack_ids=['1', '2', '3'] ) ack_method.assert_called_once_with( - subscription=EXPANDED_SUBSCRIPTION, - ack_ids=['1', '2', '3'], + request=dict( + subscription=EXPANDED_SUBSCRIPTION, + ack_ids=['1', '2', '3'], + ), retry=None, timeout=None, - metadata=None, + metadata=(), ) @mock.patch(PUBSUB_STRING.format('PubSubHook.subscriber_client')) @@ -478,11 +491,13 @@ class TestPubSubHook(unittest.TestCase): messages=self._generate_messages(3), ) ack_method.assert_called_once_with( - subscription=EXPANDED_SUBSCRIPTION, - ack_ids=['1', '2', '3'], + request=dict( + subscription=EXPANDED_SUBSCRIPTION, + ack_ids=['1', '2', '3'], + ), retry=None, timeout=None, - metadata=None, + metadata=(), ) @parameterized.expand( @@ -504,11 +519,13 @@ class TestPubSubHook(unittest.TestCase): project_id=TEST_PROJECT, subscription=TEST_SUBSCRIPTION, ack_ids=['1', '2', '3'] ) ack_method.assert_called_once_with( - subscription=EXPANDED_SUBSCRIPTION, - ack_ids=['1', '2', '3'], + request=dict( + subscription=EXPANDED_SUBSCRIPTION, + ack_ids=['1', '2', '3'], + ), retry=None, timeout=None, - metadata=None, + metadata=(), ) @parameterized.expand( diff --git a/tests/providers/google/cloud/operators/test_pubsub.py b/tests/providers/google/cloud/operators/test_pubsub.py index 9ff71e6..6abfffa 100644 --- a/tests/providers/google/cloud/operators/test_pubsub.py +++ b/tests/providers/google/cloud/operators/test_pubsub.py @@ -21,7 +21,6 @@ from typing import Any, Dict, List from unittest import mock from google.cloud.pubsub_v1.types import ReceivedMessage -from google.protobuf.json_format import MessageToDict, ParseDict from airflow.providers.google.cloud.operators.pubsub import ( PubSubCreateSubscriptionOperator, @@ -230,21 +229,18 @@ class TestPubSubPublishOperator(unittest.TestCase): class TestPubSubPullOperator(unittest.TestCase): def _generate_messages(self, count): return [ - ParseDict( - { - "ack_id": "%s" % i, - "message": { - "data": f'Message {i}'.encode('utf8'), - "attributes": {"type": "generated message"}, - }, + ReceivedMessage( + ack_id="%s" % i, + message={ + "data": f'Message {i}'.encode('utf8'), + "attributes": {"type": "generated message"}, }, - ReceivedMessage(), ) for i in range(1, count + 1) ] def _generate_dicts(self, count): - return [MessageToDict(m) for m in self._generate_messages(count)] + return [ReceivedMessage.to_dict(m) for m in self._generate_messages(count)] @mock.patch('airflow.providers.google.cloud.operators.pubsub.PubSubHook') def test_execute_no_messages(self, mock_hook): diff --git a/tests/providers/google/cloud/sensors/test_pubsub.py b/tests/providers/google/cloud/sensors/test_pubsub.py index ba1aee9..795860b 100644 --- a/tests/providers/google/cloud/sensors/test_pubsub.py +++ b/tests/providers/google/cloud/sensors/test_pubsub.py @@ -22,7 +22,6 @@ from unittest import mock import pytest from google.cloud.pubsub_v1.types import ReceivedMessage -from google.protobuf.json_format import MessageToDict, ParseDict from airflow.exceptions import AirflowSensorTimeout from airflow.providers.google.cloud.sensors.pubsub import PubSubPullSensor @@ -35,21 +34,18 @@ TEST_SUBSCRIPTION = 'test-subscription' class TestPubSubPullSensor(unittest.TestCase): def _generate_messages(self, count): return [ - ParseDict( - { - "ack_id": "%s" % i, - "message": { - "data": f'Message {i}'.encode('utf8'), - "attributes": {"type": "generated message"}, - }, + ReceivedMessage( + ack_id="%s" % i, + message={ + "data": f'Message {i}'.encode('utf8'), + "attributes": {"type": "generated message"}, }, - ReceivedMessage(), ) for i in range(1, count + 1) ] def _generate_dicts(self, count): - return [MessageToDict(m) for m in self._generate_messages(count)] + return [ReceivedMessage.to_dict(m) for m in self._generate_messages(count)] @mock.patch('airflow.providers.google.cloud.sensors.pubsub.PubSubHook') def test_poke_no_messages(self, mock_hook):
