TobKed commented on a change in pull request #6096: [AIRFLOW-5477] Rewrite
Google PubSub Hook to Google Cloud Python
URL: https://github.com/apache/airflow/pull/6096#discussion_r324696928
##########
File path: airflow/gcp/hooks/pubsub.py
##########
@@ -259,29 +443,57 @@ def pull(
return if no messages are available. Otherwise, the request will
block for an undisclosed, but bounded period of time
:type return_immediately: bool
+ :param retry: (Optional) A retry object used to retry requests.
+ If None is specified, requests will not be retried.
+ :type retry: google.api_core.retry.Retry
+ :param timeout: (Optional) The amount of time, in seconds, to wait for
the request
+ to complete. Note that if retry is specified, the timeout applies
to each
+ individual attempt.
+ :type timeout: float
+ :param metadata: (Optional) Additional metadata that is provided to
the method.
+ :type metadata: Sequence[Tuple[str, str]]]
:return: A list of Pub/Sub ReceivedMessage objects each containing
an ``ackId`` property and a ``message`` property, which includes
the base64-encoded message content. See
https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions/pull#ReceivedMessage
"""
- service = self.get_conn()
- full_subscription = _format_subscription(project, subscription)
- body = {
- 'maxMessages': max_messages,
- 'returnImmediately': return_immediately
- }
+ subscriber = self.subscriber_client
+ subscription_path = SubscriberClient.subscription_path(project,
subscription) # noqa E501 # pylint: disable=no-member,line-too-long
+
+ self.log.info("Pulling mex %d messages from subscription (path) %s",
max_messages, subscription_path)
try:
- response = service.projects().subscriptions().pull( # pylint:
disable=no-member
- subscription=full_subscription,
body=body).execute(num_retries=self.num_retries)
- return response.get('receivedMessages', [])
+ # pylint: disable=no-member
+ response = subscriber.pull(
+ subscription=subscription_path,
+ max_messages=max_messages,
+ return_immediately=return_immediately,
+ retry=retry,
+ timeout=timeout,
+ metadata=metadata,
+ )
+ result = getattr(response, 'received_messages', [])
+ self.log.info("Pulled %d messages from subscription (path) %s",
len(result),
+ subscription_path)
+ return result
except HttpError as e:
raise PubSubException(
'Error pulling messages from subscription {}'.format(
- full_subscription), e)
+ subscription_path), e)
+ except GoogleAPICallError as e:
Review comment:
I've added test.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services