mik-laj commented on a change in pull request #6096: [AIRFLOW-5477] Rewrite Google PubSub Hook to Google Cloud Python URL: https://github.com/apache/airflow/pull/6096#discussion_r324422317
########## File path: airflow/gcp/hooks/pubsub.py ########## @@ -259,29 +443,57 @@ def pull( return if no messages are available. Otherwise, the request will block for an undisclosed, but bounded period of time :type return_immediately: bool + :param retry: (Optional) A retry object used to retry requests. + If None is specified, requests will not be retried. + :type retry: google.api_core.retry.Retry + :param timeout: (Optional) The amount of time, in seconds, to wait for the request + to complete. Note that if retry is specified, the timeout applies to each + individual attempt. + :type timeout: float + :param metadata: (Optional) Additional metadata that is provided to the method. + :type metadata: Sequence[Tuple[str, str]]] :return: A list of Pub/Sub ReceivedMessage objects each containing an ``ackId`` property and a ``message`` property, which includes the base64-encoded message content. See https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions/pull#ReceivedMessage """ - service = self.get_conn() - full_subscription = _format_subscription(project, subscription) - body = { - 'maxMessages': max_messages, - 'returnImmediately': return_immediately - } + subscriber = self.subscriber_client + subscription_path = SubscriberClient.subscription_path(project, subscription) # noqa E501 # pylint: disable=no-member,line-too-long + + self.log.info("Pulling mex %d messages from subscription (path) %s", max_messages, subscription_path) try: - response = service.projects().subscriptions().pull( # pylint: disable=no-member - subscription=full_subscription, body=body).execute(num_retries=self.num_retries) - return response.get('receivedMessages', []) + # pylint: disable=no-member + response = subscriber.pull( + subscription=subscription_path, + max_messages=max_messages, + return_immediately=return_immediately, + retry=retry, + timeout=timeout, + metadata=metadata, + ) + result = getattr(response, 'received_messages', []) + self.log.info("Pulled %d messages from subscription (path) %s", len(result), + subscription_path) + return result except HttpError as e: raise PubSubException( 'Error pulling messages from subscription {}'.format( - full_subscription), e) + subscription_path), e) + except GoogleAPICallError as e: Review comment: This part of code is not covered by unit test. Can you complete missing unit tests? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services